In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sql_f
import pandas as pd

spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName("Project_Learning")\
    .getOrCreate()

sc = spark.sparkContext

In [None]:
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/CSC590_Project/data/combined_data.csv")

In [None]:
df.printSchema()
df.count()

root
 |-- DifficultyWalking: string (nullable = true)
 |-- AlcoholDrinking: string (nullable = true)
 |-- PhysicalHealthDays: integer (nullable = true)
 |-- Smoking: string (nullable = true)
 |-- Diabetic: string (nullable = true)
 |-- AgeCategory: string (nullable = true)
 |-- PhysicalActivity: string (nullable = true)
 |-- HadStroke: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- MentalHealthDays: integer (nullable = true)
 |-- HadAsthma: string (nullable = true)
 |-- HadKidneyDisease: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- GeneralHealth: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- HadHeartAttack: string (nullable = true)
 |-- SleepHours: integer (nullable = true)
 |-- HadSkinCancer: string (nullable = true)



565817

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import math

# List of categorical columns to be indexed
categorical_columns = [
    'DifficultyWalking', 'AlcoholDrinking', 'Smoking', 'Diabetic',
    'AgeCategory', 'PhysicalActivity', 'HadStroke', 'Sex',
    'HadAsthma', 'HadKidneyDisease', 'Race', 'GeneralHealth',
    'HadSkinCancer'
]

# List of categorical columns to be one-hot encoded
one_hot_columns = [
    'Diabetic', 'Race'
]

# List of numerical columns
numerical_columns = [
    'PhysicalHealthDays', 'MentalHealthDays', 'BMI', 'SleepHours'
]

# Explicitly remap the target variable to numeric
df = df.withColumn("HadHeartAttack_Numeric",
    sql_f.when(sql_f.col("HadHeartAttack") == "Yes", 1.0)
    .otherwise(0.0)
)

# Create StringIndexers for categorical columns
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_indexed", handleInvalid="keep")
    for column in categorical_columns
]

# OneHotEncode only Diabetic and Race
ohe = [
    OneHotEncoder(inputCol="Diabetic_indexed", outputCol="Diabetic_ohe"),
    OneHotEncoder(inputCol="Race_indexed", outputCol="Race_ohe")
]

# Combine all features
feature_columns = [col+"_indexed" for col in categorical_columns if col not in ["Diabetic", "Race"]] \
                  + ["Diabetic_ohe", "Race_ohe"] \
                  + numerical_columns

# Assemble features into a single vector column
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features",
    handleInvalid="skip"
)

# address the class imbalance
minority_count = df.filter(sql_f.col("HadHeartAttack_Numeric") == 1).count()
majority_count = df.filter(sql_f.col("HadHeartAttack_Numeric") == 0).count()
total_count = df.count()

# ---------------------------------------
# Different weighting strategies
# ---------------------------------------
# 1. No weighting (all 1.0)
df = df.withColumn("weight_no", sql_f.lit(1.0))

# 2. Ratio weighting (as in your original approach)
ratio = minority_count / float(majority_count)
df = df.withColumn("weight_ratio",
    sql_f.when(sql_f.col("HadHeartAttack_Numeric") == 0, ratio).otherwise(1.0)
)

# 3. Square root ratio
sqrt_ratio = math.sqrt(ratio)

df = df.withColumn("weight_sqrt_ratio",
    sql_f.when(sql_f.col("HadHeartAttack_Numeric") == 0, sqrt_ratio).otherwise(1.0)
)

# 4. Hybrid weighting (ratio and sqrt_ratio)
adjusted_ratio = (ratio + sqrt_ratio) / 2.0
df = df.withColumn("weight_adjusted_ratio",
    sql_f.when(sql_f.col("HadHeartAttack_Numeric") == 0, adjusted_ratio).otherwise(1.0)
)

# 5. Inverse frequency weighting (weights are inverse to class frequency)
#    For binary classification, you can define something like:
#    weight = total_count / (count_of_class * number_of_classes)
#    This gives minority class higher weight.
minority_weight = (total_count / (minority_count * 2.0))
majority_weight = (total_count / (majority_count * 2.0))
df = df.withColumn("weight_invfreq",
    sql_f.when(sql_f.col("HadHeartAttack_Numeric") == 1, minority_weight).otherwise(majority_weight)
)

# 6. Logarithmic ratio
log_ratio = math.log1p(ratio)  # log(1 + ratio)
df = df.withColumn("weight_log_ratio",
    sql_f.when(sql_f.col("HadHeartAttack_Numeric") == 0, log_ratio).otherwise(1.0)
)

# Split the data
train_data, test_data = df.randomSplit([0.7, 0.3], seed=123)

rf = RandomForestClassifier(
    labelCol="HadHeartAttack_Numeric",
    featuresCol="features",
    seed=123
)

pipeline = Pipeline(stages=indexers + ohe + [assembler, rf])

# Create a parameter grid that tries out different weighting columns.
# Note: weightCol should be a column name existing in the DataFrame.
paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [100])
    .addGrid(rf.maxDepth, [10])
    #.addGrid(rf.weightCol, ["weight_no", "weight_ratio", "weight_sqrt_ratio", "weight_adjusted_ratio", "weight_invfreq", "weight_log_ratio"])
    .addGrid(rf.weightCol, ["weight_ratio", "weight_sqrt_ratio", "weight_adjusted_ratio", "weight_invfreq", "weight_log_ratio"])
    .build()
)

evaluator = BinaryClassificationEvaluator(
    labelCol="HadHeartAttack_Numeric",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"
)

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=2,
    seed=123
)

In [None]:
# Fit the cross-validator
cv_model = crossval.fit(train_data)

# Get the best model
best_model = cv_model.bestModel

In [None]:
# Make predictions using the best model
predictions = best_model.transform(test_data)

In [None]:
# Retrieve the average cross-validation metrics for each parameter combination
metrics = cv_model.avgMetrics  # One metric per ParamMap in paramGrid

# Pair each metric with the corresponding parameter set from paramGrid
param_map_and_metrics = list(zip(paramGrid, metrics))

# Create a dictionary to store the best results per weight strategy
best_per_weight = {}

for param_map, metric in param_map_and_metrics:
    # Directly access parameter values from the ParamMap
    numTrees = param_map[rf.numTrees]
    maxDepth = param_map[rf.maxDepth]
    weightCol = param_map[rf.weightCol]

    # If we haven't seen this weightCol yet, or if this result is better than a previous one, update it
    if weightCol not in best_per_weight or metric > best_per_weight[weightCol]["metric"]:
        best_per_weight[weightCol] = {
            "numTrees": numTrees,
            "maxDepth": maxDepth,
            "metric": metric
        }

# Sort the best strategies by their metric in descending order
best_strategies_sorted = sorted(best_per_weight.items(), key=lambda x: x[1]["metric"], reverse=True)

print("Best Result from Each Weight Strategy:")
for weightCol, info in best_strategies_sorted:
    print(f"\nWeight Column: {weightCol}")
    print(f"  numTrees: {info['numTrees']}")
    print(f"  maxDepth: {info['maxDepth']}")
    print(f"  Average CV Metric (Area Under PR): {info['metric']}")

Best Result from Each Weight Strategy:

Weight Column: weight_sqrt_ratio
  numTrees: 100
  maxDepth: 10
  Average CV Metric (Area Under PR): 0.29479956375322686

Weight Column: weight_adjusted_ratio
  numTrees: 100
  maxDepth: 10
  Average CV Metric (Area Under PR): 0.2926725744884703

Weight Column: weight_ratio
  numTrees: 100
  maxDepth: 10
  Average CV Metric (Area Under PR): 0.28584454082850047

Weight Column: weight_log_ratio
  numTrees: 100
  maxDepth: 10
  Average CV Metric (Area Under PR): 0.2856961035175649

Weight Column: weight_invfreq
  numTrees: 100
  maxDepth: 10
  Average CV Metric (Area Under PR): 0.28563687112281244


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Extract and print the best parameters
best_rf_model = best_model.stages[-1]
best_params = best_rf_model.extractParamMap()

print("Best Hyperparameters:")
for param, value in best_params.items():
    if "weightCol" in param.name or "numTrees" in param.name or "maxDepth" in param.name:
        print(f"{param.name}: {value}")

# Evaluate the model
auc_pr = evaluator.evaluate(predictions)
print(f"\nArea Under PR Curve (AUC): {auc_pr}")

# Display the confusion matrix
predictions_and_labels = predictions.select(['prediction', 'HadHeartAttack_Numeric']).rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = MulticlassMetrics(predictions_and_labels)
print("\nConfusion Matrix:")
print(metrics.confusionMatrix().toArray())

for label in [0.0, 1.0]:
    print(f"\nClass {label}:")
    print(f"  Precision: {metrics.precision(label)}")
    print(f"  Recall: {metrics.recall(label)}")
    print(f"  F1 Score: {metrics.fMeasure(label)}")

# Feature Importance
rf_model = best_model.stages[-1]
feature_importances = list(zip(feature_columns, rf_model.featureImportances))
feature_importances.sort(key=lambda x: x[1], reverse=True)

print("\nFeature Importances:")
for feature, importance in feature_importances:
    print(f"{feature}: {importance}")

Best Hyperparameters:
maxDepth: 10
numTrees: 100
weightCol: weight_sqrt_ratio

Area Under PR Curve (AUC): 0.2860954534340103

Confusion Matrix:
[[152020.   5551.]
 [  8788.   3132.]]

Class 0.0:
  Precision: 0.9453509775633053
  Recall: 0.9647714363683673
  F1 Score: 0.9549624818219794

Class 1.0:
  Precision: 0.36070482552113325
  Recall: 0.262751677852349
  F1 Score: 0.30403339319516576

Feature Importances:
AgeCategory_indexed: 0.23444493852467702
HadStroke_indexed: 0.14397770783202324
DifficultyWalking_indexed: 0.1401660608789722
GeneralHealth_indexed: 0.08733611953656205
Race_ohe: 0.07165750568222788
Diabetic_ohe: 0.06451648208327365
Sex_indexed: 0.050396613322974834
Smoking_indexed: 0.03671769641306137
HadKidneyDisease_indexed: 0.029503834830845403
AlcoholDrinking_indexed: 0.019516572940138716
HadSkinCancer_indexed: 0.010125354781691962
PhysicalActivity_indexed: 0.006330038941189653
BMI: 0.005676626307444707
HadAsthma_indexed: 0.002652238578379918
SleepHours: 0.002288468989539327