# ENSF 612 - Engineering Large Scale Data Analytics Systems
# Heart Disease Classification Project
## Adam Kerr (10146363)

In [1]:
import pyspark
from pyspark.sql import SQLContext

#### Create spark context

In [2]:
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)

#### Read data into dataframe

In [3]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('heart.csv')

#### Get column name from index

In [4]:
col_names = df.schema.names

def col_from_idx(i):
    return col_names[i]

#### Get list of columns from a binary mask
The mask is represented as its integer value

In [5]:
def mask_to_col(mask):

    columns = []
    for i in range(0,14):
        if 2**i & mask > 0:
            columns.append(i)
        
    columns = list(map(col_from_idx, columns))
    return columns

#### Create masks based on the number of different features
This creates a mask for every possible combination of features

In [6]:
def num_features_to_masks(n):
    masks = []
    for i in range(1, 2**n):
        masks.append(i)
    return masks

print(num_features_to_masks(4))

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#### Combine features into a feature vector

In [8]:
stages = []
assembler = VectorAssembler(inputCols=list(df.schema.names[:-1]), outputCol="features")
stages += [assembler]

partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
preppedDataDF = pipelineModel.transform(df)

#### Set 80% of data for training and 20% for testing

In [9]:
train, test = preppedDataDF.randomSplit([0.8, 0.2], 0)

#### Train the model

In [10]:
lr = LogisticRegression(labelCol='target')
lrModel = lr.fit(train)

#### Make predictions on test set

In [11]:
pred = lrModel.transform(test)

In [12]:
y = test.select("target").collect()
y_pred = pred.select("prediction").collect()

#### Evaluate model
The metric is the area under the ROC curve

In [13]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='target')
evaluator.evaluate(pred)

0.9002976190476191

#### The inner function trains a logistic regression model on the provided features and returns the accuracy
As this takes a long time given the number of feature combinations, it is enclosed in a closure to track the percentage completed

In [14]:
def train_classifier_on_selected_features(num_features):
    num_combos = 2**num_features
    
    n = 0
    
    def train_classifier_on_selected_features_inner(features):
        
        features.append('target')
        new_df = df.select(features)
        #new_df.printSchema()

        stages = []
        assembler = VectorAssembler(inputCols=list(new_df.schema.names[:-1]), outputCol="features")
        stages += [assembler]

        partialPipeline = Pipeline().setStages(stages)
        pipelineModel = partialPipeline.fit(new_df)
        preppedDataDF = pipelineModel.transform(new_df)

        train, test = preppedDataDF.randomSplit([0.8, 0.2], 0)

        lr = LogisticRegression(labelCol='target')
        lrModel = lr.fit(train)

        pred = lrModel.transform(test)

        evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='target')
        
        nonlocal n
        n+=1
        print(str(n / num_combos * 100) + '%')
        
        return evaluator.evaluate(pred)
    
    return train_classifier_on_selected_features_inner

In [15]:
import pandas as pd

#### Once the data is computed it is saved as a .csv so that it does not need to be recomputed

In [16]:
try:
    best_df = pd.read_csv('best.csv') 
except:
    masks = num_features_to_masks(13)
    feature_combos = list(map(mask_to_col, masks))
    scores = list(map(train_classifier_on_selected_features(13), feature_combos))
    
    rdd = sc.parallelize(scores)
    rdd = rdd.zipWithIndex()
    rdd = rdd.sortByKey(ascending=False)
    rdd = rdd.map(lambda x: (x[0], mask_to_col(x[1] + 1)))
    best = rdd.take(int(2**13 * 0.1))
    
    best_df = pandas.DataFrame(best, columns=['Score', 'Features'])
    best_df.to_csv('best.csv')

best_df[['Score', 'Features']]

Unnamed: 0,Score,Features
0,0.988095,"['sex', 'cp', 'trestbps', 'restecg', 'oldpeak'..."
1,0.987143,"['sex', 'cp', 'fbs', 'restecg', 'thalach', 'ex..."
2,0.985714,"['sex', 'cp', 'fbs', 'restecg', 'thalach', 'ex..."
3,0.982857,"['cp', 'thalach', 'ca', 'thal', 'target']"
4,0.982759,"['sex', 'cp', 'trestbps', 'oldpeak', 'slope', ..."
...,...,...
814,0.921429,"['age', 'fbs', 'restecg', 'thalach', 'slope', ..."
815,0.921429,"['cp', 'chol', 'fbs', 'thalach', 'oldpeak', 'c..."
816,0.921212,"['restecg', 'exang', 'slope', 'ca', 'target']"
817,0.921212,"['age', 'cp', 'trestbps', 'chol', 'exang', 'ca..."


In [17]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#### This function now performs k-folds cross validation to tune the hyper-parameters


In [18]:
def cross_validate_on_selected_features(num_items):
    n = 0
    
    def cross_validate_on_selected_features_inner(features):
        
        features.append('target')
        new_df = df.select(features)
        #new_df.printSchema()

        stages = []
        assembler = VectorAssembler(inputCols=list(new_df.schema.names[:-1]), outputCol="features")
        stages += [assembler]

        partialPipeline = Pipeline().setStages(stages)
        pipelineModel = partialPipeline.fit(new_df)
        preppedDataDF = pipelineModel.transform(new_df)

        train, test = preppedDataDF.randomSplit([0.8, 0.2], 0)

        lr = LogisticRegression(labelCol='target')
        pipeline = Pipeline(stages=[lr])

        paramGrid = ParamGridBuilder() 
        #paramGrid = paramGrid.addGrid(lr.threshold, [0.3, 0.4, 0.5, 0.6, 0.7]) 
        paramGrid = paramGrid.addGrid(lr.elasticNetParam, [0, 0.5, 1]) 
        paramGrid = paramGrid.addGrid(lr.regParam, [0.1, 0.01]) 
        paramGrid = paramGrid.build()

        crossval = CrossValidator(estimator=pipeline,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=BinaryClassificationEvaluator(labelCol='target'),
                                  numFolds=3) 

        cvModel = crossval.fit(train)

        pred = cvModel.transform(test)

        evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='target')

        nonlocal n
        n+=1
        print(str(n / num_items * 100) + '%')
        
        return evaluator.evaluate(pred)
    
    return cross_validate_on_selected_features_inner

#### As before, this data is also saved as a .csv to prevent unecessary recomputation

In [19]:
try:
    cv_best_df = pd.read_csv('cv_best.csv') 
except:
    best_features = map(lambda x: x[1], best)
    cv_scores = map(cross_validate_on_selected_features(len(best)), best_features)
    
    rdd = sc.parallelize(cv_scores)
    rdd = rdd.zipWithIndex()
    rdd = rdd.sortByKey(ascending=False)
    rdd = rdd.map(lambda x: (x[0], mask_to_col(x[1] + 1)))
    cv_best = rdd.take(int(2**13 * 0.1))
    
    cv_best_df = pandas.DataFrame(cv_best, columns=['Score', 'Features'])
    cv_best_df.to_csv('cv_best.csv')

cv_best_df[['Score', 'Features']]

Unnamed: 0,Score,Features
0,0.991429,"['sex', 'cp', 'fbs', 'restecg', 'thalach', 'ex..."
1,0.990000,"['sex', 'cp', 'fbs', 'restecg', 'thalach', 'ex..."
2,0.987143,"['sex', 'cp', 'fbs', 'restecg', 'thalach', 'ex..."
3,0.987143,"['cp', 'chol', 'restecg', 'exang', 'oldpeak', ..."
4,0.985714,"['sex', 'cp', 'fbs', 'restecg', 'thalach', 'ex..."
...,...,...
76,0.964387,"['cp', 'thalach', 'exang', 'slope', 'ca', 'tha..."
77,0.964387,"['sex', 'cp', 'trestbps', 'oldpeak', 'ca', 'th..."
78,0.964387,"['sex', 'cp', 'trestbps', 'fbs', 'oldpeak', 'c..."
79,0.964286,"['age', 'sex', 'chol', 'thalach', 'oldpeak', '..."
