## Spark Machine Learning
Модуль машинного обучения Spark предоставляет базовый набор инструментов ([документация](https://spark.apache.org/docs/latest/ml-guide.html)):
* Алгоритмы классификации, регрессии, кластеризации, совместной фильтрации.
* Методы работы с признаками.
* Конвейеры (pipelines), описывающие основные стадии моделирования.
* Сохранение и загрузка моделей и конвейеров.
* Утилиты для линейной алгебры, статистики, обработки данных и др.
<br/>

Кроме того, Spark ML позволяет добавлять свои методы и реализовывать недостающие алгоритмы.



Spark ML состоит из двух библиотек, отличающихся типом построения данных:
* spark.ml – библиотека машинного обучения, основанная на DataFrame API;
* spark.mllib – библиотека машинного обучения, основанная на RDD API.

Начиная с версии 2.0 основной библиотекой является spark.ml, но spark.mllib содержит типы данных, используемые в spark.ml.



В качестве примера возмем задачу предсказания дней с низкой влажностью. Описание задачи и набор данных в задании.
Это задача бинарной классификации. Необходимо построить модель, предсказывающую по различным погодным показателям, измеренным в утренние часы, будет ли в этот день низкая влажность или нет.

In [None]:
# Подключаем google диск
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
path = '/content/drive/MyDrive/data/daily_weather.csv' # путь к данным на диске

In [1]:
! pip install pyspark
! pip install pyarrow

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=2e901d3a75a952f84c34dbac52e71a80ca680e843930e463a7289034784d3af2
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" # без этой строчки постоянно будет возникать предупреждение с просьбой установить эту переменную в значение 1

Точкой входа в Spark-приложение для создания DataFrame является SparkSession, в котором определяются параметры конфигурации: название приложения, кластерный менеджер (т.е. каким образом подключиться — локально, к Kubernates или YARN и т.д.), количество выделяемых ядер и памяти. Пример инициализации может выглядеть так:
```
spark = SparkSession \
    .builder \
    .config("spark.ui.enabled", "true") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "6g") \
    .config("spark.executor.memoryOverhead", "200m") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()
```



### Создание сессии

In [3]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark

In [4]:
# Загрузка данных в Spark DataFrame
df = spark.read.csv('daily_weather.csv', header=True, inferSchema=True)
# аргумент inferSchema указывает на вывод типа данных

Для понимания данных полезно знать кого типа колонки присутствуют в наборе данных. Чаще всего для вывода схемы DataFrame используется метод `printSchema`:

In [5]:
df.printSchema()

root
 |-- number: integer (nullable = true)
 |-- air_pressure_9am: double (nullable = true)
 |-- air_temp_9am: double (nullable = true)
 |-- avg_wind_direction_9am: double (nullable = true)
 |-- avg_wind_speed_9am: double (nullable = true)
 |-- max_wind_direction_9am: double (nullable = true)
 |-- max_wind_speed_9am: double (nullable = true)
 |-- rain_accumulation_9am: double (nullable = true)
 |-- rain_duration_9am: double (nullable = true)
 |-- relative_humidity_9am: double (nullable = true)
 |-- relative_humidity_3pm: double (nullable = true)



In [6]:
# Сводная статистика
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
number,1095,547.0,316.24357700987383,0,1094
air_pressure_9am,1092,918.8825513138094,3.184161180386833,907.9900000000024,929.3200000000012
air_temp_9am,1090,64.93300141287072,11.175514003175877,36.752000000000685,98.90599999999992
avg_wind_direction_9am,1091,142.2355107005759,69.13785928889189,15.500000000000046,343.4
avg_wind_speed_9am,1092,5.50828424225493,4.5528134655317185,0.69345139999974,23.554978199999763
max_wind_direction_9am,1092,148.95351796516923,67.23801294602953,28.89999999999991,312.19999999999993
max_wind_speed_9am,1091,7.019513529175272,5.598209170780958,1.1855782000000479,29.84077959999996
rain_accumulation_9am,1089,0.20307895225211126,1.5939521253574893,0.0,24.01999999999907
rain_duration_9am,1092,294.1080522756142,1598.0787786601481,0.0,17704.0


In [7]:
# определяем столбцы, которые далее будут использоваться в классификаторе
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

### Удаление неиспользуемых и отсутствующих данных
Столбец number не несет полезной информации, поэтому его можно удалить из датасета. Для удаления колонок из DataFrame используется метод drop, аргументами которого является одно или несколько названий колонок.
Также удалим все строки с отсутсвующими значениями.

In [8]:
df = df.drop('number')

In [9]:
df = df.na.drop()

In [10]:
# проверим кол-во строк и столбцов в датафрейме
df.count(), len(df.columns)

(1064, 10)

### Создание категориальной переменной
Чтобы указать, является ли влажность низкой, создадим категориальную переменную. Пусть если значение меньше 25%, то категориальной переменной присваивается значение 0, в противном случае категориальная переменная принимает значение 1. Категориальную переменную можно создат в виде столбца  DataFrame с помощью `Binarizer`

In [11]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

Аргумент *threshold* задает пороговое значение для переменной, *inputCol* - входной столбец для чтения, *outputCol* - имя нового категориального столбца. Вторая строка применяет двоичный анализатор и создает новый DataFrame с категориальным столбцом.<br/>
Выведем первые четыре значения нового DataFrame:

In [12]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



### Проверка сбалансированности данных
Подсчитаем количество записей в каждом классе, чтобы проверить набор данных на сбалансированность.

In [13]:
binarizedDF.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  535|
|  1.0|  529|
+-----+-----+



### Агрегация признаков
Объединим признаки, которые будут использоваться для составления прогноза, в один вектор, необходимый для обучения моделей в PySpark MLib. Аргумент *inputCols* задает список имен столбцов, определенных ранее, а *outputCol* - имя нового столбца. Вторая строка создает новый DataFrame с агрегированными объектами в столбце.

In [14]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

### Моделирование
Для построения моделей Spark ML предлагает следующие группы алгоритмов:
* [Классификация и регрессия](https://spark.apache.org/docs/latest/ml-classification-regression.html)
* [Кластеризация](https://spark.apache.org/docs/latest/ml-clustering.html)
* [Коллаборативная фильтрация](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html)
* [Поиск часто встречающихся шаблонов](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html)





### Разделение данных на обучающую и тестовую выборки
Перед тем, как перейти к построению модели, необходимо разбить набор данных на обучающую и тестовую выборки. Для этого в Spark есть стандартный метод `randomSplit()`. Первый аргумент задает количество частей, на которые нужно разделить данные, и приблизительный размер каждой из них. В данном случае определяется два набора из 80% и 20%. Обычно начальное значение можно не указывать, но здесь используем конкретное значение, чтобы получить одно и то же дерево решений при разных запусках.

In [15]:
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 13234 )

Проверим размеры полученных наборов данных.

In [16]:
trainingData.count(), testData.count()

(846, 218)

### Создание и обучение дерева решений
Для решения рассматриваемой задачи создадим дерево решений: аргумент *labelCol* - предсказываемый столбец (таргет), *featuresCol* задает столбец агрегированных объектов (признаки), *predictionCol* – название колонки с результатом, *maxDepth* - критерий остановки для индукции дерева на основе максимальной глубины дерева, *minInstancesPerNode* - критерий остановки для индукции дерева на основе минимального количества выборок в узле, *impurity* - мера энтропии, используемая для разделения узлов.

In [17]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", predictionCol="prediction", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

Обучение дерева решений путем выполнения его в Pipeline:

In [18]:
pipeline = Pipeline(stages=[dt]) # stages определяет последовательность действий в контейнере
model = pipeline.fit(trainingData)

Выполним прогноз, используя набор тестовых данных:

In [19]:
predictions = model.transform(testData)

По первым десяти строкам в прогнозе видно, что предсказание соответствует входным данным:

In [20]:
predictions.select("prediction", "label").show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



### Сохранение предсказания в формате CSV
Cохраним прогнозы в CSV-файл (только столбцы *prediction* и *labe*l).

In [22]:
predictions.select("prediction", "label").write.save(path="predictions.csv",
                                                     format="com.databricks.spark.csv",
                                                     header='true')

### Оценка качества модели

In [23]:
# Оценка качества модели
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

`MulticlassClassificationEvaluator` позволяет определять различные метрики
модели. Для этого создадим экземпляр MulticlassClassificationEvaluator.
Первые два аргумента указывают имена столбцов меток и прогнозов, а третий аргумент указывает, что нам нужна общая точность.

In [24]:
evaluator = MulticlassClassificationEvaluator(). \
    setLabelCol("label"). \
    setPredictionCol("prediction"). \
    setMetricName("accuracy")

Можно вычислить accuracy, вызвав функцию evaluate():

In [25]:
evaluator.evaluate(predictions)

0.7844036697247706

### Отображение матрицы путаницы
Полезным способом оценки модели является матрица ошибок:
* True Positive (TP) – label is positive and prediction is also positive
* True Negative (TN) – label is negative and prediction is also negative
* False Positive (FP) – label is negative but prediction is positive
* False Negative (FN) – label is positive but prediction is negative

В Spark ML нет методов, вычисляющих матрицу ошибок непосредственно, но её легко вычислить непосредственно:

In [26]:
tp = predictions[(predictions.label == 1) & (predictions.prediction == 1)].count()
tn = predictions[(predictions.label == 0) & (predictions.prediction == 0)].count()
fp = predictions[(predictions.label == 0) & (predictions.prediction == 1)].count()
fn = predictions[(predictions.label == 1) & (predictions.prediction == 0)].count()

print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("Total", predictions.count())

True Positives: 84
True Negatives: 87
False Positives: 19
False Negatives: 28
Total 218


In [27]:
r = float(tp)/(tp + fn)
print("recall", r)

p = float(tp)/(tp + fp)
print("precision", p)

recall 0.75
precision 0.8155339805825242


In [28]:
print(f"Confusion Matrix:\n{tp:>4}\t{fp:>4}\n{fn:>4}\t{tn:>4}")

Confusion Matrix:
  84	  19
  28	  87


Желательно, чтобы значения на главной диагонали матрицы были большими, а на побочной – маленькими.

### Оптимизация деревьев решений
Повысить качество модели можно, подобрав гиперпараметры.

#### Гиперпараметры дерева решений
С полным списоком гиперпараметров можно ознакомиться [здесь](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html).<br/>
Рассмотрим наиболее важные гиперпараметры:
* Максимальная глубина (**maxDepth**) — это максимальное количество связанных решений, которые классификатор примет для классификации примера (используется для избежания переобучения).
* Мера примеси (**impurity**) — хорошие правила делят целевые значения обучающих данных на относительно однородные или «чистые» подмножества. Выбор наилучшего правила означает минимизацию нечистоты двух подмножеств, которые оно вызывает. В основном используются две меры примеси: gini и entropy.
* Минимальный информационный прирост (**minInfoGain**) — это гиперпараметр, который определяет минимальный информационный прирост или уменьшение примесей для правил принятия решений‑кандидатов.





### Реализация
Для начала необходимо создать стандартный контейнер аналогично рассмотренному выше.

In [29]:
# Аналогично предыдущему разделу создаём pipeline
pipeline_with_optimization = pipeline

Затем в `ParamGridBuilder` определяем возможные варианты интересующих гиперпараметров:

In [30]:
# Определяю гиперпараметры
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_with_optimization = ParamGridBuilder(). \
    addGrid(dt.impurity, ["gini", "entropy"]). \
    addGrid(dt.maxDepth, [5, 8, 10]). \
    addGrid(dt.maxBins, [20, 30]). \
    addGrid(dt.minInfoGain, [0.0]). \
    build()

# impurity - Примесь
# maxDepth - Максимальная глубина дерева
# maxBins - Максимальное количество бинов (развилок) в дереве
# minInfoGain- Минимальный прирост информации

Далее передаем в `TrainValidationSplit` правила построения модели (pipeline_with_optimization), метрику сравнения моделей (multiclassEval), возможные варианты гиперпараметров (paramGrid_with_optimization) и соотношение, на которое разобьётся train (trainRatio), т.е. dataset train во время обучения поделится на две выборки: на одной модели будут обучаться, а с помощью второй модели будут сравниваться между собой.

In [49]:
# Построение модели для поиска оптимального варианта
from pyspark.ml.tuning import TrainValidationSplit

# estimator - контейнер с логикой построения модели
# evaluator - метрика для сравнения моделей
# estimatorParamMaps - варьируемые гиперпараметры
# trainRatio - соотношением для разбивки выборки train во время обучения

validator = TrainValidationSplit(seed=1234, estimator=pipeline_with_optimization, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid_with_optimization, trainRatio=0.9)
model_with_optimization = validator.fit(trainingData)

Можно посмотреть результаты работы каждой модели, а также их гиперпараметры.

In [50]:
metrics = model_with_optimization.validationMetrics #оценки
params = model_with_optimization.getEstimatorParamMaps() #гиперпараметры
metrics_and_params = list(zip(metrics, params))
metrics_and_params.sort(key=lambda x: x[0], reverse=True)

metrics_and_params

[(0.7681197412063686,
  {Param(parent='DecisionTreeClassifier_e111fb162418', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
   Param(parent='DecisionTreeClassifier_e111fb162418', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_e111fb162418', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 30,
   Param(parent='DecisionTreeClassifier_e111fb162418', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0}),
 (0.7681197412063686,
  {Param(parent='DecisionTreeClassifier_e111fb162418', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gi

С помощью метода `bestModel` «вытаскиваем» лучшую модель из набора.

In [51]:
# Результат лучшей модели на тестовой выборке
predictions = model_with_optimization.transform(testData)

predictions.select("features", "label", "prediction").show()


+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[911.270000000013...|  1.0|       1.0|
|[912.300000000012...|  1.0|       0.0|
|[912.410000000007...|  1.0|       0.0|
|[912.600000000005...|  1.0|       1.0|
|[912.790000000002...|  1.0|       1.0|
|[912.800000000002...|  1.0|       1.0|
|[913.200000000008...|  1.0|       1.0|
|[913.260000000007...|  1.0|       1.0|
|[913.320000000003...|  0.0|       0.0|
|[913.490000000008...|  1.0|       1.0|
|[913.520000000012...|  1.0|       1.0|
|[914.060000000007...|  1.0|       1.0|
|[914.070000000001...|  1.0|       1.0|
|[914.154388930716...|  0.0|       0.0|
|[914.300000000007...|  1.0|       1.0|
|[914.360000000011...|  1.0|       1.0|
|[914.400000000012...|  1.0|       1.0|
|[914.460000000007...|  1.0|       0.0|
|[914.560000000013...|  1.0|       1.0|
|[914.600000000007...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 20 rows



In [52]:
evaluator.evaluate(predictions.select("features", "label", "prediction"))

0.7706422018348624

### Случайные леса решений
Точность прогноза можно ьакже повысить, используя не одно дерево, а много деревьев, каждое из которых дает разумные, но разные и независимые оценки правильного целевого значения. Их коллективный средний прогноз должен быть близок к истинному ответу больше, чем прогноз любого отдельного дерева. Это есть алгоритм *случайного леса*.

Предсказание случайного леса — это просто средневзвешенное значение предсказаний отдельных деревьев. Для категориальной цели это может быть большинство голосов или наиболее вероятное значение, основанное на среднем значении вероятностей, полученных деревьями. Случайные леса, как и деревья решений, также поддерживают регрессию, и прогноз леса в этом случае представляет собой среднее число, предсказанное каждым деревом.

Хотя случайные леса являются более мощным и сложным методом классификации, разработка модели практически ничем не отличается от разработки дерева.

Сначала заменим в stages модель дерева на модель случайного леса:


In [53]:
from pyspark.ml.classification import RandomForestClassifier

# Поменяем модель с дерева на лес
classifier_forest = RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setPredictionCol("prediction")

pipeline_forest = Pipeline(stages=[classifier_forest])
model = pipeline_forest.fit(trainingData)

Добавим еще один гиперпараметр: numTrees – количество деревьев в лесе.

In [54]:
paramGrid_forest = ParamGridBuilder(). \
    addGrid(classifier_forest.impurity, ["gini"]). \
    addGrid(classifier_forest.maxDepth, [5]). \
    addGrid(classifier_forest.maxBins, [20]). \
    addGrid(classifier_forest.minInfoGain, [0.0]). \
    addGrid(classifier_forest.numTrees, [2, 4, 6]). \
    build()

Аналогично предыдущему разделу тренируем модели и извлекаю наилучшую:

In [55]:
# Построение модели для поиска оптимального варианта
validator = TrainValidationSplit(seed=1234, estimator=pipeline_forest, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid_forest, trainRatio=0.8)
model_forest = validator.fit(trainingData)

# Извлекаем RandomForestClassifier() из PipelineModel
predictions = model_forest.transform(testData)

predictions.select("features", "label", "prediction").show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[911.270000000013...|  1.0|       1.0|
|[912.300000000012...|  1.0|       0.0|
|[912.410000000007...|  1.0|       1.0|
|[912.600000000005...|  1.0|       1.0|
|[912.790000000002...|  1.0|       1.0|
|[912.800000000002...|  1.0|       1.0|
|[913.200000000008...|  1.0|       1.0|
|[913.260000000007...|  1.0|       1.0|
|[913.320000000003...|  0.0|       0.0|
|[913.490000000008...|  1.0|       1.0|
|[913.520000000012...|  1.0|       1.0|
|[914.060000000007...|  1.0|       1.0|
|[914.070000000001...|  1.0|       1.0|
|[914.154388930716...|  0.0|       0.0|
|[914.300000000007...|  1.0|       1.0|
|[914.360000000011...|  1.0|       1.0|
|[914.400000000012...|  1.0|       0.0|
|[914.460000000007...|  1.0|       0.0|
|[914.560000000013...|  1.0|       1.0|
|[914.600000000007...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 20 rows



In [56]:
evaluator.evaluate(predictions.select("features", "label", "prediction"))

0.7889908256880734

## Переход от string к integer



### 1. Использование udf функции
Использование таких функций позволяет работать с sparkDataframe, как с pandasDataframe, но без использования `toPandas()`, что позволяет сэкономить время.

```
# Перевод двух столбцов (month и y) из типа string в тип integer

# Создаем udf функции с правилами перевода
"""Transforms yes/no to digit 1/0"""
@F.pandas_udf(T.IntegerType())
def y_to_digit(y: pd.Series) -> pd.Series:
    return (y == 'yes')

"""Transforms months to digit"""
@F.pandas_udf(T.IntegerType())
def month_to_digit(month: pd.Series) -> pd.Series:
    months = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}
    return month.map(months)

# Трансформируем столбцы по правилам перевода
data_after_udf = data_raw.withColumn('y', y_to_digit(F.col('y'))).withColumn('month', month_to_digit(F.col('month')))

```

### 2. OneHotEncoder или преобразование в бинарные вектора
```
# Преобразование категориальных колонок в бинарные вектора
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index', handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol = stringIndexer.getOutputCol(), outputCol = categoricalCol + "classVec")
    stages += [stringIndexer, encoder]
```



# Gradient-Boosted Trees

s

In [38]:
from pyspark.ml.classification import GBTClassifier

gbt_classifier = GBTClassifier().setLabelCol("label").setFeaturesCol("features").setPredictionCol("prediction")

pipeline_gbt = Pipeline(stages=[gbt_classifier])
#model = pipeline_gbt.fit(trainingData)

In [39]:
paramGrid_GBT = ParamGridBuilder(). \
    addGrid(gbt_classifier.impurity, ["variance"]). \
    addGrid(gbt_classifier.maxDepth, [5, 15, 25]). \
    addGrid(gbt_classifier.maxBins, [20, 30]). \
    addGrid(gbt_classifier.minInfoGain, [0.0]). \
    addGrid(gbt_classifier.lossType , ["logistic"]). \
    addGrid(gbt_classifier.maxIter , [10, 25, 50]). \
    addGrid(gbt_classifier.featureSubsetStrategy  , ["auto", "all"]). \
    build()

In [45]:
# Построение модели для поиска оптимального варианта
validator = TrainValidationSplit(seed=1234, estimator=pipeline_gbt, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid_GBT, trainRatio=0.8)
model_gbt = validator.fit(trainingData)

# Извлекаем RandomForestClassifier() из PipelineModel
predictions = model_gbt.transform(testData)

predictions.select("features", "label", "prediction").show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[911.270000000013...|  1.0|       1.0|
|[912.300000000012...|  1.0|       1.0|
|[912.410000000007...|  1.0|       1.0|
|[912.600000000005...|  1.0|       1.0|
|[912.790000000002...|  1.0|       1.0|
|[912.800000000002...|  1.0|       1.0|
|[913.200000000008...|  1.0|       1.0|
|[913.260000000007...|  1.0|       1.0|
|[913.320000000003...|  0.0|       0.0|
|[913.490000000008...|  1.0|       1.0|
|[913.520000000012...|  1.0|       1.0|
|[914.060000000007...|  1.0|       1.0|
|[914.070000000001...|  1.0|       1.0|
|[914.154388930716...|  0.0|       0.0|
|[914.300000000007...|  1.0|       1.0|
|[914.360000000011...|  1.0|       1.0|
|[914.400000000012...|  1.0|       0.0|
|[914.460000000007...|  1.0|       0.0|
|[914.560000000013...|  1.0|       1.0|
|[914.600000000007...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 20 rows



In [48]:
evaluator.evaluate(predictions.select("features", "label", "prediction"))

0.8302752293577982