Цель: используя данный датасет: https://archive.ics.uci.edu/ml/datasets/heart+disease построить предиктивную модель классификации в spark ml. необходимо отразить все элементы DS процесса: дата препроцессинг-eda (анализ данных)-feature selection-тренировка и тюнинг параметров модели-оценка. Сравнить результаты работы алгоритма классификации из spark ml с xgboost из внешней библиотеки.

https://www.kaggle.com/ronitf/heart-disease-uci

# Spark ML

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.appName("ml").getOrCreate()

In [3]:
df = spark.read.csv("heart.csv", header="true", inferSchema="true")
df.show(3)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 3 rows



In [4]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [5]:
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

root
 |-- age: double (nullable = true)
 |-- sex: double (nullable = true)
 |-- cp: double (nullable = true)
 |-- trestbps: double (nullable = true)
 |-- chol: double (nullable = true)
 |-- fbs: double (nullable = true)
 |-- restecg: double (nullable = true)
 |-- thalach: double (nullable = true)
 |-- exang: double (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: double (nullable = true)
 |-- ca: double (nullable = true)
 |-- thal: double (nullable = true)
 |-- target: double (nullable = true)



In [6]:
train, test = df.randomSplit([0.7, 0.3])

In [7]:
featuresCols = df.columns
featuresCols.remove('target')

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=2)

In [8]:
gbt = GBTClassifier(labelCol="target")

In [9]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()

evaluator = BinaryClassificationEvaluator(labelCol=gbt.getLabelCol(), rawPredictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [10]:
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [11]:
pipelineModel = pipeline.fit(train)

In [12]:
predictions = pipelineModel.transform(test)

In [13]:
predictions.show(3)

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+--------------------+--------------------+--------------------+--------------------+----------+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+--------------------+--------------------+--------------------+--------------------+----------+
|34.0|1.0|3.0|   118.0|182.0|0.0|    0.0|  174.0|  0.0|    0.0|  2.0|0.0| 2.0|   1.0|[34.0,1.0,3.0,118...|[34.0,1.0,3.0,118...|[-2.2231326123792...|[0.01158644595035...|       1.0|
|35.0|0.0|0.0|   138.0|183.0|0.0|    1.0|  182.0|  0.0|    1.4|  2.0|0.0| 2.0|   1.0|[35.0,0.0,0.0,138...|[35.0,0.0,0.0,138...|[-2.2746970159485...|[0.01046297826455...|       1.0|
|35.0|1.0|0.0|   120.0|198.0|0.0|    1.0|  130.0|  1.0|    1.6|  1.0|0.0| 3.0|   0.0|[35.0,1.0,

In [14]:
roc = evaluator.evaluate(predictions)
print("areaUnderROC on our test set: %g" % roc)

areaUnderROC on our test set: 0.774119


# XgBoost/CatBoost

In [15]:
import pandas as pd
from catboost import CatBoostClassifier

from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

In [16]:
df = pd.read_csv('heart.csv')
df.head()

Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,63,1,3,145,233,1,0,150,0,2.3,0,0,1,1
1,37,1,2,130,250,0,1,187,0,3.5,0,0,2,1
2,41,0,1,130,204,0,0,172,0,1.4,2,0,2,1
3,56,1,1,120,236,0,1,178,0,0.8,2,0,2,1
4,57,0,0,120,354,0,1,163,1,0.6,2,0,2,1


In [17]:
df.describe()

Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
count,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0,303.0
mean,54.366337,0.683168,0.966997,131.623762,246.264026,0.148515,0.528053,149.646865,0.326733,1.039604,1.39934,0.729373,2.313531,0.544554
std,9.082101,0.466011,1.032052,17.538143,51.830751,0.356198,0.52586,22.905161,0.469794,1.161075,0.616226,1.022606,0.612277,0.498835
min,29.0,0.0,0.0,94.0,126.0,0.0,0.0,71.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,47.5,0.0,0.0,120.0,211.0,0.0,0.0,133.5,0.0,0.0,1.0,0.0,2.0,0.0
50%,55.0,1.0,1.0,130.0,240.0,0.0,1.0,153.0,0.0,0.8,1.0,0.0,2.0,1.0
75%,61.0,1.0,2.0,140.0,274.5,0.0,1.0,166.0,1.0,1.6,2.0,1.0,3.0,1.0
max,77.0,1.0,3.0,200.0,564.0,1.0,2.0,202.0,1.0,6.2,2.0,4.0,3.0,1.0


In [18]:
X = df.iloc[:,:-1]
Y = df.iloc[:,-1]
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3)

In [19]:
model = CatBoostClassifier()

grid = {'learning_rate': [0.03, 0.1, 1],
        'depth': [4, 6, 10],
        'iterations':[10, 20, 30]}

randomized_search_result = model.randomized_search(grid,
                                                   X=X_train,
                                                   y=y_train,
                                                   plot=False)


bestTest = 0.5183644749
bestIteration = 2

0:	loss: 0.5183645	best: 0.5183645 (0)	total: 72.7ms	remaining: 655ms

bestTest = 0.5573043228
bestIteration = 9

1:	loss: 0.5573043	best: 0.5183645 (0)	total: 82.6ms	remaining: 331ms

bestTest = 0.4474559415
bestIteration = 9

2:	loss: 0.4474559	best: 0.4474559 (2)	total: 92ms	remaining: 215ms

bestTest = 0.4023051369
bestIteration = 19

3:	loss: 0.4023051	best: 0.4023051 (3)	total: 109ms	remaining: 164ms

bestTest = 0.4724037724
bestIteration = 19

4:	loss: 0.4724038	best: 0.4023051 (3)	total: 128ms	remaining: 128ms

bestTest = 0.4027079698
bestIteration = 17

5:	loss: 0.4027080	best: 0.4023051 (3)	total: 148ms	remaining: 98.7ms

bestTest = 0.4567402385
bestIteration = 0

6:	loss: 0.4567402	best: 0.4023051 (3)	total: 167ms	remaining: 71.6ms

bestTest = 0.4567402385
bestIteration = 0

7:	loss: 0.4567402	best: 0.4023051 (3)	total: 198ms	remaining: 49.4ms

bestTest = 0.5685722334
bestIteration = 9

8:	loss: 0.5685722	best: 0.4023051 (3)	total:

In [20]:
preds_class = model.predict(X_test)

In [21]:
print("areaUnderROC on our test set: %g" % roc_auc_score(y_test, preds_class))

areaUnderROC on our test set: 0.687075
