# Advanced Spark

Кластер точно такой же, поэтому не буду еще раз указывать terraform.

Сегодня пройдемся по каким-то аспектам фреймворка Spark, которые не затрагивали в предыдущий раз, но которые могут оказаться очень полезными.

Для начала скачаем датасет. На этот раз поработает с базой данных IMDB - сайта про кино. Данные доступны в формате tsv - вся информация на официальном сайте https://www.imdb.com/interfaces/ .

In [6]:
%%bash

hdfs dfs -rm -r /imdb/data || true
hdfs dfs -mkdir -p /imdb/data

Deleted /imdb/data


21/02/06 13:56:09 WARN azure.AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
21/02/06 13:56:10 INFO azure.AzureFileSystemThreadPoolExecutor: Time taken for Delete operation is: 50 ms with threads: 0


Загружаем данные в HDFS. Можно отметить, что если в ключе `-put` указать `-` то команда будет считывать данные с stdin. Таким образом можно переливать данные сразу в облако, минуя жесткий диск текущей машины.

In [7]:
%%bash

curl https://datasets.imdbws.com/name.basics.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/name.basics.tsv
curl https://datasets.imdbws.com/title.akas.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/title.akas.tsv
curl https://datasets.imdbws.com/title.basics.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/title.basics.tsv
curl https://datasets.imdbws.com/title.crew.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/title.crew.tsv
curl https://datasets.imdbws.com/title.episode.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/title.episode.tsv
curl https://datasets.imdbws.com/title.principals.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/title.principals.tsv
curl https://datasets.imdbws.com/title.ratings.tsv.gz | gunzip | hdfs dfs -put - /imdb/data/title.ratings.tsv

echo "DONE"

DONE


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0  198M    0 95944    0     0  16684      0  3:28:13  0:00:05  3:28:08 16888  1  198M    1 3504k    0     0   577k      0  0:05:52  0:00:06  0:05:46  583k  4  198M    4 9155k    0     0  1293k      0  0:02:37  0:00:07  0:02:30 1306k 10  198M   10 20.1M    0     0  2560k      0  0:01:19  0:00:08  0:01:11 2582k 15  198M   15 31.7M    0     0  3580k      0  0:00:56  0:00:09  0:00:47 3608k 22  198M   22 44.2M    0     0  4499k      0  0:00:45  0:00:10  0:00:35 10.2M 28  198M   28 56.5M    0     0  5224k      0  0:00:38  0:00:11  0:00:27 10.6M 35  198M   35 70.7M    0     0  6001k      0  0:00:33  0:00:12  0:00:21 12.3M 42  198M   42 84.6M    0     0  6631k      0  0:00

In [9]:
%%bash

hdfs dfs -ls /imdb/data

Found 7 items
-rw-r--r--   1 spark supergroup  638363053 2021-02-06 14:00 /imdb/data/name.basics.tsv
-rw-r--r--   1 spark supergroup 1242207063 2021-02-06 14:00 /imdb/data/title.akas.tsv
-rw-r--r--   1 spark supergroup  647652320 2021-02-06 14:01 /imdb/data/title.basics.tsv
-rw-r--r--   1 spark supergroup  244554663 2021-02-06 14:01 /imdb/data/title.crew.tsv
-rw-r--r--   1 spark supergroup  141120617 2021-02-06 14:01 /imdb/data/title.episode.tsv
-rw-r--r--   1 spark supergroup 1888973678 2021-02-06 14:02 /imdb/data/title.principals.tsv
-rw-r--r--   1 spark supergroup   19127479 2021-02-06 14:02 /imdb/data/title.ratings.tsv


In [1]:
sc = spark.sparkContext 

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1612687723003_0008,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.

In [2]:
from pyspark.sql import SparkSession, Row

In [3]:
se = SparkSession(sc)

`TSV` - это точно такой же формат как и `CSV`, только с другим разделителем. Так что можем использовать стандартный парсер таблицы формата csv.

In [4]:
name_basics = spark.read.option("delimiter", "\t").csv('/imdb/data/name.basics.tsv', header=True, inferSchema=True)
akas = spark.read.option("delimiter", "\t").csv('/imdb/data/title.akas.tsv', header=True, inferSchema=True)
basics = spark.read.option("delimiter", "\t").csv('/imdb/data/title.basics.tsv', header=True, inferSchema=True)
crew = spark.read.option("delimiter", "\t").csv('/imdb/data/title.crew.tsv', header=True, inferSchema=True)
episode = spark.read.option("delimiter", "\t").csv('/imdb/data/title.episode.tsv', header=True, inferSchema=True)
principals = spark.read.option("delimiter", "\t").csv('/imdb/data/title.principals.tsv', header=True, inferSchema=True)
rating = spark.read.option("delimiter", "\t").csv('/imdb/data/title.ratings.tsv', header=True, inferSchema=True)

In [19]:
name_basics.printSchema()
akas.printSchema()
basics.printSchema()
crew.printSchema()
episode.printSchema()
principals.printSchema()
rating.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)

root
 |-- titleId: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- region: string (nullable = true)
 |-- language: string (nullable = true)
 |-- types: string (nullable = true)
 |-- attributes: string (nullable = true)
 |-- isOriginalTitle: string (nullable = true)

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- tconst: string (nullab

#### Улучшаем формат записи

Сейчас мы храним записи в сыров формате в виде просто строк. Такой формат удобен для чтения человеков, однако машине приходится прикладывать усилия, чтобы разобрать такой формат.

Существует ряд форматов, поддерживаемых в Spark, которые гораздо лучше подходят для обработки. Один из таких форматов - Parquet. Подробнее можно почитать про него на официальном сайте - https://parquet.apache.org/

In [41]:
principals.count()

42913666

In [43]:
principals.show()

+---------+--------+---------+---------------+--------------------+--------------+
|   tconst|ordering|   nconst|       category|                 job|    characters|
+---------+--------+---------+---------------+--------------------+--------------+
|tt0000001|       1|nm1588970|           self|                  \N|      ["Self"]|
|tt0000001|       2|nm0005690|       director|                  \N|            \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|            \N|
|tt0000002|       1|nm0721526|       director|                  \N|            \N|
|tt0000002|       2|nm1335271|       composer|                  \N|            \N|
|tt0000003|       1|nm0721526|       director|                  \N|            \N|
|tt0000003|       2|nm1770680|       producer|            producer|            \N|
|tt0000003|       3|nm1335271|       composer|                  \N|            \N|
|tt0000003|       4|nm5442200|         editor|                  \N|            \N|
|tt0

Посчитаем например среднее квадрата значения в колонке ordering

In [44]:
principals.rdd.map(lambda x: x.ordering ** 2).mean()

28.670035111891885

4.17 минуты на несложный вывод для одного файла. Пробуем сконвертировать в паркет.

In [33]:
%%bash

hdfs dfs -rm -r /imdb/parquet || true
hdfs dfs -mkdir -p /imdb/parquet

rm: `/imdb/parquet': No such file or directory


In [34]:
principals.write.parquet("/imdb/parquet/principals.parquet")

In [37]:
principals_parquet = spark.read.parquet("/imdb/parquet/principals.parquet")

In [39]:
principals_parquet.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)

Можно уже заметить, что ordering стал integer, а не просто string. 

Попробуем вопрорить запрос выше.

In [45]:
principals_parquet.rdd.map(lambda x: x.ordering ** 2).mean()

28.670035111891885

3.50 минуты. Не фантастика, но лишние полминусы срезали. Переконвертируем сразу все в этот формат и дальше будем работать уже с ним.

In [46]:
name_basics.write.parquet("/imdb/parquet/name_basics.parquet")
akas.write.parquet("/imdb/parquet/akas.parquet")
basics.write.parquet("/imdb/parquet/basics.parquet")
crew.write.parquet("/imdb/parquet/crew.parquet")
episode.write.parquet("/imdb/parquet/episode.parquet")
rating.write.parquet("/imdb/parquet/rating.parquet")

In [10]:
name_basics = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/name_basics.parquet')
akas = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/akas.parquet')
basics = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/basics.parquet')
crew = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/crew.parquet')
episode = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/episode.parquet')
principals = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/principals.parquet')
rating = spark.read.option("delimiter", "\t").parquet('/imdb/parquet/rating.parquet')

In [11]:
name_basics.registerTempTable("name_basics")
akas.registerTempTable("akas")
basics.registerTempTable("basics")
crew.registerTempTable("crew")
episode.registerTempTable("episode")
principals.registerTempTable("principals")
rating.registerTempTable("rating")

#### Поисследуем таблички

In [12]:
spark.sql("""
    SELECT * 
    FROM rating
    LIMIT 10
""").show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1677|
|tt0000002|          6.0|     207|
|tt0000003|          6.5|    1410|
|tt0000004|          6.1|     122|
|tt0000005|          6.2|    2204|
|tt0000006|          5.3|     120|
|tt0000007|          5.4|     674|
|tt0000008|          5.4|    1852|
|tt0000009|          6.0|     156|
|tt0000010|          6.9|    6196|
+---------+-------------+--------+

In [13]:
spark.sql("""
    SELECT *
    FROM basics
    LIMIT 10
""").show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

In [14]:
spark.sql("""
    SELECT *
    FROM akas
    LIMIT 10
""").show()

+----------+--------+------------------+------+--------+-----+----------+---------------+
|   titleId|ordering|             title|region|language|types|attributes|isOriginalTitle|
+----------+--------+------------------+------+--------+-----+----------+---------------+
|tt12514792|       3|     Folge #1.2483|    DE|      de|   \N|        \N|              0|
|tt12514792|       4|エピソード #1.2483|    JP|      ja|   \N|        \N|              0|
|tt12514792|       5|  Episódio #1.2483|    PT|      pt|   \N|        \N|              0|
|tt12514792|       6|  Episodio #1.2483|    IT|      it|   \N|        \N|              0|
|tt12514792|       7|  Episodio #1.2483|    ES|      es|   \N|        \N|              0|
|tt12514794|       1|    एपिसोड #1.2484|    IN|      hi|   \N|        \N|              0|
|tt12514794|       2|   Épisode #1.2484|    FR|      fr|   \N|        \N|              0|
|tt12514794|       3|     Folge #1.2484|    DE|      de|   \N|        \N|              0|
|tt12514794|   

In [15]:
spark.sql("""
    SELECT *
    FROM crew
    WHERE directors <> '\\\\N'
    LIMIT 10
""").show()

+---------+--------------------+--------------------+
|   tconst|           directors|             writers|
+---------+--------------------+--------------------+
|tt1910269|           nm2350741| nm2350741,nm4559026|
|tt1910272|nm5228743,nm10584...|nm2438224,nm30151...|
|tt1910275|           nm1380454|           nm3475978|
|tt1910277|           nm0003962|nm3929970,nm05055...|
|tt1910278|           nm0730355|           nm2656280|
|tt1910283|           nm1488792| nm2327939,nm1488792|
|tt1910286|           nm4425225|                  \N|
|tt1910287| nm1847623,nm1046377|           nm1293423|
|tt1910288| nm1046377,nm1847623|           nm1293423|
|tt1910289| nm1046377,nm1847623|           nm1293423|
+---------+--------------------+--------------------+

Здесь информация записана не совсем в табличном виде - в качестве значений колонок используется список из строк, разделенных запятой. 

В SQL есть готовые функции для манипуляции с данными. Весь список можно найти здесь - https://spark.apache.org/docs/2.3.0/api/sql/index.html . Конкретно сейчас воспользуемся split, которая превращает строку в массив строк, разбивая ее по указанному символу.

In [16]:
spark.sql("""
    SELECT tconst, split(directors, ',')
    FROM crew
    WHERE directors <> '\\\\N'
    LIMIT 10
""").show()

+---------+--------------------+
|   tconst| split(directors, ,)|
+---------+--------------------+
|tt0000001|         [nm0005690]|
|tt0000002|         [nm0721526]|
|tt0000003|         [nm0721526]|
|tt0000004|         [nm0721526]|
|tt0000005|         [nm0005690]|
|tt0000006|         [nm0005690]|
|tt0000007|[nm0005690, nm037...|
|tt0000008|         [nm0005690]|
|tt0000009|         [nm0085156]|
|tt0000010|         [nm0525910]|
+---------+--------------------+

`explode` распупочивает список в отдельные записи в таблице

In [17]:
spark.sql("""
    SELECT tconst, explode(split(directors, ',')) as director
    FROM crew
    WHERE directors <> '\\\\N'
    LIMIT 10
""").show()

+---------+---------+
|   tconst| director|
+---------+---------+
|tt1910269|nm2350741|
|tt1910272|nm5228743|
|tt1910272|nm1058480|
|tt1910272|nm1159969|
|tt1910272|nm3079478|
|tt1910272|nm7114004|
|tt1910272|nm2459245|
|tt1910272|nm4823781|
|tt1910272|nm0766272|
|tt1910272|nm5117369|
+---------+---------+

In [18]:
spark.sql("""
    SELECT *
    FROM name_basics
    LIMIT 10
""").show()

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm1678237|  Natalia López|       \N|       \N|costume_designer,...| tt0363365,tt0401933|
|nm1678238|    Tomás López|       \N|       \N|   camera_department| tt0289032,tt0823369|
|nm1678239| Matt MacDonald|       \N|       \N|editorial_department|           tt0271295|
|nm1678240|   Russell Mack|       \N|       \N|       miscellaneous|           tt0414751|
|nm1678241|Rexanne Mancini|       \N|       \N|    music_department|           tt0181305|
|nm1678242|Andrea Martínez|       \N|       \N|writer,miscellaneous|           tt0328789|
|nm1678243|  John Matthews|       \N|       \N|       miscellaneous| tt0349683,tt0449845|
|nm1678244| Kevin McCarthy|       \N|       \N|director,writer,c...| tt0423092,tt0104242|
|nm1678246

Чтобы не плодить лишних таблиц, можно делать подзапросы. Распупочим директоров и сразу джойним их с информацией про людей.

In [19]:
spark.sql("""
    WITH crew_full AS (
        SELECT tconst, explode(split(directors, ',')) as director_nconst
        FROM crew 
        WHERE directors <> '\\\\N'
        LIMIT 10
    )
    SELECT *
    FROM crew_full cf
        join name_basics nb on nb.nconst = director_nconst
    LIMIT 10
""").show()

+---------+---------------+---------+--------------------+---------+---------+--------------------+--------------------+
|   tconst|director_nconst|   nconst|         primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+--------------------+---------+---------+--------------------+--------------------+
|tt0000008|      nm0005690|nm0005690|William K.L. Dickson|     1860|     1935|cinematographer,d...|tt1428455,tt02195...|
|tt0000007|      nm0005690|nm0005690|William K.L. Dickson|     1860|     1935|cinematographer,d...|tt1428455,tt02195...|
|tt0000006|      nm0005690|nm0005690|William K.L. Dickson|     1860|     1935|cinematographer,d...|tt1428455,tt02195...|
|tt0000005|      nm0005690|nm0005690|William K.L. Dickson|     1860|     1935|cinematographer,d...|tt1428455,tt02195...|
|tt0000001|      nm0005690|nm0005690|William K.L. Dickson|     1860|     1935|cinematographer,d...|tt1428455,tt02195...|
|tt0000009|      nm0085156|nm008

In [20]:
spark.sql("""
    SELECT *
    FROM episode
    LIMIT 10
""").show()

+---------+------------+------------+-------------+
|   tconst|parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt5011052|   tt4996262|           1|            3|
|tt5011054|   tt4996262|           1|            4|
|tt5011058|   tt4996262|           1|            7|
|tt5011060|   tt4996262|           1|           10|
|tt5011062|   tt4996262|           1|            2|
|tt5011064|   tt4996262|           1|            8|
|tt5011066|   tt4996262|           1|           12|
|tt5011068|   tt4996262|           1|            9|
|tt5011070|   tt4996262|           1|            5|
|tt5011072|   tt4996262|           1|            1|
+---------+------------+------------+-------------+

In [21]:
spark.sql("""
    SELECT *
    FROM principals
    LIMIT 10
""").show()

+---------+--------+---------+---------------+----------+----------+
|   tconst|ordering|   nconst|       category|       job|characters|
+---------+--------+---------+---------------+----------+----------+
|tt1787816|       4|nm0124094|archive_footage|        \N|  ["Self"]|
|tt1787816|       5|nm1683671|       director|        \N|        \N|
|tt1787816|       6|nm0722503|         writer|written by|        \N|
|tt1787816|       7|nm1538319|       producer|  producer|        \N|
|tt1787816|       8|nm2441456|       producer|  producer|        \N|
|tt1787816|       9|nm0706420|       composer|        \N|        \N|
|tt1787817|       1|nm2816360|       director|        \N|        \N|
|tt1787817|       2|nm0366055|       director|        \N|        \N|
|tt1787817|       3|nm0111771|       producer|  producer|        \N|
|tt1787817|       4|nm4205789|       composer|        \N|        \N|
+---------+--------+---------+---------------+----------+----------+

In [22]:
spark.sql("""
    SELECT *
    FROM rating
    LIMIT 10
""").show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1677|
|tt0000002|          6.0|     207|
|tt0000003|          6.5|    1410|
|tt0000004|          6.1|     122|
|tt0000005|          6.2|    2204|
|tt0000006|          5.3|     120|
|tt0000007|          5.4|     674|
|tt0000008|          5.4|    1852|
|tt0000009|          6.0|     156|
|tt0000010|          6.9|    6196|
+---------+-------------+--------+

Чтож, давайте немного позанимается машинным обучением. Задача будет такая - предсказываем количество оценок для фильма.

Давайте придумывать признаки.

Из основной таблицы можем взять `isAdult`, `startYear` и `runtimeMinutes` прямо как есть.

In [23]:
spark.sql("""
    SELECT tconst, cast(isAdult as int) as f_is_adult, cast(startYear as int) as f_start_year, cast(runtimeMinutes as int) as f_runtime_minutes
    FROM basics
    LIMIT 10
""").show()

+---------+----------+------------+-----------------+
|   tconst|f_is_adult|f_start_year|f_runtime_minutes|
+---------+----------+------------+-----------------+
|tt4502952|         0|        2017|             null|
|tt4502954|         0|        2015|             null|
|tt4502956|         0|        2012|             null|
|tt4502958|         0|        2015|             null|
|tt4502960|         0|        2011|               12|
|tt4502962|         0|        2011|                3|
|tt4502964|         0|        2010|             null|
|tt4502966|         0|        2012|                6|
|tt4502968|         0|        2015|              100|
|tt4502970|         0|        2009|                3|
+---------+----------+------------+-----------------+

In [24]:
spark.sql("""
    SELECT tconst, cast(isAdult as int) as f_is_adult, cast(startYear as int) as f_start_year, cast(runtimeMinutes as int) as f_runtime_minutes
    FROM basics
""").registerTempTable('basics_features')

А вот с жанрами нужно немного поколдовать - вначале нужно понять, сколько вообще есть жанров, а они склеены в строку.

In [25]:
spark.sql("""
    SELECT distinct(explode(split(genres, ',')))
    FROM basics
""").show()

+-----------+
|        col|
+-----------+
|      Crime|
|    Romance|
|   Thriller|
|  Adventure|
|         \N|
|      Drama|
|        War|
|Documentary|
| Reality-TV|
|     Family|
|    Fantasy|
|  Game-Show|
|      Adult|
|    History|
|    Mystery|
|    Musical|
|  Animation|
|      Music|
|  Film-Noir|
|      Short|
+-----------+
only showing top 20 rows

In [26]:
spark.sql("""
    SELECT distinct(explode(split(genres, ',')))
    FROM basics
""").count()

29

#### Программно создаваемые запросы через DataFrame API

Чтобы более гибко контролировать запросы, можно использовать не тестовый способ запуска SQL, а программный через DataFrame API.

Подробнее про то, как составлять такие запросы и какие готовые фукнции уже существуют - на официальном сайте https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

In [27]:
from pyspark.sql import functions as F

In [28]:
basics.select(['tconst', 'titleType', 'genres']).limit(10).show()

+---------+---------+--------------------+
|   tconst|titleType|              genres|
+---------+---------+--------------------+
|tt0000001|    short|   Documentary,Short|
|tt0000002|    short|     Animation,Short|
|tt0000003|    short|Animation,Comedy,...|
|tt0000004|    short|     Animation,Short|
|tt0000005|    short|        Comedy,Short|
|tt0000006|    short|               Short|
|tt0000007|    short|         Short,Sport|
|tt0000008|    short|   Documentary,Short|
|tt0000009|    short|       Romance,Short|
|tt0000010|    short|   Documentary,Short|
+---------+---------+--------------------+

In [29]:
basics.select(['tconst', 'titleType', 'genres']).where(F.col('tconst') == 'tt0000001').limit(10).show()

+---------+---------+-----------------+
|   tconst|titleType|           genres|
+---------+---------+-----------------+
|tt0000001|    short|Documentary,Short|
+---------+---------+-----------------+

In [30]:
genres_c = spark.sql("""
    SELECT distinct(explode(split(genres, ',')))
    FROM basics
""").rdd.map(lambda x: x.col).collect()

In [31]:
genres_c

['Crime', 'Romance', 'Thriller', 'Adventure', '\\N', 'Drama', 'War', 'Documentary', 'Reality-TV', 'Family', 'Game-Show', 'Fantasy', 'Adult', 'History', 'Mystery', 'Musical', 'Animation', 'Music', 'Film-Noir', 'Short', 'Horror', 'Western', 'Biography', 'Comedy', 'Action', 'Sport', 'Talk-Show', 'Sci-Fi', 'News']

`F.when` проверяет условие и если оно верно то выставляет указанное значение, если не верно, то значение указанное в `otherwise`. 

Таким образом добавим сколько запросов, сколько есть категорий и для каждой проверим, если ли в она в списке. 

In [32]:
exprs = [
    F.when(
        F.array_contains(F.split(F.col('genres'), ','), category),
        1
    )
    .otherwise(0).alias("f_genres_" + category) 
    for category in genres_c
]

In [33]:
basics.select('tconst', *exprs).limit(5).show()

+---------+--------------+----------------+-----------------+------------------+-----------+--------------+------------+--------------------+-------------------+---------------+------------------+----------------+--------------+----------------+----------------+----------------+------------------+--------------+------------------+--------------+---------------+----------------+------------------+---------------+---------------+--------------+------------------+---------------+-------------+
|   tconst|f_genres_Crime|f_genres_Romance|f_genres_Thriller|f_genres_Adventure|f_genres_\N|f_genres_Drama|f_genres_War|f_genres_Documentary|f_genres_Reality-TV|f_genres_Family|f_genres_Game-Show|f_genres_Fantasy|f_genres_Adult|f_genres_History|f_genres_Mystery|f_genres_Musical|f_genres_Animation|f_genres_Music|f_genres_Film-Noir|f_genres_Short|f_genres_Horror|f_genres_Western|f_genres_Biography|f_genres_Comedy|f_genres_Action|f_genres_Sport|f_genres_Talk-Show|f_genres_Sci-Fi|f_genres_News|
+-------

In [34]:
basics.select('tconst', *exprs).limit(1).collect()

[Row(tconst='tt0000001', f_genres_Crime=0, f_genres_Romance=0, f_genres_Thriller=0, f_genres_Adventure=0, f_genres_\N=0, f_genres_Drama=0, f_genres_War=0, f_genres_Documentary=1, f_genres_Reality-TV=0, f_genres_Family=0, f_genres_Game-Show=0, f_genres_Fantasy=0, f_genres_Adult=0, f_genres_History=0, f_genres_Mystery=0, f_genres_Musical=0, f_genres_Animation=0, f_genres_Music=0, f_genres_Film-Noir=0, f_genres_Short=1, f_genres_Horror=0, f_genres_Western=0, f_genres_Biography=0, f_genres_Comedy=0, f_genres_Action=0, f_genres_Sport=0, f_genres_Talk-Show=0, f_genres_Sci-Fi=0, f_genres_News=0)]

In [35]:
basics.select('tconst', *exprs).registerTempTable("genres_features")

In [36]:
regions = akas.select('region').distinct().rdd.map(lambda x: x.region).collect()

In [37]:
regions = [x for x in regions if x is not None]

`collect_list` собриает значения по аггрегированному ключу в список.

In [38]:
spark.sql("""
    SELECT collect_list(region)
    FROM akas
    GROUP BY titleId
    LIMIT 10
""").show()

+--------------------+
|collect_list(region)|
+--------------------+
|                [FR]|
|                [FR]|
|        [US, \N, FR]|
|    [\N, ES, GB, ES]|
|                [US]|
|                [US]|
|    [\N, US, DK, US]|
|                [US]|
|                [DK]|
|    [\N, RU, US, NL]|
+--------------------+

In [39]:
exprs = [
    F.when(
        F.array_contains('regions', category),
        1
    )
    .otherwise(0).alias("f_region_" + category) 
    for category in regions
]

In [40]:
akas.groupby("titleId").agg(F.collect_list("region").alias("regions")).select('titleId', *exprs).limit(10).show()

+---------+-----------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------

In [41]:
x = akas.groupby("titleId").agg(F.collect_list("region").alias("regions")).select('titleId', *exprs).limit(1).collect()

In [42]:
x[0]

Row(titleId='tt0000658', f_region_LT=0, f_region_DZ=0, f_region_MM=0, f_region_CI=0, f_region_YUCS=0, f_region_TC=0, f_region_FI=0, f_region_AZ=0, f_region_SC=0, f_region_UA=0, f_region_RO=0, f_region_ZM=0, f_region_KI=0, f_region_SL=0, f_region_NL=0, f_region_LA=0, f_region_SB=0, f_region_DDDE=0, f_region_MN=0, f_region_BS=0, f_region_BW=0, f_region_VDVN=0, f_region_PL=0, f_region_AM=0, f_region_PS=0, f_region_RE=0, f_region_MK=0, f_region_MX=0, f_region_PF=0, f_region_XSA=0, f_region_XKO=0, f_region_GL=0, f_region_TV=0, f_region_EE=0, f_region_VG=0, f_region_SM=0, f_region_CN=0, f_region_AT=0, f_region_RU=1, f_region_IQ=0, f_region_NA=0, f_region_\N=1, f_region_CG=0, f_region_AD=0, f_region_HR=0, f_region_LI=0, f_region_SV=0, f_region_CZ=0, f_region_NP=0, f_region_VA=0, f_region_PT=0, f_region_SO=0, f_region_BUMM=0, f_region_PG=0, f_region_KY=0, f_region_GH=0, f_region_HK=0, f_region_CV=0, f_region_BN=0, f_region_LR=0, f_region_TW=0, f_region_BD=0, f_region_PY=0, f_region_LB=0, f_reg

In [43]:
[f for f, v in x[0].asDict().items() if v == 1]

['f_region_FR', 'f_region_XWW', 'f_region_US', 'f_region_RU', 'f_region_\\N']

In [44]:
spark.sql("""
    SELECT * 
    FROM akas
    WHERE titleId = 'tt0000658'
""").show()

+---------+--------+--------------------+------+--------+-----------+----------+---------------+
|  titleId|ordering|               title|region|language|      types|attributes|isOriginalTitle|
+---------+--------+--------------------+------+--------+-----------+----------+---------------+
|tt0000658|       1|Le cauchemar de F...|    FR|      \N|         \N|        \N|              0|
|tt0000658|       2|   Living Blackboard|   XWW|      en|         \N|        \N|              0|
|tt0000658|       3|Le cauchemar de F...|    \N|      \N|   original|        \N|              1|
|tt0000658|       4|   Кошмар у петрушки|    RU|      \N|imdbDisplay|        \N|              0|
|tt0000658|       5|The Puppet's Nigh...|    US|      \N|         \N|        \N|              0|
+---------+--------+--------------------+------+--------+-----------+----------+---------------+

In [45]:
(
    akas
    .groupby("titleId")
    .agg(F.collect_list("region").alias("regions"))
    .select('titleId', *exprs)
).registerTempTable('regions_features')

Делаем точно тоже самое для языков

In [47]:
languages = akas.select('language').distinct().rdd.map(lambda x: x.language).collect()
languages.remove(None)

exprs = [
    F.when(
        F.array_contains('languages', category),
        1
    )
    .otherwise(0).alias("f_languages_" + category) 
    for category in languages
]

(
    akas
    .groupby("titleId")
    .agg(F.collect_list("language").alias("languages"))
    .select('titleId', *exprs)
).registerTempTable('languages_features')

#### Used defined functions

В crew можем подсчитать количество людей, которое работало над фильмом. Чтобы это было удобно делать на SQL может написать собственную функицю, которую потом можно будет использовать внутри SQL запросов.

In [48]:
def count_persons(raw_string):
    return len(raw_string.split(','))

In [49]:
spark.udf.register("count_persons", count_persons, "integer")

<function count_persons at 0x7fad330efb70>

In [50]:
spark.sql("""
    SELECT tconst, directors, count_persons(directors), count_persons(writers)
    FROM crew
    WHERE directors <> '\\\\N'
    LIMIT 10
""").show()

+---------+--------------------+------------------------+----------------------+
|   tconst|           directors|count_persons(directors)|count_persons(writers)|
+---------+--------------------+------------------------+----------------------+
|tt1910269|           nm2350741|                       1|                     2|
|tt1910272|nm5228743,nm10584...|                      12|                     4|
|tt1910275|           nm1380454|                       1|                     1|
|tt1910277|           nm0003962|                       1|                     4|
|tt1910278|           nm0730355|                       1|                     1|
|tt1910283|           nm1488792|                       1|                     2|
|tt1910286|           nm4425225|                       1|                     1|
|tt1910287| nm1847623,nm1046377|                       2|                     1|
|tt1910288| nm1046377,nm1847623|                       2|                     1|
|tt1910289| nm1046377,nm1847

In [51]:
spark.sql("""
    SELECT tconst, count_persons(directors) as f_directors, count_persons(writers) as f_writers
    FROM crew
""").registerTempTable("crew_features")

In [52]:
bias = basics.select('tconst', F.when(F.col('tconst') == F.col('tconst'), 1).alias('f_bias'))

In [53]:
bias.limit(10).show()

+---------+------+
|   tconst|f_bias|
+---------+------+
|tt4502952|     1|
|tt4502954|     1|
|tt4502956|     1|
|tt4502958|     1|
|tt4502960|     1|
|tt4502962|     1|
|tt4502964|     1|
|tt4502966|     1|
|tt4502968|     1|
|tt4502970|     1|
+---------+------+

In [54]:
bias.registerTempTable("bias")

Чтож, для нашего примера должно быть достаточно. Соберем итоговый датасет.

In [55]:
spark.sql("""
    SELECT numVotes as target, *
    FROM 
        basics_features bf
        join genres_features gf on gf.tconst = bf.tconst
        join regions_features rf on rf.titleId = bf.tconst
        join languages_features lf on lf.titleId = bf.tconst
        join crew_features cf on cf.tconst = bf.tconst
        join bias b on b.tconst = bf.tconst
        join rating r on r.tconst = bf.tconst
    LIMIT 10
""").show()

+------+---------+----------+------------+-----------------+---------+--------------+----------------+-----------------+------------------+-----------+--------------+------------+--------------------+-------------------+---------------+------------------+----------------+--------------+----------------+----------------+----------------+------------------+--------------+------------------+--------------+---------------+----------------+------------------+---------------+---------------+--------------+------------------+---------------+-------------+---------+-----------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+-----------+-----------+-----------+-----------+--------

In [56]:
x = spark.sql("""
    SELECT numVotes as target, *
    FROM 
        basics_features bf
        join genres_features gf on gf.tconst = bf.tconst
        join regions_features rf on rf.titleId = bf.tconst
        join languages_features lf on lf.titleId = bf.tconst
        join crew_features cf on cf.tconst = bf.tconst
        join bias b on b.tconst = bf.tconst
        join rating r on r.tconst = bf.tconst
""")

In [57]:
%%bash 

hdfs dfs -rm -r /imdb/parquet/dataset.parquet || true

Deleted /imdb/parquet/dataset.parquet


21/02/07 09:47:59 WARN azure.AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
21/02/07 09:48:00 INFO azure.AzureFileSystemThreadPoolExecutor: Time taken for Delete operation is: 1500 ms with threads: 0


In [58]:
x.drop("tconst", "titleid", "averageRating", "numVotes").write.parquet("/imdb/parquet/dataset.parquet")

In [75]:
dataset_fd = spark.read.parquet('/imdb/parquet/dataset.parquet')

In [76]:
train_df, test_df = dataset_fd.randomSplit([0.8, 0.2], 422)

In [77]:
train = train_df.rdd.cache()
test = test_df.rdd.cache()

In [78]:
example = train.first()

In [79]:
example

Row(target=5, f_is_adult=0, f_start_year=1896, f_runtime_minutes=None, f_genres_Crime=0, f_genres_Romance=0, f_genres_Thriller=0, f_genres_Adventure=0, f_genres_\N=0, f_genres_Drama=0, f_genres_War=0, f_genres_Documentary=1, f_genres_Reality-TV=0, f_genres_Family=0, f_genres_Game-Show=0, f_genres_Fantasy=0, f_genres_Adult=0, f_genres_History=0, f_genres_Mystery=0, f_genres_Musical=0, f_genres_Animation=0, f_genres_Music=0, f_genres_Film-Noir=0, f_genres_Short=1, f_genres_Horror=0, f_genres_Western=0, f_genres_Biography=0, f_genres_Comedy=0, f_genres_Action=0, f_genres_Sport=0, f_genres_Talk-Show=0, f_genres_Sci-Fi=0, f_genres_News=0, f_region_LT=0, f_region_DZ=0, f_region_MM=0, f_region_CI=0, f_region_YUCS=0, f_region_TC=0, f_region_FI=0, f_region_AZ=0, f_region_SC=0, f_region_UA=0, f_region_RO=0, f_region_ZM=0, f_region_KI=0, f_region_SL=0, f_region_NL=0, f_region_LA=0, f_region_SB=0, f_region_DDDE=0, f_region_MN=0, f_region_BS=0, f_region_BW=0, f_region_VDVN=0, f_region_PL=0, f_regio

In [81]:
features_num = len(example.asDict()) - 1

In [82]:
features_num

387

In [None]:
import numpy as np
from functools import partial

#### Broadcasts & Accumulators

Обучать будем обычную линейную регрессию c MSE и L2 регуляризацией, 

Вектор весов может быть достаточно больши

In [90]:
def compute_gradient(weights_broadcast, loss, example):
    # достаем целевую переменную и признаки из наблюдения
    gradient = np.zeros(len(weights_broadcast.value))
    data = example.asDict()
    
    y = data['target']
    data.pop('target')

    # признаки сортируем по названию для того, чтобы позиции точно не разъезались
    x = np.array([v or 0 for k, v in sorted(data.items(), key=lambda x: x[0])])

    # делаем предсказание с текущими весами
    prediction = x.dot(weights_broadcast.value)

    # считаем градиент на объекте
    gradient = x * (prediction - y) * 2
    
    # считаем потери
    loss.add((prediction - y) ** 2)
    
    return gradient

In [103]:

# Параметры
learning_rate = 0.00000005
epochs = 10
l2_lambda = 0.0000001

np.random.seed(4220)

# Изначальные веса инициализируем случайно
weights = np.random.random(features_num)

N = train.count()

# Цикл по эпохам
for i in range(epochs):
    weights_broadcast = sc.broadcast(weights)  # Эту переменную будет бродкастить на всех воркеров
    loss = sc.accumulator(0.0) # В эту переменную будет частичные лоссы на объектах
    
    # Считаем средний градиент
    gradient = (
        train
        .map(partial(compute_gradient, weights_broadcast, loss))
        .mean()
    )
    
    gradient += 2 * l2_lambda * weights  # L2 регуляризация
    
    weights -= learning_rate * gradient
    weights_broadcast.destroy()
    
    print("epoch:", i, "loss:", loss.value / N)

epoch: 0 loss: 350144397.56
epoch: 1 loss: 349753383.691
epoch: 2 loss: 349611709.959
epoch: 3 loss: 349559979.895
epoch: 4 loss: 349540695.313
epoch: 5 loss: 349533115.215
epoch: 6 loss: 349529757.818
epoch: 7 loss: 349527924.261
epoch: 8 loss: 349526640.998
epoch: 9 loss: 349525556.846

In [105]:
weights

array([  2.82936695e-02,   5.02481632e-01,   3.45414310e-02,
         6.92461036e-01,   8.73373019e-01,   3.36667109e-01,
         6.03257781e-01,   4.99814704e-01,   2.43982593e-01,
         6.05345844e-01,   9.02140846e-01,   6.62616462e-01,
         8.24869593e-01,   4.52660141e-01,   4.58598281e-01,
         5.79623377e-01,   3.84207868e-01,   9.27836256e-01,
         5.79446562e-01,   5.99301045e-01,   5.60140428e-01,
         3.45427105e-01,   9.44093960e-02,   4.69331999e-01,
         1.84159241e-01,   3.95884688e-01,   8.60640078e-01,
         8.71167192e-01,   3.52176983e-01,   3.66173515e-01,
         1.21826206e-01,   2.19023667e-01,   9.64019004e-01,
         4.03368365e-01,   4.42996625e-01,   1.37225591e-01,
         5.06916519e-01,   4.92027474e-01,   3.45975460e-01,
         5.98080570e-01,   9.50662823e-01,   9.17109859e-01,
         1.45316459e-01,   8.25316252e-01,   4.50927389e-04,
         7.32678350e-01,   2.85980348e-02,   3.44011950e-01,
         3.35813107e-01,

In [107]:
def calc_ss_res(weights_broadcast, example):
    # достаем целевую переменную и признаки из наблюдения
    gradient = np.zeros(len(weights_broadcast.value))
    data = example.asDict()
    
    y = data['target']
    data.pop('target')

    # признаки сортируем по названию для того, чтобы позиции точно не разъезались
    x = np.array([v or 0 for k, v in sorted(data.items(), key=lambda x: x[0])])

    # делаем предсказание с текущими весами
    prediction = x.dot(weights_broadcast.value)
    return (y - prediction) ** 2

In [108]:
weights_broadcast = sc.broadcast(weights)

y_avg = train.map(lambda x: x.target).mean()
ss_tot = train.map(lambda x: (x.target - y_avg) ** 2).sum()
ss_res = train.map(partial(calc_ss_res, weights_broadcast)).sum()

r2_score = 1 - ss_res / ss_tot
print(r2_score)

0.000354713643406

Ну что можно сказать 

<img src="https://i.pinimg.com/originals/bb/0e/c3/bb0ec3d5987cff5c6bf8699b0d4e53b2.jpg" width="300">

Качество конечно ужасное, ибо задача поставлена достаточно бестолково да и признаки в целом взяты наобум. Однако!

* Лосс падает!
* Решение довольно неплохо масштабируется - фактически мы можем обработать датасет произвольного размера
* Мы попробовали кучу всякий крутых примочек, которые есть в Spark

Задачи, которые больше похожи на правду, мы будем стараться решать уже в следующий раз

### Вы жжете бабло

<img src="https://grizzle.com/wp-content/uploads/2020/01/money-cash-fire-1200x900.png" width="300">

Напоминаю, выключайте ресурсы. Они жрут ваши деньги.