<a target="_blank" href="../cluster" style="font-size:20px">All Applications (YARN)</a>

# Домашнее задание

В лекциях мы обсуждали меру Жаккара и то, как ее эффективно считать на MapReduce.

Вам предлагается посчитать меру Жаккара на Spark для поиска похожих исполнителей во всем наборе данных и ответить на следующие вопросы:
1. **Сколько исполнителей остаются в рассмотрении после применения всех фильтров из описания задания?**
2. **Для скольких пар исполнителей удалось насчитать ненулевую похожесть по Жаккару? Здесь учитываются всевозможные пары (a, b) и (b, a), а также (a, a), для проверки корректности.**
3. **Найдите 5 самых похожих исполнителей на "Maroon 5" по посчитанной мере Жаккара. В результат запишите имена 5 исполнителей отличных от "Maroon 5".**

Несколько напутственных слов:
- Используйте данные, загруженные в разделе <a href="#Загружаем-данные">Загружаем данные</a>.
- Пользователи, прослушавшие $N$ исполнителей внесут вклад в похожесть $N^2$ пар артистов. Поэтому редкие очень активные пользователи будут сильно замедлять наш алгоритм. Для таких пользователей на практике берут подмножество прослушиваний, например, 1000. Мы поступим проще и будем учитывать только прослушивания, где $plays > 2$, таким образом оставим только наиболее уверенные предпочтения пользователя.
- Чтобы похожести были более уверенными, будем считать их только для тех исполнителей, которых прослушали строго больше 50 человек (с учетом предыдущего фильтра по прослушиваниям).
- Для отладки алгоритма на меньшем объеме данных можно использовать трансформацию <a href="https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sample">events.sample(False, 0.01)</a>, чтобы не ждать долго отладочных запусков.
- Можно считать, что данные об исполнителях (например, их популярность) поместятся в памяти каждой машины. Просто нет так много исполнителей в мире, чтобы не поместиться.
- Если какой-то шаг выполняется очень долго, можно увеличить степень параллелизма, например, 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey">groupByKey(numPartitions=100)</a>, чтобы увидеть более гранулярный прогресс выполнения.
- Иногда посчитанный результат имеет смысл сохранить в HDFS, чтобы не пересчитывать его заново каждый раз, когда он нужен.
- При работе с большими данными требуется терпение, авторское решение работает около 10 минут.
- Эту задачу можно решить и на Spark SQL, если вам он нравится больше.

Решение сохраните в файл `result.json`. Пример содержимого файла:
```json
{
    "q1": 123,
    "q2": 456,
    "q3": [
        "artistName1",
        "artistName2",
        "artistName3",
        "artistName4",
        "artistName5"
    ]
}
```

In [1]:
! hadoop fs -copyFromLocal yandex_music /

copyFromLocal: `/yandex_music/README.txt': File exists
copyFromLocal: `/yandex_music/artists.jsonl': File exists
copyFromLocal: `/yandex_music/events.csv': File exists


In [2]:
! hadoop fs -ls -h /yandex_music

Found 3 items
-rw-r--r--   1 jovyan supergroup        254 2021-08-16 09:59 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2021-08-16 09:59 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2021-08-16 09:59 /yandex_music/events.csv


In [3]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

In [4]:
artists = se.read.json("hdfs:///yandex_music/artists.jsonl")
artists.registerTempTable("artists")
artists.limit(5).toPandas()

Unnamed: 0,artistId,artistName
0,0,Mack Gordon
1,1,Kenny Dorham
2,2,Max Roach
3,3,Francis Rossi
4,4,Status Quo


In [5]:
events = se.read.csv("hdfs:///yandex_music/events.csv", header=True, 
                     schema='userId bigint, artistId bigint, plays INT, skips INT')
events.registerTempTable("events")
events.limit(5).toPandas()

Unnamed: 0,userId,artistId,plays,skips
0,0,335,1,0
1,0,708,1,0
2,0,710,2,1
3,0,815,1,1
4,0,880,1,1


In [6]:
# на каждого пользователя приходится меньше 1000 исполнителей с plays > 2
se.sql(
"""
select 
    userId,
    count(*) as cnt
from events
where plays > 2
group by userId
order by cnt desc
""").show(5)

+------+---+
|userId|cnt|
+------+---+
|  4689|644|
|  3121|577|
|  4575|572|
|  2266|519|
|  4217|512|
+------+---+
only showing top 5 rows



In [7]:
# ТУТ ВАШ КОД

In [8]:
table = se.sql(
"""
select userId, artistId
from(
    select userId, events.artistId, plays, popularity
    from   (
            select *
            from(
                select artistId, count(*) as popularity
                from(
                    select distinct userId, artistId
                    from 
                        events 
                    where plays > 2
                    )
                group by artistId
                )
            where popularity > 50
             )as s join events on s.artistId = events.artistId
    where plays > 2
    order by userId, artistId
    )
group by userId, artistId
order by userId, artistId
""")
# table.show()

In [9]:
table.cache()
table.count()

498589

In [46]:
from pyspark.sql import functions as F
df = table.groupBy('userId').agg(F.collect_list("artistId").alias('pairs'))

In [47]:
def f(list_):
    lis = []
    for i in list_:
        for j in list_:
#             if j>=i:
                lis.append('{}_{}'.format(i, j))
    return lis
f([1, 2, 3])

['1_1', '1_2', '1_3', '2_1', '2_2', '2_3', '3_1', '3_2', '3_3']

In [48]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

udf_f = udf(f, ArrayType(StringType()))
pairs = df.withColumn("pairs", udf_f(col("pairs"))).select('pairs')
big_col = pairs.select(explode('pairs'))
cnt_col = big_col.groupBy('col').count().sort(desc('count'))
split = cnt_col.withColumn('A', split(cnt_col['col'], '_').getItem(0)) \
       .withColumn('B', split(cnt_col['col'], '_').getItem(1))

In [49]:
listenings = table.groupby('artistId').count().sort(desc('count')).toPandas().set_index('artistId')
def artist_to_listenings(id_):
    return int(listenings.loc[int(id_)].values[0])

AtoL = udf(artist_to_listenings, IntegerType())

In [50]:
%%time
result = split.withColumn("jaccard", (col("count")/(AtoL(col("A"))+AtoL(col("B"))-col("count")))).drop('col')
result.cache()
result.count()

CPU times: user 129 ms, sys: 33.8 ms, total: 163 ms
Wall time: 15min 20s


In [51]:
result.select('jaccard').filter('jaccard is not null').count()

6838579

In [21]:
artists.select('*').where('artistName = "Maroon 5"').show()

+--------+----------+
|artistId|artistName|
+--------+----------+
|   14803|  Maroon 5|
+--------+----------+



In [58]:
result.select("*").where('A = 14803').sort(desc('jaccard')).limit(6).show()

+-----+-----+-----+-------------------+
|count|    A|    B|            jaccard|
+-----+-----+-----+-------------------+
|  919|14803|14803|                1.0|
|  489|14803| 3568| 0.3319755600814664|
|  610|14803| 3629|0.31266017426960535|
|  537|14803|  259|0.29184782608695653|
|  318|14803|22629| 0.2867448151487827|
|  464|14803|59783| 0.2858903265557609|
+-----+-----+-----+-------------------+



In [63]:
artists.select('*').where('artistId = 59783').show()

+--------+-------------+
|artistId|   artistName|
+--------+-------------+
|   59783|Calvin Harris|
+--------+-------------+



In [64]:
%%file result.json
{
    "q1": 2889,
    "q2": 6838579,
    "q3": [
        "OneRepublic",
        "Sia",
        "David Guetta",
        "Bruno Mars",
        "Calvin Harris"
    ]
}

Overwriting result.json


In [65]:
!cat result.json

{
    "q1": 2889,
    "q2": 6838579,
    "q3": [
        "OneRepublic",
        "Sia",
        "David Guetta",
        "Bruno Mars",
        "Calvin Harris"
    ]
}

In [66]:
# останавливаем Spark (и YARN приложение)
# sc.stop()