# Машинное обучение на PySpark

## Установка PySpark

In [1]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.master("local[*]").getOrCreate()

24/04/09 14:57:01 WARN Utils: Your hostname, MacBook-Pro-Vadim.local resolves to a loopback address: 127.0.0.1; using 192.168.139.65 instead (on interface en0)
24/04/09 14:57:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/09 14:57:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/09 14:57:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/09 14:57:17 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## Загружаем набор данных

Predict survival on the Titanic
* survival - Survival	0 = No, 1 = Yes
* pclass - Ticket class	1 = 1st, 2 = 2nd, 3 = 3rd
* sex	- Sex
* Age	- Age in years
* sibsp	- # of siblings / spouses aboard the Titanic
* parch	- # of parents / children aboard the Titanic
* ticket - Ticket number
* fare -	Passenger fare
* cabin	- Cabin number
* embarked - Port of Embarkation	C = Cherbourg, Q = Queenstown, S = Southampton

In [None]:
!wget https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv

--2023-10-18 09:00:29--  https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60302 (59K) [text/plain]
Saving to: ‘titanic.csv’


2023-10-18 09:00:29 (4.32 MB/s) - ‘titanic.csv’ saved [60302/60302]



## Практика 1. Загрузите файл titanic.csv и выведите его командой show

In [None]:
df_titanic = spark.read.csv('titanic.csv', inferSchema=True, header=True)
df_titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

## Посмотрим общую статистику по нашему датафрейму
Для этого воспользуемся командой describe

In [None]:
df_titanic.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## Практика 2. Удалим малоинформативные колонки
Колонка с именем (Name), информация о родственниках (SibSp, Parch),номер билета (Ticket) и информация о каюте (Cabin) кажутся малоинформативными для первой версии модели. Удалим их при помощи команды select

In [None]:
df_titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [None]:
df_main_titanic = df_titanic.select('Survived', 'Pclass', 'Sex', 'Age', 'Fare', 'Embarked')
df_main_titanic.take(5)

[Row(Survived=0, Pclass=3, Sex='male', Age=22.0, Fare=7.25, Embarked='S'),
 Row(Survived=1, Pclass=1, Sex='female', Age=38.0, Fare=71.2833, Embarked='C'),
 Row(Survived=1, Pclass=3, Sex='female', Age=26.0, Fare=7.925, Embarked='S'),
 Row(Survived=1, Pclass=1, Sex='female', Age=35.0, Fare=53.1, Embarked='S'),
 Row(Survived=0, Pclass=3, Sex='male', Age=35.0, Fare=8.05, Embarked='S')]

## Практика 3. Колонки Age и Embarked содержат пропущенные значения.
Пропуски надо заполнить
Для поля Age предлагается заполнить пропуски средним значением возвраста (информация по нему есть в describe)
Для поля Embarked самым частым вариантом (надо сгруппировать данные и посчитать какой порт самый частый)
Затем при помощи команды:

```
df_filtered = df_filtered.na.fill({'Age': XX, 'Embarked': YY})
```

устранить пропуски

In [None]:
df_port = df_titanic.groupBy('Embarked').count()
df_port.show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [None]:
df_titanic.groupBy('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [None]:
df_titanic.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [None]:
df_filtered = df_main_titanic.na.fill({'Age': 29.7, 'Embarked': 'S'})
df_filtered.show()

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|       0|     3|  male|22.0|   7.25|       S|
|       1|     1|female|38.0|71.2833|       C|
|       1|     3|female|26.0|  7.925|       S|
|       1|     1|female|35.0|   53.1|       S|
|       0|     3|  male|35.0|   8.05|       S|
|       0|     3|  male|29.7| 8.4583|       Q|
|       0|     1|  male|54.0|51.8625|       S|
|       0|     3|  male| 2.0| 21.075|       S|
|       1|     3|female|27.0|11.1333|       S|
|       1|     2|female|14.0|30.0708|       C|
|       1|     3|female| 4.0|   16.7|       S|
|       1|     1|female|58.0|  26.55|       S|
|       0|     3|  male|20.0|   8.05|       S|
|       0|     3|  male|39.0| 31.275|       S|
|       0|     3|female|14.0| 7.8542|       S|
|       1|     2|female|55.0|   16.0|       S|
|       0|     3|  male| 2.0| 29.125|       Q|
|       1|     2|  male|29.7|   13.0|       S|
|       0|   

In [None]:
df_filtered.describe().show(5)

+-------+-------------------+------------------+------+------------------+-----------------+--------+
|summary|           Survived|            Pclass|   Sex|               Age|             Fare|Embarked|
+-------+-------------------+------------------+------+------------------+-----------------+--------+
|  count|                891|               891|   891|               891|              891|     891|
|   mean| 0.3838383838383838| 2.308641975308642|  null| 29.69929292929302| 32.2042079685746|    null|
| stddev|0.48659245426485753|0.8360712409770491|  null|13.002015230774303|49.69342859718089|    null|
|    min|                  0|                 1|female|              0.42|              0.0|       C|
|    max|                  1|                 3|  male|              80.0|         512.3292|       S|
+-------+-------------------+------------------+------+------------------+-----------------+--------+



## Работа со строковыми колонками
У нас осталось 2 колонки, содержащих строковые данные:
* Sex
* Embarked
Первую мы преобразуем в 0 и 1
Вторую закодируем по принципу OneHot

Для этого нам помогут специальные классы в PySpark:
* StringIndexer - https://spark.apache.org/docs/latest/ml-features#stringindexer
* OneHotEncoderEstimator - https://spark.apache.org/docs/latest/ml-features#onehotencoder

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [None]:
indexer = StringIndexer(inputCol='Sex', outputCol='SexInd')
indexerTrained = indexer.fit(df_filtered)
df_features = indexerTrained.transform(df_filtered)

In [None]:
indexerTrained.labels

['male', 'female']

In [None]:
df_features.show()

+--------+------+------+----+-------+--------+------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|
+--------+------+------+----+-------+--------+------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|
|       1|     1|female|38.0|71.2833|       C|   1.0|
|       1|     3|female|26.0|  7.925|       S|   1.0|
|       1|     1|female|35.0|   53.1|       S|   1.0|
|       0|     3|  male|35.0|   8.05|       S|   0.0|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|
|       0|     1|  male|54.0|51.8625|       S|   0.0|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|
|       1|     3|female|27.0|11.1333|       S|   1.0|
|       1|     2|female|14.0|30.0708|       C|   1.0|
|       1|     3|female| 4.0|   16.7|       S|   1.0|
|       1|     1|female|58.0|  26.55|       S|   1.0|
|       0|     3|  male|20.0|   8.05|       S|   0.0|
|       0|     3|  male|39.0| 31.275|       S|   0.0|
|       0|     3|female|14.0| 7.8542|       S|   1.0|
|       1|     2|female|55.0

## Практика 4.
Преобразуйте колонку Embarked.
Сначала надо при помощи StringIndexer преобразовать колонку Embarked в колонку EmbarkedInd с цифровыми кодами
Затем сделать OneHotEncoder, который колонку EmbarkedInd превратит в колонку EmbarkedOhe. Обратите внимание у OneHotEncoder'а нет метода fit. Его можно сразу применять методом transform

In [None]:
indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedInd')
indexerTrained = indexer.fit(df_features)
df_features = indexerTrained.transform(df_features)

In [None]:
indexerTrained.labels

['S', 'C', 'Q']

In [None]:
df_features.show()

+--------+------+------+----+-------+--------+------+-----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|
+--------+------+------+----+-------+--------+------+-----------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|        0.0|
|       1|     3|female|27.0|11.1333|       S|   1.0|        0.0|
|       1|     2|female|14.0|30.0708|       C|   1.0|        1.0|
|       1|     3|female| 4.0|   16.7|       S|   1.0|        0.0|
|       1|     1|female|58.0|  26.55|       S|   1.0|        0.0|
|       0|

In [None]:
ohe = OneHotEncoder(inputCol='EmbarkedInd', outputCol='EmbarkedOhe')

### Вариант 1

In [None]:
model = ohe.fit(df_features)

In [None]:
df_features = model.transform(df_features)

### Вариант 2

In [None]:
df_features = ohe.fit(df_features).transform(df_features)

In [None]:
df_features.show()

+--------+------+------+----+-------+--------+------+-----------+-------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|
+--------+------+------+----+-------+--------+------+-----------+-------------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|        0.0|(2,[0],[1.0])|
|       1|     3|female|27.0|11.1333|       S|   1.0|        0.0|(2,[0],[1.0])|
|       1|     2|female|14.0|30.0708|   

## Практика 5. Дальше нужно собрать все признаки в единый вектор для работы машинного обучения.
Для этого используется VectorAssembler
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
df_features.show()

+--------+------+------+----+-------+--------+------+-----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|
+--------+------+------+----+-------+--------+------+-----------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|        0.0|
|       1|     3|female|27.0|11.1333|       S|   1.0|        0.0|
|       1|     2|female|14.0|30.0708|       C|   1.0|        1.0|
|       1|     3|female| 4.0|   16.7|       S|   1.0|        0.0|
|       1|     1|female|58.0|  26.55|       S|   1.0|        0.0|
|       0|

In [None]:
df_features.columns

['Survived',
 'Pclass',
 'Sex',
 'Age',
 'Fare',
 'Embarked',
 'SexInd',
 'EmbarkedInd',
 'EmbarkedOhe']

In [None]:
assembler = VectorAssembler(inputCols=['Pclass', 'Age','Fare', 'SexInd', 'EmbarkedOhe'], outputCol='Features')

In [None]:
df_features = assembler.transform(df_features)

In [None]:
df_features.show()

+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|
+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,35.0,53.1,1....|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,35.0,8.05,0....|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|[3.0,29.7,8.4583,...|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,54.0,51

In [None]:
dataset_titanic = df_features.select('Survived', 'Embarked', 'Pclass', 'Age', 'Fare', 'SexInd', 'EmbarkedInd')
dataset_titanic.show(5)

+--------+--------+------+----+-------+------+-----------+
|Survived|Embarked|Pclass| Age|   Fare|SexInd|EmbarkedInd|
+--------+--------+------+----+-------+------+-----------+
|       0|       S|     3|22.0|   7.25|   0.0|        0.0|
|       1|       C|     1|38.0|71.2833|   1.0|        1.0|
|       1|       S|     3|26.0|  7.925|   1.0|        0.0|
|       1|       S|     1|35.0|   53.1|   1.0|        0.0|
|       0|       S|     3|35.0|   8.05|   0.0|        0.0|
+--------+--------+------+----+-------+------+-----------+
only showing top 5 rows



## Повторимость
Часто нам нужно все указанные шаги по подготовке признаков делать не один раз, а несколько - для предсказания каждого нового значения.
Тогда их логично объединить в Pipeline преобразования и применять как единую операцию

https://spark.apache.org/docs/latest/ml-pipeline.html



In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages =
[
  StringIndexer(inputCol='Sex', outputCol='SexInd'),
  StringIndexer(inputCol='Embarked', outputCol='EmbarkedInd'),
  OneHotEncoder(inputCol='EmbarkedInd', outputCol = 'EmbarkedOhe'),
  VectorAssembler(inputCols=['Pclass', 'Age', 'Fare', 'SexInd', 'EmbarkedOhe'], outputCol='Features')
]
)

In [None]:
pipelineTrained = pipeline.fit(df_filtered)

In [None]:
pipelineTrained.transform(df_filtered).show()

+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|
+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,35.0,53.1,1....|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,35.0,8.05,0....|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|[3.0,29.7,8.4583,...|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,54.0,51

In [None]:
df_features = pipelineTrained.transform(df_filtered)

In [None]:
df_features.show()

+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|
+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,35.0,53.1,1....|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,35.0,8.05,0....|
|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|[3.0,29.7,8.4583,...|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,54.0,51

## Разобьем данные на данные для обучения и проверки

In [None]:
train, test = df_features.randomSplit([0.8, 0.2], seed=12345)

In [None]:
train.show()

+--------+------+------+----+--------+--------+------+-----------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|    Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|
+--------+------+------+----+--------+--------+------+-----------+-------------+--------------------+
|       0|     1|female| 2.0|  151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,2.0,151.55,1...|
|       0|     1|female|25.0|  151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,25.0,151.55,...|
|       0|     1|female|50.0| 28.7125|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,50.0,28.7125...|
|       0|     1|  male|18.0|   108.9|       C|   0.0|        1.0|(2,[1],[1.0])|[1.0,18.0,108.9,0...|
|       0|     1|  male|19.0|    53.1|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,19.0,53.1,0....|
|       0|     1|  male|19.0|   263.0|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,19.0,263.0,0...|
|       0|     1|  male|22.0|135.6333|       C|   0.0|        1.0|(2,[1],[1.0])|[1

## Создадим и обучим модель логистической регрессии

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(featuresCol = 'Features', labelCol = 'Survived')
lrModel = lr.fit(train)

In [None]:
train_res = lrModel.transform(train)
test_res = lrModel.transform(test)

In [None]:
train_res.show()

+--------+------+------+----+--------+--------+------+-----------+-------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|    Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|       rawPrediction|         probability|prediction|
+--------+------+------+----+--------+--------+------+-----------+-------------+--------------------+--------------------+--------------------+----------+
|       0|     1|female| 2.0|  151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,2.0,151.55,1...|[-3.3101041867087...|[0.03522617821846...|       1.0|
|       0|     1|female|25.0|  151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,25.0,151.55,...|[-2.4144758542870...|[0.08207547992848...|       1.0|
|       0|     1|female|50.0| 28.7125|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,50.0,28.7125...|[-2.1747688852745...|[0.10203924364510...|       1.0|
|       0|     1|  male|18.0|   108.9|       C|   0.0|        1.0|(2,[

## Оценим качество
Для оценки качества предсказания в spark реализованно несколько классов
Если мы решаем задачу бинарной классификации (то есть классов - 2), то нам подойдет BinaryCLassificationEvaluator, а если классов больше 2-х, то MulticlassClassificationEvaluator

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
ev = BinaryClassificationEvaluator(labelCol='Survived')

In [None]:
ev.evaluate(train_res)

0.8524237589235771

In [None]:
ev.evaluate(test_res)

0.8456432707244734

## Практика 6. Обучите модель дерева решений и оцените его качество
https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
tr = DecisionTreeClassifier(featuresCol='Features', labelCol='Survived')

In [None]:
trFitted = tr.fit(train)

In [None]:
train_tr_res=trFitted.transform(train)
test_tr_res=trFitted.transform(test)

In [None]:
train_tr_res.show()

+--------+------+------+----+--------+--------+------+-----------+-------------+--------------------+-------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|    Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|rawPrediction|         probability|prediction|
+--------+------+------+----+--------+--------+------+-----------+-------------+--------------------+-------------+--------------------+----------+
|       0|     1|female| 2.0|  151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,2.0,151.55,1...|    [1.0,0.0]|           [1.0,0.0]|       0.0|
|       0|     1|female|25.0|  151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,25.0,151.55,...|  [8.0,121.0]|[0.06201550387596...|       1.0|
|       0|     1|female|50.0| 28.7125|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,50.0,28.7125...|  [8.0,121.0]|[0.06201550387596...|       1.0|
|       0|     1|  male|18.0|   108.9|       C|   0.0|        1.0|(2,[1],[1.0])|[1.0,18.0,108.9,0...|  [59.0,33.

In [None]:
ev.evaluate(train_tr_res)

0.7435807809981247

In [None]:
ev.evaluate(test_tr_res)

0.8068775596667137

## Домашнее задание
Обучите модель классификации для цветков Iris'а

Примерная последовательность действий:
1. Взять данные - https://drive.google.com/file/d/18ksAxTxBkp15LToEg46BHhwp3sPIoeUU/view?usp=sharing
2. Загрузить в pyspark
3. При помощи VectorAssembler преобразовать все колонки с признаками в одну (использовать PipeLine - опционально)
4. Разбить данные на train и test
5. Создать модель линейной регресии или модель дерева и обучить ее
6. Воспользоваться MulticlassClassificationEvaluator для оценки качества на train и test множестве