In [31]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('722_I4').getOrCreate()
from pyspark.sql.functions import mean,when
from pyspark.sql.functions import count, col, desc
from pyspark.sql.functions import sum as spark_sum
from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.functions import vector_to_array

import pandas as pd
import matplotlib.pyplot as plt

In [32]:
df1 = spark.read.csv('Diabetes_1.csv', inferSchema=True, header=True)
df1.printSchema()


                                                                                

root
 |-- ID: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)



In [33]:
df2 = spark.read.csv('Diabetes_2.csv', inferSchema=True, header=True)
df2.printSchema()

                                                                                

root
 |-- ID: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- HbA1c_level: double (nullable = true)
 |-- blood_glucose_level: integer (nullable = true)
 |-- diabetes: integer (nullable = true)



[Stage 106935:>                                                     (0 + 1) / 1]                                                                                

In [34]:
df2 = df2.drop('blood_glucose_level', 'HbA1c_level')
df2.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- diabetes: integer (nullable = true)



In [None]:
missing_counts = df1.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df1.columns])
missing_counts.show()
missing_counts = df2.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df2.columns])
missing_counts.show()
avg_val = df1.agg(mean(df1['age'])).first()[0]
df1 = df1.fillna({'age': avg_val})
mode_val = df1.groupBy('gender').agg(count('gender').alias('count')).orderBy(desc('count')).first()[0]
df1 = df1.fillna({'gender': mode_val})

avg_val = df2.agg(mean(df2['hypertension'])).first()[0]
df2 = df2.fillna({'hypertension': avg_val})
avg_val = df2.agg(mean(df2['heart_disease'])).first()[0]
df2 = df2.fillna({'heart_disease': avg_val})
avg_val = df2.agg(mean(df2['bmi'])).first()[0]
df2 = df2.fillna({'bmi': avg_val})
avg_val = df2.agg(mean(df2['diabetes'])).first()[0]
df2 = df2.fillna({'diabetes': avg_val})
mode_val = df2.groupBy('smoking_history').agg(count('smoking_history').alias('count')).orderBy(desc('count')).first()[0]
df2 = df2.fillna({'smoking_history': mode_val})

print("Number of entries: ", df1.count())
print("Number of entries: ", df2.count())
missing_counts = df1.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df1.columns])
missing_counts.show()
missing_counts = df2.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df2.columns])
missing_counts.show()

                                                                                

+---+------+---+
| ID|gender|age|
+---+------+---+
|  0|   164|157|
+---+------+---+



                                                                                

+---+------------+-------------+---------------+---+--------+
| ID|hypertension|heart_disease|smoking_history|bmi|diabetes|
+---+------------+-------------+---------------+---+--------+
|  0|         161|           95|            253|160|     184|
+---+------------+-------------+---------------+---+--------+



[Stage 107011:(0 + 0) / 2][Stage 107012:(0 + 2) / 2][Stage 107013:(0 + 0) / 1]1]

In [None]:

df2 = df2.filter((df2.bmi <= 100) & (df2.bmi >= 0))
df_pd = df2.toPandas()
plt.scatter(range(len(df_pd['bmi'])), df_pd['bmi'])
plt.title('Scatter plot for BMI')
plt.xlabel('Index')
plt.ylabel('Value')
plt.show()

In [None]:
df = df1.join(df2, 'id', 'inner')
df.printSchema()
print("Number of entries: ", df.count())

In [None]:
age_bins = [0, 2, 4, 13, 20, 30, 50, 65, 81]
age_labels = ["Infants", "Toddlers", "Children", "Teenagers", "Young", "Middle-aged", "Older adults", "Seniors"]

age_column_expr = when(col("age") < age_bins[0], None)  
for i in range(len(age_bins) - 1):
    age_column_expr = age_column_expr.when((col("age") >= age_bins[i]) & (col("age") < age_bins[i + 1]), age_labels[i])

df = df.withColumn("Age group", age_column_expr)

bmi_bins = [0, 18.50, 24.99, 29.99, 34.99, 39.99, 100]
bmi_labels = ["Underweight", "Normal weight", "Overweight", "Obesity I", "Obesity II", "Obesity III"]

bmi_column_expr = when(col("bmi") < bmi_bins[0], None)  
for i in range(len(bmi_bins) - 1):
    bmi_column_expr = bmi_column_expr.when((col("bmi") >= bmi_bins[i]) & (col("bmi") < bmi_bins[i + 1]), bmi_labels[i])

df = df.withColumn("Bmi class", bmi_column_expr)
df.show()

In [None]:
medical_features = ['hypertension', 'heart_disease']

df = df.withColumn("Medical_History_Count", sum(col(feature) for feature in medical_features))
df.printSchema()

In [None]:
indexer = StringIndexer(inputCol="Age group", outputCol="Age group_index")
indexed_model = indexer.fit(df)
df = indexed_model.transform(df)
labels = indexed_model.labels
encoder = OneHotEncoder(inputCols=["Age group_index"], outputCols=["Age group_encoded"], dropLast=False)
df = encoder.fit(df).transform(df)
num_categories_age = df.select("Age group_index").distinct().count()
df = df.withColumn("Age group_array", vector_to_array("Age group_encoded"))
for i, label in enumerate(labels):
    df = df.withColumn(f"Age group_{label}", col("Age group_array")[i].cast("int"))

indexer = StringIndexer(inputCol="Bmi class", outputCol="Bmi class_index")
indexed_model = indexer.fit(df)
df = indexed_model.transform(df)
labels = indexed_model.labels
encoder = OneHotEncoder(inputCols=["Bmi class_index"], outputCols=["Bmi class_encoded"], dropLast=False)
df = encoder.fit(df).transform(df)
num_categories_bmi = df.select("Bmi class_index").distinct().count()
df = df.withColumn("Bmi class_array", vector_to_array("Bmi class_encoded"))
for i, label in enumerate(labels):
    df = df.withColumn(f"Bmi class_{label}", col("Bmi class_array")[i].cast("int"))


indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
indexed_model = indexer.fit(df)
df = indexed_model.transform(df)
labels = indexed_model.labels
encoder = OneHotEncoder(inputCols=["gender_index"], outputCols=["gender_encoded"], dropLast=False)
df = encoder.fit(df).transform(df)
num_categories_gender = df.select("gender_index").distinct().count()
df = df.withColumn("gender_array", vector_to_array("gender_encoded"))
for i, label in enumerate(labels):
    df = df.withColumn(f"gender_{label}", col("gender_array")[i].cast("int"))

indexer = StringIndexer(inputCol="smoking_history", outputCol="smoking_history_index")
indexed_model = indexer.fit(df)
df = indexed_model.transform(df)
labels = indexed_model.labels
encoder = OneHotEncoder(inputCols=["smoking_history_index"], outputCols=["smoking_history_encoded"], dropLast=False)
df = encoder.fit(df).transform(df)
num_categories_smoking = df.select("smoking_history_index").distinct().count()
df = df.withColumn("smoking_history_array", vector_to_array("smoking_history_encoded"))
for i, label in enumerate(labels):
    df = df.withColumn(f"smoking_history_{label}", col("smoking_history_array")[i].cast("int"))

indexer = StringIndexer(inputCol="Medical_History_Count", outputCol="Medical_History_Count_index")
indexed_model = indexer.fit(df)
df = indexed_model.transform(df)
labels = indexed_model.labels
encoder = OneHotEncoder(inputCols=["Medical_History_Count_index"], outputCols=["Medical_History_Count_encoded"], dropLast=False)
df = encoder.fit(df).transform(df)
num_categories_medical = df.select("Medical_History_Count_index").distinct().count()
df = df.withColumn("Medical_History_Count_array", vector_to_array("Medical_History_Count_encoded"))
for i, label in enumerate(labels):
    df = df.withColumn(f"Medical_History_Count_{label}", col("Medical_History_Count_array")[i].cast("int"))

columns_to_drop = ["Age group_index", "Bmi class_index", "gender_index", "smoking_history_index", 
                   "Medical_History_Count_index", "Age group_encoded", "Bmi class_encoded", 
                   "gender_encoded", "smoking_history_encoded", "Medical_History_Count_encoded", 
                   "Age group_array", "Bmi class_array", "gender_array", "smoking_history_array", 
                   "Medical_History_Count_array"]
df = df.drop(*columns_to_drop)


In [None]:

columns_to_keep = ['hypertension','heart_disease','diabetes','Age group_Middle-aged','Age group_Older adults',
                   'Age group_Seniors','Age group_Young','Age group_Children','Age group_Teenagers','Age group_Toddlers',
                   'Age group_Infants','Bmi class_Overweight','Bmi class_Normal weight','Bmi class_Obesity I',
                   'Bmi class_Underweight','Bmi class_Obesity II','Bmi class_Obesity III', 'gender_Female','gender_Male',
                   'gender_Other', 'smoking_history_never','smoking_history_former','smoking_history_current','smoking_history_not current',
                   'smoking_history_ever','Medical_History_Count_0','Medical_History_Count_1','Medical_History_Count_2'
]

df = df.select(columns_to_keep)

In [None]:
df_select = df2[df2['smoking_history'] != 'No Info']
print("Number of entries: ", df_select.count())


In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

input_columns = [col for col in df.columns if col != 'diabetes']
assembler = VectorAssembler(inputCols=input_columns, outputCol="features")
data = assembler.transform(df)

x = data.select("features")
y = data.select("diabetes")

clf = RandomForestClassifier(numTrees=100, labelCol="diabetes", featuresCol="features")
model = clf.fit(data)

importances = model.featureImportances
feature_importance = dict(zip(input_columns, importances))

sorted_feature_importance = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
for feature, importance in sorted_feature_importance:
    print(f"{feature}: {importance}")

In [None]:
non_important_features = ['Age group_Toddlers', 'Age group_Infants', 'gender_Other']
for feature in non_important_features:
    df = df.drop(feature)
df.columns

In [None]:
diabetes_counts = df.groupBy('diabetes').count().collect()
print({row['diabetes']: row['count'] for row in diabetes_counts})

df_high = df.filter(col('diabetes') == 0)
df_low = df.filter(col('diabetes') == 1)
fraction = df_low.count() / df_high.count()
df_high_reduce = df_high.sample(False, fraction)
df_reduced = df_low.union(df_high_reduce)

reduced_diabetes_counts = df_reduced.groupBy('diabetes').count().collect()
print({row['diabetes']: row['count'] for row in reduced_diabetes_counts})


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import DecisionTreeClassifier

df_reduced = df_reduced.withColumn("diabetes", col("diabetes").cast("double"))
feature_cols = list(df_reduced.columns)
feature_cols.remove("diabetes")

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data, test_data = df_reduced.randomSplit([0.8, 0.2], seed=42)

dt_classifier = DecisionTreeClassifier(labelCol="diabetes", featuresCol="features")
pipeline = Pipeline(stages=[assembler, dt_classifier])
dt_model = pipeline.fit(train_data)
predictions_dt = dt_model.transform(test_data)

accuracy_dt = MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy").evaluate(predictions_dt)

predictionLabels_dt = predictions_dt.select("prediction", "diabetes").rdd
metrics_dt = MulticlassMetrics(predictionLabels_dt)

precision_0_dt = metrics_dt.precision(label=0.0)
recall_0_dt = metrics_dt.recall(label=0.0)
f1Score_0_dt = metrics_dt.fMeasure(label=0.0)
precision_1_dt = metrics_dt.precision(label=1.0)
recall_1_dt = metrics_dt.recall(label=1.0)
f1Score_1_dt = metrics_dt.fMeasure(label=1.0)

print(f"Decision Tree - Accuracy: {accuracy_dt:.4f}")
print(f"Decision Tree - 0 - Precision: {precision_0_dt:.4f}")
print(f"Decision Tree - 0 - Recall: {recall_0_dt:.4f}")
print(f"Decision Tree - 0 - F1 Score: {f1Score_0_dt:.4f}")
print(f"Decision Tree - 1 - Precision: {precision_1_dt:.4f}")
print(f"Decision Tree - 1 - Recall: {recall_1_dt:.4f}")
print(f"Decision Tree - 1 - F1 Score: {f1Score_1_dt:.4f}")


In [None]:
from pyspark.ml.classification import RandomForestClassifier

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data, test_data = df_reduced.randomSplit([0.8, 0.2], seed=42)

rf_classifier = RandomForestClassifier(labelCol="diabetes", featuresCol="features")
pipeline = Pipeline(stages=[assembler, rf_classifier])
rf_model = pipeline.fit(train_data)

predictions_rf = rf_model.transform(test_data)

accuracy_rf = MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy").evaluate(predictions_rf)

predictionLabels_rf = predictions_rf.select("prediction", "diabetes").rdd
metrics_rf = MulticlassMetrics(predictionLabels_rf)

precision_0_rf = metrics_rf.precision(label=0.0)
recall_0_rf = metrics_rf.recall(label=0.0)
f1Score_0_rf = metrics_rf.fMeasure(label=0.0)
precision_1_rf = metrics_rf.precision(label=1.0)
recall_1_rf = metrics_rf.recall(label=1.0)
f1Score_1_rf = metrics_rf.fMeasure(label=1.0)

print(f"Random Forest - Accuracy: {accuracy_rf:.4f}")
print(f"Random Forest - 0 - Precision: {precision_0_rf:.4f}")
print(f"Random Forest - 0 - Recall: {recall_0_rf:.4f}")
print(f"Random Forest - 0 - F1 Score: {f1Score_0_rf:.4f}")
print(f"Random Forest - 1 - Precision: {precision_1_rf:.4f}")
print(f"Random Forest - 1 - Recall: {recall_1_rf:.4f}")
print(f"Random Forest - 1 - F1 Score: {f1Score_1_rf:.4f}")


In [None]:
from pyspark.ml.classification import GBTClassifier

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data, test_data = df_reduced.randomSplit([0.8, 0.2], seed=42)

gbt_classifier = GBTClassifier(labelCol="diabetes", featuresCol="features")
pipeline = Pipeline(stages=[assembler, gbt_classifier])
gbt_model = pipeline.fit(train_data)

predictions_gbt = gbt_model.transform(test_data)

accuracy_gbt = MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy").evaluate(predictions_gbt)

predictionLabels_gbt = predictions_gbt.select("prediction", "diabetes").rdd
metrics_gbt = MulticlassMetrics(predictionLabels_gbt)

precision_0_gbt = metrics_gbt.precision(label=0.0)
recall_0_gbt = metrics_gbt.recall(label=0.0)
f1Score_0_gbt = metrics_gbt.fMeasure(label=0.0)
precision_1_gbt = metrics_gbt.precision(label=1.0)
recall_1_gbt = metrics_gbt.recall(label=1.0)
f1Score_1_gbt = metrics_gbt.fMeasure(label=1.0)

print(f"Gradient Boosted Trees - Accuracy: {accuracy_gbt:.4f}")
print(f"Gradient Boosted Trees - 0 - Precision: {precision_0_gbt:.4f}")
print(f"Gradient Boosted Trees - 0 - Recall: {recall_0_gbt:.4f}")
print(f"Gradient Boosted Trees - 0 - F1 Score: {f1Score_0_gbt:.4f}")
print(f"Gradient Boosted Trees - 1 - Precision: {precision_1_gbt:.4f}")
print(f"Gradient Boosted Trees - 1 - Recall: {recall_1_gbt:.4f}")
print(f"Gradient Boosted Trees - 1 - F1 Score: {f1Score_1_gbt:.4f}")


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data, test_data = df_reduced.randomSplit([0.8, 0.2], seed=42)

gbt_classifier = GBTClassifier(labelCol="diabetes", featuresCol="features")
pipeline = Pipeline(stages=[assembler, gbt_classifier])

paramGrid = (ParamGridBuilder()
             .addGrid(gbt_classifier.maxDepth, [2, 4, 6])
             .addGrid(gbt_classifier.maxIter, [10, 20])
             .addGrid(gbt_classifier.stepSize, [0.1, 0.01])
             .build())

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy"),
                          numFolds=5)
cvModel = crossval.fit(train_data)


bestPipeline = cvModel.bestModel
bestGBT = bestPipeline.stages[1]
print("Best Max Depth: ", bestGBT._java_obj.getMaxDepth())
print("Best Max Iterations: ", bestGBT._java_obj.getMaxIter())
print("Best Step Size: ", bestGBT._java_obj.getStepSize())


In [None]:
predictions_gbt = cvModel.transform(test_data)

accuracy_gbt = MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy").evaluate(predictions_gbt)

predictionLabels_gbt = predictions_gbt.select("prediction", "diabetes").rdd
metrics_gbt = MulticlassMetrics(predictionLabels_gbt)

precision_0_gbt = metrics_gbt.precision(label=0.0)
recall_0_gbt = metrics_gbt.recall(label=0.0)
f1Score_0_gbt = metrics_gbt.fMeasure(label=0.0)
precision_1_gbt = metrics_gbt.precision(label=1.0)
recall_1_gbt = metrics_gbt.recall(label=1.0)
f1Score_1_gbt = metrics_gbt.fMeasure(label=1.0)

print(f"Gradient Boosted Trees - Accuracy: {accuracy_gbt:.4f}")
print(f"Gradient Boosted Trees - 0 - Precision: {precision_0_gbt:.4f}")
print(f"Gradient Boosted Trees - 0 - Recall: {recall_0_gbt:.4f}")
print(f"Gradient Boosted Trees - 0 - F1 Score: {f1Score_0_gbt:.4f}")
print(f"Gradient Boosted Trees - 1 - Precision: {precision_1_gbt:.4f}")
print(f"Gradient Boosted Trees - 1 - Recall: {recall_1_gbt:.4f}")
print(f"Gradient Boosted Trees - 1 - F1 Score: {f1Score_1_gbt:.4f}")