# Блок 1: Spark appliction
## Запуск Spark application

In [1]:
import pyspark

In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
!hdfs dfsadmin -safemode leave

2023-12-15 12:05:29 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Safe mode is OFF


In [10]:
conf = SparkConf()\
    .set("spark.executor.instances", "2")\
    .set("spark.executor.cores", "1")\
    .set("spark.executor.memory", "1g")

In [5]:
sc = SparkContext(appName="fedotova_spark", master="yarn")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/15 12:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/15 12:05:37 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


## Работа с movielens датасететом
### Загрузка датасета

In [6]:
!hdfs dfs -rm -r ml-latest-small

2023-12-15 12:05:49 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `ml-latest-small': No such file or directory


In [7]:
!hdfs dfs -put ml-latest-small .

2023-12-15 12:05:50 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Чтение датасета

In [8]:
rdd_ratings = sc.textFile("ml-latest-small/ratings.csv")

In [9]:
rdd_tags = sc.textFile("ml-latest-small/tags.csv")

### Подсчет количества строк

In [10]:
rdd_ratings.count()

                                                                                

100837

In [12]:
rdd_tags.count()

3684

Посмотрим на содержимое файлов

In [14]:
rdd_ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815']

In [15]:
rdd_tags.take(5)

['userId,movieId,tag,timestamp',
 '2,60756,funny,1445714994',
 '2,60756,Highly quotable,1445714996',
 '2,60756,will ferrell,1445714992',
 '2,89774,Boxing story,1445715207']

В каждом из файлов на первой строке расположены названия колонок таблицы, поэтому, чтобы найти число строк с контентом, нужно вычесть единицу из полученных значений: 100836 и 3683 строки (само удаление первых строк из таблиц будет ниже).

# Блок 2: Работа с данными

## Обработка данных
Удалим первую строку с названием колонок и посплитим данные для удобного обращения к каждому элементу строки

In [18]:
ratings_parsed = rdd_ratings\
    .filter(lambda row: 'userId' not in row)\
    .map(lambda row: row.split(','))

In [20]:
ratings_parsed.take(5)

[['1', '1', '4.0', '964982703'],
 ['1', '3', '4.0', '964981247'],
 ['1', '6', '4.0', '964982224'],
 ['1', '47', '5.0', '964983815'],
 ['1', '50', '5.0', '964982931']]

In [21]:
tags_parsed = rdd_tags\
    .filter(lambda row: 'userId' not in row)\
    .map(lambda row: row.split(','))

In [22]:
tags_parsed.take(5)

[['2', '60756', 'funny', '1445714994'],
 ['2', '60756', 'Highly quotable', '1445714996'],
 ['2', '60756', 'will ferrell', '1445714992'],
 ['2', '89774', 'Boxing story', '1445715207'],
 ['2', '89774', 'MMA', '1445715200']]

## Задачи

### 1. Количество уникальных фильмов и уникальных юзеров в таблице “ratings”
Для нахождения числа уникальных фильмов выделим значения колонки *movieId*, сгруппируем по ключу и посчитаем количество получившихся групп. Аналогично для нахождения числа пользователей, только там будем использовать ключ *userId*.

#### Фильмы

In [25]:
movie_ids = ratings_parsed.map(lambda row: row[1])

In [26]:
movie_idsuser_ids

['1', '3', '6', '47', '50']

In [28]:
grouped_by_movie_ids = movie_ids.groupBy(lambda movie_id: movie_id)

In [29]:
grouped_by_movie_ids.take(5)

[('4973', <pyspark.resultiterable.ResultIterable at 0xffff6beb7970>),
 ('5060', <pyspark.resultiterable.ResultIterable at 0xffff6beb79a0>),
 ('5218', <pyspark.resultiterable.ResultIterable at 0xffff6beb7a00>),
 ('5299', <pyspark.resultiterable.ResultIterable at 0xffff6beb7a60>),
 ('5349', <pyspark.resultiterable.ResultIterable at 0xffff6beb7ac0>)]

Ответ

In [32]:
print(f'Unique movies count: {grouped_by_movie_ids.count()}')

Unique movies count: 9724


#### Пользователи

In [33]:
user_ids = ratings_parsed.map(lambda row: row[0])

In [34]:
user_ids.take(5)

['1', '1', '1', '1', '1']

In [35]:
grouped_by_user_id = user_ids.groupBy(lambda user_id: user_id)

In [36]:
grouped_by_user_id.take(5)

[('328', <pyspark.resultiterable.ResultIterable at 0xffff6bcbbbb0>),
 ('330', <pyspark.resultiterable.ResultIterable at 0xffff6bc79e10>),
 ('331', <pyspark.resultiterable.ResultIterable at 0xffff6bc7ae90>),
 ('333', <pyspark.resultiterable.ResultIterable at 0xffff6bc61840>),
 ('334', <pyspark.resultiterable.ResultIterable at 0xffff6bc4a050>)]

Ответ

In [37]:
print(f'Unique users count: {grouped_by_user_id.count()}')

Unique users count: 610


### 2. Сколько было поставлено оценок >= 4.0
Тк числовой рейтинг фильмов присутствует только в таблице 'ratings', опять будем работать с ней. Отфлильтруем строки, оставив только те, в которых рейтинг >= 4.0, и посчитаем число оставшихся строк.

In [43]:
filtered_ratings = ratings_parsed.filter(lambda row: float(row[2]) >= 4.0)

Ради проверки посмотрим на оставшиеся значения поля ratings

In [44]:
filtered_ragins_values = filtered_ratings.map(lambda row: row[2])

In [45]:
filtered_ragins_values.take(5)

['4.0', '4.0', '4.0', '5.0', '5.0']

Ответ

In [46]:
print(f'High ratings (>= 4.0) count: {filtered_ratings.count()}')

High ratings (>= 4.0) count: 48580


### 3. Топ-100 фильмов с самым высоким рейтингом
Тк числовой рейтинг фильмов присутствует только в таблице 'ratings', опять будем работать с ней. Посчитаем средний рейтинг для каждого фильма и выведем топ-100 фильмов. Если в задании предполагалось для каждого фильма посмотреть только самую высокую оценку, которую ему когда-либо ставили, такое решение отличается от текущего одной строчкой map'а и написано в комментарии в одной из ячеек решения.

Итого: нам потребуются только колонки *movieId* и *rating*, оставим только их и заодно приведем колонку *ratings* к формату float. Сгруппируем строки по *movieId*, посчитаем средний рейтинг для каждого фильма и отсортируем их в порядке убывания этой оценки. Возьмем первые 100 значений.

Все это можно записать в одну строку через точки, но для наглядности будем делать take и смотреть на промежуточные результаты.

Оставляем: (movieId, rating)

In [97]:
movie_rating = ratings_parsed.map(lambda row: [row[1], float(row[2])])
movie_rating.take(5)

[['1', 4.0], ['3', 4.0], ['6', 4.0], ['47', 5.0], ['50', 5.0]]

Группируем по userId, кастуем результат groupBy: из pyspark.resultiterable.ResultIterable в list

In [98]:
# grouping by 'movieId'
# casting grouped elements from pyspark.resultiterable.ResultIterable to the list

grouped_movie_rating = movie_rating\
    .groupBy(lambda row: row[0])\
    .mapValues(list)

grouped_movie_rating.take(2)

[('1',
  [['1', 4.0],
   ['1', 4.0],
   ['1', 4.5],
   ['1', 2.5],
   ['1', 4.5],
   ['1', 3.5],
   ['1', 4.0],
   ['1', 3.5],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 2.5],
   ['1', 5.0],
   ['1', 4.5],
   ['1', 0.5],
   ['1', 4.0],
   ['1', 2.5],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 4.5],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 3.5],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 2.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 3.5],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 2.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 4.5],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 3.5],
   ['1', 4.5],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 4.0],
   

In [99]:
movie_ratings = grouped_movie_rating\
    .mapValues(lambda movie_rating_group: [movie_rating_pair[1] for movie_rating_pair in movie_rating_group])

# если требуется сортировать фильмы по максимальной оценке, которую дал им какой-либо из пользователей, 
# достаточно будет использовать такой map:
# .map(lambda movie_group: (movie_group[0], max(movie_group[1])[1]))\
# -- выбираем пару с максимальной оценкой для данного фильма, оставляем только оценку, убирая номер movieId

movie_ratings.take(2)

[('4973',
  [4.5,
   4.0,
   4.0,
   4.0,
   5.0,
   5.0,
   4.0,
   3.0,
   5.0,
   3.0,
   4.5,
   5.0,
   1.0,
   4.5,
   4.5,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   4.0,
   3.5,
   4.0,
   4.0,
   3.5,
   5.0,
   4.0,
   5.0,
   1.5,
   4.0,
   5.0,
   4.5,
   4.5,
   5.0,
   3.0,
   5.0,
   5.0,
   4.0,
   3.0,
   4.0,
   4.0,
   3.0,
   5.0,
   5.0,
   4.5,
   4.5,
   5.0,
   3.5,
   4.0,
   4.5,
   3.5,
   3.5,
   4.0,
   5.0,
   4.0,
   4.5,
   4.0,
   4.0,
   4.0,
   3.0,
   3.5,
   3.5,
   4.0,
   4.0,
   4.5,
   5.0,
   2.0,
   5.0,
   5.0,
   2.5,
   5.0,
   5.0,
   2.0,
   5.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   0.5,
   4.0,
   3.5,
   4.0,
   5.0,
   4.0,
   5.0,
   3.5,
   5.0,
   4.5,
   5.0,
   4.0,
   2.5,
   5.0,
   4.0,
   4.5,
   5.0,
   3.0,
   5.0,
   4.0,
   5.0,
   4.0,
   5.0,
   4.5,
   4.0,
   3.5,
   4.0,
   5.0,
   4.0,
   5.0,
   5.0,
   4.5,
   3.0,
   4.0,
   4.0,
   5.0,
   5.0,
   4.0,
   4.5,
   4.0]),
 ('5060',
  [4.0,
   4.0,
  

In [143]:
movie_top_rating = movie_ratings\
    .mapValues(lambda movie_ratings_list: sum(movie_ratings_list) / len(movie_ratings_list))

movie_top_rating.take(5)

[('4973', 4.183333333333334),
 ('5060', 3.9347826086956523),
 ('5218', 3.6882352941176473),
 ('5299', 3.2666666666666666),
 ('5349', 3.540983606557377)]

Отсортируем сперва по оценке, затем по *movieId* (сортировка по убыванию обоих параметров)

In [150]:
top_rated_movies = movie_top_rating\
    .sortBy(lambda row: [row[1], row[0]], ascending=False)

In [151]:
TOP_N = 100
top_rated_movies.take(TOP_N)

[('99636', 5.0),
 ('99', 5.0),
 ('96935', 5.0),
 ('96832', 5.0),
 ('96608', 5.0),
 ('96430', 5.0),
 ('95843', 5.0),
 ('95311', 5.0),
 ('95175', 5.0),
 ('95149', 5.0),
 ('94810', 5.0),
 ('93320', 5.0),
 ('93022', 5.0),
 ('93008', 5.0),
 ('92494', 5.0),
 ('91386', 5.0),
 ('91355', 5.0),
 ('90943', 5.0),
 ('8911', 5.0),
 ('88448', 5.0),
 ('8804', 5.0),
 ('87834', 5.0),
 ('876', 5.0),
 ('8738', 5.0),
 ('86721', 5.0),
 ('86668', 5.0),
 ('86237', 5.0),
 ('8580', 5.0),
 ('85295', 5.0),
 ('84512', 5.0),
 ('84273', 5.0),
 ('83969', 5.0),
 ('82744', 5.0),
 ('8238', 5.0),
 ('80124', 5.0),
 ('79897', 5.0),
 ('78836', 5.0),
 ('7815', 5.0),
 ('77846', 5.0),
 ('76091', 5.0),
 ('74226', 5.0),
 ('73822', 5.0),
 ('72692', 5.0),
 ('72142', 5.0),
 ('71268', 5.0),
 ('7122', 5.0),
 ('7096', 5.0),
 ('7071', 5.0),
 ('70451', 5.0),
 ('69860', 5.0),
 ('6983', 5.0),
 ('69469', 5.0),
 ('69211', 5.0),
 ('6835', 5.0),
 ('6818', 5.0),
 ('67618', 5.0),
 ('6611', 5.0),
 ('64501', 5.0),
 ('64499', 5.0),
 ('6442', 5.0),

Чтобы проверить, что не все фильмы имеют рейтинг 5.0, посмотрим топ-500:

In [152]:
TOP_N = 500
top_rated_movies.take(TOP_N)

[('99636', 5.0),
 ('99', 5.0),
 ('96935', 5.0),
 ('96832', 5.0),
 ('96608', 5.0),
 ('96430', 5.0),
 ('95843', 5.0),
 ('95311', 5.0),
 ('95175', 5.0),
 ('95149', 5.0),
 ('94810', 5.0),
 ('93320', 5.0),
 ('93022', 5.0),
 ('93008', 5.0),
 ('92494', 5.0),
 ('91386', 5.0),
 ('91355', 5.0),
 ('90943', 5.0),
 ('8911', 5.0),
 ('88448', 5.0),
 ('8804', 5.0),
 ('87834', 5.0),
 ('876', 5.0),
 ('8738', 5.0),
 ('86721', 5.0),
 ('86668', 5.0),
 ('86237', 5.0),
 ('8580', 5.0),
 ('85295', 5.0),
 ('84512', 5.0),
 ('84273', 5.0),
 ('83969', 5.0),
 ('82744', 5.0),
 ('8238', 5.0),
 ('80124', 5.0),
 ('79897', 5.0),
 ('78836', 5.0),
 ('7815', 5.0),
 ('77846', 5.0),
 ('76091', 5.0),
 ('74226', 5.0),
 ('73822', 5.0),
 ('72692', 5.0),
 ('72142', 5.0),
 ('71268', 5.0),
 ('7122', 5.0),
 ('7096', 5.0),
 ('7071', 5.0),
 ('70451', 5.0),
 ('69860', 5.0),
 ('6983', 5.0),
 ('69469', 5.0),
 ('69211', 5.0),
 ('6835', 5.0),
 ('6818', 5.0),
 ('67618', 5.0),
 ('6611', 5.0),
 ('64501', 5.0),
 ('64499', 5.0),
 ('6442', 5.0),

### 4. Разница во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму
Тк временные метки, когда пользователь поставил тег данного фильма и когда он поставил оценку фильму, лежат в разных таблицах *ratings* и *tags*, сделаем их join по ключу (*userId*, *movieId*): нам важен и пользователь, и конкретный фильм, который он оценивал. Перед этим можем удалить из таблиц все остальные данные, оставив только *userId*, *movieId* и *timestamp*. Посчитаем разницу временных меток для каждой пары (*userId*, *movieId*). В качестве ответа выведем среднюю дельту по времени.

Вспомним, как в каком порядке идут колонки в исходных таблицах:

In [104]:
rdd_ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815']

In [105]:
rdd_tags.take(5)

['userId,movieId,tag,timestamp',
 '2,60756,funny,1445714994',
 '2,60756,Highly quotable,1445714996',
 '2,60756,will ferrell,1445714992',
 '2,89774,Boxing story,1445715207']

Приводим обе таблицы к виду: ((userId, movieId), timestamp). Timestamp'ы скастуем из str ко float.

In [113]:
rating_timestamps = ratings_parsed\
    .map(lambda row: ((row[0], row[1]), float(row[3])))

rating_timestamps.take(5)

[(('1', '1'), 964982703.0),
 (('1', '3'), 964981247.0),
 (('1', '6'), 964982224.0),
 (('1', '47'), 964983815.0),
 (('1', '50'), 964982931.0)]

In [114]:
tag_timestamps = tags_parsed\
    .map(lambda row: ((row[0], row[1]), float(row[3])))

tag_timestamps.take(5)

[(('2', '60756'), 1445714994.0),
 (('2', '60756'), 1445714996.0),
 (('2', '60756'), 1445714992.0),
 (('2', '89774'), 1445715207.0),
 (('2', '89774'), 1445715200.0)]

Джоиним полученные таблицы

In [115]:
joined_rating_tag_timestamps = rating_timestamps.join(tag_timestamps)

joined_rating_tag_timestamps.take(5)

[(('336', '1'), (1122227329.0, 1139045764.0)),
 (('336', '37729'), (1139047287.0, 1139047294.0)),
 (('356', '2146'), (1229142976.0, 1229142995.0)),
 (('356', '37384'), (1229142463.0, 1229142458.0)),
 (('356', '61323'), (1228073421.0, 1228073481.0))]

Считаем разницу между временными метками

In [129]:
delta_rating_tag_timestamps = joined_rating_tag_timestamps\
    .mapValues(lambda timestamps_pair: abs(timestamps_pair[1] - timestamps_pair[0]))

delta_rating_tag_timestamps.take(5)

[(('2', '106782'), 88.0),
 (('2', '106782'), 85.0),
 (('2', '106782'), 90.0),
 (('18', '44665'), 1898413.0),
 (('18', '52604'), 47.0)]

Создаем список только из временных меток

In [123]:
timestamps_deltas = delta_rating_tag_timestamps.values()

timestamps_deltas.take(5)

[16818435.0, 7.0, 19.0, 5.0, 60.0]

Считаем среднюю дельту по всем временным меткам. Ответ:

In [126]:
avg_timestamps_delta = timestamps_deltas.mean()
print(f'Average delta rating and taging timestamps: {avg_timestamps_delta}')

Average delta rating and taging timestamps: 29203715.56846949


### 5. Средняя оценка от каждого пользователя

Тк числовой рейтинг фильмов присутствует только в таблице 'ratings', опять будем работать с ней. Делаем те же шаги, что и в задаче 3, где считали среднюю оценку для каждого фильма, только теперь считаем для каждого пользователя.

Шаги: нам потребуются колонки *userId* и *rating*, оставим только их и заодно приведем колонку *ratings* к формату float. Сгруппируем строки по *userId*, посчитаем средний рейтинг для каждого пользователя. В качестве ответа выведем среднее от всех усредненных оценок всех пользователей.

Вспоминаем порядок столбцов

In [132]:
rdd_ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815']

Оставляем: (userId, rating)

In [130]:
user_rating = ratings_parsed.map(lambda row: [row[0], float(row[2])])
user_rating.take(5)

[['1', 4.0], ['1', 4.0], ['1', 4.0], ['1', 5.0], ['1', 5.0]]

Группируем по userId, кастуем результат groupBy: из pyspark.resultiterable.ResultIterable в list

In [138]:
grouped_user_rating = user_rating\
    .groupBy(lambda row: row[0])\
    .mapValues(list)

grouped_user_rating.take(2)

[('1',
  [['1', 4.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 3.0],
   ['1', 3.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 3.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 5.0],
   ['1', 4.0],
   ['1', 5.0],
   ['1', 5.0],
   

Каждое value у нас сейчас является списком пар: (userId, rating). Уберем из них userId, оставив только список рейтингов.

In [141]:
user_ratings = grouped_user_rating\
    .mapValues(lambda user_rating_group: [user_rating_pair[1] for user_rating_pair in user_rating_group])

user_ratings.take(1)

[('1',
  [4.0,
   4.0,
   4.0,
   5.0,
   5.0,
   3.0,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   3.0,
   5.0,
   4.0,
   5.0,
   3.0,
   3.0,
   5.0,
   4.0,
   4.0,
   5.0,
   4.0,
   3.0,
   4.0,
   5.0,
   4.0,
   3.0,
   5.0,
   4.0,
   4.0,
   5.0,
   4.0,
   4.0,
   4.0,
   5.0,
   5.0,
   3.0,
   5.0,
   3.0,
   4.0,
   3.0,
   3.0,
   4.0,
   5.0,
   5.0,
   5.0,
   4.0,
   5.0,
   3.0,
   5.0,
   5.0,
   5.0,
   5.0,
   3.0,
   5.0,
   5.0,
   4.0,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   4.0,
   2.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   3.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   4.0,
   3.0,
   3.0,
   3.0,
   3.0,
   4.0,
   4.0,
   5.0,
   4.0,
   5.0,
   3.0,
   5.0,
   5.0,
   4.0,
   5.0,
   3.0,
   3.0,
   5.0,
   4.0,
   4.0,
   5.0,
   4.0,
   4.0,
   5.0,
   5.0,
   4.0,
   4.0,
   5.0,
   4.0,
   5.0,
   4.0,
   5.0,
   4.0,
 

Посчитаем средний рейтинг для каждого пользователя

In [142]:
avg_user_ratings = user_ratings\
    .mapValues(lambda user_ratings_list: sum(user_ratings_list) / len(user_ratings_list))

avg_user_ratings.take(5)

[('1', 4.366379310344827),
 ('4', 3.5555555555555554),
 ('8', 3.574468085106383),
 ('9', 3.260869565217391),
 ('10', 3.2785714285714285)]

Выведем среднее значение оценки по всем пользователям:

In [154]:
avg_total_user_rating = avg_user_ratings.values().mean()
print(f'Average user rating: {avg_total_user_rating}')

Average user rating: 3.6572223377474


# Блок 3. UDF
Хотим обучить модель, которая по тегам предсказывала бы рейтинг фильма.

### Настройка Spark сессии

In [131]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [132]:
spark = SparkSession.builder\
    .appName('fedotova_spark')\
    .config(conf=conf)\
    .master(master="yarn").getOrCreate()

### Обработка данных
Создадим DataFrame'ы с данными.
Datframe рейтингов:

In [133]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

In [134]:
%%time
ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("ml-latest-small/ratings.csv")

CPU times: user 3.48 ms, sys: 2.24 ms, total: 5.72 ms
Wall time: 33.7 ms


In [135]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



                                                                                

DataFrame тегов:

In [136]:
tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

In [137]:
%%time
tags_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(tags_schema)\
    .load("ml-latest-small/tags.csv")

CPU times: user 2.48 ms, sys: 3.36 ms, total: 5.84 ms
Wall time: 29.3 ms


In [138]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



Join'им таблицы:

In [176]:
tags_with_ratings = tags_df.alias("r").join(ratings_df.alias("t"), on=["userId", "movieId"], how="inner")

In [177]:
tags_with_ratings.show(5)

+------+-------+---------------+----------+------+----------+
|userId|movieId|            tag| timestamp|rating| timestamp|
+------+-------+---------------+----------+------+----------+
|     2|  60756|   will ferrell|1445714992|   5.0|1445714980|
|     2|  60756|Highly quotable|1445714996|   5.0|1445714980|
|     2|  60756|          funny|1445714994|   5.0|1445714980|
|     2|  89774|      Tom Hardy|1445715205|   5.0|1445715189|
|     2|  89774|            MMA|1445715200|   5.0|1445715189|
+------+-------+---------------+----------+------+----------+
only showing top 5 rows



Оставляем только нужные колонки 'tags' и 'ratings':

In [182]:
tags_ratings_df = tags_with_ratings.select('tag', 'rating')

In [183]:
tags_ratings_df.show(5)

+---------------+------+
|            tag|rating|
+---------------+------+
|   will ferrell|   5.0|
|Highly quotable|   5.0|
|          funny|   5.0|
|      Tom Hardy|   5.0|
|            MMA|   5.0|
+---------------+------+
only showing top 5 rows



Сконвертируем датасеты в pandas

In [180]:
tags_ratings_pdf = tags_ratings_df.toPandas()

In [181]:
tags_ratings_pdf.head(5)

Unnamed: 0,rating,tag
0,5.0,will ferrell
1,5.0,Highly quotable
2,5.0,funny
3,5.0,Tom Hardy
4,5.0,MMA


### Обучение модели
Обучим TfidfVectorizer на колонке “tag”:

In [145]:
from sklearn.feature_extraction.text import TfidfVectorizer

In [146]:
tfidf_vectorizer = TfidfVectorizer()
tfidf_vectorizer.fit(tags_ratings_pdf['tag'])

Получим численные признаки transform-ом от tfidf на той же колонке “tag”:

In [147]:
tags_features = tfidf_vectorizer.transform(tags_ratings_pdf['tag'])

In [148]:
print(tfidf_vectorizer.get_feature_names_out())
print(tag_features.shape)

['06' '1900s' '1920s' ... 'zombie' 'zombies' 'zooey']
(3476, 1708)


Обучим SGDRegressor на новых численных признаках от TfidfVectorizer-а с лейблом “rating”:

In [149]:
from sklearn.linear_model import SGDRegressor

In [150]:
sgd = SGDRegressor()

In [151]:
sgd.fit(X=tags_features, y=tags_ratings_pdf['rating'])

### UDF: предсказание рейтинга по столбцу “tag”
Сначала делаем transform от TfidfVectorizer, затем predict от SGDRegressor на полученных признаках из 1-го этапа

In [173]:
@f.udf(DoubleType())
def predict_rating_from_tag(tags):
    tags_features = tfidf_vectorizer.transform([tags])
    predicted_ratings = sgd.predict(tags_features)
    # преобразуем numpy ndarray в стандартный python scalar
    return predicted_ratings.item()

Применим UDF к spark dataframe-у и убедимся, что udf работает, вызвав action show(50)):

In [174]:
tags_ratings_with_preds = tags_ratings_df\
    .withColumn('predicted_rating', predict_rating_from_tag(f.col('tag')))

In [175]:
tags_ratings_with_preds.show(50)

                                                                                

+------+--------------------+------------------+
|rating|                 tag|  predicted_rating|
+------+--------------------+------------------+
|   5.0|        will ferrell|4.0338854132540805|
|   5.0|     Highly quotable| 3.945725021929871|
|   5.0|               funny| 4.367294947333176|
|   5.0|           Tom Hardy|3.8414958875151717|
|   5.0|                 MMA| 3.419073066217332|
|   5.0|        Boxing story|3.9249024610120773|
|   5.0|     Martin Scorsese|3.8479621567156994|
|   5.0|   Leonardo DiCaprio| 4.234165809560408|
|   5.0|               drugs| 4.283169097967513|
|   1.0|        way too long| 3.427078467741662|
|   4.0|               mafia| 4.159233805653512|
|   4.0|            gangster| 3.620002030163171|
|   4.0|           Al Pacino|3.8543008558504526|
|   5.0|               Mafia| 4.159233805653512|
|   5.0|           Al Pacino|3.8543008558504526|
|   4.5|          true story|3.8507721191734943|
|   4.5|           holocaust|3.9732221163643957|
|   4.5|        twis

### RMSE между предсказанным и истинным значением рейтинга
Формула RMSE (корень средней суммы квадратов разностей между предсказанным и истинным значением рейтинга): 
$$ RMSE = \sqrt{\frac{\sum_{i=1}^{n} (y\_pred_{i} - y\_true_{i})^2}{n}} $$
Будем использовать pyspark.sql

In [163]:
tags_ratings_with_preds\
.select((f.col('rating') - f.col('predicted_rating')).alias('delta_y'))\
.select(f.pow(f.col('delta_y'), 2).alias('delta_y_square'))\
.select(f.avg(f.col('delta_y_square')).alias('avg_squares'))\
.select(f.sqrt(f.col('avg_squares')).alias('RMSE'))\
.show()

[Stage 77:>                                                         (0 + 1) / 1]

+----------------+
|            RMSE|
+----------------+
|0.87426504306081|
+----------------+



                                                                                