**Spark Pipeline**

Aditya Pulikal / MSc DSAI / L022

In [None]:
!pip install pyspark

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType


from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.types as tp

# Create a Spark session
spark = SparkSession.builder \
    .appName("DiabetesPredictionPipeline") \
    .getOrCreate()

# Read the CSV file
my_data = spark.read.csv('/content/diabetes.csv', header=True)

# Define the schema for the data
my_schema = tp.StructType([
    tp.StructField(name='Pregnancies', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Glucose', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BloodPressure', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='SkinThickness', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Insulin', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BMI', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='DiabetesPedigreeFunction', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='Age', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Outcome', dataType=tp.IntegerType(), nullable=True)
])

# Read the data again with the defined schema
my_data = spark.read.csv('diabetes.csv', schema=my_schema, header=True)

# Print the schema
my_data.printSchema()


# Read the CSV file
my_data = spark.read.csv('diabetes.csv', schema=my_schema, header=True)

# Convert zeros to nulls
from pyspark.sql.functions import when

# List of columns where you want to replace zeros with null
cols_to_check = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']

# Replace zeros with nulls
for col in cols_to_check:
    my_data = my_data.withColumn(col, when(my_data[col] == 0, None).otherwise(my_data[col]))

# Now you can proceed with the rest of your pipeline


# Define stages for the pipeline
imputer = Imputer(
    inputCols=['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI'], # Specify columns to impute
    outputCols=["{}_imputed".format(c) for c in ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']]
).setStrategy("median")

assembler = VectorAssembler(
    inputCols=['Pregnancies', 'Glucose_imputed', 'BloodPressure_imputed', 'SkinThickness_imputed',
               'Insulin_imputed', 'BMI_imputed', 'DiabetesPedigreeFunction', 'Age'], # Use imputed columns
    outputCol='features'
)

lr = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=10)

# Create the pipeline
pipeline = Pipeline(stages=[imputer, assembler, lr])

# Split the data into training and test sets
xtrain, xtest = my_data.randomSplit([0.8, 0.2])

# Fit the pipeline on training data
pipeline_model = pipeline.fit(xtrain)

# Make predictions on the test data
predictions = pipeline_model.transform(xtest)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()









root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)

Accuracy: 0.7763157894736842
