# Import libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, LongType, StringType, DoubleType
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, when , isnan, count, udf
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, LinearSVC, DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, Evaluator
import pyspark.sql.functions as F
from itertools import combinations
import os

# import requests
# import re

# Set up

In [2]:
DATA_FOLDER = "../data"

# 1. Load Data

In [3]:
def read_data(spark: SparkSession) -> DataFrame:
    """
    read data; since the data has the header we let spark guess the schema
    """
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(os.path.join(DATA_FOLDER, "heart_disease.csv"))
    print(df)
    return df

# 2. Clean Data

In [4]:
def data_cleaning(df: DataFrame):
    columns = ['age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'smoke', 'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 'exang', 'oldpeak', 'slope', 'target']
    df = df.select(columns)
    df = df.dropna(subset=["target"])
    df = df.withColumn("trestbps", when(F.col("trestbps") < 100, 100).otherwise(F.col("trestbps")))
    df = df.withColumn("oldpeak", when(F.col("oldpeak") < 0, 0).when(F.col("oldpeak") > 4, 4).otherwise(F.col("oldpeak")))
    for col_name in ["painloc", "painexer", 'fbs', 'prop', 'nitr', 'pro', 'diuretic']:
        df = df.withColumn(col_name, when(F.col(col_name) < 0, 0).when(F.col(col_name) > 1, 1).otherwise(F.col(col_name)))
    df = df.withColumn("age", df["age"].cast(IntegerType()))
    print(df)

    # def fetch_html(url):
    #     response = requests.get(url)
    #     return response.text

    # fetch_html_udf = udf(fetch_html, StringType())

    # # Fetch HTML content for ABS and CDC data
    # abs_url = "https://www.abs.gov.au/statistics/health/health-conditions-and-risks/smoking-and-vaping/latest-release"
    # cdc_url = "https://www.cdc.gov/tobacco/data_statistics/fact_sheets/adult_data/cig_smoking/index.htm"

    # abs_html = fetch_html(abs_url)
    # cdc_html = fetch_html(cdc_url)

    # # Function to parse the ABS data
    # def parse_abs_data(html):
    #     pattern = re.compile(r'<tr>(.*?)</tr>', re.DOTALL)
    #     rows = pattern.findall(html)
    #     data = []
    #     for row in rows:
    #         cols = re.findall(r'<td>(.*?)</td>', row)
    #         if cols:
    #             age_range = re.findall(r'\d+', cols[0])
    #             percentage = re.findall(r'\d+\.\d+', cols[1])
    #             if age_range and percentage:
    #                 data.append({"Age Range": age_range, "2022 (%)": float(percentage[0])})
    #     return data

    # # Function to parse the CDC data
    # def parse_cdc_data(html):
    #     pattern = re.compile(r'<li>(.*?)</li>', re.DOTALL)
    #     items = pattern.findall(html)
    #     age_data = []
    #     sex_data = []
    #     for item in items:
    #         if 'aged' in item:
    #             age_range = re.findall(r'\d+', item)
    #             percentage = re.findall(r'\d+\.\d+', item)
    #             if age_range and percentage:
    #                 age_data.append({"Age Range": age_range, "2022 (%)": float(percentage[0])})
    #         if 'Male' in item or 'Female' in item:
    #             sex = 'Male' if 'Male' in item else 'Female'
    #             percentage = re.findall(r'\d+\.\d+', item)
    #             if percentage:
    #                 sex_data.append({"Sex": sex, "2022 (%)": float(percentage[0])})
    #     return age_data, sex_data

    # # Parse the HTML content
    # abs_data = parse_abs_data(abs_html)
    # cdc_age_data, cdc_sex_data = parse_cdc_data(cdc_html)

    # # Broadcast the data
    # abs_data_broadcast = spark.sparkContext.broadcast(abs_data)
    # cdc_age_data_broadcast = spark.sparkContext.broadcast(cdc_age_data)
    # cdc_sex_data_broadcast = spark.sparkContext.broadcast(cdc_sex_data)


    # def age_to_smoke_percent_udf(age):
    #     abs_data = abs_data_broadcast.value
    #     if age is None:
    #         return None
    #     for row in abs_data:
    #         age_range = row["Age Range"]
    #         if len(age_range) == 2:
    #             if int(age) >= int(age_range[0]) and int(age) <= int(age_range[1]):
    #                 return float(row['2022 (%)'])
    #         elif len(age_range) == 1:
    #             if int(age) >= int(age_range[0]):
    #                 return float(row['2022 (%)'])
    #     return None

    # def cdc_age_sex_smoke_udf(age, sex):
    #     cdc_age_data = cdc_age_data_broadcast.value
    #     cdc_sex_data = cdc_sex_data_broadcast.value
    #     if age is None or sex is None:
    #         return None
    #     age_rate = None
    #     for row in cdc_age_data:
    #         age_range = row['Age Range']
    #         if len(age_range) == 2:
    #             if int(age) >= int(age_range[0]) and int(age) <= int(age_range[1]):
    #                 age_rate = float(row['2022 (%)'])
    #         elif len(age_range) == 1:
    #             if int(age) >= int(age_range[0]):
    #                 age_rate = float(row['2022 (%)'])
    #     if sex == 0:
    #         sex_rate = float(cdc_sex_data[0]['2022 (%)'])
    #     else:
    #         female_rate = float(cdc_sex_data[1]['2022 (%)'])
    #         male_rate = float(cdc_sex_data[0]['2022 (%)'])
    #         sex_rate = age_rate * (male_rate / female_rate)
    #     return sex_rate

    # def smoke_transform_udf(x):
    #     if x is None:
    #         return None
    #     else:
    #         x = float(x)
    #         return 1 if x >= 12 else 0

    # def impute_smoke_transform_udf(smoke, abs_transformed, cdc_transformed):
    #     if smoke is None:
    #         return 1 if abs_transformed == 1 or cdc_transformed == 1 else 0
    #     return smoke

    # # Register UDFs
    # age_to_smoke_percent_udf = udf(age_to_smoke_percent_udf, DoubleType())
    # cdc_age_sex_smoke_udf = udf(cdc_age_sex_smoke_udf, DoubleType())
    # smoke_transform_udf = udf(smoke_transform_udf, DoubleType())
    # impute_smoke_transform_udf = udf(impute_smoke_transform_udf, DoubleType())

    # # Apply UDFs to DataFrame
    # df = df.withColumn("abs_smoke", age_to_smoke_percent_udf(F.col("age")))
    # df = df.withColumn("cdc_smoke", cdc_age_sex_smoke_udf(F.col("age"), F.col("sex")))
    # df = df.withColumn("abs_transformed", smoke_transform_udf(F.col("abs_smoke")))
    # df = df.withColumn("cdc_transformed", smoke_transform_udf(F.col("cdc_smoke")))
    # df = df.withColumn("smoke", impute_smoke_transform_udf(F.col("smoke"), F.col("abs_transformed"), F.col("cdc_transformed")))

    return df

# 3., 4., and 5.: Split Data into Train and Test, Train binary classification models on data, Select Final Model

In [5]:
def pipeline(df: DataFrame):

    """
    every attribute that is numeric is non-categorical; this is questionable
    """

    numeric_features = [f.name for f in df.schema.fields if isinstance(f.dataType, (DoubleType, FloatType, IntegerType, LongType))]
   
    numeric_features.remove("target")

    imputed_columns_numeric = [f"Imputed{v}" for v in numeric_features]
    imputer_numeric = Imputer(inputCols=numeric_features, outputCols=imputed_columns_numeric, strategy="mean")


    assembler = VectorAssembler(
        inputCols=imputed_columns_numeric, 
        outputCol="features"
    )


    rf = RandomForestClassifier(labelCol='target', featuresCol='features')
    lr = LogisticRegression(labelCol='target', featuresCol='features', maxIter=1000)
    svc = LinearSVC(labelCol='target', featuresCol='features')
    dt = DecisionTreeClassifier(labelCol='target', featuresCol='features')
    class_dict = {"RandFor": rf, "LogReg": lr, "SVC": svc, "DecTree": dt}

    PG_rf = ParamGridBuilder().addGrid(rf.numTrees, [20, 50]).build()
    PG_lr = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1]).build()
    PG_svc = ParamGridBuilder().addGrid(svc.regParam, [0.01, 0.1]).build()
    PG_dt = ParamGridBuilder().addGrid(dt.maxDepth, [4, 6, 8, 10, 12]).addGrid(dt.minInstancesPerNode, [1, 2, 4, 6]).build()

    PG_dict = {"RandFor": PG_rf, "LogReg": PG_lr, "SVC": PG_svc, "DecTree": PG_dt}

    evaluator_roc_auc = BinaryClassificationEvaluator(labelCol='target', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='f1')
    evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='accuracy')


    train, test = df.randomSplit([0.9, 0.1], seed=42)

    best_roc_auc = float("-inf")
    best_f1 = float("-inf")
    best_accuracy = float("-inf")

    full_scores_dict = {}
    scores_dict = {}

   

    for ML_model, model_class in class_dict.items():
        stages = [imputer_numeric, assembler, model_class]
        pipeline = Pipeline(stages=stages)


        crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=PG_dict[ML_model], evaluator = BinaryClassificationEvaluator(labelCol='target'), numFolds=5, seed=42)
        cvModel = crossval.fit(train)
        predictions = cvModel.transform(test)

        roc_auc = evaluator_roc_auc.evaluate(predictions)
        f1 = evaluator_f1.evaluate(predictions)
        accuracy = evaluator_accuracy.evaluate(predictions)

        print(f"{ML_model} ROC AUC: {roc_auc}, F1: {f1}, Accuracy: {accuracy}")

        score = roc_auc + f1 + accuracy

        full_scores_dict[ML_model] = [roc_auc, f1, accuracy]
        scores_dict[ML_model] = score



    
    best_model = max(scores_dict, key=lambda x: scores_dict[x])

    for model in scores_dict:
        if full_scores_dict[model][0] > best_roc_auc:
            best_roc_auc = full_scores_dict[model][0]
        if full_scores_dict[model][1] > best_f1:
            best_f1 = full_scores_dict[model][1]
        if full_scores_dict[model][2] > best_accuracy:
            best_accuracy = full_scores_dict[model][2]

    print(f"Best Model: {best_model}, Best ROC_AUC: {best_roc_auc}, Best F1: {best_f1}, Best Accuracy: {best_accuracy}")

    
    final_dict = {"RandFor": "RandomForest", "LogReg": "LogisticRegression", "SVC": "SVC", "DecTree": "DecisionTree"}
    print(f"Final Selected Model: {final_dict[best_model]}")


In [6]:
def main():
    spark = SparkSession.builder \
        .appName("Heart Disease") \
        .getOrCreate()

    try:
        df = read_data(spark)
        df = data_cleaning(df)
        
        pipeline(df)
    
    finally:
        spark.stop()

In [7]:
main()

24/06/07 01:02:29 WARN Utils: Your hostname, Elinas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.18.247.249 instead (on interface en0)
24/06/07 01:02:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/07 01:02:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


DataFrame[age: string, sex: int, painloc: int, painexer: int, relrest: int, pncaden: string, cp: int, trestbps: int, htn: int, chol: int, smoke: int, cigs: int, years: int, fbs: int, dm: int, famhist: int, restecg: int, ekgmo: int, ekgday(day: int, ekgyr: int, dig: int, prop: int, nitr: int, pro: int, diuretic: int, proto: int, thaldur: double, thaltime: double, met: double, thalach: int, thalrest: int, tpeakbps: int, tpeakbpd: int, dummy: int, trestbpd: int, exang: int, xhypo: int, oldpeak: double, slope: int, rldv5: int, rldv5e: int, ca: int, restckm: string, exerckm: int, restef: double, restwm: int, exeref: double, exerwm: int, thal: int, thalsev: int, thalpul: int, earlobe: int, cmo: int, cday: int, cyr: int, target: int]
DataFrame[age: int, sex: int, painloc: int, painexer: int, cp: int, trestbps: int, smoke: int, fbs: int, prop: int, nitr: int, pro: int, diuretic: int, thaldur: double, thalach: int, exang: int, oldpeak: double, slope: int, target: int]


24/06/07 01:02:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


RandFor ROC AUC: 0.8837792642140471, F1: 0.7794238683127573, Accuracy: 0.7777777777777778


24/06/07 01:03:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/07 01:03:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


LogReg ROC AUC: 0.8879598662207357, F1: 0.8213256157076383, Accuracy: 0.8194444444444444
SVC ROC AUC: 0.8913043478260869, F1: 0.8213256157076383, Accuracy: 0.8194444444444444
DecTree ROC AUC: 0.8269230769230769, F1: 0.8333333333333334, Accuracy: 0.8333333333333334
Best Model: SVC, Best ROC_AUC: 0.8913043478260869, Best F1: 0.8333333333333334, Best Accuracy: 0.8333333333333334
Final Selected Model: SVC
