In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score

cardio_data = pd.read_csv('cardio_train_new.csv')
cardio_data = cardio_data.drop('id',axis=1)
cardio_data.dropna(inplace=True)

cardio_data = cardio_data.loc[cardio_data.ap_hi <= 220]
cardio_data = cardio_data.loc[cardio_data.ap_hi >= 40]
cardio_data = cardio_data.loc[cardio_data.ap_lo <= 220]
cardio_data = cardio_data.loc[cardio_data.ap_lo >= 40]

cardio_data.gender= cardio_data.gender.astype('str')
cardio_data.height = cardio_data.height.astype('float64')
cardio_data.ap_hi = cardio_data.ap_hi.astype('float64')
cardio_data.ap_lo = cardio_data.ap_lo.astype('float64')
cardio_data.cholesterol = cardio_data.cholesterol.astype('category')
cardio_data.gluc = cardio_data.gluc.astype('category')
cardio_data.smoke = cardio_data.smoke.astype('bool')
cardio_data.alco = cardio_data.alco.astype('bool')
cardio_data.active = cardio_data.active.astype('bool')
cardio_data.cardio = cardio_data.cardio.astype('int64')

cholesterol = cardio_data.cholesterol
new_cholesterol = []
for c in cholesterol:
    if c == 1:
        new_cholesterol.append(1)
    else:
        new_cholesterol.append(2)
cardio_data['new_cholesterol'] = new_cholesterol
cardio_data.new_cholesterol = cardio_data.new_cholesterol.astype('str')

gluc = cardio_data.gluc
new_gluc = []
for g in gluc:
    if g == 1:
        new_gluc.append(1)
    else:
        new_gluc.append(2)
cardio_data['new_gluc'] = new_gluc
cardio_data.new_gluc = cardio_data.new_gluc.astype('str')

cardio_data = cardio_data.drop('cholesterol', axis=1)
cardio_data = cardio_data.drop('gluc', axis=1)

In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('logistic_regression_adv').getOrCreate()

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

In [3]:
cardio_data_pyspark = spark.createDataFrame(cardio_data)

In [4]:
print(type(cardio_data))
print(type(cardio_data_pyspark))
print(cardio_data_pyspark.columns)
cardio_data_pyspark.printSchema()

<class 'pandas.core.frame.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
['age', 'gender', 'height', 'weight', 'ap_hi', 'ap_lo', 'smoke', 'alco', 'active', 'cardio', 'new_cholesterol', 'new_gluc']
root
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- ap_hi: double (nullable = true)
 |-- ap_lo: double (nullable = true)
 |-- smoke: boolean (nullable = true)
 |-- alco: boolean (nullable = true)
 |-- active: boolean (nullable = true)
 |-- cardio: long (nullable = true)
 |-- new_cholesterol: string (nullable = true)
 |-- new_gluc: string (nullable = true)



In [5]:
from pyspark.ml.feature import VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer

In [6]:
gender_indexer = StringIndexer(inputCol='gender',outputCol='genderIndex')
gender_encoder = OneHotEncoder(inputCol='genderIndex',outputCol='genderVec')
cholesterol_index = StringIndexer(inputCol='new_cholesterol', outputCol='cholesterolIndex')
cholesterol_encoder = OneHotEncoder(inputCol='cholesterolIndex', outputCol='cholesterolVec')
gluc_indexer = StringIndexer(inputCol='new_gluc', outputCol='glucIndex')
gluc_encoder = OneHotEncoder(inputCol='glucIndex', outputCol='glucVec')

In [7]:
assembler = VectorAssembler(inputCols=['age',
                                       'genderVec',
                                       'height',
                                       'weight',
                                       'ap_hi',
                                       'ap_lo',
                                       'smoke',
                                       'alco',
                                       'active',
                                       'cholesterolVec',
                                       'glucVec'
], outputCol='features')

In [8]:
cardio_data_pyspark = gender_indexer.fit(cardio_data_pyspark).transform(cardio_data_pyspark)
cardio_data_pyspark = gender_encoder.transform(cardio_data_pyspark)

cardio_data_pyspark = cholesterol_index.fit(cardio_data_pyspark).transform(cardio_data_pyspark)
cardio_data_pyspark = cholesterol_encoder.transform(cardio_data_pyspark)

cardio_data_pyspark = gluc_indexer.fit(cardio_data_pyspark).transform(cardio_data_pyspark)
cardio_data_pyspark = gluc_encoder.transform(cardio_data_pyspark)

cardio_data_pyspark = assembler.transform(cardio_data_pyspark)

In [10]:
t = cardio_data_pyspark.toPandas()

In [9]:
cardio_data_pyspark.columns
cardio_data_pyspark.printSchema()

root
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- ap_hi: double (nullable = true)
 |-- ap_lo: double (nullable = true)
 |-- smoke: boolean (nullable = true)
 |-- alco: boolean (nullable = true)
 |-- active: boolean (nullable = true)
 |-- cardio: long (nullable = true)
 |-- new_cholesterol: string (nullable = true)
 |-- new_gluc: string (nullable = true)
 |-- genderIndex: double (nullable = true)
 |-- genderVec: vector (nullable = true)
 |-- cholesterolIndex: double (nullable = true)
 |-- cholesterolVec: vector (nullable = true)
 |-- glucIndex: double (nullable = true)
 |-- glucVec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
train_data, test_data = cardio_data_pyspark.randomSplit([0.8,.2])

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='cardio')

In [None]:
Logistic_model_1 = LogisticRegression(regParam=0, maxIter=100, labelCol='cardio')
Logistic_model_2 = LogisticRegression(regParam=1, maxIter=100, labelCol='cardio')
Logistic_model_3 = LogisticRegression(regParam=0, maxIter=200, labelCol='cardio')

l1 = Logistic_model_1.fit(train_data)
l2 = Logistic_model_2.fit(train_data)
l3 = Logistic_model_3.fit(train_data)

pred1 = l1.transform(test_data)
pred2 = l2.transform(test_data)
pred3 = l3.transform(test_data)

AUC1 = evaluator.evaluate(pred1)
print("Logistic Regression Model 1: " + str(AUC1))
AUC2 = evaluator.evaluate(pred2)
print("Logistic Regression Model 2: " + str(AUC2))
AUC3 = evaluator.evaluate(pred3)
print("Logistic Regression Model 3: " + str(AUC3))

In [None]:
decision_tree_model_1 = DecisionTreeClassifier(impurity='gini', labelCol='cardio')
decision_tree_model_2 = DecisionTreeClassifier(impurity='entropy', labelCol='cardio')

d1 = decision_tree_model_1.fit(train_data)
d2 = decision_tree_model_2.fit(train_data)

pred1 = d1.transform(test_data)
pred2 = d2.transform(test_data)

AUC1 = evaluator.evaluate(pred1)
print("Decision Tree Model 1: " + str(AUC1))
AUC2 = evaluator.evaluate(pred2)
print("Decision Tree Model 2: " + str(AUC2))

In [None]:
random_forest_model_1 = RandomForestClassifier(numTrees=5, impurity='gini', labelCol='cardio')
random_forest_model_2 = RandomForestClassifier(numTrees=10, impurity='gini', labelCol='cardio')
random_forest_model_3 = RandomForestClassifier(numTrees=5, impurity='entropy', labelCol='cardio')

clf1 = random_forest_model_1.fit(train_data)
clf2 = random_forest_model_2.fit(train_data)
clf3 = random_forest_model_3.fit(train_data)

pred1 = clf1.transform(test_data)
pred2 = clf2.transform(test_data)
pred3 = clf3.transform(test_data)

AUC1 = evaluator.evaluate(pred1)
print("Random Forest Model 1: " + str(AUC1))
AUC2 = evaluator.evaluate(pred2)
print("Random Forest Model 2: " + str(AUC2))
AUC3 = evaluator.evaluate(pred3)
print("Random Forest Model 3: " + str(AUC3))