## Лабораторная работа № 2
### Машинное обучение на больших данных
#### Цель и задачи работы:
1. Познакомиться с базовыми алгоритмами машинного обучения;
2. Познакомиться с реализацией машинного обучения в библиотеке Spark ML
3. Получить навыки разработки программного обеспечения для анализа данных с
использованием pyspark.
### Порядок выполнения работы:
1. Выполнить анализ выбранного датасета с помощью двух алгоритмов машинного
обучения: Линейной регрессии и Бустинга над решающими деревьями 
2. Выполнить обучение и валидацию модели, рассчитайте значения метрик классификации и
регрессии.
3. Выполнить подбор гиперпараметров моделей по сетке.

## Ход работы

#### Создание сессии и загрузка датасета

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, BooleanType, DateType, IntegerType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

from pyspark.ml.regression import LinearRegression

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('SOBDLab2') \
    .getOrCreate()
data = spark.read.parquet("C:/Users/Kir/SOBDLab1/data/cleared.parquet")
data.printSchema()


root
 |-- startingAirport: integer (nullable = true)
 |-- destinationAirport: integer (nullable = true)
 |-- fareBasisCode: integer (nullable = true)
 |-- travelDuration: integer (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)



### Линейная регрессия

#### Подготовка данных

In [3]:
boolVect = VectorAssembler(inputCols=['isBasicEconomy', 'isNonStop'], outputCol="boolFeatures")
numVect = VectorAssembler(inputCols=['travelDuration', 'totalTravelDistance'], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=['boolFeatures', "normFeatures"], outputCol="features")
pipeline = Pipeline(stages=[boolVect, numVect, minMax, featVect])
scalerModel = pipeline.fit(data)
final_data = scalerModel.transform(data.sample(0.25))
final_data = final_data.select('features', 'baseFare')
train_data, test_data = final_data.randomSplit([0.8, 0.2])
train_data.show()


+--------------------+--------+
|            features|baseFare|
+--------------------+--------+
|[0.0,0.0,0.001064...|  412.09|
|[0.0,0.0,0.001064...|   440.0|
|[0.0,0.0,0.001064...|   458.6|
|[0.0,0.0,0.001064...|  472.56|
|[0.0,0.0,0.004259...|  533.02|
|[0.0,0.0,0.010649...|  609.65|
|[0.0,0.0,0.010649...|  609.65|
|[0.0,0.0,0.010649...|  640.93|
|[0.0,0.0,0.010649...|  640.93|
|[0.0,0.0,0.011714...|  718.15|
|[0.0,0.0,0.011714...|  720.93|
|[0.0,0.0,0.020234...|  725.58|
|[0.0,0.0,0.028753...|  671.63|
|[0.0,0.0,0.045793...|  689.31|
|[0.0,0.0,0.051118...|  534.89|
|[0.0,0.0,0.051118...|  576.74|
|[0.0,0.0,0.051118...|  637.21|
|[0.0,0.0,0.051118...|  637.21|
|[0.0,0.0,0.051118...|  646.52|
|[0.0,0.0,0.051118...|  655.82|
+--------------------+--------+
only showing top 20 rows



#### Создание и тренировка модели

In [5]:
lr = LinearRegression(featuresCol='features', labelCol='baseFare', predictionCol='predicted_Fare')
lr_model = lr.fit(train_data)


#### Работа модели на тестовых данных и её оценка

In [6]:
predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="baseFare", predictionCol="predicted_Fare", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="baseFare", predictionCol="predicted_Fare", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

coefficients = lr_model.coefficients
intercept = lr_model.intercept

print("Coefficients: ", coefficients)
print("Intercept: {:.3f}".format(intercept))


Root Mean Squared Error (RMSE) on test data: 115.846
R-squared (R2) on test data: 0.420
Coefficients:  [-155.6829752541083,-13.771848845841525,146.4634809594328,251.65592684825504]
Intercept: 183.403


#### Гиперпараметры модели и кросс-валидация

In [7]:
grid_search = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.5, 1.0]) \
    .build()
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=grid_search,
                    evaluator=evaluator)
cv_model = cv.fit(train_data.sample(0.05))


In [8]:
print("R-squared (R2): ", cv_model.bestModel.summary.r2)
print("Root Mean Squared Error (RMSE): ", cv_model.bestModel.summary.rootMeanSquaredError)
cv_model.bestModel.extractParamMap()


R-squared (R2):  0.42049630332598986
Root Mean Squared Error (RMSE):  115.72215739244177


{Param(parent='LinearRegression_b501eccc4179', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_b501eccc4179', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5,
 Param(parent='LinearRegression_b501eccc4179', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_b501eccc4179', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LinearRegression_b501eccc4179', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_b501eccc4179', name='labelCol', doc='label column name.'): 'baseFare',
 Param(parent='LinearRegression_b501eccc4179', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredEr

### Бинарная классификация

#### Подготовка данных

In [2]:
num_rows = data.select("isNonStop").count()
true_rows = data.select("isNonStop").where(data.isNonStop == 1).count()
weights = {0: (num_rows - true_rows) / num_rows, 1: true_rows / num_rows}
print(f'Count of true rows:{true_rows / num_rows}')
print(f'Count of false rows:{(num_rows - true_rows) / num_rows}')


Count of true rows:0.2831524746501752
Count of false rows:0.7168475253498248


In [4]:
def addweight(value):
    if (value == 1):
        return weights[0]
    else:
        return weights[1]


duration_cast = udf(lambda x: addweight(x), DoubleType())
data1 = data.withColumn('weight', duration_cast('isNonStop'))
data1.show()


+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+------------------+
|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|            weight|
+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+------------------+
|              6|                 1|        13049|            74|          0|         false|       false|     true|  124.65|    148.6|             6|                185|0.7168475253498248|
|              6|                 1|        10108|            80|          0|         false|       false|     true|  199.07|    228.6|             4|                185|0.7168475253498248|
|              6|                 1|         6695|     

In [5]:
assembler = VectorAssembler(inputCols=["travelDuration", "baseFare", "totalTravelDistance"], outputCol="features")
final_data = assembler.transform(data1)
final_data = final_data.withColumn('isNonStop', data.isNonStop.cast(IntegerType()))
final_data = final_data.sample(0.1)
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=100)


#### Создание и тренировка модели

In [6]:
gbm = GBTClassifier(featuresCol='features', labelCol='isNonStop', weightCol='weight')
gbm_model = gbm.fit(train_data)


#### Работа модели на тестовых данных и её оценка

In [7]:
predictions = gbm_model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol="isNonStop")
print('Area Under Curve:', evaluator.evaluate(predictions))


Area Under Curve: 0.9999646821590664


In [8]:
predictions.select("features", "prediction", "isNonStop").show()

+--------------------+----------+---------+
|            features|prediction|isNonStop|
+--------------------+----------+---------+
|[154.0,552.56,947.0]|       1.0|        1|
| [262.0,338.6,947.0]|       0.0|        0|
|[158.0,264.19,947.0]|       1.0|        1|
| [294.0,264.2,947.0]|       0.0|        0|
|[278.0,479.07,947.0]|       0.0|        0|
|[413.0,469.77,600.0]|       0.0|        0|
|[270.0,213.95,947.0]|       0.0|        0|
|[271.0,213.95,956.0]|       0.0|        0|
|[279.0,213.95,956.0]|       0.0|        0|
|[279.0,213.95,947.0]|       0.0|        0|
|[282.0,213.95,956.0]|       0.0|        0|
|[320.0,213.95,956.0]|       0.0|        0|
|[419.0,213.95,956.0]|       0.0|        0|
|[634.0,191.63,146...|       0.0|        0|
|[267.0,161.86,947.0]|       0.0|        0|
|[507.0,161.86,185...|       0.0|        0|
|[265.0,202.79,947.0]|       0.0|        0|
|[271.0,202.79,956.0]|       0.0|        0|
|[275.0,202.79,947.0]|       0.0|        0|
|[279.0,202.79,947.0]|       0.0

In [20]:
tp = float(predictions.filter("prediction == 1.0 AND isNonStop == 1").count())
fp = float(predictions.filter("prediction == 1.0 AND isNonStop == 0").count())
tn = float(predictions.filter("prediction == 0.0 AND isNonStop == 0").count())
fn = float(predictions.filter("prediction == 0.0 AND isNonStop == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
    ("TP", tp),
    ("FP", fp),
    ("TN", tn),
    ("FN", fn),
    ("Precision", pr),
    ("Recall", re),
    ("F1", 2 * pr * re / (re + pr))], ["metric", "value"])
metrics.show()


+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          419335.0|
|       FP|            1761.0|
|       TN|         1062204.0|
|       FN|             518.0|
|Precision|0.9958180557402587|
|   Recall|0.9987662348488637|
|       F1| 0.997289966454565|
+---------+------------------+



#### Гиперпараметры модели и кросс-валидация

In [9]:
paramGrid = ParamGridBuilder() \
    .addGrid(gbm.maxIter, [5, 10]) \
    .addGrid(gbm.maxDepth, [3, 5]) \
    .build()
cv = CrossValidator(estimator=gbm, evaluator=evaluator, estimatorParamMaps=paramGrid,
                    numFolds=2)

model = cv.fit(train_data.sample(0.01))


In [15]:
newPrediction = model.transform(test_data)
newPredicted = newPrediction.select("features", "prediction", "isNonStop")
newPredicted.show()


+--------------------+----------+---------+
|            features|prediction|isNonStop|
+--------------------+----------+---------+
|[154.0,552.56,947.0]|       1.0|        1|
| [262.0,338.6,947.0]|       0.0|        0|
|[158.0,264.19,947.0]|       1.0|        1|
| [294.0,264.2,947.0]|       0.0|        0|
|[278.0,479.07,947.0]|       0.0|        0|
|[413.0,469.77,600.0]|       0.0|        0|
|[270.0,213.95,947.0]|       0.0|        0|
|[271.0,213.95,956.0]|       0.0|        0|
|[279.0,213.95,956.0]|       0.0|        0|
|[279.0,213.95,947.0]|       0.0|        0|
|[282.0,213.95,956.0]|       0.0|        0|
|[320.0,213.95,956.0]|       0.0|        0|
|[419.0,213.95,956.0]|       0.0|        0|
|[634.0,191.63,146...|       0.0|        0|
|[267.0,161.86,947.0]|       0.0|        0|
|[507.0,161.86,185...|       0.0|        0|
|[265.0,202.79,947.0]|       0.0|        0|
|[271.0,202.79,956.0]|       0.0|        0|
|[275.0,202.79,947.0]|       0.0|        0|
|[279.0,202.79,947.0]|       0.0

In [16]:
print('Area Under Curve:', evaluator.evaluate(newPrediction))
model.bestModel.extractParamMap()

Area Under Curve: 0.999871308494693


{Param(parent='GBTClassifier_52590a739e88', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='GBTClassifier_52590a739e88', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='GBTClassifier_52590a739e88', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for r

In [34]:
tp2 = float(newPrediction.filter("prediction == 1.0 AND isNonStop == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND isNonStop == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND isNonStop == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND isNonStop == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
    ("TP", tp2),
    ("FP", fp2),
    ("TN", tn2),
    ("FN", fn2),
    ("Precision", pr2),
    ("Recall", re2),
    ("F1", 2 * pr2 * re2 / (re2 + pr2))], ["metric", "value"])
metrics2.show()


+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          419335.0|
|       FP|            2493.0|
|       TN|         1061472.0|
|       FN|             518.0|
|Precision| 0.994090008249808|
|   Recall|0.9987662348488637|
|       F1|0.9964226351788861|
+---------+------------------+

