In [1]:
from pprint import pprint
import findspark
findspark.init()

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkSession') \
                    .getOrCreate()

In [4]:
data = pd.read_csv('processed.cleveland.csv')
print(data.shape)
data.head()

(303, 14)


Unnamed: 0,Age,Gender,CP,TrestBPS,Cholestrol,Fbs,RestECG,Thalach,Exang,OldPeak,Slope,Ca,Thal,Pred
0,63,1,1,145,233,1,2,150,0,2.3,3,0.0,6.0,0
1,67,1,4,160,286,0,2,108,1,1.5,2,3.0,3.0,2
2,67,1,4,120,229,0,2,129,1,2.6,2,2.0,7.0,1
3,37,1,3,130,250,0,0,187,0,3.5,3,0.0,3.0,0
4,41,0,2,130,204,0,2,172,0,1.4,1,0.0,3.0,0


In [5]:
### 1 = male, 0 = female
print(data.isnull().sum())

data['Pred'] = data.Pred.map({0: 0, 1: 1, 2: 1, 3: 1, 4: 1})
data['Thal'] = data.Thal.fillna(data.Thal.mean())
data['Ca'] = data.Ca.fillna(data.Ca.mean())
data.info()

Age           0
Gender        0
CP            0
TrestBPS      0
Cholestrol    0
Fbs           0
RestECG       0
Thalach       0
Exang         0
OldPeak       0
Slope         0
Ca            4
Thal          2
Pred          0
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 303 entries, 0 to 302
Data columns (total 14 columns):
Age           303 non-null int64
Gender        303 non-null int64
CP            303 non-null int64
TrestBPS      303 non-null int64
Cholestrol    303 non-null int64
Fbs           303 non-null int64
RestECG       303 non-null int64
Thalach       303 non-null int64
Exang         303 non-null int64
OldPeak       303 non-null float64
Slope         303 non-null int64
Ca            303 non-null float64
Thal          303 non-null float64
Pred          303 non-null int64
dtypes: float64(3), int64(11)
memory usage: 33.2 KB


In [6]:
s_data = spark.createDataFrame(data)
s_data.printSchema()
s_data.show()

root
 |-- Age: long (nullable = true)
 |-- Gender: long (nullable = true)
 |-- CP: long (nullable = true)
 |-- TrestBPS: long (nullable = true)
 |-- Cholestrol: long (nullable = true)
 |-- Fbs: long (nullable = true)
 |-- RestECG: long (nullable = true)
 |-- Thalach: long (nullable = true)
 |-- Exang: long (nullable = true)
 |-- OldPeak: double (nullable = true)
 |-- Slope: long (nullable = true)
 |-- Ca: double (nullable = true)
 |-- Thal: double (nullable = true)
 |-- Pred: long (nullable = true)

+---+------+---+--------+----------+---+-------+-------+-----+-------+-----+---+----+----+
|Age|Gender| CP|TrestBPS|Cholestrol|Fbs|RestECG|Thalach|Exang|OldPeak|Slope| Ca|Thal|Pred|
+---+------+---+--------+----------+---+-------+-------+-----+-------+-----+---+----+----+
| 63|     1|  1|     145|       233|  1|      2|    150|    0|    2.3|    3|0.0| 6.0|   0|
| 67|     1|  4|     160|       286|  0|      2|    108|    1|    1.5|    2|3.0| 3.0|   1|
| 67|     1|  4|     120|       229|  0|

In [7]:
s_data.columns

['Age',
 'Gender',
 'CP',
 'TrestBPS',
 'Cholestrol',
 'Fbs',
 'RestECG',
 'Thalach',
 'Exang',
 'OldPeak',
 'Slope',
 'Ca',
 'Thal',
 'Pred']

In [10]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
  inputCols=['Age','Gender','CP','TrestBPS','Cholestrol','Fbs',
             'RestECG','Thalach','Exang','OldPeak','Slope','Ca','Thal'],
    outputCol="features")
X = assembler.transform(s_data)
X.take(5)

[Row(Age=63, Gender=1, CP=1, TrestBPS=145, Cholestrol=233, Fbs=1, RestECG=2, Thalach=150, Exang=0, OldPeak=2.3, Slope=3, Ca=0.0, Thal=6.0, Pred=0, features=DenseVector([63.0, 1.0, 1.0, 145.0, 233.0, 1.0, 2.0, 150.0, 0.0, 2.3, 3.0, 0.0, 6.0])),
 Row(Age=67, Gender=1, CP=4, TrestBPS=160, Cholestrol=286, Fbs=0, RestECG=2, Thalach=108, Exang=1, OldPeak=1.5, Slope=2, Ca=3.0, Thal=3.0, Pred=1, features=DenseVector([67.0, 1.0, 4.0, 160.0, 286.0, 0.0, 2.0, 108.0, 1.0, 1.5, 2.0, 3.0, 3.0])),
 Row(Age=67, Gender=1, CP=4, TrestBPS=120, Cholestrol=229, Fbs=0, RestECG=2, Thalach=129, Exang=1, OldPeak=2.6, Slope=2, Ca=2.0, Thal=7.0, Pred=1, features=DenseVector([67.0, 1.0, 4.0, 120.0, 229.0, 0.0, 2.0, 129.0, 1.0, 2.6, 2.0, 2.0, 7.0])),
 Row(Age=37, Gender=1, CP=3, TrestBPS=130, Cholestrol=250, Fbs=0, RestECG=0, Thalach=187, Exang=0, OldPeak=3.5, Slope=3, Ca=0.0, Thal=3.0, Pred=0, features=DenseVector([37.0, 1.0, 3.0, 130.0, 250.0, 0.0, 0.0, 187.0, 0.0, 3.5, 3.0, 0.0, 3.0])),
 Row(Age=41, Gender=0, C

In [11]:
s_data1 = X.select("features",'Pred')
train, test = s_data1.randomSplit([0.7,0.3])

In [12]:
from pyspark.ml.classification import LogisticRegression

In [13]:
lr = LogisticRegression(labelCol="Pred",featuresCol="features")
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("Pred","prediction").show(10)

+----+----------+
|Pred|prediction|
+----+----------+
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   1|       0.0|
|   0|       0.0|
|   1|       1.0|
|   1|       0.0|
|   0|       0.0|
+----+----------+
only showing top 10 rows



In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='Pred')

predict_test.select("Pred","rawPrediction","prediction","probability").show(5)


+----+--------------------+----------+--------------------+
|Pred|       rawPrediction|prediction|         probability|
+----+--------------------+----------+--------------------+
|   0|[5.71676747613526...|       0.0|[0.99672046096523...|
|   0|[3.00297161873899...|       0.0|[0.95270819422812...|
|   0|[3.82572753661468...|       0.0|[0.97866264160538...|
|   0|[3.51622470652619...|       0.0|[0.97114590250225...|
|   0|[1.57376540683172...|       0.0|[0.82831973443878...|
+----+--------------------+----------+--------------------+
only showing top 5 rows



In [15]:
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))

print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.9328746433965428
The area under ROC for test set is 0.8941647597254001
