In [None]:
!pip install pyspark
!pip install findspark
!pip install pyspark numpy pandas matplotlib seaborn scikit-lear

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("HeartDiseasePrediction").getOrCreate()

# Load dataset (ensure the CSV file is accessible in your environment)
df = spark.read.csv("C:/Users/sutha/Desktop/heart_2020_cleaned.csv", header=True, inferSchema=True)
df.show(5)

In [None]:
import seaborn as sns

df_pd = df.select("HeartDisease").toPandas()
plt.figure(figsize=(6, 4))
sns.countplot(data=df_pd, x="HeartDisease", palette="coolwarm")
plt.xlabel("Heart Disease (0 = No, 1 = Yes)")
plt.ylabel("Count")
plt.title("Distribution of Heart Disease Cases")
plt.show()

In [None]:
# Preprocess the target variable: map "Yes" to 1 and "No" to 0
df = df.withColumn("HeartDisease", when(col("HeartDisease") == "Yes", 1).otherwise(0))

# Convert binary columns from Yes/No to 1/0
binary_cols = ['Smoking', 'AlcoholDrinking', 'Stroke', 'DiffWalking',
               'PhysicalActivity', 'Asthma', 'KidneyDisease', 'SkinCancer']
for c in binary_cols:
    df = df.withColumn(c, when(col(c) == "Yes", 1).otherwise(0))

# Categorical columns to be indexed and one-hot encoded
categorical_cols = ['Sex', 'AgeCategory', 'Race', 'Diabetic', 'GenHealth']

# Create indexers and encoders for each categorical variable
indexers = [StringIndexer(inputCol=c, outputCol=c+"_Index", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c+"_Index", outputCol=c+"_OHE") for c in categorical_cols]

# Numerical columns
numerical_cols = ['BMI', 'PhysicalHealth', 'MentalHealth', 'SleepTime']

# Assemble features: include one-hot encoded categorical columns, binary columns, and numerical columns
assemblerInputs = [c+"_OHE" for c in categorical_cols] + binary_cols + numerical_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [None]:
# Build the preprocessing pipeline
pipeline_stages = indexers + encoders + [assembler]
pipeline = Pipeline(stages=pipeline_stages)
pipelineModel = pipeline.fit(df)
df_transformed = pipelineModel.transform(df)

# Select only the features and label
data = df_transformed.select("features", "HeartDisease")
data.show(5)

In [None]:
# Split into training (80%) and testing (20%) sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Initialize BinaryClassificationEvaluator using areaUnderROC as the metric
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="HeartDisease", metricName="areaUnderROC")

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

def train_model(model, paramGrid):
    # Build a Pipeline with the model as the final stage (features already preprocessed)
    pipeline_model = Pipeline(stages=[model])

    # Set up CrossValidator with 5-fold CV
    cv = CrossValidator(estimator=pipeline_model,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator,
                        numFolds=5,
                        parallelism=2)

    # Fit the model on training data
    cvModel = cv.fit(train_data)
    # Generate predictions on test data
    predictions = cvModel.transform(test_data)
    # Evaluate using AUC
    auc = evaluator.evaluate(predictions)
    return cvModel, predictions, auc

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="HeartDisease", maxIter=100)
paramGrid_lr = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1]).addGrid(lr.elasticNetParam, [0.0, 0.5]).build()

cvModel_lr, predictions_lr, auc_lr = train_model(lr, paramGrid_lr)
print("Logistic Regression AUC:", auc_lr)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Initialize evaluators
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="accuracy"
)

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedPrecision"
)

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedRecall"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="f1"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="HeartDisease", metricName="areaUnderROC"
)

# Compute metrics
accuracy = evaluator_accuracy.evaluate(predictions_lr)
precision = evaluator_precision.evaluate(predictions_lr)
recall = evaluator_recall.evaluate(predictions_lr)
f1_score = evaluator_f1.evaluate(predictions_lr)
auc = evaluator_auc.evaluate(predictions_lr)

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")
print(f"AUC: {auc:.4f}")

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol="features", labelCol="HeartDisease")
paramGrid_dt = ParamGridBuilder().addGrid(dt.maxDepth, [5, 10, 15]).addGrid(dt.minInstancesPerNode, [1, 5]).build()

cvModel_dt, predictions_dt, auc_dt = train_model(dt, paramGrid_dt)
print("Decision Tree AUC:", auc_dt)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Initialize evaluators
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="accuracy"
)

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedPrecision"
)

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedRecall"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="f1"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="HeartDisease", metricName="areaUnderROC"
)

# Compute metrics
accuracy = evaluator_accuracy.evaluate(predictions_dt)
precision = evaluator_precision.evaluate(predictions_dt)
recall = evaluator_recall.evaluate(predictions_dt)
f1_score = evaluator_f1.evaluate(predictions_dt)
auc = evaluator_auc.evaluate(predictions_dt)

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")
print(f"AUC: {auc:.4f}")

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="HeartDisease")
paramGrid_rf = ParamGridBuilder().addGrid(rf.numTrees, [50, 100]).addGrid(rf.maxDepth, [5, 10]).build()

cvModel_rf, predictions_rf, auc_rf = train_model(rf, paramGrid_rf)
print("Random Forest AUC:", auc_rf)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Initialize evaluators
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="accuracy"
)

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedPrecision"
)

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedRecall"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="f1"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="HeartDisease", metricName="areaUnderROC"
)

# Compute metrics
accuracy = evaluator_accuracy.evaluate(predictions_rf)
precision = evaluator_precision.evaluate(predictions_rf)
recall = evaluator_recall.evaluate(predictions_rf)
f1_score = evaluator_f1.evaluate(predictions_rf)
auc = evaluator_auc.evaluate(predictions_rf)

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")
print(f"AUC: {auc:.4f}")
####
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol="features", labelCol="HeartDisease", maxIter=50)
paramGrid_gbt = ParamGridBuilder().addGrid(gbt.maxDepth, [5, 10]).build()

cvModel_gbt, predictions_gbt, auc_gbt = train_model(gbt, paramGrid_gbt)
print("Gradient-Boosted Trees AUC:", auc_gbt)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Initialize evaluators
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="accuracy"
)

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedPrecision"
)

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="weightedRecall"
)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="HeartDisease", predictionCol="prediction", metricName="f1"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="HeartDisease", metricName="areaUnderROC"
)

# Compute metrics
accuracy = evaluator_accuracy.evaluate(predictions_gbt)
precision = evaluator_precision.evaluate(predictions_gbt)
recall = evaluator_recall.evaluate(predictions_gbt)
f1_score = evaluator_f1.evaluate(predictions_gbt)
auc = evaluator_auc.evaluate(predictions_gbt)

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")
print(f"AUC: {auc:.4f}")

In [None]:
print("AUC Comparison:")
print("Logistic Regression:", auc_lr)
print("Decision Tree:", auc_dt)
print("Random Forest:", auc_rf)
print("GBT:", auc_gbt)

# Choose the best model based on highest AUC
auc_dict = {"LogisticRegression": auc_lr, "DecisionTree": auc_dt, "RandomForest": auc_rf, "GBT": auc_gbt}
best_model_name = max(auc_dict, key=auc_dict.get)
print("Best Model:", best_model_name)

In [None]:
from pyspark.sql.functions import col
import pandas as pd
import scipy.stats as stats

# Function to compute correlation
def compute_corr(predictions, model_name):
    # Convert to Pandas DataFrame
    predictions_pd = predictions.select("HeartDisease", "probability").toPandas()
    
    # Extract probability of having heart disease (second value in probability vector)
    predictions_pd["Predicted_Prob"] = predictions_pd["probability"].apply(lambda x: float(x[1]))

    # Compute Pearson and Spearman Correlation
    pearson_corr = stats.pearsonr(predictions_pd["HeartDisease"], predictions_pd["Predicted_Prob"])[0]
    spearman_corr = stats.spearmanr(predictions_pd["HeartDisease"], predictions_pd["Predicted_Prob"])[0]

    print(f"**{model_name} Correlations:**")
    print(f" - Pearson Correlation: {pearson_corr:.4f}")
    print(f" - Spearman Correlation: {spearman_corr:.4f}")
    print()

    return pearson_corr, spearman_corr

# Compute correlation for all models
corr_lr = compute_corr(predictions_lr, "Logistic Regression")
corr_dt = compute_corr(predictions_dt, "Decision Tree")
corr_rf = compute_corr(predictions_rf, "Random Forest")
corr_gbt = compute_corr(predictions_gbt, "Gradient-Boosted Trees")

# Store results in a dictionary for easy comparison
corr_results = {
    "Logistic Regression": corr_lr,
    "Decision Tree": corr_dt,
    "Random Forest": corr_rf,
    "Gradient-Boosted Trees": corr_gbt
}

# Print best model based on highest correlation
best_corr_model = max(corr_results, key=lambda x: corr_results[x][0])  # Pearson is the primary metric
print(f"Best Model Based on Pearson Correlation: {best_corr_model}")

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType

# Choose best predictions based on the best model selected
if best_model_name == "LogisticRegression":
    best_predictions = predictions_lr
elif best_model_name == "DecisionTree":
    best_predictions = predictions_dt
elif best_model_name == "RandomForest":
    best_predictions = predictions_rf
elif best_model_name == "GBT":
    best_predictions = predictions_gbt

# Define a UDF to convert predicted probability vector to a risk score (0-100)
def prob_to_score(probability):
    return float(probability[1] * 100)
prob_to_score_udf = udf(prob_to_score, DoubleType())

best_predictions = best_predictions.withColumn("RiskScore", prob_to_score_udf(col("probability")))

# Define risk categories based on score thresholds
def risk_category(score):
    if score < 33:
        return "Low"
    elif score < 66:
        return "Medium"
    else:
        return "High"
risk_category_udf = udf(risk_category, StringType())
best_predictions = best_predictions.withColumn("RiskCategory", risk_category_udf(col("RiskScore")))

best_predictions.select("HeartDisease", "RiskScore", "RiskCategory").show(5)

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

best_predictions_pd = best_predictions.select("RiskScore").toPandas()

plt.figure(figsize=(10, 6))
plt.hist(best_predictions_pd["RiskScore"], bins=30, color="skyblue", edgecolor="black", alpha=0.7)
plt.xlabel("Risk Score")
plt.ylabel("Frequency")
plt.title("Distribution of Predicted Risk Scores")
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.show()

In [None]:
risk_category_pd = best_predictions.select("RiskCategory").toPandas()

plt.figure(figsize=(8, 5))
sns.countplot(data=risk_category_pd, x="RiskCategory", palette="coolwarm", order=["Low", "Medium", "High"])
plt.xlabel("Risk Category")
plt.ylabel("Count")
plt.title("Distribution of Risk Categories")
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt 
def get_roc_data(predictions):
    pred_pd = predictions.select("probability", "HeartDisease").toPandas()
    y_true = pred_pd["HeartDisease"]
    y_scores = pred_pd["probability"].apply(lambda x: x[1])
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)
    return fpr, tpr, roc_auc

fpr_lr, tpr_lr, auc_lr = get_roc_data(predictions_lr)
fpr_dt, tpr_dt, auc_dt = get_roc_data(predictions_dt)
fpr_rf, tpr_rf, auc_rf = get_roc_data(predictions_rf)
fpr_gbt, tpr_gbt, auc_gbt = get_roc_data(predictions_gbt)

plt.figure(figsize=(8, 6))
plt.plot(fpr_lr, tpr_lr, label=f"Logistic Regression (AUC = {auc_lr:.2f})")
plt.plot(fpr_dt, tpr_dt, label=f"Decision Tree (AUC = {auc_dt:.2f})")
plt.plot(fpr_rf, tpr_rf, label=f"Random Forest (AUC = {auc_rf:.2f})")
plt.plot(fpr_gbt, tpr_gbt, label=f"GBT (AUC = {auc_gbt:.2f})")

plt.plot([0, 1], [0, 1], linestyle="--", color="gray", label="Random Guessing")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve Comparison")
plt.legend()
plt.grid()
plt.show()