A chemist had two chemical flasks labeled 0 and 1 which consist of two
different chemicals. He extracted 3 features from these chemicals in order to
distinguish between them, you provided the results derived by the chemicals and
your task is to create a model that will label chemical 0 or 1 given its three features
and built-in docker and use some library to display that in frontend.
Note : Use only pyspark

In [1]:
import os

from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName("chemicals").getOrCreate()
filePath = 'indian_liver_patient.csv'
df=spark.read.format("csv").option("header","true").load(filePath)

In [3]:
from pyspark.sql.functions import countDistinct

# Assuming you have a DataFrame 'df'
distinct_counts = df.agg(*(countDistinct(col).alias(col) for col in df.columns))

# The 'distinct_counts' DataFrame will contain the number of unique values in each column
distinct_counts.show()

+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
|Age|Gender|Total_Bilirubin|Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|Total_Protiens|Albumin|Albumin_and_Globulin_Ratio|Dataset|
+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
| 72|     2|            113|              80|                 263|                     152|                       177|            58|     40|                        69|      2|
+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+



In [4]:
from pyspark.sql.functions import count

# Assuming you have a DataFrame 'df'
value_counts = {}

# Iterate over the columns
for col in df.columns:
    # Group by the column and count the occurrences of each unique value
    col_value_counts = df.groupBy(col).agg(count('*').alias('count'))
    
    # Collect the value counts into a dictionary
    value_counts[col] = col_value_counts

# Print the value counts for each column
for col, counts in value_counts.items():
    print(f"Value counts for column '{col}':")
    counts.show()


Value counts for column 'Age':
+---+-----+
|Age|count|
+---+-----+
| 51|   10|
|  7|    2|
| 54|    8|
| 15|    1|
| 11|    1|
| 29|    7|
| 69|    2|
| 42|   21|
| 73|    2|
| 64|    6|
| 30|   10|
| 34|    8|
|  8|    1|
| 28|    8|
| 22|    9|
| 85|    1|
| 52|    7|
| 35|   12|
| 16|    3|
| 47|    6|
+---+-----+
only showing top 20 rows

Value counts for column 'Gender':
+------+-----+
|Gender|count|
+------+-----+
|Female|  142|
|  Male|  441|
+------+-----+

Value counts for column 'Total_Bilirubin':
+---------------+-----+
|Total_Bilirubin|count|
+---------------+-----+
|            2.6|    5|
|            8.2|    1|
|            7.3|    3|
|            3.1|    2|
|           14.2|    1|
|           16.6|    1|
|             15|    1|
|            4.2|    2|
|             11|    1|
|           15.8|    1|
|           22.5|    1|
|           16.4|    1|
|              3|    2|
|           16.7|    1|
|           22.8|    1|
|              8|    1|
|            2.7|    9|
|      

In [5]:
df.show()

+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
|Age|Gender|Total_Bilirubin|Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|Total_Protiens|Albumin|Albumin_and_Globulin_Ratio|Dataset|
+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
| 65|Female|            0.7|             0.1|                 187|                      16|                        18|           6.8|    3.3|                       0.9|      1|
| 62|  Male|           10.9|             5.5|                 699|                      64|                       100|           7.5|    3.2|                      0.74|      1|
| 62|  Male|            7.3|             4.1|                 490|                      60|                        

In [6]:
df.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Total_Bilirubin: string (nullable = true)
 |-- Direct_Bilirubin: string (nullable = true)
 |-- Alkaline_Phosphotase: string (nullable = true)
 |-- Alamine_Aminotransferase: string (nullable = true)
 |-- Aspartate_Aminotransferase: string (nullable = true)
 |-- Total_Protiens: string (nullable = true)
 |-- Albumin: string (nullable = true)
 |-- Albumin_and_Globulin_Ratio: string (nullable = true)
 |-- Dataset: string (nullable = true)



In [7]:
df = df.dropna()

Processing the Gender categorical feature 

In [42]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import monotonically_increasing_id
# Assuming you have a DataFrame 'df' with a column named 'Age'

# StringIndexer to convert the 'Age' column to numeric indices
indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndex')

# OneHotEncoder to perform one-hot encoding on the 'AgeIndex' column
encoder = OneHotEncoder(inputCols=['GenderIndex'], outputCols=['GenderVec'])

# Create a pipeline to execute the StringIndexer and OneHotEncoder in sequence
pipeline = Pipeline(stages=[indexer, encoder])

# Fit and transform the DataFrame using the pipeline
encoded_df = pipeline.fit(df).transform(df)
df_with_index = encoded_df.withColumn("Index", monotonically_increasing_id())

# Show the encoded DataFrame

df_with_index.printSchema()
df_with_index.show()

root
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Total_Bilirubin: double (nullable = true)
 |-- Direct_Bilirubin: double (nullable = true)
 |-- Alkaline_Phosphotase: double (nullable = true)
 |-- Alamine_Aminotransferase: double (nullable = true)
 |-- Aspartate_Aminotransferase: double (nullable = true)
 |-- Total_Protiens: double (nullable = true)
 |-- Albumin: double (nullable = true)
 |-- Albumin_and_Globulin_Ratio: string (nullable = true)
 |-- Dataset: string (nullable = true)
 |-- GenderIndex: double (nullable = false)
 |-- GenderVec: vector (nullable = true)
 |-- Index: long (nullable = false)

+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+-----------+-------------+-----+
|Age|Gender|Total_Bilirubin|Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|Total_Protiens|Albumin|Albumin

Processing the float columns represented as string

In [58]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType

numeric_columns = df_with_index.select([col(column).cast("float").alias(column) for column in df_with_index.columns[2:] 
                                    #  if encoded_df.schema[column].dataType == StringType()
                                      if column not in ["GenderIndex", "GenderVec"] 
                                      ])
print(numeric_columns.schema.json())
# numeric_columns = df.columns[2:-1]  # Select numeric columns except 'Age', 'Gender', and 'Dataset'
# for column in numeric_columns:
#     df = df.withColumn(column, df[column].cast("double"))
numeric_columns.printSchema()
numeric_columns.show()

{"fields":[{"metadata":{},"name":"Total_Bilirubin","nullable":true,"type":"float"},{"metadata":{},"name":"Direct_Bilirubin","nullable":true,"type":"float"},{"metadata":{},"name":"Alkaline_Phosphotase","nullable":true,"type":"float"},{"metadata":{},"name":"Alamine_Aminotransferase","nullable":true,"type":"float"},{"metadata":{},"name":"Aspartate_Aminotransferase","nullable":true,"type":"float"},{"metadata":{},"name":"Total_Protiens","nullable":true,"type":"float"},{"metadata":{},"name":"Albumin","nullable":true,"type":"float"},{"metadata":{},"name":"Albumin_and_Globulin_Ratio","nullable":true,"type":"float"},{"metadata":{},"name":"Dataset","nullable":true,"type":"float"},{"metadata":{},"name":"Index","nullable":false,"type":"float"}],"type":"struct"}
root
 |-- Total_Bilirubin: float (nullable = true)
 |-- Direct_Bilirubin: float (nullable = true)
 |-- Alkaline_Phosphotase: float (nullable = true)
 |-- Alamine_Aminotransferase: float (nullable = true)
 |-- Aspartate_Aminotransferase: flo

In [70]:
# Select the required columns from encoded_df

encoded_column = df_with_index.select(col("GenderVec"),col("index"))
# Concatenate the numeric_columns and encoded_columns
all_columns = numeric_columns .join(encoded_column,on=["index"])
all_columns = all_columns.drop("index")


In [71]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from xgboost import XGBClassifier
# from xgboost.spark import SparkXGBRegressor
inputCols = all_columns.columns
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
all_columns = assembler.transform(all_columns)


In [73]:
inputCols

['Total_Bilirubin',
 'Direct_Bilirubin',
 'Alkaline_Phosphotase',
 'Alamine_Aminotransferase',
 'Aspartate_Aminotransferase',
 'Total_Protiens',
 'Albumin',
 'Albumin_and_Globulin_Ratio',
 'Dataset',
 'GenderVec']

In [81]:
train_data, test_data = all_columns.randomSplit([0.7, 0.3], seed=42)
lr = LogisticRegression(labelCol="Dataset", featuresCol="features")
model = lr.fit(train_data)
y_pred = model.transform(test_data)


from pyspark.ml.evaluation import BinaryClassificationEvaluator


evaluator = BinaryClassificationEvaluator(labelCol='Dataset', rawPredictionCol='prediction')
classification_score = evaluator.evaluate(y_pred)
print("Classification Score:", classification_score)


DataFrame[Total_Bilirubin: float, Direct_Bilirubin: float, Alkaline_Phosphotase: float, Alamine_Aminotransferase: float, Aspartate_Aminotransferase: float, Total_Protiens: float, Albumin: float, Albumin_and_Globulin_Ratio: float, Dataset: float, GenderVec: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
Classification Score: 1.0


In [97]:
distinct_values = all_columns.select('Dataset').distinct().collect()
distinct_values
y_pred.groupby("rawPrediction").agg(count('*').alias("raw_count")).show()
y_pred.agg(countDistinct("rawPrediction")).show()

+--------------------+---------+
|       rawPrediction|raw_count|
+--------------------+---------+
|[-7.1966301932299...|        1|
|[-7.2346698557783...|        1|
|[-7.1115042709243...|        1|
|[-7.1290993200220...|        1|
|[-7.2022189799144...|        2|
|[-6.2328763421645...|        1|
|[-7.1544401363139...|        1|
|[-6.9685857828833...|        1|
|[-7.1102081351194...|        1|
|[-7.2149595027539...|        1|
|[-7.1227013555836...|        1|
|[-7.1797752125417...|        1|
|[-6.8063873783382...|        1|
|[-6.3820925068507...|        1|
|[-7.1645508771414...|        1|
|[-7.1987485235449...|        1|
|[-7.1731584920510...|        1|
|[-7.0801493082289...|        1|
|[-7.1570912716788...|        1|
|[-7.1339444817286...|        1|
+--------------------+---------+
only showing top 20 rows

+--------------------+
|count(rawPrediction)|
+--------------------+
|                 144|
+--------------------+



In [37]:
import streamlit as st 




+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+--------------------+--------------------+--------------------+----------+
|Age|Gender|Total_Bilirubin|Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|Total_Protiens|Albumin|Albumin_and_Globulin_Ratio|Dataset|            features|       rawPrediction|         probability|prediction|
+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+--------------------+--------------------+--------------------+----------+
| 12|  Male|            0.8|             0.2|               302.0|                    47.0|                      67.0|           6.7|    3.5|                       1.1|    2.0|[0.8,0.2,302.0,47...|[-9.8920319843369...|[1.69731648685040...|       1

In [99]:
import streamlit as st
# Define a function to preprocess the input data
def preprocess_input(data):
    data = spark.createDataFrame(data)
    encoded_df = pipeline.fit(data).transform(data)
    df_with_index = encoded_df.withColumn("Index", monotonically_increasing_id())
    numeric_columns = df_with_index.select([col(column).cast("float").alias(column) for column in df_with_index.columns[2:] 
                                        #  if encoded_df.schema[column].dataType == StringType()
                                        if column not in ["GenderIndex", "GenderVec"] 
                                        ])
    encoded_column = df_with_index.select(col("GenderVec"),col("index"))
    # Concatenate the numeric_columns and encoded_columns
    all_columns = numeric_columns .join(encoded_column,on=["index"])
    all_columns = all_columns.drop("index")

    return all_columns

def make_predictions(data):
    # Preprocess the input data
    preprocessed_data = preprocess_input(data)

    # Convert the preprocessed data to a DMatrix
    dmatrix = model.DMatrix(preprocessed_data)

    # Make predictions using the XGBoost model
    predictions = model.predict(dmatrix)

    return predictions
def main():
    # Set the app title
    st.title("Indian Liver Patient")
    st.write("Displaying the results of the predictions")

    st.write(f"classification_score : {classification_score}")

    # Create input fields for the features
    age = st.number_input("Age", value=12)
    gender = st.selectbox("Gender", ["Male", "Female"])
    total_bilirubin = st.number_input("Total Bilirubin", value=0.8)
    direct_bilirubin = st.number_input("Direct Bilirubin", value=0.2)
    alkaline_phosphotase = st.number_input("Alkaline Phosphotase", value=302.0)
    alamine_aminotransferase = st.number_input("Alamine Aminotransferase", value=47.0)
    aspartate_aminotransferase = st.number_input("Aspartate Aminotransferase", value=67.0)
    total_protiens = st.number_input("Total Protiens", value=6.7)
    albumin = st.number_input("Albumin", value=3.5)
    albumin_globulin_ratio = st.number_input("Albumin and Globulin Ratio", value=1.1)

    # Create a dictionary with the input data
    input_data = {
        "Age": age,
        "Gender": gender,
        "Total_Bilirubin": total_bilirubin,
        "Direct_Bilirubin": direct_bilirubin,
        "Alkaline_Phosphotase": alkaline_phosphotase,
        "Alamine_Aminotransferase": alamine_aminotransferase,
        "Aspartate_Aminotransferase": aspartate_aminotransferase,
        "Total_Protiens": total_protiens,
        "Albumin": albumin,
        "Albumin_and_Globulin_Ratio": albumin_globulin_ratio
    }

    # Make predictions using the input data
    predictions = make_predictions(input_data)

    # Display the predictions
    st.header("Predictions")
    st.write(predictions)

if __name__ == "__main__":
    main()

2023-06-09 15:39:12.007 
  command:

    streamlit run c:\Users\arunk\ineuron-assignments\ASSESSMENT COMPLETION\Machine_learning_Assessment\env\lib\site-packages\ipykernel_launcher.py [ARGUMENTS]


Py4JError: An error occurred while calling o2408.legacyInferArrayTypeFromFirstElement. Trace:
py4j.Py4JException: Method legacyInferArrayTypeFromFirstElement([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)

