## Конфигурация

Сначала зададим параметры приложения.

Ниже примеры конфигурации для AWS EMR и для локального использования.

Конфигурация для AWS EMR.

1. Добавим пакет `catboost-spark` с нужными версиями Spark, Scala и версией самого пакета.
2. `deployMode` - `cluster`, потому что на driverе тоже делается нетривиальная работа, требующая ресурсы.
3. Число vCpu на машинах `executor` ов = `spark.executor.cores` = `spark.tasks.cpus`, чтобы workerы CatBoost не мешали друг другу в одном процессе
4. Нужно добавить не-JVM памяти (* .memoryOverhead) для нативного кода CatBoost.

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "ai.catboost:catboost-spark_3.0_2.12:0.25",
        "spark.submit.deployMode": "cluster",
        "spark.driver.cores": 2,
        "spark.driver.memory": "2g",
        "spark.driver.memoryOverhead": "2g",
        "spark.executor.memory": "2g",
        "spark.executor.memoryOverhead": "2g",
        "spark.executor.cores": 2,
        "spark.tasks.cpus": 2
    }
}

Конфигурация для локального кластера.

1. Добавим пакет `catboost-spark` с нужными версиями Spark, Scala и версией самого пакета.
2. Число vCpu на машинах `executor` ов = `spark.executor.cores` = `spark.tasks.cpus`, чтобы workerы CatBoost не мешали друг другу в одном процессе

In [3]:
%env PYSPARK_PYTHON=python3.6
%env PYSPARK_DRIVER_PYTHON=python3.6

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.jars.packages", "ai.catboost:catboost-spark_3.0_2.12:0.25")
    .config("spark.executor.cores", "2")
    .config("spark.task.cpus", "2")     
    .appName("CatBoost_Spark_3")
    .getOrCreate()
)

env: PYSPARK_PYTHON=python3.6
env: PYSPARK_DRIVER_PYTHON=python3.6


ModuleNotFoundError: No module named 'pyspark'

Проверим SparkSession объект

In [4]:
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f04336c2150>

Импортируем `tempfile` - пригодится для создания временных директорий при обучении

In [5]:
import tempfile

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Импорты нужных объектов из PySpark

In [6]:
from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.sql import Row
from pyspark.sql.types import *

from pyspark.ml.linalg import *
from pyspark.ml.param import *
from pyspark.sql import *
from pyspark.sql.types import *


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Импортируем `catboost_spark`

In [7]:
import catboost_spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Задание схемы данных


Функция `createSchema` поможет создать нужную схему с метаданными для `DataFrames`, которые будет использовать CatBoost, в частности позволяет задать имена признаков и специфицировать категориальные признаки (если есть)

Метаданные задаются словарем, передаваемым в параметр metadata конструктора `StructField`.
Для численных признаков заполняем поле `numeric`, для категориальных - `nominal`.
Для категориальных можно либо просто задать число уникальных значений (`num_vals`) либо еще указать их строковые представления (`vals`) (если они были получены из исходно строковых данных)


In [8]:
def createSchema(
    schemaDesc, #Seq[(String,DataType)],
    featureNames, #Seq[String],
    addFeatureNamesMetadata = True, # Boolean = true,
    nullableFields = [], # Seq[String] = Seq(),
    catFeaturesNumValues = {}, # Map[String,Int] = Map[String,Int](),
    catFeaturesValues = {} #: Map[String,Seq[String]] = Map[String,Seq[String]]()
): #: Seq[StructField] = {
    result = []
    for name, dataType in schemaDesc:
        if (addFeatureNamesMetadata and (name == "features")):
            numericAttrs = []
            nominalAttrs = []

            for i, fname in enumerate(featureNames):
                if fname in catFeaturesNumValues:
                    nominalAttrs.append({"num_vals": catFeaturesNumValues[fname], "idx": i, "name": fname})
                if fname in catFeaturesValues:
                    nominalAttrs.append({"vals": catFeaturesValues[fname], "idx": i, "name": fname})
                else:
                    numericAttrs.append({"idx": i, "name": fname})

            attrs = {}
            if numericAttrs:
                attrs["numeric"] = numericAttrs
            if nominalAttrs:
                attrs["nominal"] = nominalAttrs

            metadata = {"ml_attr": {"attrs": attrs, "num_attrs": len(featureNames)}}

            result.append(
                StructField(name, dataType, name in nullableFields, metadata)
            )
        else:
            result.append(StructField(name, dataType, name in nullableFields))
    return result


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Простое обучение

Создадим данные программно

In [9]:
featureNames = ["f1", "f2", "f3"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
srcDataSchema = createSchema(
    [
        ("features", VectorUDT()),
        ("label", DoubleType())
    ],
    featureNames,
    addFeatureNamesMetadata=True
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
srcData = [
    Row(Vectors.dense(0.1, 0.2, 0.11), 1.0),
    Row(Vectors.dense(0.97, 0.82, 0.33), 2.0),
    Row(Vectors.dense(0.13, 0.22, 0.23), 2.0),
    Row(Vectors.dense(0.14, 0.18, 0.1), 1.0),
    Row(Vectors.dense(0.9, 0.67, 0.17), 2.0),
    Row(Vectors.dense(0.66, 0.1, 0.31), 1.0)
]


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
df = spark.createDataFrame(spark.sparkContext.parallelize(srcData), StructType(srcDataSchema))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-----+
|        features|label|
+----------------+-----+
|  [0.1,0.2,0.11]|  1.0|
|[0.97,0.82,0.33]|  2.0|
|[0.13,0.22,0.23]|  2.0|
| [0.14,0.18,0.1]|  1.0|
| [0.9,0.67,0.17]|  2.0|
| [0.66,0.1,0.31]|  1.0|
+----------------+-----+

Создадим простой классификатор с 20 итерациями

In [15]:
classifier = (catboost_spark.CatBoostClassifier()
      .setIterations(20)
      .setTrainDir(tempfile.mkdtemp(prefix='test1')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



Обучим его, получим модель

In [16]:
model = classifier.fit(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Модель можно применить на `DataFrame` с помощью функции `transform`. В результате получим `DataFrame` - копию исходного с дополнительными колонками.
По умолчанию для классификатора будут доступны `rawPrediction`, `prediction` и `probability`, но можно выбрать необходимые с помощью методов `setRawPredictionCol`, `setPredictionCol`, `setProbabilityCol`.

In [17]:
predDf = model.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
predDf.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(features=DenseVector([0.1, 0.2, 0.11]), label=1.0, rawPrediction=DenseVector([0.0841, -0.0841]), probability=DenseVector([0.542, 0.458]), prediction=0.0)

In [15]:
model.setRawPredictionCol("rawPrediction").setProbabilityCol("").setPredictionCol("")
predDf = model.transform(df)
predDf.head()

Row(features=DenseVector([0.1, 0.2, 0.11]), label=1.0, rawPrediction=DenseVector([0.0841, -0.0841]))

Параметры можно также менять у уже готового объекта-Predictor.

In [19]:
classifier.setLossFunction("Logloss").setRsm(0.2)
model = classifier.fit(df)
predDf = model.transform(df)
predDf.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(features=DenseVector([0.1, 0.2, 0.11]), label=1.0, rawPrediction=DenseVector([0.0159, -0.0159]), probability=DenseVector([0.5079, 0.4921]), prediction=0.0)

## Сериализация

Сам объект - `Predictor` (`CatBoostClassifier` или `CatBoostRegressor`) можно сериализовывать.
Это удобно для сохранения настроек обучения.

Сохраним объект на AWS в хранилище S3

In [20]:
classifier.write().overwrite().save("s3a://aws-emr-resources-957875663655-us-east-2/my_classifier")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Для локального режима используем простой путь

In [18]:
classifier.write().overwrite().save("my_classifier")

Загрузим сохраненный объект из S3.

In [21]:
loaded_classifier = catboost_spark.CatBoostClassifier.load("s3a://aws-emr-resources-957875663655-us-east-2/my_classifier")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CatBoostMLReader._java_loader_class.  ai.catboost.spark.CatBoostClassifier

Загрузим локально сохраненный объект

In [19]:
loaded_classifier = catboost_spark.CatBoostClassifier.load("my_classifier")

CatBoostMLReader._java_loader_class.  ai.catboost.spark.CatBoostClassifier


Обучим с помощью загруженного классификатора.

In [47]:
model_from_loaded = loaded_classifier.fit(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Модель тоже, естественно, можно сериализовать. Притом формат совместим с локальным CatBoost. Можно скачать сохраненную модель и использовать в применении уже с локальными библиотеками CatBoost. 

С использованием AWS S3:

In [22]:
model.write().overwrite().save("s3a://aws-emr-resources-957875663655-us-east-2/my_binclass_model")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
loaded_model = catboost_spark.CatBoostClassificationModel.load("s3a://aws-emr-resources-957875663655-us-east-2/my_binclass_model")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CatBoostMLReader._java_loader_class.  ai.catboost.spark.CatBoostClassificationModel

В локальном случае:

In [20]:
model.write().overwrite().save("my_binclass_model")

In [21]:
loaded_model = catboost_spark.CatBoostClassificationModel.load("my_binclass_model")

CatBoostMLReader._java_loader_class.  ai.catboost.spark.CatBoostClassificationModel


Проверим что загруженная модель работает и результат тот же что и был на исходной

In [24]:
predDf = loaded_model.transform(df)
predDf.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(features=DenseVector([0.1, 0.2, 0.11]), label=1.0, rawPrediction=DenseVector([0.0159, -0.0159]), probability=DenseVector([0.5079, 0.4921]), prediction=0.0)

## Тип Pool

CatBoost у удобно иметь дополнительный тип датасета по сравнению и обычным DataFrame.
Он является надстройкой над DataFrame.

Зачем он нужен?
1. Позволяет добавить в датасет дополнительные данные, такие как пары.
2. Позволяет хранить преквантованные данные (об этом ниже).
3. Удобнее для внутренного использования (API), хотя это не так важно пользователям.

Создать Pool из DataFrame очень легко:

In [25]:
trainPool = catboost_spark.Pool(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Исходный DataFrame хранится в атрибуте `data`:

In [26]:
trainPool.data

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[features: vector, label: double]

## Обучение при наличии валидационных датасетов

Теперь рассмотрим обучение при наличии валидационных датасетов. Их может быть несколько, на чаще всего один. На нем можно отслеживать целевые метрики и использовать overfitting detector.

Использование валидационных датасетов пока возможно только когда датасеты имеют тип Pool. Для этого нужно передать их список вторым параметром функции `fit`.


In [30]:
srcEvalData = [
  Row(Vectors.dense(0.0, 0.33, 1.1), 1.0),
  Row(Vectors.dense(0.02, 0.0, 0.38), 2.0),
  Row(Vectors.dense(0.86, 0.54, 0.9), 1.0)
]
evalDf = spark.createDataFrame(spark.sparkContext.parallelize(srcEvalData), StructType(srcDataSchema))
evalPool = catboost_spark.Pool(evalDf)
model = classifier.fit(trainPool, [evalPool])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
model.transform(evalPool.data).head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(features=DenseVector([0.0, 0.33, 1.1]), label=1.0, rawPrediction=DenseVector([0.0061, -0.0061]), probability=DenseVector([0.5031, 0.4969]), prediction=0.0)

Особенностью архитектуры распределенного CatBoost на данный момент является то, что валидационные датасеты не распределяются по workerам кластера, а после квантования хранятся целиком на мастере.

Это надо учитывать и поэтому не стоит использовать большие валидационные датасеты, а также надо увеличивать `spark.driver.memoryOverhead` примерно на `1 * число_признаков * число_объектов_валидационного_датасета`. 1 тут - это потому что одно значение признака в квантованном датасете типично занимает 1 байт.


## Обучение с категориальными признаками

В Spark MLLib значения категориальных признаков стандартно так же лежат в DataFrame в том же массиве признаков типа `ml.linalg.Vector` как и численные и имеют целочисленные значения (хотя хранятся как double, так как `ml.linalg.Vector` может хранить только такие).

Вышеописанная функция `createSchema` позволит нам правильно задать атрибуты в метаданных, так что нужные признаки будут распознаны как категориальные.

Для разнообразия обучим на этот раз регрессионную, а не классификационную модель.

In [32]:
srcDataSchema = createSchema(
    [
      ("features", VectorUDT()),
      ("label", DoubleType()),
      ("weight", FloatType())
    ],
    featureNames=["c1", "c2", "c3"],
    catFeaturesNumValues={"c1": 2, "c2": 4, "c3": 6},
    addFeatureNamesMetadata=True
)

srcData = [
  Row(Vectors.dense(0, 0, 0), 0.34, 1.0),
  Row(Vectors.dense(1, 1, 0), 0.12, 0.12),
  Row(Vectors.dense(0, 2, 1), 0.22, 0.18),
  Row(Vectors.dense(1, 2, 2), 0.01, 1.0),
  Row(Vectors.dense(0, 0, 3), 0.0, 2.0),
  Row(Vectors.dense(0, 0, 4), 0.42, 0.45),
  Row(Vectors.dense(1, 3, 5), 0.1, 1.0)
]

dfWithCatFeatures = spark.createDataFrame(spark.sparkContext.parallelize(srcData), StructType(srcDataSchema))

regressor = catboost_spark.CatBoostRegressor(iterations=100)
model_with_cat_features = regressor.fit(dfWithCatFeatures)
prediction_with_cat_features = model_with_cat_features.transform(dfWithCatFeatures)
prediction_with_cat_features.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-31:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 190, in cell_monitor
    update_job_content(job_info_box, job_data)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/updatewidgets.py", line 38, in update_job_content
    update_job_progress_bar(job_progress_bar_holder, job_data)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/updatewidgets.py", line 48, in update_job_progress_bar
    progress_bar.max = new_max
  File "/mnt/notebook-env

Row(features=DenseVector([0.0, 0.0, 0.0]), label=0.34, weight=1.0, prediction=0.2433638796338237)

Разумеется, возможны комбинации числовых и категориальных признаков в одном датасете.

In [33]:
srcDataSchema = createSchema(
    [
      ("features", VectorUDT()),
      ("label", DoubleType()),
      ("weight", FloatType())
    ],
    featureNames=["f1", "c2", "c3"],
    catFeaturesNumValues={"c2": 4, "c3": 6},
    addFeatureNamesMetadata=True
)

srcData = [
  Row(Vectors.dense(0.12, 0, 0), 0.34, 1.0),
  Row(Vectors.dense(0.1, 1, 0), 0.12, 0.12),
  Row(Vectors.dense(0.0, 2, 1), 0.22, 0.18),
  Row(Vectors.dense(0.33, 2, 2), 0.01, 1.0),
  Row(Vectors.dense(0.8, 0, 3), 0.0, 2.0),
  Row(Vectors.dense(0.0, 0, 4), 0.42, 0.45),
  Row(Vectors.dense(0.01, 3, 5), 0.1, 1.0)
]

dfWithNumAndCatFeatures = spark.createDataFrame(spark.sparkContext.parallelize(srcData), StructType(srcDataSchema))

regressor = catboost_spark.CatBoostRegressor(iterations=100)
model_with_num_and_cat_features = regressor.fit(dfWithNumAndCatFeatures)
prediction_with_num_and_cat_features = model_with_num_and_cat_features.transform(dfWithNumAndCatFeatures)
prediction_with_num_and_cat_features.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(features=DenseVector([0.12, 0.0, 0.0]), label=0.34, weight=1.0, prediction=0.3002270728946602)

Exception in thread cell_monitor-32:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1406

