<div style="font-size:18pt; padding-top:20px; text-align:center">СЕМИНАР 4. <b>Комбинация решающих деревьев и </b> <span style="font-weight:bold; color:green">Spark MLlib</span></div><hr>
<div style="text-align:right;">Папулин С.Ю. <span style="font-style: italic;font-weight: bold;">(papulin_hse@mail.ru)</span></div>

<a name="0"></a>
<div><span style="font-size:14pt; font-weight:bold">Содержание</span>
    <ol>
        <li><a href="#1">Подключение библиотек и создание Spark контекста</a></li>
        <li><a href="#2">Загрузка исходных данных</a></li>
        <li><a href="#3">Анализ исходных данных</a></li>
        <li><a href="#4">Преобразование категориальных признаков в числовые</a>
            <ol style = "list-style-type:lower-alpha">
                <li><a href="#4a">Заполнение ячеек с неопределенными значениями</a></li>
                <li><a href="#4b">Преобразование в натуральные числа</a></li>
                <li><a href="#4c">Преобразование в матрицу дискретных значений</a></li>
                <li><a href="#4d">Формирование вектора признаков</a></li>
            </ol>
        </li>
        <li><a href="#5">Решающие деревья и выбор модели</a>
            <ol style = "list-style-type:lower-alpha">
                <li><a href="#5a">Формирование обучающего и тестового подмножеств</a></li>
                <li><a href="#5b">Вычисление базовой отметки</a></li>
                <li><a href="#5c">Random Forest и выбор модели</a></li>
                <li><a href="#5d">Gradient-boosted tree и выбор модели</a></li>
            </ol>
        </li>
        <li><a href="#6">Завершение работы</a></li>
        <li><a href="#7">Источники</a></li>
    </ol>
</div>

<p>Подключение стилей оформления</p>

In [1]:
%%html
<link href="css/style.css" rel="stylesheet" type="text/css">

<a name="1"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">1. Подключение библиотек и создание Spark контекста</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

In [None]:
import spark_context

In [None]:
spark_context.setup_pyspark_env()

In [None]:
import pyspark

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SQLContext, Row
import matplotlib.pyplot as plt

In [None]:
sc = spark_context.get_spark_context(pyspark, appName="tree", parallelism=30)

In [None]:
sqlContext = SQLContext(sc)

<a name="2"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">2. Загрузка исходных данных</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

<p>Путь к данными</p>

In [None]:
# Для Databricks
# train_path = "/FileStore/tables/z7yasv9p1492968987236/train.csv"

# Для Azure кластера
# train_path = "file:///home/ubuntu/notebooks/Class_4/data/train.csv"
train_path = "hdfs:///data/tree/train.csv"

<div class="msg-block msg-warning">
  <p class="msg-text-warn">При использовании <span class="code-font">file:///</span> метод <span class="code-font">sqlContext.read.load()</span> может выдать ошибку</p>
</div>

In [None]:
df_purchase = sqlContext.read.load(train_path, 
                          format="com.databricks.spark.csv", 
                          header="true", 
                          inferSchema="true", sep=",")
df_purchase.show(5)

In [None]:
#df_purchase.rdd.glom().collect()

In [None]:
df_purchase.printSchema()

<a name="3"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">3. Анализ исходных данных</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

<p>Количество элементов в выборке</p>

In [None]:
sample_size = df_purchase.count()
sample_size

<p>Количество уникальных значений по столбцам</p>

In [None]:
from pyspark.sql import functions as F

In [None]:
exprs = [F.countDistinct(clmn).alias(clmn) for clmn in df_purchase.columns]
exprs

In [None]:
df_purchase.agg(*exprs).show()

<p>Уникальные значения по столбцам</p>

In [None]:
for clmn in df_purchase.columns:
    df_purchase[[clmn]].distinct().show(30)

<p>Распределение по категориям</p>

In [None]:
df_gender_local = df_purchase.groupBy("Gender").count().toPandas()
df_gender_local.head()

In [None]:
df_gender_local["share"] = df_gender_local["count"].apply(lambda x: x / float(sample_size))
df_gender_local.head()

In [None]:
df_age_local = df_purchase.groupBy(["Age"]).count().toPandas()
df_age_local["share"] = df_age_local["count"].apply(lambda x: x / float(sample_size))
df_age_local.head(20)

In [None]:
fig = plt.figure("2")

width=0.25
plt.bar(df_age_local.index-width/2.0, df_age_local["share"], width)
plt.xticks(df_age_local.index, df_age_local["Age"])
plt.grid(True)

plt.show()

<p>По нескольким столбцам</p>

In [None]:
df_gender_age_local = df_purchase.groupBy(["Gender","Age"]).count().toPandas()
df_gender_age_local["share"] = df_gender_age_local["count"].apply(lambda x: x / float(sample_size))
df_gender_age_local.head(20)

<a name="4"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">4. Преобразование категориальных признаков в числовые</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

<a name="4a"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            a. Заполнение ячеек с неопределенными значениями
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#4">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#4b">Далее</a>
            </div>
        </div>
    </div>
</div>

In [None]:
df_purchase_na = df_purchase.na.fill({"Product_Category_2": 0, "Product_Category_3": 0})
df_purchase_na.show(5)

<a name="4b"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            b. Преобразование в натуральные числа
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#4a">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#4c">Далее</a>
            </div>
        </div>
    </div>
</div>

<p><i><b>Способ 1.</b> С использованием UDF</i></p>

In [None]:
from pyspark.sql.types import FloatType, ArrayType

In [None]:
def my_funct(x):
    if x == "F":
        return 1.0
    return 0.0
  
convert_gender = F.udf(lambda x: my_funct(x), FloatType())

df_purchase_na_gender = df_purchase_na.select("*", convert_gender(df_purchase_na["Gender"]).alias("GenderIndex"))
df_purchase_na_gender.show(5)

<p><i><b>Способ 2.</b> С использованием StringIndexer</i></p>

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
df_purchase_na_gender = gender_indexer.fit(df_purchase_na).transform(df_purchase_na)
df_purchase_na_gender.show(5)

<p><b>Преобразование для столбцов Gender, Stay_In_Current_City_Years и Age</b></p>

<p>Age имеет вид диапазонов. При преобразовании важно сохранить порядок</p>

<p>Уникальные значения Age</p>

In [None]:
df_age_pn = df_purchase.select("Age").distinct().toPandas()
df_age_pn.head(10)

<p>Создание словаря, в котором каждому диапазону поставлено в соотвестие натуральное число</p>

In [None]:
age_range_list = df_age_pn.sort(columns="Age", ascending=True).to_dict("list")["Age"]
age_index = range(7)

dict_age = dict(zip(age_range_list, age_index))
dict_age

<p>Передача словаря всем executor'ам</p>

In [None]:
dict_age_brcst = sc.broadcast(dict_age)
dict_age_brcst.value

<p>UDF</p>

In [None]:
def convert_gender_funct(x):
    if x == "F":
        return 1.0
    return 0.0

def convert_stay_func(x):
    if x == "4+":
        return 5.0
    return float(x)
  
def convert_age_func(x):
    return float(dict_age_brcst.value[x])


convert_gender = F.udf(lambda x: convert_gender_funct(x), FloatType())
convert_stay = F.udf(lambda x: convert_stay_func(x), FloatType())
convert_age = F.udf(convert_age_func, FloatType())

<p>Преобразование признаков и добавление их в исходный dataframe</p>

In [None]:
df_purchase_converted = df_purchase_na.select("*", 
                                             convert_gender(df_purchase_na["Gender"]).alias("GenderIndex"),
                                             convert_stay(df_purchase_na["Stay_In_Current_City_Years"]).alias("Stay_Index"),
                                             convert_age(df_purchase_na["Age"]).alias("Age_Index"))

df_purchase_converted.persist().select("Age_Index").distinct().show(20)

<a name="4c"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            c. Преобразование в матрицу дискретных значений
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#4b">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#4d">Далее</a>
            </div>
        </div>
    </div>
</div>

In [None]:
from pyspark.ml.feature import OneHotEncoder

<p><b>Шаг 1.</b> Преобразование категориального признака в числовой с использованием StringIndexer</p>

In [None]:
city_indexer = StringIndexer(inputCol="City_Category", outputCol="City_Category_Index")
df_purchase_converted_city_indx = city_indexer.fit(df_purchase_converted).transform(df_purchase_converted)

<p><b>Шаг 2.</b> Преобразование полученного посредством StringIndexer числового признака в набор чиловых признаков с использованием OneHotEncoder</p>

In [None]:
city_encoder = OneHotEncoder(inputCol="City_Category_Index", outputCol="City_Category_Cat")
df_purchase_full_converted = city_encoder.transform(df_purchase_converted_city_indx)
df_purchase_full_converted.show()

<a name="4d"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            d. Формирование вектора признаков
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#4b">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#5">Далее</a>
            </div>
        </div>
    </div>
</div>

<p><i><b>Способ 1.</b> С использованием UDF</i></p>

<p>Преобразование столбца с разреженным представления вектора значений от OneHotEncoder в полный вид (dense)</p>

In [None]:
def convert_sparse2dense_funct(x):
    return x.toArray().tolist()

convert_sparse2dense = F.udf(lambda x: convert_sparse2dense_funct(x), ArrayType(FloatType()))

df1 = df_purchase_full_converted.select(convert_sparse2dense(df_purchase_full_converted["City_Category_Cat"]).alias("City_Category_Cat"))
df1.show(5)

<p>Формирование столбца из множества других, объединяя их значения в вектор (массив)</p>

In [None]:
selected_columns = ["GenderIndex", "Age_Index", "Marital_Status", "Stay_Index", 
                    "Product_Category_1", "Product_Category_2", "Product_Category_3"]

df2 = df_purchase_full_converted.select(F.array(selected_columns).alias("IndexArray"), 
                                        convert_sparse2dense(df_purchase_full_converted["City_Category_Cat"]).alias("City_Category_Cat"), 
                                        df_purchase_full_converted["Purchase"])
df2.show(5)

<p>Объединение столбцов</p>

In [None]:
def union_funct(x):
    return x[0] + x[1]

union = F.udf(lambda x: union_funct(x), ArrayType(FloatType()))

df_features = df2.select(union(F.array(df2["IndexArray"], df2["City_Category_Cat"])).alias("Features"), 
                         df2["Purchase"]).persist()
df_features.show(5)

<p><i><b>Способ 2.</b> С использованием VectorAssembler</i></p>

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
feature_assembler = VectorAssembler(inputCols=selected_columns+["City_Category_Cat"],
                                    outputCol="Features")

df_features_all = feature_assembler.transform(df_purchase_full_converted)
df_features_all.show(5)

In [None]:
df_features = df_features_all.select("Features", "Purchase")
df_features.show(5)

<a name="5"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">5. Решающие деревья и выбор модели</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

<a name="5a"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            a. Формирование обучающего и тестового подмножеств
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#5">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#5b">Далее</a>
            </div>
        </div>
    </div>
</div>

<p>Разделение данных на обучающее и тестовое подмножества</p>

In [None]:
df_train, df_test = df_features.randomSplit([0.8, 0.2], seed=12)
df_train.persist(); df_test.repartition(10).persist()

<a name="5b"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            b. Вычисление базовой отметки
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#5a">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#5c">Далее</a>
            </div>
        </div>
    </div>
</div>

<p>Вычисление среднего значения</p>

In [None]:
df_train_mean = df_train.select(F.mean("Purchase").alias("avg_purchase"))
df_train_mean.show()

In [None]:
train_purchase_mean = df_train_mean.collect()[0]["avg_purchase"]
train_purchase_mean

<p>Передача среднего значения всем executor'ам</p>

In [None]:
# TODO: broadcast ?

In [None]:
mean_pur_br = sc.broadcast(train_purchase_mean)
mean_pur_br.value

<p>Создание столбца со средним значением</p>

In [None]:
df_test_pred_bl = df_test.withColumn("Prediction", F.lit(mean_pur_br.value))
df_test_pred_bl.select("Prediction").show(5)

<p>Проверка на тестовом подмножестве</p>

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="Purchase", predictionCol="Prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="Purchase", predictionCol="Prediction")

In [None]:
eval_rmse.evaluate(df_test_pred_bl)

In [None]:
eval_r2.evaluate(df_test_pred_bl)

<a name="5c"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            c. Random Forest и выбор модели
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#5b">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#5d">Далее</a>
            </div>
        </div>
    </div>
</div>

In [None]:
from pyspark.ml.regression import RandomForestRegressor

<p><b>Создание модели, обучение и тестирование</b></p>

<p>Создание модели</p>

In [None]:
rf = RandomForestRegressor(featuresCol="Features", labelCol="Purchase",  predictionCol="Prediction", numTrees=10, maxDepth=10, seed=12)

<p>Обучение</p>

In [None]:
rf_model = rf.fit(df_train)
rf_model

<p>Отображение значений важности признаков</p>

In [None]:
rf_model.featureImportances

<p>Предсказание для тестового подмножества</p>

In [None]:
df_test_pred_rf = rf_model.transform(df_test)
df_test_pred_rf.show(5)

<p>Тестирование</p>

In [None]:
eval_rmse.evaluate(df_test_pred_rf)

In [None]:
eval_r2.evaluate(df_test_pred_rf)

<p><b>Выбор модели</b></p>

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

<p>Создание базовой модели</p>

In [None]:
rf = RandomForestRegressor(featuresCol="Features", labelCol="Purchase",  predictionCol="Prediction", seed=12)

<p>Формирование сетки параметров для моделей</p>

In [None]:
grid = ParamGridBuilder().addGrid(rf.numTrees, [10, 30]) \
                         .addGrid(rf.maxDepth, [10, 20]) \
                         .build()

<p>Конфигурирования исходных данных для выбора модели с использованием кросс-валидации с k-folds</p>

In [None]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=eval_rmse, numFolds=4)

<p>Запуск процесса выбора модели по заданной сетке параметров</p>

In [None]:
m_cv = cv.fit(df_train)
m_cv

<p>Отображение значений ошибок для всех моделей</p>

In [None]:
m_cv.avgMetrics

<p>Доступ к лучшей модели</p>

In [None]:
bestModel = m_cv.bestModel

<p>Тестирование лучшей модели (без повторного обучения на df_train)</p>

In [None]:
df_test_pred_rf = bestModel.transform(df_test)
eval_rmse.evaluate(df_test_pred_rf)

<a name="5d"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            d. Gradient-boosted tree и выбор модели
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#5c">Назад</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#6">Далее</a>
            </div>
        </div>
    </div>
</div>

In [None]:
from pyspark.ml.regression import GBTRegressor

<p>Создание модели</p>

In [None]:
gbt = GBTRegressor(featuresCol="Features", labelCol="Purchase",  predictionCol="Prediction", 
                   maxDepth=10, maxIter=20, stepSize=0.00001, minInstancesPerNode=10)

<p>Обучение</p>

In [None]:
gbt_model = gbt.fit(df_train)
gbt_model

<p>Отображение значений важности признаков</p>

In [None]:
gbt_model.featureImportances

<p>Предсказание для тестового подмножества</p>

In [None]:
df_test_pred_gbt = gbt_model.transform(df_test)
df_test_pred_gbt.show(5)

<p>Тестирование</p>

In [None]:
eval_rmse.evaluate(df_test_pred_gbt)

In [None]:
eval_r2.evaluate(df_test_pred_gbt)

<p><b>Выбор модели</b></p>

<p>Создание базовой модели</p>

In [None]:
gbt = GBTRegressor(featuresCol="Features", labelCol="Purchase",  predictionCol="Prediction",
                  checkpointInterval=10, stepSize=0)

<p>Формирование сетки параметров для моделей</p>

In [None]:
grid = ParamGridBuilder().addGrid(gbt.maxIter, [15, 20, 30]) \
                         .addGrid(gbt.maxDepth, [5]) \
                         .build()

<p>Конфигурирования исходных данных для выбора модели с использованием кросс-валидации с k-folds</p>

In [None]:
cv = CrossValidator(estimator=gbt, estimatorParamMaps=grid, evaluator=eval_rmse, numFolds=4)

<p>Запуск процесса выбора модели по заданной сетке параметров</p>

In [None]:
m_cv = cv.fit(df_train)
m_cv

<p>Отображение значений ошибок для всех моделей</p>

In [None]:
m_cv.avgMetrics

<p>Доступ к лучшей модели</p>

In [None]:
bestModel = m_cv.bestModel

<p>Тестирование лучшей модели (без повторного обучения на df_train)</p>

In [None]:
df_test_pred_rf = bestModel.transform(df_test)
eval_rmse.evaluate(df_test_pred_rf)

<a name="6"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">6. Завершение работы</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

<p>Завершение Spark контекста</p>

In [None]:
sc.stop()

<a name="7"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">7. Источники</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">К содержанию</a></div>
    </div>
</div>

In [None]:
https://datahack.analyticsvidhya.com/contest/black-friday/
https://www.analyticsvidhya.com/blog/2016/05/h2o-data-table-build-models-large-data-sets/