# I. Hướng dẫn

## Khởi tạo Spark

In [3]:
import findspark
findspark.init()

import pyspark
findspark.find()

from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = (SparkSession
         .builder
         .appName("Classification and Regression")
         .getOrCreate())

## Đọc và load tập dữ liệu Iris

In [2]:
irisDF = (spark.read
          .option("HEADER", True)
          .option("inferSchema", True)
          .csv("./data/iris.csv")
         )

irisDF.show(5)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|      class|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



## Chuyển cột `class` (kiểu string) thành `label` (kiểu double)

In [3]:
from pyspark.ml.feature import StringIndexer

class_indexer = StringIndexer(inputCol = 'class', outputCol = 'label')

irisDFindexed = class_indexer.fit(irisDF).transform(irisDF)

irisDFindexed.show(5)

+------------+-----------+------------+-----------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|      class|label|
+------------+-----------+------------+-----------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|  0.0|
+------------+-----------+------------+-----------+-----------+-----+
only showing top 5 rows



## Tập dữ liệu Iris

`sepal_length`: chiều dài đài hoa (cm)

`sepal_width`: chiều rộng đài hoa (cm)

`petal_length`: chiều dài cánh hoa (cm)

`petal_width`: chiều rộng cánh hoa (cm)

`class/label`: loại hoa

![Iris dataset](./image/iris.png)

## Chia dữ liệu thành train/test set

In [4]:
(trainDF, testDF) = irisDFindexed.randomSplit([.8, .2], seed = 1)

## Xem các loại biến trong tập dữ liệu

In [5]:
irisDFindexed.dtypes

[('sepal_length', 'double'),
 ('sepal_width', 'double'),
 ('petal_length', 'double'),
 ('petal_width', 'double'),
 ('class', 'string'),
 ('label', 'double')]

## Biến đổi train data và test data theo định dạng của Spark

In [7]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['sepal_length','sepal_width','petal_length','petal_width'],
                            outputCol = 'features')
assembler_train = assembler.transform(trainDF)

X_train = assembler_train.select('features', 'label')
X_train.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[4.3,3.0,1.1,0.1]|  0.0|
|[4.4,2.9,1.4,0.2]|  0.0|
|[4.4,3.0,1.3,0.2]|  0.0|
|[4.4,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
+-----------------+-----+
only showing top 5 rows



## Sử dụng Logistic Regression

### 1.1 Tạo mô hình Logistic Regression

Tạo một một hình Logistic Regression và huấn luyện mô hình trên `X_train` với `labelCol` là `'label'` và `featuresCol` là `'features'`

In [10]:
from pyspark.ml.classification import LogisticRegression

logit = LogisticRegression(featuresCol = "features", labelCol = "label")

logitModel = logit.fit(X_train)

### 1.2. Áp dụng mô hình trên test data

Áp dụng biến đổi cho tập test tương tự như trên tập train. In ra vài dòng sau khi biến đổi để xem kết quả.

In [8]:
assembler_test = assembler.transform(testDF)
X_test = assembler_test.select('features', 'label')
X_test.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[4.5,2.3,1.3,0.3]|  0.0|
|[4.8,3.1,1.6,0.2]|  0.0|
|[4.8,3.4,1.6,0.2]|  0.0|
|[4.8,3.4,1.9,0.2]|  0.0|
|[4.9,2.5,4.5,1.7]|  2.0|
+-----------------+-----+
only showing top 5 rows



Dự đoán trên test data

In [11]:
predictions = logitModel.transform(X_test)
predictions.select("prediction", "label").show(5)

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  2.0|
+----------+-----+
only showing top 5 rows



### 1.3. Đánh giá mô hình

Tính giá trị `Accuracy` của mô hình trên tập test

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Accuracy = 0.961538
Test Error = 0.0384615


### 1.4. Tạo ML pipeline và đánh giá dùng phương pháp cross validation

In [37]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pprint

pp = pprint.PrettyPrinter(indent = 4)

# Create a LogisticRegression instance. This instance is an Estimator.
logit = LogisticRegression(featuresCol = "features", labelCol = "label")

# Define indexer
indexer = StringIndexer(inputCol = 'class', 
                        outputCol = 'label')

# Define assembler
assembler = VectorAssembler(inputCols = ['sepal_length','sepal_width','petal_length','petal_width'],
                            outputCol = 'features')

# Configure an ML pipeline, which consists of two stages: indexer, assembler, and logit.
pipeline = Pipeline(stages = [indexer, assembler, logit])

# Specify evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol = "label", 
    predictionCol = "prediction",
    metricName = "accuracy"
)

# Specify parameters
paramGrid = (ParamGridBuilder()
            .addGrid(logit.regParam , [0.01, 0.1, 1])
            .build())

# Train/test split
(trainDF, testDF) = irisDF.randomSplit([.8, .2], seed = 1)

# Setup CrossValidator 
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
cv = CrossValidator(estimator = logit, 
                    evaluator = evaluator, 
                    estimatorParamMaps = paramGrid, 
                    numFolds = 3, 
                    parallelism = 2, 
                    seed = 1)

# Run cross-validation on training data, and choose the best set of parameters
logitModel = pipeline.fit(trainDF)

# Make predictions on test data. logitModel uses the best model found (regParam = 0.1)
prediction = logitModel.transform(testDF)
result = prediction.select("features", "label", "prediction").collect()

# Print some predictions
for row in result[0:5]:
    pp.pprint("features=%s, label=%s -> prediction=%s" % 
              (row.features, row.label, row.prediction))

accuracy = evaluator.evaluate(predictions)

print("Test Error = %g" % (1.0 - accuracy))

'features=[4.5,2.3,1.3,0.3], label=2.0 -> prediction=2.0'
'features=[4.8,3.1,1.6,0.2], label=2.0 -> prediction=2.0'
'features=[4.8,3.4,1.6,0.2], label=2.0 -> prediction=2.0'
'features=[4.8,3.4,1.9,0.2], label=2.0 -> prediction=2.0'
'features=[4.9,2.5,4.5,1.7], label=1.0 -> prediction=0.0'
Test Error = 0.0384615


# II. Áp dụng

## Câu 1 - Áp dụng `LogisticRegression` với tập dữ liệu `Auto`

Câu hỏi này sử dụng Logistic Regression trên tập dữ liệu `Auto` để dự đoán một xe cho trước có `mpg` là `high` hay `low`.

**Auto Data Set Description**

A data frame with 392 observations on the following 9 variables.

- `mpg`: miles per gallon

- `cylinders`: Number of cylinders between 4 and 8

- `displacement`: Engine displacement (cu. inches)

- `horsepower`: Engine horsepower

- `weight`: Vehicle weight (lbs.)

- `acceleration`: Time to accelerate from 0 to 60 mph (sec.)

- `year`: Model year (modulo 100)

- `origin`: Origin of car (1. American, 2. European, 3. Japanese)

- `name`: Vehicle name

**1.1.** Tạo một binary variable nhận giá trị 1 (`high`) với các xe có `mpg` lớn hơn median mpg, và nhận giá trị 0 (`low`) cho các xe còn lại.

In [1]:
# Viết code của bạn ở đây


**1.2.** Áp dụng Logistic Regression cho tập dữ liệu với các giá trị siêu tham số `regParam` khác nhau để dự đoán `mpg`. Cho biết cross-validation error ứng với các giá trị khác nhau của siêu tham số này. Nhận xét kết quả thu được. Tham khảo document về Logistic Regression của Spark ở [LogisticRegression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html#pyspark.ml.classification.LogisticRegression).

In [63]:
# Viết code của bạn ở đây


## Câu 2 - So sánh các mô hình phân loại

Thực hiện việc train tất cả các mô hình `LogisticRegression`, `DecisionTreeClassifier` và `RandomForestClassifier`, `GBTClassifier`, `MultilayerPerceptronClassifier`, `LinearSVC`, `NaiveBayes` trên tập dữ liệu HeartDisease (https://archive.ics.uci.edu/ml/datasets/heart+Disease). Điều chỉnh các siêu tham số của các mô hình để chọn mô hình tốt nhất dùng cross validation. So sánh và nhận xét về kết quả của các mô hình. Để tránh lặp lại các bước xử lý giống nhau nhiều lần như ở trên bạn nên tạo pipeline các bước xử lý. Tham khảo cách tạo pipeline cho mô hình ở [đây](https://spark.apache.org/docs/latest/ml-pipeline.html). Tham khảo document về các classifier của Spark ở [classification module](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#classification).

Bên dưới là một số gợi ý.

In [4]:
heart = (spark.read
          .option("HEADER", True)
          .option("inferSchema", True)
          .csv("./data/HeartDisease.csv")
         )

heart.show(5)

+---+---+---+------------+------+----+---+-------+-----+-----+-------+-----+---+----------+---+
|_c0|Age|Sex|   ChestPain|RestBP|Chol|Fbs|RestECG|MaxHR|ExAng|Oldpeak|Slope| Ca|      Thal|AHD|
+---+---+---+------------+------+----+---+-------+-----+-----+-------+-----+---+----------+---+
|  1| 63|  1|     typical|   145| 233|  1|      2|  150|    0|    2.3|    3|  0|     fixed| No|
|  2| 67|  1|asymptomatic|   160| 286|  0|      2|  108|    1|    1.5|    2|  3|    normal|Yes|
|  3| 67|  1|asymptomatic|   120| 229|  0|      2|  129|    1|    2.6|    2|  2|reversable|Yes|
|  4| 37|  1|  nonanginal|   130| 250|  0|      0|  187|    0|    3.5|    3|  0|    normal| No|
|  5| 41|  0|  nontypical|   130| 204|  0|      2|  172|    0|    1.4|    1|  0|    normal| No|
+---+---+---+------------+------+----+---+-------+-----+-----+-------+-----+---+----------+---+
only showing top 5 rows



In [5]:
heart.count()

303

In [6]:
len(heart.columns)

15

In [7]:
heart.dtypes

[('_c0', 'int'),
 ('Age', 'int'),
 ('Sex', 'int'),
 ('ChestPain', 'string'),
 ('RestBP', 'int'),
 ('Chol', 'int'),
 ('Fbs', 'int'),
 ('RestECG', 'int'),
 ('MaxHR', 'int'),
 ('ExAng', 'int'),
 ('Oldpeak', 'double'),
 ('Slope', 'int'),
 ('Ca', 'string'),
 ('Thal', 'string'),
 ('AHD', 'string')]

In [8]:
heart.withColumn('Ca', heart.Ca.cast('int'))

DataFrame[_c0: int, Age: int, Sex: int, ChestPain: string, RestBP: int, Chol: int, Fbs: int, RestECG: int, MaxHR: int, ExAng: int, Oldpeak: double, Slope: int, Ca: int, Thal: string, AHD: string]

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

catInputCols = ['ChestPain', 'RestECG', 'Slope', 'Thal']
catOutputCols = [x + "Index" for x in catInputCols]
oheCatOutputCols = [x + "OHE" for x in catInputCols]

catIndexer = StringIndexer(inputCols = catInputCols, outputCols = catOutputCols)

catOHEncoder = OneHotEncoder(inputCols = catOutputCols, 
                          outputCols = oheCatOutputCols)

pipeline = Pipeline(stages = [catIndexer, catOHEncoder])

# fit the pipeline model and transform the data as defined
pipelineModel = pipeline.fit(heart)

# view the transformed data
transformed_heart = pipelineModel.transform(heart)
transformed_heart.select(catInputCols + oheCatOutputCols).show(5)

+------------+-------+-----+----------+-------------+-------------+-------------+-------------+
|   ChestPain|RestECG|Slope|      Thal| ChestPainOHE|   RestECGOHE|     SlopeOHE|      ThalOHE|
+------------+-------+-----+----------+-------------+-------------+-------------+-------------+
|     typical|      2|    3|     fixed|    (3,[],[])|(2,[1],[1.0])|    (2,[],[])|(3,[2],[1.0])|
|asymptomatic|      2|    2|    normal|(3,[0],[1.0])|(2,[1],[1.0])|(2,[1],[1.0])|(3,[0],[1.0])|
|asymptomatic|      2|    2|reversable|(3,[0],[1.0])|(2,[1],[1.0])|(2,[1],[1.0])|(3,[1],[1.0])|
|  nonanginal|      0|    3|    normal|(3,[1],[1.0])|(2,[0],[1.0])|    (2,[],[])|(3,[0],[1.0])|
|  nontypical|      2|    1|    normal|(3,[2],[1.0])|(2,[1],[1.0])|(2,[0],[1.0])|(3,[0],[1.0])|
+------------+-------+-----+----------+-------------+-------------+-------------+-------------+
only showing top 5 rows



In [None]:
# Viết code của bạn ở đây


## Câu 3 - So sánh các mô hình hồi quy

Thực hiện việc train tất cả các mô hình `LinearRegression`, `DecisionTreeRegression` và `RandomForestRegression`, `GBTRegression` trên tập một tập dữ liệu cho bài toán hồi quy (có 10-100 thuộc tính gồm cả thuộc tính phân loại và thuộc tính số và có 100-1000 đối tượng) mà bạn quan tâm ở [UCI Regression Dataset](https://archive.ics.uci.edu/ml/datasets.php?format=&task=reg&att=&area=&numAtt=10to100&numIns=100to1000&type=&sort=nameUp&view=table). Điều chỉnh các siêu tham số của các mô hình để chọn mô hình tốt nhất dùng cross validation. Để tránh lặp lại các bước xử lý giống nhau nhiều lần như ở trên bạn nên tạo pipeline các bước xử lý. Tham khảo cách tạo pipeline cho mô hình ở [đây](https://spark.apache.org/docs/latest/ml-pipeline.html). Tham khảo document về các classifier của Spark ở [regression module](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#regression).

In [16]:
# Viết code của bạn ở đây
