In [None]:
!pip install pyspark
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, when

# Download CSV
!wget https://raw.githubusercontent.com/neelamdoshi/Spark_neelam/main/diabetes.csv

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DiabetesLogisticRegressionWithImputer") \
    .getOrCreate()

# Load and inspect the data
data = spark.read.csv("diabetes.csv", header=True, inferSchema=True)
data.printSchema()
data.show(5)

# Step 1: Replace zero values with nulls in specific columns where zero is invalid
columns_with_zero_as_missing = ["Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI"]

# Replace zero values with null in specified columns
for column in columns_with_zero_as_missing:
    data = data.withColumn(column, when(col(column) == 0, None).otherwise(col(column)))

# Step 2: Impute missing (null) values using the median strategy
feature_cols = [col for col in data.columns if col != 'Outcome']
imputer = Imputer(inputCols=feature_cols, outputCols=[f"{col}_imputed" for col in feature_cols])\
    .setStrategy("median")  # Set strategy to median

# Apply the Imputer
imputed_data = imputer.fit(data).transform(data)
imputed_data.show(5)  # Inspect imputed data

# Step 3: Feature assembly and scaling
# Use imputed columns for feature assembling
imputed_feature_cols = [f"{col}_imputed" for col in feature_cols]

# Assemble the feature columns into a single vector
assembler = VectorAssembler(inputCols=imputed_feature_cols, outputCol="features")

# Standardize the feature vectors (scaling)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Create the label column from Outcome (cast it to double type)
imputed_data = imputed_data.withColumn("label", col("Outcome").cast("double"))

# Step 4: Create a Logistic Regression model
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

# Step 5: Create a Pipeline with stages: Imputer -> Assembler -> Scaler -> Logistic Regression
pipeline = Pipeline(stages=[imputer, assembler, scaler, lr])

# Step 6: Train-test split
train_data, test_data = imputed_data.randomSplit([0.8, 0.2], seed=42)

# Step 7: Train the model
model = pipeline.fit(train_data)

# Step 8: Make predictions on the test data
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(5)

# Step 9: Evaluate the model using BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop Spark session
spark.stop()
