In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import IntegerType, FloatType, LongType, StringType, DoubleType
from pyspark.sql.dataframe import DataFrame
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, Evaluator
import pyspark.sql.functions as F
from pyspark.sql.functions import when, col
from itertools import combinations
import os

In [2]:
DATA_FOLDER = "../data"

NUMBER_OF_FOLDS = 3
SPLIT_SEED = 7576
TRAIN_TEST_SPLIT = 0.9

In [3]:
import sys
sys.executable

'/tmp/demos/bin/python3'

## Get data from S3

In [None]:
s3 = boto3.client('s3',
                  aws_access_key_id='',
                  aws_secret_access_key='',
                  aws_session_token='')


bucket_name = 'de300spring2024'
object_key = 'emily_kohlberg/hw/heart_disease.csv'

In [None]:
csv_obj = s3.get_object(Bucket=bucket_name, Key=object_key)
body = csv_obj['Body']
csv_string = body.read().decode('utf-8')

In [None]:
raw_data = pd.read_csv(BytesIO(csv_string.encode()))
raw_data

## Get data from data

In [4]:
def read_data(spark: SparkSession) -> DataFrame:
    """
    read data; since the data has the header we let spark guess the schema
    """
    
    data = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(os.path.join(DATA_FOLDER,"heart_disease.csv"))

    return data

## Cleaning

In [5]:
def retain_cols(data: DataFrame) -> DataFrame:
    columns_to_retain = ['age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'smoke', 
                         'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 
                         'exang', 'oldpeak', 'slope', 'target']
    
    filtered_data = data.select(columns_to_retain)
    return filtered_data
    
def replace_out_of_range(data: DataFrame) -> DataFrame:
    data = data.withColumn('painloc', when(col('painloc') < 0, 0).when(col('painloc') > 1, 1).otherwise(col('painloc')))
    data = data.withColumn('painexer', when(col('painexer') < 0, 0).when(col('painexer') > 1, 1).otherwise(col('painexer')))
    data = data.withColumn('trestbps', when(col('trestbps') < 100, 100).otherwise(col('trestbps')))
    data = data.withColumn('oldpeak', when(col('oldpeak') < 0, 0).when(col('oldpeak') > 4, 4).otherwise(col('oldpeak')))
    data = data.withColumn('fbs', when(col('fbs') < 0, 0).when(col('fbs') > 1, 1).otherwise(col('fbs')))
    data = data.withColumn('prop', when(col('prop') < 0, 0).when(col('prop') > 1, 1).otherwise(col('prop')))
    data = data.withColumn('nitr', when(col('nitr') < 0, 0).when(col('nitr') > 1, 1).otherwise(col('nitr')))
    data = data.withColumn('pro', when(col('pro') < 0, 0).when(col('pro') > 1, 1).otherwise(col('pro')))
    data = data.withColumn('diuretic', when(col('diuretic') < 0, 0).when(col('diuretic') > 1, 1).otherwise(col('diuretic')))
    data = data.withColumn('exang', when(col('exang') < 0, 0).when(col('exang') > 1, 1).otherwise(col('exang')))
    data = data.withColumn('slope', when(col('slope') < 1, None).when(col('slope') > 3, None).otherwise(col('slope')))
    return data
    
def replace_nulls_with_mean(data: DataFrame) -> DataFrame:
    columns_for_imputation = ['age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 
                     'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 
                     'exang', 'oldpeak', 'slope', 'target']
    
    for column in columns_for_imputation:
        mean_value = data.select(F.mean(col(column))).collect()[0][0]
        if mean_value is not None:
            data = data.withColumn(column, when(col(column).isNull(), mean_value).otherwise(col(column)))
    return data

In [None]:
def smoke_1(data: DataFrame) -> DataFrame:
    url1 = 'https://www.abs.gov.au/statistics/health/health-conditions-and-risks/smoking-and-vaping/latest-release'
    response = requests.get(url1)
        
    # get the HTML file as a string
    html_content = response.content
    
    # create a selector object
    full_sel = Selector(text=html_content)
    
    # select all tables in page -> returns a SelectorList object
    tables = full_sel.xpath('//table')
    smokers_by_age = tables[1]
    # get the rows
    rows = smokers_by_age.xpath('./tbody//tr')

    def parse_row_1(row:Selector) -> List[str]:
        '''
        Parses a html row into a list of individual elements
        '''
        cells = row.xpath('.//th | .//td')
        row_data = []
        
        for i, cell in enumerate(cells):
            if i == 0 or i == 10:
                cell_text = cell.xpath('normalize-space(.)').get()
                cell_text = re.sub(r'<.*?>', ' ', cell_text)  # Remove remaining HTML tags
                # if there are br tags, there will be some binary characters
                cell_text = cell_text.replace('\xa0', '')  # Remove \xa0 characters
                row_data.append(cell_text)
        
        return row_data
    
    table_data = [parse_row_1(row) for row in rows]

    def get_rate_1(age):
        try:
            age = int(age)
            for i, row in enumerate(table_data):
                if i < len(table_data) - 1:
                    cutoff = row[0].split('–')[1]
                    if age <= int(cutoff):
                        return float(row[1])
                else:
                    return float(row[1])
        except:
            return np.nan
    
    # Register the UDF
    get_rate_1_udf = F.udf(lambda age: get_rate_1(age) / 100, DoubleType())

    data = data.withColumn('smoke_1', when(col('smoke_1').isNull(), get_rate_1_udf(col('age'))).otherwise(col('smoke_1')))

    return data

def smoke_2(data: DataFrame) -> DataFrame:
    url2 = 'https://www.cdc.gov/tobacco/data_statistics/fact_sheets/adult_data/cig_smoking/index.htm'
    response = requests.get(url2)

    # Create a scrapy Selector from the response content
    selector = Selector(text=response.content)

    ul_sel_list = selector.xpath('//ul[@class="block-list"]')
    genders = ul_sel_list[0]
    ages = ul_sel_list[1]

    def clean_gender_percents(rows):
        dict = {}
        for row in rows:
            gender = 'woman' if 'women' in row.split('(')[0] else 'man'
            percent = float(row.split('(')[1].split('%')[0])
            dict[gender] = float(percent)
        return dict

    def clean_age_percents(rows):
        for i, row in enumerate(rows):
            if i < len(rows) - 1:
                age = int(row.split('–')[1].split(' ')[0])
            else:
                age = int(row.split(' ')[7])
                
            percent = float(row.split('(')[1].split('%')[0])
            rows[i] = [age, percent]
        return rows

    def parse_row_2(row:Selector) -> List[str]:
        '''
        Parses a html row into a list of individual elements
        '''
        cells = row.xpath('./li')
        row_data = []
        
        for i, cell in enumerate(cells):
            cell_text = cell.xpath('normalize-space(.)').get()
            cell_text = re.sub(r'<.*?>', ' ', cell_text)  # Remove remaining HTML tags
            # if there are br tags, there will be some binary characters
            cell_text = cell_text.replace('\xa0', '')  # Remove \xa0 characters
            row_data.append(cell_text)
        
        return row_data

    per_by_gender = clean_gender_percents(parse_row_2(genders))
    per_by_age = clean_age_percents(parse_row_2(ages))

    def get_rate_2(sex, age):
        if sex == 0:
            try:
                age = int(age)
                for i, row in enumerate(per_by_age):
                    if i < len(per_by_age) - 1:
                        if age <= row[0]:
                            return row[1]
                    else:
                        return row[1]
            except:
                return np.nan
        else:
            try:
                age = int(age)
                for i, row in enumerate(per_by_age):
                    if i < len(per_by_age) - 1:
                        if age <= row[0]:
                            return row[1] * per_by_gender['man'] / per_by_gender['woman']
                    else:
                        return row[1] * per_by_gender['man'] / per_by_gender['woman']
            except:
                return np.nan

    # Register the UDF
    get_rate_2_udf = F.udf(lambda sex, age: get_rate_2(sex, age) / 100, DoubleType())

    data = data.withColumn('smoke_2', when(col('smoke_2').isNull(), get_rate_2_udf(col('sex'), col('age'))).otherwise(col('smoke_2')))

    return data 

def impute_smoke(data: DataFrame) -> DataFrame:
    data = data.withColumn('smoke_1', F.col('smoke'))
    data = data.withColumn('smoke_2', F.col('smoke'))

    data = smoke_1(data)
    data = smoke_2(data)

    data = data.drop('smoke')
    
    return data

## Prediction Model

In [6]:
# Custom CompositeEvaluator
class CompositeEvaluator(Evaluator):
    def __init__(self):
        super().__init__()
        self.auc_evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
        self.accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
        self.precision_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="weightedPrecision")
        self.recall_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="weightedRecall")
        self.f1_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")

        self.weights = {
            "AUC": 0.4,
            "accuracy": 0.2,
            "precision": 0.1,
            "recall": 0.1,
            "f1": 0.2
        }

    def isLargerBetter(self):
        return True

    def _evaluate(self, dataset):
        auc = self.auc_evaluator.evaluate(dataset)
        accuracy = self.accuracy_evaluator.evaluate(dataset)
        precision = self.precision_evaluator.evaluate(dataset)
        recall = self.recall_evaluator.evaluate(dataset)
        f1 = self.f1_evaluator.evaluate(dataset)

        composite_score = (self.weights["AUC"] * auc +
                           self.weights["accuracy"] * accuracy +
                           self.weights["precision"] * precision +
                           self.weights["recall"] * recall +
                           self.weights["f1"] * f1)

        return composite_score

In [7]:
def pipeline(data: DataFrame):

    # clean
    data = retain_cols(data)
    data = replace_out_of_range(data)

    # drop null targets
    data = data.dropna(subset=["target"])

    # make age an int
    data = data.withColumn("age", data["age"].cast(IntegerType()))

    numeric_features = [f.name for f in data.schema.fields if isinstance(f.dataType, (DoubleType, FloatType, IntegerType, LongType))]
    numeric_features.remove("target")

    # numeric columns
    imputed_columns = [f"Imputed{v}" for v in numeric_features]
    imputer_numeric = Imputer(inputCols=numeric_features, outputCols=imputed_columns, strategy = "mean")


    data.show()
    
    # Assemble feature columns into a single feature vector
    assembler = VectorAssembler(
        inputCols=imputed_columns, 
        outputCol="features"
        )

    # Define binary classification models
    classifiers = {
        "RandomForest": RandomForestClassifier(labelCol="target", featuresCol="features"),
        "LogisticRegression": LogisticRegression(labelCol="target", featuresCol="features"),
        "GBTClassifier": GBTClassifier(labelCol="target", featuresCol="features"),
        "DecisionTree": DecisionTreeClassifier(labelCol="target", featuresCol="features")

    }

    # Define parameter grids for each classifier
    param_grids = {
        "RandomForest": ParamGridBuilder() \
            .addGrid(classifiers["RandomForest"].maxDepth, [4, 6, 8]) \
            .addGrid(classifiers["RandomForest"].numTrees, [50, 100]) \
            .build(),
        "LogisticRegression": ParamGridBuilder() \
            .addGrid(classifiers["LogisticRegression"].regParam, [0.01, 0.1]) \
            .build(),
        "GBTClassifier": ParamGridBuilder() \
            .addGrid(classifiers["GBTClassifier"].maxDepth, [4, 6, 8]) \
            .addGrid(classifiers["GBTClassifier"].maxIter, [20, 50]) \
            .build(),
        "DecisionTree": ParamGridBuilder() \
            .addGrid(classifiers["DecisionTree"].maxDepth, [4, 6, 8]) \
            .addGrid(classifiers["DecisionTree"].minInstancesPerNode, [1, 2, 4]) \
            .build()
    }
    
    # Set up the composite evaluator
    composite_evaluator = CompositeEvaluator()

    # Split the data into training and test sets
    train_data, test_data = data.randomSplit([TRAIN_TEST_SPLIT, 1-TRAIN_TEST_SPLIT], seed=SPLIT_SEED)

    # Cache the training data to improve performance
    train_data.cache()

    best_model = None
    best_model_name = ""
    best_composite_score = float('-inf')
    
    # Iterate through each classifier
    for model_name, classifier in classifiers.items():
        # Create the pipeline
        pipeline = Pipeline(stages=[imputer_numeric, assembler, classifier])  

        # Set up the cross-validator
        crossval = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=param_grids[model_name],
            evaluator=composite_evaluator,
            numFolds=NUMBER_OF_FOLDS,
            seed=SPLIT_SEED)

        # Train the cross-validated pipeline model
        cvModel = crossval.fit(train_data)

        # Make predictions on the test data
        predictions = cvModel.transform(test_data)

        # Evaluate the model using the composite score
        composite_score = composite_evaluator.evaluate(predictions)
        print(f"{model_name} Composite Score: {composite_score:.4f}")

        # Update the best model if current model is better
        if composite_score > best_composite_score:
            best_composite_score = composite_score
            best_model = cvModel.bestModel.stages[-1]
            best_model_name = model_name
            best_metrics = {
                "composite_score": composite_score,
                "AUC": composite_evaluator.auc_evaluator.evaluate(predictions),
                "accuracy": composite_evaluator.accuracy_evaluator.evaluate(predictions),
                "precision": composite_evaluator.precision_evaluator.evaluate(predictions),
                "recall": composite_evaluator.recall_evaluator.evaluate(predictions),
                "f1": composite_evaluator.f1_evaluator.evaluate(predictions)
            }
            
    # Print the best model information
    print(f"Best Model: {best_model_name}")
    print(f"Best Model Composite Score: {best_composite_score:.4f}")
    print(f"Best Model Metrics - AUC: {best_metrics['AUC']:.4f}, Accuracy: {best_metrics['accuracy']:.4f}, Precision: {best_metrics['precision']:.4f}, Recall: {best_metrics['recall']:.4f}, F1-Score: {best_metrics['f1']:.4f}")


    # Retrieve and print the best model parameters
    if best_model_name == "RandomForest":
        selected_max_depth = best_model.getOrDefault(best_model.getParam("maxDepth"))
        selected_num_trees = best_model.getOrDefault(best_model.getParam("numTrees"))
        print(f"Selected Maximum Tree Depth: {selected_max_depth}")
        print(f"Selected Number of Trees: {selected_num_trees}")
    elif best_model_name == "LogisticRegression":
        selected_reg_param = best_model.getOrDefault(best_model.getParam("regParam"))
        print(f"Selected Regularization Parameter: {selected_reg_param}")
    elif best_model_name == "GBTClassifier":
        selected_max_depth = best_model.getOrDefault(best_model.getParam("maxDepth"))
        selected_max_iter = best_model.getOrDefault(best_model.getParam("maxIter"))
        print(f"Selected Maximum Tree Depth: {selected_max_depth}")
        print(f"Selected Maximum Iterations: {selected_max_iter}")
    elif best_model_name == "DecisionTree":
        selected_max_depth = best_model.getOrDefault(best_model.getParam("maxDepth"))
        selected_min_instances_per_node = best_model.getOrDefault(best_model.getParam("minInstancesPerNode"))
        print(f"Selected Maximum Tree Depth: {selected_max_depth}")
        print(f"Selected Minimum Instances Per Node: {selected_min_instances_per_node}")



In [8]:
def set_spark_configurations():
    conf = SparkConf()
    conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")
    conf.set("spark.driver.memory", "8g")
    conf.set("spark.executor.memory", "8g")
    conf.set("spark.executor.cores", "4")
    conf.set("spark.driver.maxResultSize", "4g")
    return conf

def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Predict Heart Disease") \
        .config(conf=set_spark_configurations()) \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        .getOrCreate()


    data = read_data(spark)
    
    pipeline(data)



    spark.stop()
    

In [9]:
main()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/27 17:36:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+---+-------+--------+---+--------+-----+---+----+----+---+--------+-------+-------+-----+-------+-----+------+
|age|sex|painloc|painexer| cp|trestbps|smoke|fbs|prop|nitr|pro|diuretic|thaldur|thalach|exang|oldpeak|slope|target|
+---+---+-------+--------+---+--------+-----+---+----+----+---+--------+-------+-------+-----+-------+-----+------+
| 63|  1|   null|    null|  1|     145| null|  1|   0|   0|  0|       0|   10.5|    150|    0|    2.3|    3|     0|
| 67|  1|   null|    null|  4|     160| null|  0|   1|   0|  0|       0|    9.5|    108|    1|    1.5|    2|     1|
| 67|  1|   null|    null|  4|     120| null|  0|   1|   0|  0|       0|    8.5|    129|    1|    2.6|    2|     1|
| 37|  1|   null|    null|  3|     130| null|  0|   1|   0|  0|       0|   13.0|    187|    0|    3.5|    3|     0|
| 41|  0|   null|    null|  2|     130| null|  0|   0|   0|  0|       0|    7.0|    172|    0|    1.4|    1|     0|
| 56|  1|   null|    null|  2|     120| null|  0|   0|   0|  0|       0|

24/05/27 17:36:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/05/27 17:37:30 WARN DAGScheduler: Broadcasting large task binary with size 1138.6 KiB
24/05/27 17:37:31 WARN DAGScheduler: Broadcasting large task binary with size 1482.2 KiB
24/05/27 17:37:32 WARN DAGScheduler: Broadcasting large task binary with size 1184.3 KiB
24/05/27 17:37:33 WARN DAGScheduler: Broadcasting large task binary with size 1196.9 KiB
24/05/27 17:37:33 WARN DAGScheduler: Broadcasting large task binary with size 1196.9 KiB
24/05/27 17:37:33 WARN DAGScheduler: Broadcasting large task binary with size 1196.9 KiB
24/05/27 17:37:34 WARN DAGScheduler: Broadcasting large task binary with size 1196.9 KiB
24/05/27 17:38:01 WARN DAGScheduler: Broadcasting large task binary with size 1142.0 KiB
24/05/27 17:38:01 WARN DAGScheduler: Broadcasting large task binary with size 1460.4 KiB
24/05/27 17:38:02 W

RandomForest Composite Score: 0.8103


24/05/27 17:40:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/27 17:40:23 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

LogisticRegression Composite Score: 0.8412


24/05/27 18:05:10 WARN DAGScheduler: Broadcasting large task binary with size 1000.7 KiB
24/05/27 18:05:11 WARN DAGScheduler: Broadcasting large task binary with size 1007.3 KiB
24/05/27 18:05:12 WARN DAGScheduler: Broadcasting large task binary with size 1017.2 KiB
24/05/27 18:05:14 WARN DAGScheduler: Broadcasting large task binary with size 1015.9 KiB
24/05/27 18:05:15 WARN DAGScheduler: Broadcasting large task binary with size 1016.4 KiB
24/05/27 18:05:16 WARN DAGScheduler: Broadcasting large task binary with size 1016.9 KiB
24/05/27 18:05:17 WARN DAGScheduler: Broadcasting large task binary with size 1018.0 KiB
24/05/27 18:05:18 WARN DAGScheduler: Broadcasting large task binary with size 1020.0 KiB
24/05/27 18:05:19 WARN DAGScheduler: Broadcasting large task binary with size 1023.8 KiB
24/05/27 18:05:20 WARN DAGScheduler: Broadcasting large task binary with size 1029.9 KiB
24/05/27 18:05:21 WARN DAGScheduler: Broadcasting large task binary with size 1038.8 KiB
24/05/27 18:05:22 WAR

GBTClassifier Composite Score: 0.8275


                                                                                

DecisionTree Composite Score: 0.7952
Best Model: LogisticRegression
Best Model Composite Score: 0.8412
Best Model Metrics - AUC: 0.8853, Accuracy: 0.8118, Precision: 0.8118, Recall: 0.8118, F1-Score: 0.8118
Selected Regularization Parameter: 0.1


The best model is LogisticRegression with metrics - AUC: 0.8853, Accuracy: 0.8118, Precision: 0.8118, Recall: 0.8118, F1-Score: 0.8118
The selected regularization parameter was 0.1.