In [1]:
import pyspark
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.pandas as ps
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, count, lit, min, max, mean, stddev
from pyspark.sql.functions import monotonically_increasing_id

from pyspark.ml.stat import Correlation, ChiSquareTest
from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes, MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LinearSVC

from ucimlrepo import fetch_ucirepo 
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"




In [2]:
# fetch dataset 
cdc_diabetes_health_indicators = fetch_ucirepo(id=891) 
  
# data (as pandas dataframes) 
X = cdc_diabetes_health_indicators.data.features[:100000]
y = cdc_diabetes_health_indicators.data.targets[:100000]

# Remove duplicate rows
combined = pd.concat([X, y], axis=1).drop_duplicates()

# Check for identical X with different y and remove them
inconsistent_indices = combined[combined.duplicated(subset=combined.columns[:-1], keep=False) & combined.duplicated(subset=[combined.columns[-1]], keep=False)].index
if not inconsistent_indices.empty:
    combined = combined.drop(inconsistent_indices)

# Separate features and target after cleaning
X = combined.iloc[:, :-1]
y = pd.DataFrame(combined.iloc[:, -1], columns=['Diabetes_binary'])

In [3]:
from pyspark import SparkConf
# Initialize SparkSession
conf = SparkConf() \
        .setAppName("CDC Diabetes Health Indicators") \
        .setMaster("local[64]") \
        .set("spark.executor.memory", "4g") \
        .set("spark.driver.memory", "4g") \
        .set("spark.executor.cores", "2")  # Set number of cores per executor

# Create Spark session with custom configuration
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
# Converting a pandas DataFrame to a PySpark DataFrame
# X is the feature, y is the target variable
X_pyspark = spark.createDataFrame(X)
y_pyspark = spark.createDataFrame(y)

# Showing the Architecture of a PySpark DataFrame
X_pyspark.printSchema()
y_pyspark.printSchema()

24/10/11 17:51:10 WARN Utils: Your hostname, lyudmil-ROG-Zephyrus-M16-GU603ZX-GU603ZX resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlo1)
24/10/11 17:51:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/11 17:51:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- HighBP: long (nullable = true)
 |-- HighChol: long (nullable = true)
 |-- CholCheck: long (nullable = true)
 |-- BMI: long (nullable = true)
 |-- Smoker: long (nullable = true)
 |-- Stroke: long (nullable = true)
 |-- HeartDiseaseorAttack: long (nullable = true)
 |-- PhysActivity: long (nullable = true)
 |-- Fruits: long (nullable = true)
 |-- Veggies: long (nullable = true)
 |-- HvyAlcoholConsump: long (nullable = true)
 |-- AnyHealthcare: long (nullable = true)
 |-- NoDocbcCost: long (nullable = true)
 |-- GenHlth: long (nullable = true)
 |-- MentHlth: long (nullable = true)
 |-- PhysHlth: long (nullable = true)
 |-- DiffWalk: long (nullable = true)
 |-- Sex: long (nullable = true)
 |-- Age: long (nullable = true)
 |-- Education: long (nullable = true)
 |-- Income: long (nullable = true)

root
 |-- Diabetes_binary: long (nullable = true)



In [4]:
USED_FEATURES = ['Diet', 'cardiovascular', 'unhealthy_behavior', 'healthcare', 'Fruits', 'Veggies', 'HighChol', 'HighBP', 'Smoker', 'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost']

In [5]:
# Summing Fruits and Veggies to indicate the healthiness of a diet
# Summing HighChol and HighBP to indicate overall cardiovascular risk
# Summing Smoker and HvyAlcoholConsump to indicate healthcare accessibility

X_pyspark = X_pyspark.withColumn('Diet', F.col('Fruits') + F.col('Veggies'))

X_pyspark = X_pyspark.withColumn('cardiovascular', F.col('HighChol')  + F.col('HighBP'))

X_pyspark = X_pyspark.withColumn('unhealthy_behavior', F.col('Smoker') + F.col('HvyAlcoholConsump'))

In [6]:
# 0：no healthcare and with cost problem。
# 1：no healthcare and no cost problem。
# 2：with healthcare but with cost problem。
# 3：with healthcare and no cost problem。

X_pyspark = X_pyspark.withColumn('healthcare',
    F.when((F.col('AnyHealthcare') == 1) & (F.col('NoDocbcCost') == 0), 3)
     .when((F.col('AnyHealthcare') == 1) & (F.col('NoDocbcCost') == 1), 2)
     .when((F.col('AnyHealthcare') == 0) & (F.col('NoDocbcCost') == 0), 1)
     .when((F.col('AnyHealthcare') == 0) & (F.col('NoDocbcCost') == 1), 0)
     .otherwise(-1))


In [7]:
# Min-Max Scaling: GenHlth, Age, Education, Income, BMI, MentHlth, PhysHlth, Diet, cardiovascular, unhealthy_behavior, healthcare
min_max_features = ["GenHlth", "Age", "Education", "Income", "BMI", "MentHlth", "PhysHlth", "Diet", "cardiovascular", "unhealthy_behavior", "healthcare"]

for feature in min_max_features:
    min_val = X_pyspark.agg(min(col(feature))).collect()[0][0]
    max_val = X_pyspark.agg(max(col(feature))).collect()[0][0]
    X_pyspark = X_pyspark.withColumn(
        feature,
        (col(feature) - min_val) / (max_val - min_val)
    )

# Standardization: BMI, MentHlth, PhysHlth
# standardize_features = ["BMI", "MentHlth", "PhysHlth"]

# for feature in standardize_features:
#     mean_val = X_pyspark.agg(mean(col(feature))).collect()[0][0]
#     stddev_val = X_pyspark.agg(stddev(col(feature))).collect()[0][0]
#     X_pyspark = X_pyspark.withColumn(
#         feature,
#         (col(feature) - mean_val) / stddev_val
#     )


                                                                                

In [8]:
X_pyspark.columns[:-4]

['HighBP',
 'HighChol',
 'CholCheck',
 'BMI',
 'Smoker',
 'Stroke',
 'HeartDiseaseorAttack',
 'PhysActivity',
 'Fruits',
 'Veggies',
 'HvyAlcoholConsump',
 'AnyHealthcare',
 'NoDocbcCost',
 'GenHlth',
 'MentHlth',
 'PhysHlth',
 'DiffWalk',
 'Sex',
 'Age',
 'Education',
 'Income']

In [9]:
X_pyspark.columns[-4:]

['Diet', 'cardiovascular', 'unhealthy_behavior', 'healthcare']

In [10]:
X_pyspark_without_used_features = X_pyspark.drop(*USED_FEATURES)
X_pyspark_without_used_features.show()

+---------+-------------------+------+--------------------+------------+-------+-------------------+-------------------+--------+---+-------------------+---------+-------------------+
|CholCheck|                BMI|Stroke|HeartDiseaseorAttack|PhysActivity|GenHlth|           MentHlth|           PhysHlth|DiffWalk|Sex|                Age|Education|             Income|
+---------+-------------------+------+--------------------+------------+-------+-------------------+-------------------+--------+---+-------------------+---------+-------------------+
|        1|0.32558139534883723|     0|                   0|           0|    1.0|                0.6|                0.5|       1|  0| 0.6666666666666666|      0.6| 0.2857142857142857|
|        0| 0.1511627906976744|     0|                   0|           1|    0.5|                0.0|                0.0|       0|  0|                0.5|      1.0|                0.0|
|        1|0.18604651162790697|     0|                   0|           0|    1.0|

In [11]:
assembler = VectorAssembler(inputCols=X_pyspark_without_used_features.columns, outputCol="features")
X_vector = assembler.transform(X_pyspark).select("features")
pca = PCA(k=6, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(X_vector)

# Transform the dataset with the PCA model
X_pca = pca_model.transform(X_vector)

                                                                                

In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F

# Assuming you have the PCA model `pca_model` and the original feature names

# Step 1: Extract PCA loadings
pca_loadings = pca_model.pc.toArray()  # Eigenvectors for PCA
columns = X_pyspark_without_used_features.columns  # Original feature columns

# Step 2: Create a DataFrame for PCA loadings
loadings_data = [(float(pca_loadings[i][j]), columns[j], f"PC{i+1}") for i in range(pca_loadings.shape[0]) for j in range(pca_loadings.shape[1])]
loadings_df = spark.createDataFrame(loadings_data, ["loading", "feature", "principal_component"])

# Step 3: Calculate absolute contributions
loadings_df = loadings_df.withColumn("abs_loading", F.abs(loadings_df["loading"]))

# Step 4: Sort by principal component and then by absolute loading
sorted_loadings_df = loadings_df.orderBy("abs_loading", ascending=[False])

# Show the sorted contributions
sorted_loadings_df.show(30, truncate=False)


+--------------------+--------------------+-------------------+-------------------+
|loading             |feature             |principal_component|abs_loading        |
+--------------------+--------------------+-------------------+-------------------+
|0.9290178552998998  |BMI                 |PC10               |0.9290178552998998 |
|0.8360084376199224  |Stroke              |PC5                |0.8360084376199224 |
|0.7580561343492052  |PhysActivity        |PC13               |0.7580561343492052 |
|0.7276552943594699  |HeartDiseaseorAttack|PC4                |0.7276552943594699 |
|0.5144842424645347  |CholCheck           |PC9                |0.5144842424645347 |
|-0.4973506109055958 |CholCheck           |PC5                |0.4973506109055958 |
|0.4942905799906531  |PhysActivity        |PC9                |0.4942905799906531 |
|0.488599594581108   |HeartDiseaseorAttack|PC11               |0.488599594581108  |
|0.43826028204976203 |GenHlth             |PC4                |0.43826028204

In [13]:
result_features = sorted_loadings_df.dropDuplicates(["feature"]).select("feature").limit(6)

# Show the result
result_features.show(truncate=False)

+--------------------+
|feature             |
+--------------------+
|CholCheck           |
|BMI                 |
|Stroke              |
|HeartDiseaseorAttack|
|GenHlth             |
|PhysActivity        |
+--------------------+



In [14]:
important_features = [row['feature'] for row in result_features.select('feature').collect()]
important_features

['CholCheck',
 'BMI',
 'Stroke',
 'HeartDiseaseorAttack',
 'GenHlth',
 'PhysActivity']

In [38]:
# pca_df = X_pca.select('pca_features')

# # Define a function to extract the elements of the vector and create new columns
# def extract_components(row):
#     return [float(x) for x in row[0]]  # Extracting values from the vector

# # Use rdd and map to convert the pca_features to separate columns
# pca_components = pca_df.rdd.map(extract_components)

# # Create a new DataFrame with the principal components
# pca_X_pyspark = spark.createDataFrame(pca_components, schema=[f'PC{i+1}' for i in range(4)])

# # Show the new DataFrame with principal components
# pca_X_pyspark.show(truncate=False)

In [15]:
# Select only the desired columns from X_pyspark
X_subset = X_pyspark.select(*important_features,'Diet', 'cardiovascular', 'unhealthy_behavior', 'healthcare')

from pyspark.sql.functions import monotonically_increasing_id

# Show the result
X_subset.show()


+---------+-------------------+------+--------------------+-------+------------+----+--------------+------------------+------------------+
|CholCheck|                BMI|Stroke|HeartDiseaseorAttack|GenHlth|PhysActivity|Diet|cardiovascular|unhealthy_behavior|        healthcare|
+---------+-------------------+------+--------------------+-------+------------+----+--------------+------------------+------------------+
|        1|0.32558139534883723|     0|                   0|    1.0|           0| 0.5|           1.0|               0.5|               1.0|
|        0| 0.1511627906976744|     0|                   0|    0.5|           1| 0.0|           0.0|               0.5|               0.0|
|        1|0.18604651162790697|     0|                   0|    1.0|           0| 0.5|           1.0|               0.0|0.6666666666666666|
|        1| 0.1744186046511628|     0|                   0|   0.25|           1| 1.0|           0.5|               0.0|               1.0|
|        1|0.13953488372093

In [16]:
y_pyspark = y_pyspark.withColumn("index", monotonically_increasing_id())
X_subset = X_subset.withColumn("index", monotonically_increasing_id())

# Perform the join on the index
merged_df = X_subset.join(y_pyspark, on="index", how="inner")

# Drop the index column if it's no longer needed
merged_df = merged_df.drop("index")

train_data, test_data = merged_df.randomSplit([0.8, 0.2], seed=1234)

In [17]:
merged_df.columns

['CholCheck',
 'BMI',
 'Stroke',
 'HeartDiseaseorAttack',
 'GenHlth',
 'PhysActivity',
 'Diet',
 'cardiovascular',
 'unhealthy_behavior',
 'healthcare',
 'Diabetes_binary']

In [18]:
# Assemble the features
assembler = VectorAssembler(inputCols=merged_df.columns[:-1], outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

In [19]:
train_data.show()

                                                                                

+---------+--------------------+------+--------------------+-------+------------+----+--------------+------------------+------------------+---------------+--------------------+
|CholCheck|                 BMI|Stroke|HeartDiseaseorAttack|GenHlth|PhysActivity|Diet|cardiovascular|unhealthy_behavior|        healthcare|Diabetes_binary|            features|
+---------+--------------------+------+--------------------+-------+------------+----+--------------+------------------+------------------+---------------+--------------------+
|        0|0.023255813953488372|     0|                   0|    0.5|           1| 1.0|           0.0|               0.5|               1.0|              0|[0.0,0.0232558139...|
|        0| 0.03488372093023256|     0|                   1|    0.5|           0| 1.0|           0.0|               0.5|               0.0|              0|(10,[1,3,4,6,8],[...|
|        0|0.046511627906976744|     0|                   0|   0.25|           1| 0.0|           0.0|              

In [20]:
train_data.columns

['CholCheck',
 'BMI',
 'Stroke',
 'HeartDiseaseorAttack',
 'GenHlth',
 'PhysActivity',
 'Diet',
 'cardiovascular',
 'unhealthy_behavior',
 'healthcare',
 'Diabetes_binary',
 'features']

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

def get_hyperparameters(model):
    best_model = model.bestModel

    # Extract and print the best hyperparameters
    best_params = best_model.extractParamMap()

    # Print each parameter and its value
    for param, value in best_params.items():
        print(f"{param.name}: {value}")

        
def get_metrics(predictions):
    # Accuracy
    accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Diabetes_binary", predictionCol="prediction", metricName="accuracy")
    accuracy = accuracy_evaluator.evaluate(predictions)

    # F1-Score
    f1_evaluator = MulticlassClassificationEvaluator(labelCol="Diabetes_binary", predictionCol="prediction", metricName="f1")
    f1_score = f1_evaluator.evaluate(predictions)

    # Recall
    recall_evaluator = MulticlassClassificationEvaluator(labelCol="Diabetes_binary", predictionCol="prediction", metricName="weightedRecall")
    recall = recall_evaluator.evaluate(predictions)

    # Precision
    precision_evaluator = MulticlassClassificationEvaluator(labelCol="Diabetes_binary", predictionCol="prediction", metricName="weightedPrecision")
    precision = precision_evaluator.evaluate(predictions)

    # Step 3: Print the evaluation results
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1-Score: {f1_score:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"Precision: {precision:.4f}")


In [57]:
# Logistic Regression
lr = LogisticRegression(labelCol="Diabetes_binary", featuresCol="features")

# Param grid for hyperparameter tuning
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross-validation
cv_lr = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid_lr,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5, parallelism=64)

# Fit model
lr_model = cv_lr.fit(train_data)

                                                                                00]

In [58]:
get_hyperparameters(lr_model)

aggregationDepth: 2
elasticNetParam: 0.0
family: auto
featuresCol: features
fitIntercept: True
labelCol: Diabetes_binary
maxBlockSizeInMB: 0.0
maxIter: 100
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
regParam: 0.01
standardization: True
threshold: 0.5
tol: 1e-06


In [60]:
lr_predictions = lr_model.transform(test_data)
get_metrics(lr_predictions)

Accuracy: 0.8615
F1-Score: 0.8217
Recall: 0.8615
Precision: 0.8246


In [49]:
# Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="Diabetes_binary", featuresCol="features")

paramGrid_dt = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 20]) \
    .addGrid(dt.maxBins, [32, 64]) \
    .build()

cv_dt = CrossValidator(estimator=dt,
                    estimatorParamMaps=paramGrid_dt,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5, parallelism=64)

dt_model = cv_dt.fit(train_data)


                                                                                0]0]]]

In [59]:
get_hyperparameters(dt_model)

cacheNodeIds: False
checkpointInterval: 10
featuresCol: features
impurity: gini
labelCol: Diabetes_binary
leafCol: 
maxBins: 64
maxDepth: 10
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 8624640193437594967


In [61]:
dt_predictions = dt_model.transform(test_data)
get_metrics(dt_predictions)

Accuracy: 0.8603
F1-Score: 0.8258
Recall: 0.8603
Precision: 0.8242


In [64]:
# Random Forest Classifier
rf = RandomForestClassifier(labelCol="Diabetes_binary", featuresCol="features")

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .build()

cv_rf = CrossValidator(estimator=rf,
                    estimatorParamMaps=paramGrid_rf,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5, parallelism=32)

rf_model = cv_rf.fit(train_data)

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
get_hyperparameters(rf_model)

In [None]:
rf_predictions = rf_model.transform(test_data)
get_metrics(rf_predictions)

In [None]:
# Gradient-Boosted Tree Classifier
gbt = GBTClassifier(labelCol="Diabetes_binary", featuresCol="features", maxIter=100)

paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 20]) \
    .build()

cv_gbt = CrossValidator(estimator=gbt,
                    estimatorParamMaps=paramGrid_gbt,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5, parallelism=64)

gbt_model = cv_gbt.fit(train_data)


                                                                                00]]]0]

In [None]:
get_hyperparameters(gbt_model)

In [None]:
gbt_predictions = gbt_model.transform(test_data)
get_metrics(gbt_predictions)

In [None]:
# Support Vector Machine
svm = LinearSVC(labelCol="Diabetes_binary", featuresCol="features")

paramGrid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .build()

cv = CrossValidator(estimator=svm,
                    estimatorParamMaps=paramGrid,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5,  parallelism=64)

svm_model = cv.fit(train_data)


In [None]:
get_hyperparameters(svm_model)

In [None]:
svm_predictions = svm_model.transform(test_data)
get_metrics(svm_predictions)

In [None]:
# Naive Bayes Classifier
nb = NaiveBayes(labelCol="Diabetes_binary", featuresCol="features")

paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0.5, 1.0, 1.5]) \
    .build()

cv_nb = CrossValidator(estimator=nb,
                    estimatorParamMaps=paramGrid_nb,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5,  parallelism=64)

nb_model = cv_nb.fit(train_data)


In [None]:
get_hyperparameters(nb_model)

In [None]:
nb_predictions = nb_model.transform(test_data)
get_metrics(nb_predictions)

In [None]:
# MLP classifier
layers = 3
mlp = MultilayerPerceptronClassifier(labelCol="Diabetes_binary", featuresCol="features", layers=layers, seed=1234)

paramGrid_mlp = ParamGridBuilder() \
    .addGrid(mlp.layers, [[len(X_pyspark.columns), 5, 4, 2], [len(X_pyspark.columns), 10, 5, 2]]) \
    .build()

cv_mlp = CrossValidator(estimator=mlp,
                    estimatorParamMaps=paramGrid_mlp,
                    evaluator=BinaryClassificationEvaluator(labelCol="Diabetes_binary"),
                    numFolds=5,  parallelism=64)

mlp_model = cv_mlp.fit(train_data)

In [None]:
get_hyperparameters(mlp_model)

In [None]:
mlp_predictions = mlp_model.transform(test_data)
get_metrics(mlp_predictions)