In [1]:
## Подключаем библиотеки и датасет. 

In [2]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

import os

# Any results you write to the current directory are saved as output.

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
csv = spark.read.csv('../work/data/lab1_new_df.csv', inferSchema=True, header=True)
csv.show(10)

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

### Подготовка данных для модели классификации (модель обучения дереву принятия решений)

In [5]:
data = csv.select('startingAirport', 'destinationAirport', ((col('isBasicEconomy')).cast('Int')),
                  ((col('isRefundable')).cast('Int')),((col('isNonStop')).cast('Int')),
    'baseFare', 'totalFare',((col('totalTravelDistance') > 1500).cast('Int').alias('label')))
data.show(10)

+---------------+------------------+--------------+------------+---------+--------+---------+-----+
|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|label|
+---------------+------------------+--------------+------------+---------+--------+---------+-----+
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        0|  213.02|    251.1|    0|
|            ATL|               BOS|             0|           0|        0|  213.02|    251.1|    0|


### Разделяем данные
Мы используем 70% данных для тренировки и 30% для тестирования


In [6]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 189669  Testing Rows: 80676


In [7]:
train.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1| 78602|
|    0|111067|
+-----+------+



#### Определяем пайплайн
пайплайн состоит из серии этапов преобразования и оценки, которые обычно подготавливают DataFrame для моделирования, а затем обучают прогнозирующую модель. В этом случае вы создадите пайплайн с семью этапами:
1. **StringIndexer**, оценщик, преобразующий строковые значения в индексы для категориальных объектов
2. **VectorAssemble**, объединяющий категориальные признаки в единый вектор
3. **VectorIndex**, который создает индексы для вектора категориальных признаков
4. **VectorAssembler**, который создает вектор непрерывных числовых объектов
5. **MinMaxScaleca**, который нормализует непрерывные числовые характеристики
6. **VectorAssembler**, который создает вектор категориальных и непрерывных признаков
7. **GBTClassifier**, который обучает классификационную модель.model.

In [8]:
strIdx = StringIndexer(inputCols = ["startingAirport","destinationAirport"], outputCols = ["startingAirportIdx","destinationAirportIdx"])
catVect = VectorAssembler(inputCols = ["startingAirportIdx","destinationAirportIdx", "isRefundable", "isNonStop", "isBasicEconomy"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["baseFare","totalFare"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
GBMc = GBTClassifier(featuresCol="features",labelCol="label", seed=55)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, GBMc])

#### Запуск пайплайна для тренировки модели
Запустим пайплайн в качестве оценщика обучающих данных, чтобы обучить модель.

In [9]:
piplineModel = pipeline.fit(train)

### Генерация предсказания меток бинарной классификации
Преобразование тестовых данных со всеми этапами и обученной моделью в пайплайне для генерации предсказаний меток.

In [10]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "label")
predicted.show(100, truncate=False)

+-------------------------------------------------------------+----------+-----+
|features                                                     |prediction|label|
+-------------------------------------------------------------+----------+-----+
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.2425596076884054] |0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.2425596076884054] |0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.2425596076884054] |0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.2425596076884054] |0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24467335550419933]|0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24467335550419933]|0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24467335550419933]|0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24467335550419933]|0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24467335550419933]|0.0       |0    |
|[6.0,9.0,0.0,0.0,0.0,0.2336

Изучая результаты, некоторые истинные значения Label прогнозируются как 0. Давайте оценим модель.

## Оценка модели классификации
Мы вычислим *Confusion Matrix* и *Area Under ROC* (Receiver Operating Characteristic) для оценки модели.
### Вычисляем матрицу путаницы
Классификаторы обычно оцениваются путем создания *Confusion Matrix*, которая указывает количество:

- True Positives - Истинно положительных результатов
- True Negatives - Истинно отрицательные результаты
- False Positives - Ложноположительные результаты
- False Negatives - Ложноотрицательные результаты
  
На основе этих основных показателей могут быть рассчитаны другие показатели оценки, такие как *точность*, *отзыв* и *F1*.

In [11]:
def show_metrics(predicted):
    tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
    fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
    tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
    fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
    pr = tp / (tp + fp)
    re = tp / (tp + fn)
    metrics = spark.createDataFrame([
     ("TP", tp),
     ("FP", fp),
     ("TN", tn),
     ("FN", fn),
     ("Precision", pr),
     ("Recall", re),
     ("F1", 2*pr*re/(re+pr))],["metric", "value"])
    metrics.show()

show_metrics(predicted)

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           28745.0|
|       FP|            1060.0|
|       TN|           46125.0|
|       FN|            4746.0|
|Precision|0.9644354973997651|
|   Recall|0.8582902869427608|
|       F1|0.9082722446916077|
+---------+------------------+



Все показатели на высоком уровне

### Просмотрите область под ROC
Другим способом оценки эффективности модели классификации является измерение площади под кривой ROC (рабочей характеристики приемника) для модели. 
Библиотека spark.ml включает класс **BinaryClassificationEvaluator**, который мы можем использовать для вычисления этого. 
Кривая ROC показывает истинно положительные и ложноположительные показатели, построенные для различных пороговых значений.

In [12]:
def calc_aur(prediction):
    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    return evaluator.evaluate(prediction)
print ("AUR = ", calc_aur(prediction))

AUR =  0.981424282661924


Итак, AUR показывает, что модель в порядке. Посмотрим глубже.

### Просмотр необработанного прогноза и вероятности
Прогноз основан на оценке необработанного прогноза, которая описывает помеченную точку в логистической функции. Это необработанное предсказание затем преобразуется в прогнозируемую метку 0 или 1 на основе вектора вероятности, который указывает достоверность для каждого возможного значения метки (в данном случае 0 и 1). Значение с наибольшей достоверностью выбирается в качестве прогноза.

In [13]:
prediction.select('rawPrediction', 'probability', 'prediction', 'label').show(100, truncate=False)

+------------------------------------------+-----------------------------------------+----------+-----+
|rawPrediction                             |probability                              |prediction|label|
+------------------------------------------+-----------------------------------------+----------+-----+
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0.17170675611103603]  |0.0       |0    |
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0.17170675611103603]  |0.0       |0    |
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0.17170675611103603]  |0.0       |0    |
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0.17170675611103603]  |0.0       |0    |
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0.17170675611103603]  |0.0       |0    |
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0.17170675611103603]  |0.0       |0    |
|[0.7867895677481926,-0.7867895677481926]  |[0.828293243888964,0

Обратите внимание, что результаты включают строки, в которых вероятность для 0 (первое значение в векторе вероятности) лишь немного выше, чем вероятность для 1 (второе значение в векторе вероятности). 

Порог распознавания по умолчанию (граница, которая определяет, будет ли вероятность предсказана как 1 или 0) установлен равным 0,5; 
таким образом, всегда используется прогноз с наибольшей вероятностью, независимо от того, насколько близко к порогу.

И мы можем видеть из приведенных выше результатов, что для тех *label* 1, для которых мы предсказали 0, у многих из них вероятность 1 лишь немного меньше порогового значения 0,5.

## Настройка параметров
Чтобы найти параметры с наилучшей производительностью, мы можем использовать класс **CrossValidator** для оценки каждой комбинации параметров, определенных в **ParameterGrid**, по нескольким *складкам* данных, разделенных на обучающие и проверочные наборы данных. 

Обратите внимание, что это может занять много времени, поскольку каждая комбинация параметров пробуется несколько раз.

### Измените порог распознавания
Оценка AUC, по-видимому, указывает на достаточно хорошую модель, но показатели производительности, по-видимому, указывают на то, что она предсказывает большое количество *Ложноотрицательных* меток (т.е. она предсказывает 0, когда истинная метка равна 1), что приводит к низкому *Отзыву*. Мы можем улучшить это, понизив порог. И наоборот, иногда мы можем захотеть устранить большое количество ложноположительных результатов, повысив порог.

В этом случае позволим **CrossValidator** найти лучшую максимальную глубину дерева от 2, 4 и 10, максимальное количество интервалов для дискретизации непрерывных функций от 20, 100 и количество итераций от 10 до 20.

In [14]:
paramGrid = ParamGridBuilder()\
    .addGrid(GBMc.maxDepth,[2, 4, 10])\
    .addGrid(GBMc.maxBins, [20, 100])\
    .addGrid(GBMc.maxIter, [10, 20])\
    .build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [15]:
newPrediction = model.transform(test)
newPredicted = newPrediction.select('features', 'prediction', 'label')
newPredicted.show()

+--------------------+----------+-----+
|            features|prediction|label|
+--------------------+----------+-----+
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    1|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    1|
|[6.0,9.0,0.0,0.0,...|       0.0|    1|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
|[6.0,9.0,0.0,0.0,...|       0.0|    0|
+--------------------+----------+-----+
only showing top 20 rows



In [16]:
show_metrics(newPrediction)

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           30415.0|
|       FP|            1234.0|
|       TN|           45951.0|
|       FN|            3076.0|
|Precision|0.9610098265348036|
|   Recall|0.9081544295482369|
|       F1|0.9338348173165489|
+---------+------------------+



Выглядит довольно хорошо! Новая модель улучшает показатель *Кecall* с 0,85 до 0,86, показатель *F1* с 0,0 до 0,91 без ущерба для других показателей.


In [17]:
print ("AUR = ", calc_aur(newPrediction))

AUR =  0.9899915880664999


## Сохранение модели
Сохраним лучшую модель для дальнейшего использования.

In [18]:
model.save('./model')

### Подготовка данных для модели регрессии

In [19]:
data = csv.select('startingAirport', 'destinationAirport', ((col('isBasicEconomy')).cast('Int')),
                  ((col('isRefundable')).cast('Int')),((col('isNonStop')).cast('Int')),
    'baseFare', 'totalFare',((col('totalTravelDistance') > 1500).cast('Int').alias('label')))
data.show(10)

+---------------+------------------+--------------+------------+---------+--------+---------+-----+
|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|label|
+---------------+------------------+--------------+------------+---------+--------+---------+-----+
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        1|  217.67|    248.6|    0|
|            ATL|               BOS|             0|           0|        0|  213.02|    251.1|    0|
|            ATL|               BOS|             0|           0|        0|  213.02|    251.1|    0|


#### Определяем пайплайн
* **LogisticRegression**, который обучает классификационную модель.model.

In [20]:
strIdx = StringIndexer(inputCols = ["startingAirport","destinationAirport"], outputCols = ["startingAirportIdx","destinationAirportIdx"])
catVect = VectorAssembler(inputCols = ["startingAirportIdx","destinationAirportIdx", "isRefundable", "isNonStop", "isBasicEconomy"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["baseFare","totalFare"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LinearRegression(labelCol='label', featuresCol='features')
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

### Подготавливаем данные

In [21]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 188745  Testing Rows: 81600


In [22]:
pipelineModel = pipeline.fit(train)

### Генерация предсказания меток линейной регрессии

In [23]:
prediction = pipelineModel.transform(test)
prediction = prediction.select('features', 'prediction', 'label')
prediction.show(100, truncate=False)

+-------------------------------------------------------------+-------------------+-----+
|features                                                     |prediction         |label|
+-------------------------------------------------------------+-------------------+-----+
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24269982939243967]|0.2571751073677433 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,0.0,0.0,0.0,0.23363388103193128,0.24481479914837218]|0.2671596998545098 |0    |
|[6.0,9.0,

### Оценка модели регрессии

In [24]:
def evaluateFunc(prediction, metric):
    evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName=metric)
    return evaluator.evaluate(prediction)

print ("mae = ", evaluateFunc(prediction,'mae'))
print ("r2 = ", evaluateFunc(prediction,'r2'))


mae =  0.3797906300047922
r2 =  0.22481763696572843


### Настройка параметров линейной регрессии

In [25]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.5, 0.7]) \
    .addGrid(lr.maxIter, [25, 50, 75]) \
    .build()

linearCv = CrossValidator(
    estimator=pipeline, 
    evaluator=RegressionEvaluator(), 
    estimatorParamMaps=paramGrid, 
    numFolds=2
)

model = linearCv.fit(train)

In [None]:
newPrediction = model.transform(test)
newPredicted = newPrediction.select('features', 'prediction', 'label')
newPredicted.show()

In [27]:
def evaluateFunc(prediction, metric):
    evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName=metric)
    return evaluator.evaluate(prediction)

print ("mae = ", evaluateFunc(newPrediction,'mae'))
print ("r2 = ", evaluateFunc(newPrediction,'r2'))

mae =  0.3797906300047922
r2 =  0.22481763696572843


Полученные показатели оказались прежними, что означает, что прежние значения оказались наилучшими

##### В ходе лабораторной работы ознакомился с машинным обучением и его реализацией с помощью Spark ML