# Инфраструктура для моделей машинного обучения. Практическая работа

# Цель практической работы

Потренироваться в использовании библиотек PySpark SQL и PySpark ML для предобработки данных и обучения моделей.

# Что входит в практическую работу

1. Инициализация спарк-сессии.
2. Загрузка данных.
3. Ознакомление с данными.
4. Преобразование типов столбцов.
5. Очистка данных.
6. Feature-инжиниринг.
7. Векторизация фичей.
8. Создание и обучение модели.
9. Выбор лучшей модели.
10. Обратная связь.


# Что оценивается

- Пройдены все этапы работы.
- Спарк-сессия успешно запущена.
- Данные прочитаны.
- Все колонки с числовыми значениями преобразованы в числовые типы данных (Int или Double).
- Отфильтрованы все строки с Null-значениями.
- Созданы новые фичи.
- Все категориальные колонки преобразованы в числовой вид, выполнены все этапы векторизации признаков.
- Выборка разделена на обучающую и тестовую.
- Создано три объекта: модель, сетка гиперпараметров и evaluator.
- Создан объект класса CrossValidator и обучен на обучающей выборке.
- Выбрана лучшая модель, посчитана метрика качества лучшей модели.


# Задача

Используя данные о клиентах телекоммуникационной компании, обучите модель, предсказывающую их отток.

Описание данных, с которыми вы будете работать:

* **CustomerID**: ID клиента.
* **Gender**: пол клиента.
* **SeniorCitizen**: пенсионер ли клиент (1 — да, 0 — нет).
* **Partner**: есть у клиента партнёр (жена, муж) или нет (Yes/No).
* **Dependents**: есть ли у клиента инждивенцы, например дети (Yes/No).
* **Tenure**: как много месяцев клиент оставался в компании.
* **PhoneService**: подключена ли у клиента телефонная служба (Yes/No).
* **MultipleLines**: подключено ли несколько телефонных линий (Yes, No, No phone service).
* **InternetService**: интернет-провайдер клиента (DSL, Fiber optic, No).
* **OnlineSecurity**: подключена ли у клиента услуга онлайн-безопасности (Yes, No, No internet service)
* **OnlineBackup**: подключена ли услуга резервного копирования онлайн (Yes, No, No internet service).
* **DeviceProtection**: подключена ли услуга защиты устройства (Yes, No, No internet service)
* **TechSupport**: есть ли у клиента техническая поддержка (Yes, No, No internet service).
* **StreamingTV**: подключена ли услуга потокового телевидения (Yes, No, No internet service).
* **StreamingMovies**: подключена ли услуга стримингового воспроизведения фильмов (Yes, No, No internet service).
* **Contract**: тип контракта клиента (Month-to-month, One year, Two year).
* **PaperlessBilling**: есть ли безбумажный счёт.
* **PaymentMethod**: способ оплаты услуг (Electronic check, Mailed check, Bank transfer (automatic), Credit card (automatic)).
* **MonthlyCharges**: сумма, которая списывается ежемесячно.
* **TotalCharges**: сумма, списанная за всё время.
* **Churn**: ушёл ли клиент (Yes/No). Это целевая переменная, которую нужно предсказать.


# 1. Инициализация спарк-сессии

Инициализируйте спарк-сессию.

Эта ячейка нужна для того, чтобы заргузить необходимые библиотеки и настроить окружение Google Colab для работы со Spark.

Просто запустите её перед выполением задания :)

In [1]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet
!apt install openjdk-8-jdk-headless &> /dev/null

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('./ngrok http 4050 &')


In [2]:
from pyspark.sql import SparkSession
from google.colab import drive

drive.mount("/content/drive")


### Ваш код здесь ###

Mounted at /content/drive


In [116]:
spark = SparkSession.builder.appName("CSV Loading Data")\
.config("spark.executor.memory", "2g")\
.config("spark.driver.memory", "2g")\
.getOrCreate()


# 2. Загрузка данных
Загрузите данные, сохраните их в переменную типа sparkDataframe, используя метод read.csv (не забывайте про header и delimiter).

In [117]:
### Ваш код здесь ###
data = spark.read\
.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.option("delimiter", ",")\
.csv("/content/drive/MyDrive/WA_Fn-UseC_-Telco-Customer-Churn.csv")

# 3. Ознакомление с данными
1. Выведите на экран первые несколько строк датафрейма.


In [118]:
### Ваш код здесь ###
data.show(5, 8)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|  7590-...|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|              No|         No|         No|             No|Month...|             Yes| 


2. Выведите общее количество строк датафрейма.



In [119]:
### Ваш код здесь ###
row_count = data.count()
row_count

7043

3. Выведите структуру (схему) датафрейма.

In [120]:
### Ваш код здесь ###
data.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



# 4. Преобразование типов столбцов
Преобразуйте тип столбцов у числовых признаков (Int — если признак целочисленный, Double — если признак не целочисленный). Сохраните преобразованный датафрейм в новую переменную.

## Совет

Если вам сложно выполнить это задание, изучите дополнительные материалы: [об операторе Cast](https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/), [об операторе Select](https://sparkbyexamples.com/pyspark/select-columns-from-pyspark-dataframe/).



In [121]:
from pyspark.sql.functions import expr, col, sum, avg, max, count, countDistinct, isnan, isnull

### Ваш код здесь ###
data_formatted = data.select(
    col("gender").cast("String"),
    col("SeniorCitizen").cast("Int"),
    col("Partner").cast("String"),
    col("Dependents").cast("String"),
    col("Tenure").cast("Int"),
    col("PhoneService").cast("String"),
    col("MultipleLines").cast("String"),
    col("InternetService").cast("String"),
    col("OnlineSecurity").cast("String"),
    col("OnlineBackup").cast("String"),
    col("DeviceProtection").cast("String"),
    col("TechSupport").cast("String"),
    col("StreamingTV").cast("String"),
    col("StreamingMovies").cast("String"),
    col("Contract").cast("String"),
    col("PaperlessBilling").cast("String"),
    col("PaymentMethod").cast("String"),
    col("MonthlyCharges").cast("Double"),
    col("TotalCharges").cast("Double"),
    col("Churn").cast("String")
)
data_formatted.printSchema()

root
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



In [122]:
data_formatted.show(5, 8)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|              No|         No|         No|             No|Month...|             Yes|     Elect...|         29.85|       29.85|   

In [123]:
data_formatted.count()

7043

In [124]:
data_formatted.groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|Female| 3488|
|  Male| 3555|
+------+-----+



In [125]:
data_formatted.groupBy("SeniorCitizen").count().show()

+-------------+-----+
|SeniorCitizen|count|
+-------------+-----+
|            1| 1142|
|            0| 5901|
+-------------+-----+



In [126]:
data_formatted.groupBy("Partner").count().show()

+-------+-----+
|Partner|count|
+-------+-----+
|     No| 3641|
|    Yes| 3402|
+-------+-----+



In [127]:
data_formatted.groupBy("Dependents").count().show()

+----------+-----+
|Dependents|count|
+----------+-----+
|        No| 4933|
|       Yes| 2110|
+----------+-----+



In [128]:
data_formatted.groupBy("Tenure").count().show()

+------+-----+
|Tenure|count|
+------+-----+
|    31|   65|
|    65|   76|
|    53|   70|
|    34|   65|
|    28|   57|
|    27|   72|
|    26|   79|
|    44|   51|
|    12|  117|
|    22|   90|
|    47|   68|
|     1|  613|
|    52|   80|
|    13|  109|
|    16|   80|
|     6|  110|
|     3|  200|
|    20|   71|
|    40|   64|
|    57|   65|
+------+-----+
only showing top 20 rows



In [129]:
data_formatted.groupBy("TotalCharges").count().show()

+------------+-----+
|TotalCharges|count|
+------------+-----+
|     1874.45|    1|
|     1752.65|    1|
|       244.8|    1|
|      673.25|    1|
|      4326.8|    1|
|       229.6|    1|
|       936.7|    1|
|     5388.15|    1|
|       168.5|    1|
|        49.8|    2|
|        74.5|    2|
|      779.25|    1|
|     3888.65|    1|
|      414.95|    1|
|     5586.45|    1|
|        69.8|    2|
|       405.7|    1|
|      5812.6|    1|
|      8443.7|    1|
|     4863.85|    1|
+------------+-----+
only showing top 20 rows



# 5. Очистка данных
Проверьте, есть ли в какой-либо колонке Null-значения. Для этого можно использовать your_dataframe.filter(col("colname")).isNull()).

Выведите на экран несколько строк с Null-значениями в одной из колонок.

Сохраните очищенный от строк с Null-значениями датафрейм в новую переменную. Для фильтрации этих значений можно использовать метод .isNotNull().

Колонок в датафрейме много, проверять каждую неудобно и долго. Подумайте, как упроситить эту работу, если использовать, например, перебор с циклом for.

[Примеры использования операторов isNull() и isNotNull()](https://sparkbyexamples.com/pyspark/pyspark-isnull/).


In [130]:
### Ваш код здесь ###
cols = ["gender", "SeniorCitizen", "Partner", "Dependents",
        "Tenure", "PhoneService", "MultipleLines", "InternetService",
        "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport",
        "StreamingTV", "StreamingMovies", "Contract", "PaperlessBilling",
        "PaymentMethod", "MonthlyCharges", "TotalCharges", "Churn"]

In [131]:
for column in cols:
  null_count = data_formatted.filter(col(column).isNull()).count()
  print(f"Столбец '{column}': {null_count} пропусков")

Столбец 'gender': 0 пропусков
Столбец 'SeniorCitizen': 0 пропусков
Столбец 'Partner': 0 пропусков
Столбец 'Dependents': 0 пропусков
Столбец 'Tenure': 0 пропусков
Столбец 'PhoneService': 0 пропусков
Столбец 'MultipleLines': 0 пропусков
Столбец 'InternetService': 0 пропусков
Столбец 'OnlineSecurity': 0 пропусков
Столбец 'OnlineBackup': 0 пропусков
Столбец 'DeviceProtection': 0 пропусков
Столбец 'TechSupport': 0 пропусков
Столбец 'StreamingTV': 0 пропусков
Столбец 'StreamingMovies': 0 пропусков
Столбец 'Contract': 0 пропусков
Столбец 'PaperlessBilling': 0 пропусков
Столбец 'PaymentMethod': 0 пропусков
Столбец 'MonthlyCharges': 0 пропусков
Столбец 'TotalCharges': 11 пропусков
Столбец 'Churn': 0 пропусков


In [132]:
data_cleaned = data_formatted.filter(col("TotalCharges").isNotNull())
data_cleaned.show(5, 8)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|              No|         No|         No|             No|Month...|             Yes|     Elect...|         29.85|       29.85|   

In [133]:
data_cleaned.count()

7032

# 6. Feature-инжиниринг
Добавьте в датафрейм одну или несколько новых фичей. Удалите колонки, которые, как вам кажется, нужно убрать из фичей. Обоснуйте свои решения.

In [134]:
from pyspark.sql.functions import when

### Ваш код здесь ###
def average_price(TotalCharges, Tenure):
  av_price = TotalCharges / Tenure
  return av_price

In [135]:
data_cleaned_1 = data_cleaned.withColumn("AverageCharges", average_price(col("TotalCharges"), col("Tenure")))
data_cleaned_1.show(5, 8)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+--------------+
|gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|AverageCharges|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+--------------+
|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|              No|         No|         No|             No|Month...|             Yes|

In [136]:
data_cleaned_2 = data_cleaned_1.withColumn("AloneParent", when(((col("Partner") == "No") & (col("Dependents") == "Yes")), 1).otherwise(0))
data_cleaned_2.show(5, 8)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+--------------+-----------+
|gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|AverageCharges|AloneParent|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+--------------+-----------+
|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|              No|         No|         No|      

In [137]:
data_cleaned_2.groupBy("AloneParent").count().show()

+-----------+-----+
|AloneParent|count|
+-----------+-----+
|          1|  359|
|          0| 6673|
+-----------+-----+



In [138]:
data_cleaned_2 = data_cleaned_2.withColumn("averageCharge", col("TotalCharges") / col("Tenure"))
data_cleaned_2.show(5, 8)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+--------------+-----------+-------------+
|gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|AverageCharges|AloneParent|averageCharge|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+--------------+-----------+-------------+
|Female|            0|    Yes|        No|     1|          No|     No ph...|            DSL|            No|         Yes|     

In [139]:
data_cleaned_2 = data_cleaned_2.withColumn("amount_of_services",
                                           (when(col("PhoneService") == "Yes", 1).otherwise(0) +
                                            when(col("MultipleLines") == "Yes", 1).otherwise(0) +
                                            when(col("OnlineSecurity") == "Yes", 1).otherwise(0) +
                                            when(col("OnlineBackup") == "Yes", 1).otherwise(0) +
                                            when(col("DeviceProtection") == "Yes", 1).otherwise(0) +
                                            when(col("TechSupport") == "Yes", 1).otherwise(0) +
                                            when(col("StreamingTV") == "Yes", 1).otherwise(0) +
                                            when(col("StreamingMovies") == "Yes", 1).otherwise(0))
                                           )

In [140]:
data_cleaned_2.show(5)

+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+-----------------+-----------+-----------------+------------------+
|gender|SeniorCitizen|Partner|Dependents|Tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|   AverageCharges|AloneParent|    averageCharge|amount_of_services|
+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+-----------------+-----------+-----------------+------------------

#7. Векторизация фичей
Подготовьте данные к обучению:





1. Преобразуйте текстовые колонки в числа, используя StringIndexer.
Удалите столбцы со старыми (непреобразованными) признаками. Выведите на экран структуру получившегося датафрейма. Не забывайте о столбце Churn. Хоть он и выступает в задаче как таргет, он имеет текстовый тип, поэтому тоже должен быть закодирован числовыми значениями.

Чтобы использовать StringIndexer для всех категориальных признаков сразу, а не для каждого отдельно, можно применить сущность pipeline.

**Пример кода:**

##### #Задаём список текстовых колонок:
text_columns = ["text_col_1", "text_col_2", "text_col_3"]

##### #Задаём список StringIndexer'ов — сущностей, каждая из которых будет кодировать одну текстовую колонку числами. Имена преобразованных колонок будут заканчиваться на _index:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index",).fit(<ваш датасет>) for column in text_columns]

##### #Создаём Pipeline из StringIndexer'ов:
pipeline = Pipeline(stages=indexers)

##### #Скармливаем нашему pipeline датафрейм, удаляя старые колонки:
new_dataframe = pipeline.fit(<ваш датасет>).transform(<ваш датасет>).drop(*text_columns)


In [141]:
from pyspark import mllib
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

#список колонок с текстовым типом
text_cols = ["gender", "Partner", "Dependents", "PhoneService", "MultipleLines", "InternetService", \
                "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", \
                "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod", "Churn"]


### Ваш код здесь ###
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index",).fit(data_cleaned) for column in text_cols]

#pipeline = Pipeline(stages=indexers)

#new_dataframe = pipeline.fit(data_cleaned).transform(data_cleaned).drop(*text_cols)
#new_dataframe.show(5)

2. Векторизуйте категориальные признаки, используя OneHotEncoder.
Удалите столбцы со старыми (непреобразованными) признаками.
Выведите на экран структуру получившегося после преобразований датафрейма.


In [142]:
from pyspark.ml.feature import OneHotEncoder

#список категориальных колонок
features_inp  = ["gender", "Partner", "Dependents", "PhoneService", "MultipleLines", "InternetService", \
                "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", \
                "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod"]


### Ваш код здесь ###
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in features_inp]

pipeline = Pipeline(stages=indexers + encoders)

new_dataframe = pipeline.fit(data_cleaned_2).transform(data_cleaned_2).drop(*text_cols)
new_dataframe.show(5)

+-------------+------+--------------+------------+-----------------+-----------+-----------------+------------------+------------+-------------+----------------+------------------+-------------------+---------------------+--------------------+------------------+----------------------+-----------------+-----------------+---------------------+--------------+----------------------+-------------------+-----------+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+-------------------+-------------------+-----------------------+----------------+------------------------+---------------------+
|SeniorCitizen|Tenure|MonthlyCharges|TotalCharges|   AverageCharges|AloneParent|    averageCharge|amount_of_services|gender_index|Partner_index|Dependents_index|PhoneService_index|MultipleLines_index|InternetService_index|OnlineSecurity_index|OnlineBackup_index|DeviceProt

3. Объедините колонки фичей в один вектор, используя VectorAssembler.
Удалите столбцы со старыми (непреобразованными) признаками.
Выведите на экран первые несколько строк и структуру получившегося датафрейма.

In [143]:
from pyspark.ml.feature import VectorAssembler


### Ваш код здесь ###
features = list(new_dataframe.drop("Churn_index").columns)
target = "Churn_index"

In [144]:
vectorizer = VectorAssembler(inputCols=features, outputCol="features_vec")

final_df = vectorizer.transform(new_dataframe)

final_df.select("features_vec", "Churn_index").show(5, 100)

+----------------------------------------------------------------------------------------------------+-----------+
|                                                                                        features_vec|Churn_index|
+----------------------------------------------------------------------------------------------------+-----------+
|(49,[1,2,3,4,6,7,8,9,11,12,13,15,25,30,31,34,35,37,39,41,43,45,46],[1.0,29.85,29.85,29.85,29.85,1...|        0.0|
|(49,[1,2,3,4,6,7,13,14,16,20,21,22,23,24,25,26,27,30,32,33,36,37,39,41,47],[34.0,56.95,1889.5,55....|        0.0|
|(49,[1,2,3,4,6,7,13,14,15,22,23,24,25,26,27,30,32,34,35,37,39,41,43,45,47],[2.0,53.85,108.15,54.0...|        1.0|
|(49,[1,2,3,4,6,7,11,12,13,14,16,17,20,21,22,23,24,25,30,32,33,36,38,39,41,48],[45.0,42.3,1840.75,...|        0.0|
|(49,[1,2,3,4,6,7,8,24,25,26,27,29,31,33,35,37,39,41,43,45,46],[2.0,70.7,151.65,75.825,75.825,1.0,...|        1.0|
+-------------------------------------------------------------------------------

#8. Создание и обучение модели

1. Создайте модель — логистическую регрессию (используя LogisticRegression). В качестве параметров класса LogisticRegression укажите колонку фичей (параметр featuresCol), колонку-таргет (параметр labelCol) из датафрейма и имя колонки, в которую будут записываться предсказания (параметр predictionCol).

In [145]:
from pyspark.ml.classification import LogisticRegression

### Ваш код здесь ###
model = LogisticRegression(featuresCol="features_vec", labelCol=target)

2. Разделите датафрейм на обучающую и тестовую выборку.

In [146]:

### Ваш код здесь ###
df_train, df_test = final_df.randomSplit([0.8, 0.2])


3. Создайте объекты — сетки гиперпараметров для каждой модели, используя ParamGridBuilder. Так же, как и в ноутбуке из последнего видео, в сетку гиперпараметров можно добавить значения параметров regParam и elasticNetParam.

Вы можете ознакомиться [с документацией объекта LogisticRegression в PySpark](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html) и добавить в сетку больше параметров.


In [147]:
from pyspark.ml.tuning import ParamGridBuilder

### Ваш код здесь ###
grid = ParamGridBuilder().addGrid(model.regParam, [0.1, 0.2, 0.5, 0.7, 1, 1.5, 2, 2.5, 3, 4, 5, 10, 20])\
                                  .addGrid(model.elasticNetParam, [0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.5, 0.75]).build()

4. Создайте объект evaluator, который будет отвечать за метрику качества при обучении. Для этого используйте класс BinaryClassificationEvaluator со следующими параметрами: rawPredictionCol — колонка с предсказаниями, labelCol — колонка с таргетом.

У вас, возможно, возник вопрос, какую метрику качества берёт по умолчанию BinaryClassificationEvaluator. По умолчанию BinaryClassificationEvaluator будет рассчитывать areaUnderROC. Это метрика оценки площади под кривой ROC (Receiver Operating Characteristic), которая служит графической интерпретацией производительности модели. Эта метрика качества находится в пределах от 0 до 1. Чем выше метрика, тем более качественные предсказания делает модель.

In [148]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

### Ваш код здесь ###
eval = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol=target)

5. Создайте объект CrossValidator, в качестве параметров укажите уже созданные вами модель, сетку гиперпараметров и evaluator.

In [149]:
from pyspark.ml.tuning import CrossValidator

### Ваш код здесь ###
cv = CrossValidator(estimator=model, estimatorParamMaps=grid, evaluator=eval)



6. Запустите обучение модели на тренировочной выборке. Сохраните обученную модель в новую переменную.

In [150]:

### Ваш код здесь ###
cv_model = cv.fit(df_train)

#9. Выбор лучшей модели

1. Выберите лучшую модель, сохраните её в отдельную переменную, отобразите её параметры.

Вывод параметров модели в PySpark можно сделать, используя метод extractParamMap().

In [151]:

### Ваш код здесь ###
best_model = cv_model.bestModel

best_model.extractParamMap()

{Param(parent='LogisticRegression_26513a98370f', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_26513a98370f', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.01,
 Param(parent='LogisticRegression_26513a98370f', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_26513a98370f', name='featuresCol', doc='features column name.'): 'features_vec',
 Param(parent='LogisticRegression_26513a98370f', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_26513a98370f', name='labelCol', doc='label column name.'): 'Churn_index',
 Param(parent='LogisticRegression_26513a98370f', name='maxBlockSizeInMB', doc='maximum memory 

2. Запустите лучшую модель в режиме предсказания на тестовой выборке. Сохраните предсказания в отдельную переменную. Выведите первые несколько строк датафрейма с предсказаниями на экран.

Запуск модели в режиме предсказания выполняется при помощи метода .transform(<тестовая выборка>).

In [152]:
from pyspark.sql.functions import monotonically_increasing_id
### Ваш код здесь ###
preds = best_model.transform(df_test)
df_test = df_test.withColumn("temp_id", monotonically_increasing_id())
preds = preds.withColumn("temp_id", monotonically_increasing_id())
result_df = df_test.join(preds.select("temp_id", "prediction"), "temp_id").drop("temp_id")

In [153]:
result_df.show(5)

+-------------+------+--------------+------------+--------------+-----------+-------------+------------------+------------+-------------+----------------+------------------+-------------------+---------------------+--------------------+------------------+----------------------+-----------------+-----------------+---------------------+--------------+----------------------+-------------------+-----------+--------------+---------------+------------------+--------------------+---------------------+-----------------------+----------------------+--------------------+------------------------+-------------------+-------------------+-----------------------+----------------+------------------------+---------------------+--------------------+----------+
|SeniorCitizen|Tenure|MonthlyCharges|TotalCharges|AverageCharges|AloneParent|averageCharge|amount_of_services|gender_index|Partner_index|Dependents_index|PhoneService_index|MultipleLines_index|InternetService_index|OnlineSecurity_index|OnlineBacku

3. Получите метрику качества модели. Для этого примените к объекту evaluator метод .evaluate(<ваш датафрейм с предсказаниями>).



In [154]:

### Ваш код здесь ###
eval.evaluate(preds)

0.690492179411847

#10. Обратная связь
Вы ознакомились с возможностями двух мощных библиотек: PySpark SQL для предобработки данных и PySpark ML для машинного обучения.

Поделитесь впечатлениями от работы с новыми библиотеками. В чём они более удобны, чем уже знакомые вам Pandas и Sklearn, а в чём нет.

# Как отправить работу на проверку

Загрузите файл с заданиями, откройте его через Jupyter Notebook в Google Colab. Скачайте файл с датасетом и загрузите его в Colab. Выполните задачи, сохраните изменения: воспользуйтесь опцией Save and Checkpoint из вкладки меню File или кнопкой Save and Checkpoint на панели инструментов. Отправьте через форму ниже итоговый файл Jupyter Notebook (.ipynb) или ссылку на него.