In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

# 1. Start Spark session
spark = SparkSession.builder.appName("ModelTraining").getOrCreate()

# 2. Load data
train_df = spark.read.csv("train_set.csv", header=True, inferSchema=True)
test_df = spark.read.csv("test_set.csv", header=True, inferSchema=True)

In [9]:
# 3. Assemble features (you need to list your feature columns here)
feature_cols = [col for col in train_df.columns if col != "TenYearCHD"]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# # 4. Model
lr = LogisticRegression(featuresCol="features", labelCol="TenYearCHD")

# 5. Build a pipeline
pipeline = Pipeline(stages=[assembler, lr])

# 6. Define hyperparameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 1.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

# 7. Evaluator
evaluator = BinaryClassificationEvaluator(labelCol="TenYearCHD", metricName="areaUnderROC")

# 8. CrossValidator (grid search with CV)
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,   # 3-fold CV
    parallelism=4 # how many models to train in parallel (based on your cluster resources)
)

# 9. Train model (with tuning)
cv_model = crossval.fit(train_df)

# 10. Pick best model
best_model = cv_model.bestModel

# 11. Evaluate on test set
predictions = best_model.transform(test_df)

auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")

# Optional: Show some predictions
predictions.select("features", "TenYearCHD", "prediction", "probability").show()

Test AUC: 0.6626451466830516
+--------------------+----------+----------+--------------------+
|            features|TenYearCHD|prediction|         probability|
+--------------------+----------+----------+--------------------+
|(16,[0,1,2,7,8,9,...|         0|       1.0|[0.39640490073695...|
|(16,[1,2,5,7,8,9,...|         0|       1.0|[0.34072652066599...|
|(16,[0,1,2,7,8,9,...|         1|       1.0|[0.37401917567010...|
|[1.0,-0.301586855...|         0|       1.0|[0.17318863776027...|
|[1.0,0.5151064948...|         0|       0.0|[0.52081678073214...|
|(16,[0,1,2,7,8,9,...|         0|       1.0|[0.38615563751711...|
|[1.0,0.6317769734...|         0|       1.0|[0.46442068248917...|
|(16,[1,2,7,8,9,10...|         0|       0.0|[0.63698235568789...|
|(16,[1,2,7,8,9,10...|         0|       1.0|[0.40223385161777...|
|[1.0,-0.418257333...|         0|       0.0|[0.58493670272350...|
|(16,[1,2,7,8,9,10...|         1|       0.0|[0.70520468904405...|
|[1.0,-1.351621162...|         0|       0.0|[0.

In [None]:
y_test  = predictions.select("TenYearCHD").rdd.flatMap(lambda x: x).collect()
y_pred = predictions.select("prediction").rdd.flatMap(lambda x: x).collect()

# Calculate metrics
test_accuracy = accuracy_score(y_test, y_pred)
test_precision = precision_score(y_test, y_pred, average="weighted")
test_recall = recall_score(y_test, y_pred, average="weighted")
test_f1 = f1_score(y_test, y_pred, average="weighted")
test_roc_auc = roc_auc_score(y_test, y_pred)

# # Print metrics
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Precision (Weighted): {test_precision}")
print(f"Test Recall (Weighted): {test_recall}")
print(f"Test F1 Score (Weighted): {test_f1}")
print(f"Test ROC AUC: {test_roc_auc}")

Test Accuracy: 0.6403301886792453
Test Precision (Weighted): 0.789055268778925
Test Recall (Weighted): 0.6403301886792453
Test F1 Score (Weighted): 0.6891549582581097
Test ROC AUC: 0.6034274563077487
