# Машинное обучение на PySpark

## Установка PySpark

In [2]:
!apt-get update #libraries update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # java installation
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3-scala2.13.tgz #download actual spark version
!tar -xvf spark-3.3.1-bin-hadoop3-scala2.13.tgz # unzip spark 
!pip install -q findspark # install python findspark module

import os
# define environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3-scala2.13"

"""Find spark home, and initialize by adding pyspark to sys.path.

If SPARK_HOME is defined, it will be used to put pyspark on sys.path.
Otherwise, common locations for spark will be searched."""

import findspark
# initialize findspark
findspark.init()
from pyspark.sql import SparkSession

# master. Sets the Spark master URL to connect to, 
#such as "local" to run locally, "local[4]" to run locally with 4 cores, 
#or "spark://master:7077" to run on a Spark standalone cluster.
spark = SparkSession.builder.master("local[*]").getOrCreate()

"""
Main entry point for Spark functionality. 
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, 
accumulators and broadcast variables on that cluster.
Only one SparkContext may be active per JVM. 
You must stop() the active SparkContext before creating a new one.
"""

#sc = spark.sparkContext

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to cloud.r-project.o                                                                               Hit:2 http://security.ubuntu.com/ubuntu bionic-security InRelease
                                                                               Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to cloud.r-project.org] [Waiting for heade0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to cloud.r-proje                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to cloud.r-proje                                                                               Hit:5 https://developer.dow

'\nMain entry point for Spark functionality. \nA SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, \naccumulators and broadcast variables on that cluster.\nOnly one SparkContext may be active per JVM. \nYou must stop() the active SparkContext before creating a new one.\n'

## Загружаем набор данных

Predict survival on the Titanic
* survival - Survival	0 = No, 1 = Yes
* pclass - Ticket class	1 = 1st, 2 = 2nd, 3 = 3rd
* sex	- Sex	
* Age	- Age in years	
* sibsp	- # of siblings / spouses aboard the Titanic	
* parch	- # of parents / children aboard the Titanic	
* ticket - Ticket number	
* fare -	Passenger fare	
* cabin	- Cabin number	
* embarked - Port of Embarkation	C = Cherbourg, Q = Queenstown, S = Southampton

In [3]:
!wget https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv

--2022-11-09 16:47:12--  https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60302 (59K) [text/plain]
Saving to: ‘titanic.csv’


2022-11-09 16:47:12 (52.4 MB/s) - ‘titanic.csv’ saved [60302/60302]



## Практика 1. Загрузите файл titanic.csv и выведите его командой show

In [4]:
df = spark.read.csv('titanic.csv', inferSchema=True, header=True)
df.count()

891

In [5]:
df.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## Посмотрим общую статистику по нашему датафрейму
Для этого воспользуемся командой describe

In [6]:
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## Практика 2. Удалим малоинформативные колонки
Колонка с идентификатором пассажира (PassengerId), именем (Name), информация о родственниках (SibSp, Parch),номер билета (Ticket) и информация о каюте (Cabin) кажутся малоинформативными для первой версии модели. Удалим их при помощи команды select

In [None]:
df = df.select('Survived', 'Pclass', 'Sex', 'Age','Fare','Embarked')
df.show()

NameError: ignored

## Практика 3. Колонки Age и Embarked содержат пропущенные значения.
Пропуски надо заполнить
Для поля Age предлагается заполнить пропуски средним значением возвраста (информация по нему есть в describe)
Для поля Embarked самым частым вариантом (надо сгруппировать данные и посчитать какой порт самый частый)
Затем при помощи команды:

```
df_filtered = df_filtered.na.fill({'Age': XX, 'Embarked': YY})
```

устранить пропуски

## Работа со строковыми колонками
У нас осталось 2 колонки, содержащих строковые данные:
* Sex
* Embarked
Первую мы преобразуем в 0 и 1
Вторую закодируем по принципу OneHot

Для этого нам помогут специальные классы в PySpark:
* StringIndexer - https://spark.apache.org/docs/latest/ml-features#stringindexer
* OneHotEncoderEstimator - https://spark.apache.org/docs/latest/ml-features#onehotencoder

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [None]:
indexer = StringIndexer(inputCol='Sex', outputCol='SexInd')
indexerTrained = indexer.fit(df_filtered)
df_features = indexerTrained.transform(df_filtered)

In [None]:
indexerTrained.labels

['male', 'female']

In [None]:
df_features.show()

+-----------+--------+------+------+----+-------+--------+------+
|PassengerId|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|
+-----------+--------+------+------+----+-------+--------+------+
|          1|       0|     3|  male|22.0|   7.25|       S|   0.0|
|          2|       1|     1|female|38.0|71.2833|       C|   1.0|
|          3|       1|     3|female|26.0|  7.925|       S|   1.0|
|          4|       1|     1|female|35.0|   53.1|       S|   1.0|
|          5|       0|     3|  male|35.0|   8.05|       S|   0.0|
|          6|       0|     3|  male|29.7| 8.4583|       Q|   0.0|
|          7|       0|     1|  male|54.0|51.8625|       S|   0.0|
|          8|       0|     3|  male| 2.0| 21.075|       S|   0.0|
|          9|       1|     3|female|27.0|11.1333|       S|   1.0|
|         10|       1|     2|female|14.0|30.0708|       C|   1.0|
|         11|       1|     3|female| 4.0|   16.7|       S|   1.0|
|         12|       1|     1|female|58.0|  26.55|       S|   1.0|
|         

## Практика 4.
Преобразуйте колонку Embarked.
Сначала надо при помощи StringIndexer преобразовать колонку Embarked в колонку EmbarkedInd с цифровыми кодами
Затем сделать OneHotEncoder, который колонку EmbarkedInd превратит в колонку EmbarkedOhe. Обратите внимание у OneHotEncoder'а нет метода fit. Его можно сразу применять методом transform

## Практика 5. Дальше нужно собрать все признаки в единый вектор для работы машинного обучения.
Для этого используется VectorAssembler
https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

## Повторимость
Часто нам нужно все указанные шаги по подготовке признаков делать не один раз, а несколько - для предсказания каждого нового значения.
Тогда их логично объединить в Pipeline преобразования и применять как единую операцию

https://spark.apache.org/docs/latest/ml-pipeline.html



In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages = 
[
  StringIndexer(inputCol='Sex', outputCol='SexInd'),
  StringIndexer(inputCol='Embarked', outputCol='EmbarkedInd'),
  OneHotEncoder(inputCol='EmbarkedInd', outputCol = 'EmbarkedOhe'),
  VectorAssembler(inputCols=['Pclass', 'Age', 'Fare', 'SexInd', 'EmbarkedOhe'], outputCol='Features')
]
)

In [None]:
pipelineTrained = pipeline.fit(df_filtered)

In [None]:
pipelineTrained.transform(df_filtered).show()

+-----------+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|PassengerId|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|
+-----------+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|          1|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|
|          2|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|
|          3|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|
|          4|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,35.0,53.1,1....|
|          5|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,35.0,8.05,0....|
|          6|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|[3.0,

In [None]:
df_features = pipelineTrained.transform(df_filtered)

## Разобьем данные на данные для обучения и проверки

In [None]:
train, test = df_features.randomSplit([0.8, 0.2], seed=12345)

In [None]:
train.show()

+-----------+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|PassengerId|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|
+-----------+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|          1|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|
|          2|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|
|          3|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|
|          4|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,35.0,53.1,1....|
|          6|       0|     3|  male|29.7| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|[3.0,29.7,8.4583,...|
|          7|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,

## Создадим и обучим модель логистической регрессии

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(featuresCol = 'Features', labelCol = 'Survived')
lrModel = lr.fit(train)

In [None]:
train_res = lrModel.transform(train)
test_res = lrModel.transform(test)

In [None]:
train_res.show()

+-----------+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+--------------------+--------------------+----------+
|PassengerId|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOhe|            Features|       rawPrediction|         probability|prediction|
+-----------+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+--------------------+--------------------+----------+
|          1|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|[2.28851114991778...|[0.90792105716095...|       0.0|
|          2|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|[-2.7781883655360...|[0.05851427934696...|       1.0|
|          3|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|[-0.2278559064623...|[0.44328120664877...|       1.0|
|   

## Оценим качество
Для оценки качества предсказания в spark реализованно несколько классов
Если мы решаем задачу бинарной классификации (то есть классов - 2), то нам подойдет BinaryCLassificationEvaluator, а если классов больше 2-х, то MulticlassClassificationEvaluator

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
ev = BinaryClassificationEvaluator(labelCol='Survived')

In [None]:
ev.evaluate(train_res)

0.8603479853479851

In [None]:
ev.evaluate(test_res)

0.8184473654645547

## Практика 6. Обучите модель дерева решений и оцените его качество
https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier

## Домашнее задание
Обучите модель классификации для цветков Iris'а

Примерная последовательность действий:
1. Взять данные - https://drive.google.com/file/d/18ksAxTxBkp15LToEg46BHhwp3sPIoeUU/view?usp=sharing
2. Загрузить в pyspark
3. При помощи VectorAssembler преобразовать все колонки с признаками в одну (использовать PipeLine - опционально)
4. Разбить данные на train и test
5. Создать модель логистической регресии или модель дерева и обучить ее
6. Воспользоваться MulticlassClassificationEvaluator для оценки качества на train и test множестве