## Khởi tạo Spark

In [1]:
import findspark
findspark.init()

import pyspark
findspark.find()

from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = (SparkSession
         .builder
         .appName("Heart Disease")
         .getOrCreate())

## Đọc và load tập dữ liệu Boston housing

In [72]:
heartDF = (spark.read
            .option("HEADER", True)
            .option("inferSchema", True)
            .csv("C:/Users/Phung Huy Hoang/Downloads/framingham.csv")
           )

heartDF.show()

+----+---+---------+-------------+----------+------+---------------+------------+--------+-------+-----+-----+-----+---------+-------+----------+
|male|age|education|currentSmoker|cigsPerDay|BPMeds|prevalentStroke|prevalentHyp|diabetes|totChol|sysBP|diaBP|  BMI|heartRate|glucose|TenYearCHD|
+----+---+---------+-------------+----------+------+---------------+------------+--------+-------+-----+-----+-----+---------+-------+----------+
|   1| 39|        4|            0|         0|     0|              0|           0|       0|    195|106.0| 70.0|26.97|       80|     77|         0|
|   0| 46|        2|            0|         0|     0|              0|           0|       0|    250|121.0| 81.0|28.73|       95|     76|         0|
|   1| 48|        1|            1|        20|     0|              0|           0|       0|    245|127.5| 80.0|25.34|       75|     70|         0|
|   0| 61|        3|            1|        30|     0|              0|           1|       0|    225|150.0| 95.0|28.58|       6

## Tập dữ liệu Framingham Heart Disease
`male`: 1 là `nam`, 0 là `nữ`

`age`: Độ tuổi

`education`: Trình độ giáo dục

`currentSmoker`: Có đang hút thuốc hay không

`cigsPerDay`: Số điếu thuốc mỗi ngày

`BPMeds`: Có đang dùng thuốc huyết áp hay không

`prevalentStroke`: Trước đó có bị đột quỵ hay không

`prevalentHyp`: Có bị tăng huyết áp hay không

`diabetes`: Có bị tiểu đường hay không

`totChol`: Tổng mức cholesterol

`sysBP`: Huyết áp tâm thu

`diaBP`: Huyết áp tâm trương

`BMI`: Chỉ số khối cơ thể

`heartRate`: Nhịp tim

`glucose`: Lượng đường trong máu

Biến dự báo:

`TenYearCHD`: Nguy cơ mắc bệnh tim mạch vành trong 10 năm

## Xem các loại biến trong tập dữ liệu

In [49]:
heartDF.dtypes

[('male', 'int'),
 ('age', 'int'),
 ('education', 'string'),
 ('currentSmoker', 'int'),
 ('cigsPerDay', 'string'),
 ('BPMeds', 'string'),
 ('prevalentStroke', 'int'),
 ('prevalentHyp', 'int'),
 ('diabetes', 'int'),
 ('totChol', 'string'),
 ('sysBP', 'double'),
 ('diaBP', 'double'),
 ('BMI', 'string'),
 ('heartRate', 'string'),
 ('glucose', 'string'),
 ('TenYearCHD', 'int')]

Các biến được lưu trong file CSV dưới dạng số. Tuy nhiên khi đọc vào Spark, một số biến lại trở thành dạng `string` bởi vì trong biến có chứa giá trị `'NA'`.

Những biến nào chứa giá trị `'NA'` sẽ thay bằng giá trị ... (chưa nghĩ ra)

In [73]:
from pyspark.sql.functions import *
heartDF = heartDF.withColumn('education', regexp_replace('education', 'NA', '-9999')).\
    withColumn('education', col("education").cast('int')).\
    withColumn('cigsPerDay', regexp_replace('cigsPerDay', 'NA', '-9999')).\
    withColumn('cigsPerDay', col("cigsPerDay").cast('int')).\
    withColumn('BPMeds', regexp_replace('BPMeds', 'NA', '-9999')).\
    withColumn('BPMeds', col("BPMeds").cast('int')).\
    withColumn('totChol', regexp_replace('totChol', 'NA', '-9999')).\
    withColumn('totChol', col("totChol").cast('int')).\
    withColumn('BMI', regexp_replace('BMI', 'NA', '-9999')).\
    withColumn('BMI', col("BMI").cast('double')).\
    withColumn('heartRate', regexp_replace('heartRate', 'NA', '-9999')).\
    withColumn('heartRate', col("heartRate").cast('int')).\
    withColumn('glucose', regexp_replace('glucose', 'NA', '-9999')).\
    withColumn('glucose', col("glucose").cast('int'))

heartDF.dtypes

[('male', 'int'),
 ('age', 'int'),
 ('education', 'int'),
 ('currentSmoker', 'int'),
 ('cigsPerDay', 'int'),
 ('BPMeds', 'int'),
 ('prevalentStroke', 'int'),
 ('prevalentHyp', 'int'),
 ('diabetes', 'int'),
 ('totChol', 'int'),
 ('sysBP', 'double'),
 ('diaBP', 'double'),
 ('BMI', 'double'),
 ('heartRate', 'int'),
 ('glucose', 'int'),
 ('TenYearCHD', 'int')]

## Chia dữ liệu thành train/test set

In [74]:
(trainDF, testDF) = heartDF.randomSplit([.8, .2], seed=1)

+----+---+---------+-------------+----------+------+---------------+------------+--------+-------+-----+-----+-----+---------+-------+----------+
|male|age|education|currentSmoker|cigsPerDay|BPMeds|prevalentStroke|prevalentHyp|diabetes|totChol|sysBP|diaBP|  BMI|heartRate|glucose|TenYearCHD|
+----+---+---------+-------------+----------+------+---------------+------------+--------+-------+-----+-----+-----+---------+-------+----------+
|   1| 39|        4|            0|         0|     0|              0|           0|       0|    195|106.0| 70.0|26.97|       80|     77|         0|
|   0| 46|        2|            0|         0|     0|              0|           0|       0|    250|121.0| 81.0|28.73|       95|     76|         0|
|   1| 48|        1|            1|        20|     0|              0|           0|       0|    245|127.5| 80.0|25.34|       75|     70|         0|
|   0| 61|        3|            1|        30|     0|              0|           1|       0|    225|150.0| 95.0|28.58|       6

## Biến đổi train data theo định dạng của Spark

In [75]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['male','age','education','currentSmoker',\
                                       'cigsPerDay','BPMeds','prevalentStroke',\
                                       'prevalentHyp','diabetes','totChol','sysBP',\
                                       'diaBP','BMI','heartRate', 'glucose'],
                            outputCol='features') 
assembler_train = assembler.transform(trainDF)
final_train = assembler_train.select('features','TenYearCHD')
final_train.show(3)

+--------------------+----------+
|            features|TenYearCHD|
+--------------------+----------+
|[0.0,32.0,2.0,1.0...|         0|
|(15,[1,2,9,10,11,...|         0|
|[0.0,33.0,2.0,1.0...|         0|
+--------------------+----------+
only showing top 3 rows



# 1. Decision Tree

## Huấn luyện mô hình Decision Tree Regressor trên train data

**1.1** Huấn luyện mô hình trên `final_train` với `DecisionTreeRegressor` với `labelCol` là `'TenYearCHD'` và `featuresCol` là `'features'`

In [70]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(labelCol = 'TenYearCHD', featuresCol = 'features')
dtModel = dt.fit(final_train)

Py4JJavaError: An error occurred while calling o738.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 27, 192.168.70.1, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$2458/1145457780: (struct<male_double_VectorAssembler_a0a845897161:double,age_double_VectorAssembler_a0a845897161:double,education_double_VectorAssembler_a0a845897161:double,currentSmoker_double_VectorAssembler_a0a845897161:double,cigsPerDay_double_VectorAssembler_a0a845897161:double,BPMeds_double_VectorAssembler_a0a845897161:double,prevalentStroke_double_VectorAssembler_a0a845897161:double,prevalentHyp_double_VectorAssembler_a0a845897161:double,diabetes_double_VectorAssembler_a0a845897161:double,totChol_double_VectorAssembler_a0a845897161:double,sysBP:double,diaBP:double,BMI:double,heartRate_double_VectorAssembler_a0a845897161:double,glucose_double_VectorAssembler_a0a845897161:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1181)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2193)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 25 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2194)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1176)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:125)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:274)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.$anonfun$train$1(DecisionTreeRegressor.scala:126)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:114)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:44)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$2458/1145457780: (struct<male_double_VectorAssembler_a0a845897161:double,age_double_VectorAssembler_a0a845897161:double,education_double_VectorAssembler_a0a845897161:double,currentSmoker_double_VectorAssembler_a0a845897161:double,cigsPerDay_double_VectorAssembler_a0a845897161:double,BPMeds_double_VectorAssembler_a0a845897161:double,prevalentStroke_double_VectorAssembler_a0a845897161:double,prevalentHyp_double_VectorAssembler_a0a845897161:double,diabetes_double_VectorAssembler_a0a845897161:double,totChol_double_VectorAssembler_a0a845897161:double,sysBP:double,diaBP:double,BMI:double,heartRate_double_VectorAssembler_a0a845897161:double,glucose_double_VectorAssembler_a0a845897161:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1181)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2193)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 25 more
