# Установка PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

In [None]:
!tar -xvf spark-2.4.3-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Получаем данные

In [7]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

--2019-09-02 20:07:31--  http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip.1’


2019-09-02 20:07:31 (4.33 MB/s) - ‘ml-latest-small.zip.1’ saved [978202/978202]



In [8]:
!unzip ml-latest-small.zip

Archive:  ml-latest-small.zip
replace ml-latest-small/links.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

# Загружаем данные

In [None]:
ratings = spark.read.csv('ml-latest-small/ratings.csv', header=True, inferSchema=True)
movies = spark.read.csv('ml-latest-small/movies.csv', header=True, inferSchema=True)

In [11]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [12]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

## Подход 1 - фильмы с наибольшим средним рейтингом и как минимум 150 отзывами

In [None]:
from pyspark.sql import functions as F

In [None]:
ratings_with_avg = ratings.groupBy('movieId').agg(F.count(ratings.rating).alias('count'), F.avg(ratings.rating).alias('avg'))

In [94]:
ratings_with_avg.show()

+-------+-----+------------------+
|movieId|count|               avg|
+-------+-----+------------------+
|   1580|  165| 3.487878787878788|
|   2366|   25|              3.64|
|   3175|   75|              3.58|
|   1088|   42| 3.369047619047619|
|  32460|    4|              4.25|
|  44022|   23| 3.217391304347826|
|  96488|    4|              4.25|
|   1238|    9| 4.055555555555555|
|   1342|   11|               2.5|
|   1591|   26|2.6346153846153846|
|   1645|   51| 3.411764705882353|
|   4519|    9|3.3333333333333335|
|   2142|   10|               2.7|
|    471|   40|              3.55|
|   3997|   12|1.8333333333333333|
|    833|    6|               2.0|
|   3918|    9|3.2777777777777777|
|   7982|    4|              3.25|
|   1959|   15|3.6666666666666665|
|  68135|   10|              3.55|
+-------+-----+------------------+
only showing top 20 rows



## Практика 1.
1. Добавьте в DataFrame название фильма
2. Оставьте в DataFrame только те фильмы, которые набрали не менее 150 оценок
3. Выведите top-25 фильмов с наивысшим средним баллом

## Подход 2 - Колобративная фильтрация

![alt text](https://upload.wikimedia.org/wikipedia/commons/thumb/5/52/Collaborative_filtering.gif/300px-Collaborative_filtering.gif)

это один из методов построения прогнозов (рекомендаций) в рекомендательных системах, использующий известные предпочтения (оценки) группы пользователей для прогнозирования неизвестных предпочтений другого пользователя. Его основное допущение состоит в следующем: те, кто одинаково оценивал какие-либо предметы в прошлом, склонны давать похожие оценки другим предметам и в будущем. 

(c) Wikipedia
https://ru.wikipedia.org/wiki/Коллаборативная_фильтрация

Для решения будем использовать алгоритм [ALS](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html)

![alt text](https://www.researchgate.net/profile/Huu_Hoa_Nguyen/publication/314071424/figure/fig1/AS:570666408529920@1513068882014/An-example-of-matrix-factorization.png)

Параметры:
* numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
* rank is the number of features to use (also referred to as the number of latent factors).
* iterations is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less.
* lambda specifies the regularization parameter in ALS.
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [None]:
# Удаляем не нужную колонку
ratings = ratings.drop('timestamp')

In [None]:
from pyspark.mllib.recommendation import ALS
import math

In [None]:
# Разбиваем датасет на train и test
ratings_train, ratings_test = ratings.randomSplit([0.8, 0.2])

In [16]:
ratings_test.count()

20143

In [17]:
ratings_train.count()

80693

In [None]:
iterations = 10
rank = 8

In [None]:
model = ALS.train(ratings_train, rank, iterations=iterations)

In [23]:
ratings_test.select('userId', 'movieId').show()

+------+-------+
|userId|movieId|
+------+-------+
|     1|      3|
|     1|     70|
|     1|    151|
|     1|    356|
|     1|    500|
|     1|    552|
|     1|    592|
|     1|    593|
|     1|    736|
|     1|   1024|
|     1|   1031|
|     1|   1032|
|     1|   1049|
|     1|   1214|
|     1|   1220|
|     1|   1226|
|     1|   1270|
|     1|   1282|
|     1|   1298|
|     1|   1396|
+------+-------+
only showing top 20 rows



In [None]:
predictions = model.predictAll(ratings_test.select('userId', 'movieId').rdd)

In [None]:
predictions_converted = predictions.map(lambda r: (r[0], r[1], r[2]))

In [None]:
predictions_df = predictions_converted.toDF(["userId", "movieId", "rating_pred"])

In [37]:
predictions_df.show()

+------+-------+------------------+
|userId|movieId|       rating_pred|
+------+-------+------------------+
|   140|   1084|3.6140463315545652|
|   288|   1084| 4.115574001305719|
|   372|   1084|  4.26499870042785|
|     4|   1084| 4.565801896696092|
|   156|   1084| 3.762990466888491|
|   589|   1084| 4.949622347974899|
|   607|   1084| 4.192533194136292|
|   315|   1084|3.7720387878956014|
|   318|   3702|3.3543689263929988|
|   160|   3702|3.9041483475474337|
|   332|   3702| 3.468236356121293|
|   414|   3702| 4.044239326337486|
|   182|   3702| 3.889148676629699|
|   368|   3702|3.0001768995761386|
|   312|   3702| 3.969641123162916|
|   303|   3702| 3.766323504861666|
|   603|   3702|4.1213492788738035|
|    95|   3702|3.3880086448148585|
|   438|   6754|3.8068840618218203|
|    80|   6754| 4.435867332216437|
+------+-------+------------------+
only showing top 20 rows



## Практика 2
1. Посчитайте среднее абсолютное отклонение предсказанной оценки от реальной оценки

In [None]:
from pyspark.sql.functions import abs

## Предсказание

In [59]:
USER_ID_FOR_PREDICTION = 320
data_for_prediction = ratings_train[ratings_train['userId']==USER_ID_FOR_PREDICTION]
data_for_prediction.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   320|   1013|   3.5|
|   320|   2058|   4.0|
|   320|   3404|   0.5|
|   320|   3534|   4.0|
|   320|   3578|   3.5|
|   320|   3703|   3.0|
|   320|   3745|   3.5|
|   320|   4387|   4.0|
|   320|   5785|   4.0|
|   320|   6283|   3.5|
|   320|   6857|   3.5|
|   320|  31184|   4.0|
|   320|  42718|   3.5|
|   320|  59315|   4.0|
|   320|  59900|   3.0|
|   320|  62999|   3.5|
|   320|  68358|   4.0|
|   320|  71535|   4.0|
+------+-------+------+



In [61]:
data_for_prediction.join(movies, on='movieId').show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1013|   320|   3.5|Parent Trap, The ...|Children|Comedy|R...|
|   2058|   320|   4.0|Negotiator, The (...|Action|Crime|Dram...|
|   3404|   320|   0.5|      Titanic (1953)|        Action|Drama|
|   3534|   320|   4.0|      28 Days (2000)|               Drama|
|   3578|   320|   3.5|    Gladiator (2000)|Action|Adventure|...|
|   3703|   320|   3.0|Road Warrior, The...|Action|Adventure|...|
|   3745|   320|   3.5|   Titan A.E. (2000)|Action|Adventure|...|
|   4387|   320|   4.0|Kiss of the Drago...|              Action|
|   5785|   320|   4.0|Jackass: The Movi...|Action|Comedy|Doc...|
|   6283|   320|   3.5|Cowboy Bebop: The...|Action|Animation|...|
|   6857|   320|   3.5|Ninja Scroll (Jûb...|Action|Adventure|...|
|  31184|   320|   4.0|Appleseed (Appuru...|Action|Animation|...|
|  42718| 

In [None]:
predicted_user_watched = data_for_prediction.rdd.map(lambda r: r[1])

In [68]:
predicted_user_watched.take(20)

[1013,
 2058,
 3404,
 3534,
 3578,
 3703,
 3745,
 4387,
 5785,
 6283,
 6857,
 31184,
 42718,
 59315,
 59900,
 62999,
 68358,
 71535]

In [None]:
predicted_user_watched = predicted_user_watched.take(20)

In [None]:
prediction_user_unrated = movies.rdd.filter(lambda m: m[0] not in predicted_user_watched).map(lambda m: (USER_ID_FOR_PREDICTION, m[0]))

In [78]:
prediction_user_unrated.take(10)

[(320, 1),
 (320, 2),
 (320, 3),
 (320, 4),
 (320, 5),
 (320, 6),
 (320, 7),
 (320, 8),
 (320, 9),
 (320, 10)]

## Практика 3
1. Предскажите значения рейтингов для тех фильмов, которые пользователь еще не ввидел (prediction_user_unrated)
2. На основе предсказанных значений постройте top-10 фильмов для пользователя

##  Индивидуальное предсказание

In [89]:
model.predict(320, 48322)

6.857201129306809

## Сохранение модели

In [None]:
model.save(spark.sparkContext, 'model')

## Домашнее задание
2 варианта.
1. Вариант легкий: Решите задачу классификации цветков ирисов с использованием PySpark
2. Вариант сложный: Решите задачу классификации пассажиров титаника с использованием PySpark (https://www.kaggle.com/c/titanic)

**При выполнении ДЗ не разрешается:**
1. Использовать библиотеку pandas
2. Использовать библиотеку sklearn

Полезные импорты:
1. from pyspark.ml.classification import LogisticRegression
2. from pyspark.ml.evaluation import MulticlassClassificationEvaluator - для оценки качества работы алгоритма
3. from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler - для предобработки признаков

Полезные ссылки:
1. https://spark.apache.org/docs/latest/ml-classification-regression.html#classification - алгоримты классификации в pyspark
2. https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa - пример решения задачи классификации на pyspark