## Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, LinearSVC
from pyspark.ml.feature import VectorAssembler

## Utils

In [2]:
def transform_data(data, input_cols, output_col):
    assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
    data = assembler.transform(data)
    data = data.select(['features', output_col])
    return data

def evaluate_model(model, data, model_name , date_type):

    # prdict on data
    predictions = model.transform(data)

    # Create evaluators for different metrics
    evaluator_multi = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='DIED', metricName='accuracy')
    evaluator_weighted_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='DIED', metricName='weightedPrecision')
    evaluator_weighted_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='DIED', metricName='weightedRecall')
    evaluator_f1 = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='DIED', metricName='f1')

    # Calculate evaluation metrics
    accuracy = evaluator_multi.evaluate(predictions)
    weighted_precision = evaluator_weighted_precision.evaluate(predictions)
    weighted_recall = evaluator_weighted_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)

    # Print results
    print('-------------------------------------------------------------------------------------------------------------------')
    print(f'---------------------------------------------- Model: {model_name} -----------------------------------------------')
    print('-------------------------------------------------------------------------------------------------------------------')
    print(f'Data Type: {date_type}')
    print(f'Accuracy: {accuracy}')
    print(f'Weighted Precision: {weighted_precision}')
    print(f'Weighted Recall: {weighted_recall}')
    print(f'F1 Score: {f1}')

## Logistic Regression

In [3]:
def logistic_regression(train_data, test_data, output_col):

    # Create Logistic Regression model
    lr = LogisticRegression(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    lr_model = lr.fit(train_data)

    evaluate_model(lr_model,train_data, 'Logistic Regression', 'train')
    evaluate_model(lr_model,test_data, 'Logistic Regression', 'test')

## Decision Tree

In [4]:
def decision_tree(train_data, test_data, output_col):

    # Create Decision Tree model
    dt = DecisionTreeClassifier(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    dt_model = dt.fit(train_data)

    evaluate_model(dt_model, train_data, 'Decision Tree', 'train')
    evaluate_model(dt_model, test_data, 'Decision Tree', 'test')

## Random Forest

In [5]:
def random_forest(train_data, test_data, output_col):

    # Create Random Forest model
    rf = RandomForestClassifier(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    rf_model = rf.fit(train_data)

    evaluate_model(rf_model, train_data, 'Random Forest', 'train')
    evaluate_model(rf_model, test_data, 'Random Forest', 'test')

In [6]:
def map_reduce(train_data, test_data, output_col):

    # Create Random Forest model
    rf = LinearSVC(featuresCol='features', labelCol=output_col)

    # Fit model to training data
    rf_model = rf.fit(train_data)

    evaluate_model(rf_model, train_data, 'Linear SVC', 'train')
    evaluate_model(rf_model, test_data, 'Linear SVC', 'test')

## Pipeline

In [7]:
def pipeline(train_data, test_data, output_col):   
    logistic_regression(train_data, test_data, output_col)
    decision_tree(train_data, test_data, output_col)
    random_forest(train_data, test_data, output_col)
    map_reduce(train_data, test_data, output_col)

In [8]:
spark = SparkSession.builder.appName('ml_models').getOrCreate()

train_data_path = 'dataset/04-preprocessed/train.csv'
test_data_path = 'dataset/04-preprocessed/test.csv'

train_data = spark.read.csv(train_data_path, header=True, inferSchema=True)
test_data = spark.read.csv(test_data_path, header=True, inferSchema=True)
# USMER,MEDICAL_UNIT,SEX,PATIENT_TYPE,DIED,INTUBED,PNEUMONIA,AGE,PREGNANT,DIABETES,COPD,ASTHMA,INMSUPR,HIPERTENSION,OTHER_DISEASE,CARDIOVASCULAR,OBESITY,RENAL_CHRONIC,TOBACCO,CLASIFFICATION_FINAL,ICU
input_cols = ["USMER","MEDICAL_UNIT","SEX","PATIENT_TYPE","PNEUMONIA","AGE","DIABETES","COPD","ASTHMA","INMSUPR","HIPERTENSION","OTHER_DISEASE","CARDIOVASCULAR","OBESITY","RENAL_CHRONIC","TOBACCO","CLASIFFICATION_FINAL"]
# input_cols = ['SEX', 'PATIENT_TYPE', 'AGE', 'ASTHMA','HIPERTENSION', 'OTHER_DISEASE','OBESITY', 'TOBACCO', "CLASIFFICATION_FINAL"]
output_col = 'DIED'


transformed_train_data = transform_data(train_data, input_cols, output_col)
transformed_test_data = transform_data(test_data, input_cols, output_col)

pipeline(transformed_train_data, transformed_test_data, output_col) 

spark.stop()

25/05/08 15:25:11 WARN Utils: Your hostname, Ajits-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 216.37.99.170 instead (on interface en0)
25/05/08 15:25:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 15:25:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/08 15:25:15 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/08 15:25:15 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


-------------------------------------------------------------------------------------------------------------------
---------------------------------------------- Model: Logistic Regression -----------------------------------------------
-------------------------------------------------------------------------------------------------------------------
Data Type: train
Accuracy: 0.9402707692307692
Weighted Precision: 0.9326813019982092
Weighted Recall: 0.9402707692307692
F1 Score: 0.9352444623904975
-------------------------------------------------------------------------------------------------------------------
---------------------------------------------- Model: Logistic Regression -----------------------------------------------
-------------------------------------------------------------------------------------------------------------------
Data Type: test
Accuracy: 0.9387171522950254
Weighted Precision: 0.9313049247226775
Weighted Recall: 0.9387171522950254
F1 Score: 0.9337591575