<a target="_blank" href="../cluster" style="font-size:20px">All Applications (YARN)</a>

# Стек Hadoop. Практическая работа

## Цель практической работы

Научиться использовать Hadoop MapReduce на практике.

## Что входит в работу

* Загрузка данных в HDFS.
* Получение данных из HDFS.
* Реализация парадигмы MapReduce с применением Hadoop Streaming.

## Формат сдачи

Отправьте в форме сдачи следующие файлы:
- файл с результатом result.json;
- ноутбук с кодом (все команды и функции, которые использовались для решения задач).

# Практическое задание

Будем использовать логи сессий прослушивания музыкальных исполнителей в сервисе Spotify, сокращённую версию.

https://www.aicrowd.com/challenges/spotify-sequential-skip-prediction-challenge/dataset_files

Файл `spotify/log_mini.csv` содержит записи вида `ID сессии, номер в сессии, длинна сессии, id трека, skip_1, skip_2, ...`:
```csv
session_id,session_position,session_length,track_id_clean,skip_1,skip_2,skip_3,not_skipped,context_switch,no_pause_before_play,short_pause_before_play,long_pause_before_play,hist_user_behavior_n_seekfwd,hist_user_behavior_n_seekback,hist_user_behavior_is_shuffle,hour_of_day,date,premium,context_type,hist_user_behavior_reason_start,hist_user_behavior_reason_end
0_00006f66-33e5-4de7-a324-2d18e439fc1e,1,20,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,false,false,false,true,0,0,0,0,0,0,true,16,2018-07-15,true,editorial_playlist,trackdone,trackdone
0_00006f66-33e5-4de7-a324-2d18e439fc1e,2,20,t_9099cd7b-c238-47b7-9381-f23f2c1d1043,false,false,false,true,0,1,0,0,0,0,true,16,2018-07-15,true,editorial_playlist,trackdone,trackdone
```

Вам нужно:
1. **Посчитать для каждого трека количество его прослушиваний. Выведите два самых прослушиваемых трека.**
2. **Вывести долю популярных треков: тех, что имеют больше 100 прослушиваний.**

Для решения задачи:
1. Скопируйте файлы в HDFS.
2. Реализуйте подсчёт прослушиваний отдельным MapReduce, в файлы результата сохраните пары <track_id, listen_count>.
3. С помощью команды `hdfs dfs -cat <YOUR-MAPRED-RESULT/*> | python stream_processor.py` решите три подзадачи:
    1. Подсчитайте количество уникальных треков.
    2. Посчитайте количество треков с количеством прослушиваний больше 20.
    3. Найдите два самых популярных по listen_count.
    
    `stream_processor.py` — скрипт, читающий с потока ввода, необходимо реализовать самостоятельно.
4. Сохраните результат работы скрипта выше в файл `result.json`, формат описан ниже.

Реализуйте решение с использованием Hadoop MapReduce Streaming, для написания mapper и reducer используйте Python.

Решение сохраните в локальный файл `result.json`, где по ключу q1
 запишите ответ на первый вопрос, по ключу q2 — на второй и по ключу q3 — на третий.


## Критерии проверки

1. Корректно реализован алгоритм подсчёта прослушиваний — mapper.py, reducer.py (без сохранения всех данных в память, работа с потоком).
2. mapper.py и reducer.py протестированы локально.
3. Данные ( `spotify/log_mini.csv` ) загружены в HDFS.
4. Корректно запущен процесс Hadoop MapReduce Streaming с использованием mapper.py и reducer.py на данных.
5. Корректно реализован `stream_processor.py` (без сохранения всех данных в память, работа с потоком).
6. Результат записан в файл `result.json` и совпадает с эталонным.

Пример содержимого файла `result.json`:

```json
{
    "q1": ["id1", "id2"],
    "q2": 0.13
}
```

In [24]:
# Пример содержимого файла
!head -n 5 spotify/log_mini.csv

session_id,session_position,session_length,track_id_clean,skip_1,skip_2,skip_3,not_skipped,context_switch,no_pause_before_play,short_pause_before_play,long_pause_before_play,hist_user_behavior_n_seekfwd,hist_user_behavior_n_seekback,hist_user_behavior_is_shuffle,hour_of_day,date,premium,context_type,hist_user_behavior_reason_start,hist_user_behavior_reason_end
0_00006f66-33e5-4de7-a324-2d18e439fc1e,1,20,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,false,false,false,true,0,0,0,0,0,0,true,16,2018-07-15,true,editorial_playlist,trackdone,trackdone
0_00006f66-33e5-4de7-a324-2d18e439fc1e,2,20,t_9099cd7b-c238-47b7-9381-f23f2c1d1043,false,false,false,true,0,1,0,0,0,0,true,16,2018-07-15,true,editorial_playlist,trackdone,trackdone
0_00006f66-33e5-4de7-a324-2d18e439fc1e,3,20,t_fc5df5ba-5396-49a7-8b29-35d0d28249e0,false,false,false,true,0,1,0,0,0,0,true,16,2018-07-15,true,editorial_playlist,trackdone,trackdone
0_00006f66-33e5-4de7-a324-2d18e439fc1e,4,20,t_23cff8d6-d874-4b20-83dc-94e450e8aa20,false,f

In [25]:
!ls spotify/

log_mini.csv


In [26]:
!whoami

jovyan


In [31]:
!hdfs dfs -ls /

Found 1 items
drwxrwx---   - root supergroup          0 2024-05-17 13:06 /tmp


In [32]:
!hdfs dfs -mkdir /spotify

In [33]:
# Копируем файлы в HDFS
!hdfs dfs -copyFromLocal spotify/log_mini.csv /spotify/log_mini.csv # допишите команду

In [34]:
!hdfs dfs -ls /

Found 2 items
drwxr-xr-x   - jovyan supergroup          0 2024-05-17 16:20 /spotify
drwxrwx---   - root   supergroup          0 2024-05-17 13:06 /tmp


### Разработка

In [38]:
import sys
import pandas as pd

COUNT_SKIPPED = True

if sys.version_info[0] < 3: 
    from StringIO import StringIO
else:
    from io import StringIO

In [39]:
pd.read_csv('spotify/log_mini.csv').iloc[0]

session_id                         0_00006f66-33e5-4de7-a324-2d18e439fc1e
session_position                                                        1
session_length                                                         20
track_id_clean                     t_0479f24c-27d2-46d6-a00c-7ec928f2b539
skip_1                                                              False
skip_2                                                              False
skip_3                                                              False
not_skipped                                                          True
context_switch                                                          0
no_pause_before_play                                                    0
short_pause_before_play                                                 0
long_pause_before_play                                                  0
hist_user_behavior_n_seekfwd                                            0
hist_user_behavior_n_seekback         

In [40]:
columns = [
    'session_id',
    'session_position',
    'session_length',
    'track_id_clean',
    'skip_1',
    'skip_2',
    'skip_3',
    'not_skipped',
    'context_switch',
    'no_pause_before_play',
    'short_pause_before_play',
    'long_pause_before_play',
    'hist_user_behavior_n_seekfwd',
    'hist_user_behavior_n_seekback',
    'hist_user_behavior_is_shuffle',
    'hour_of_day',
    'date',
    'premium',
    'context_type',
    'hist_user_behavior_reason_start',
    'hist_user_behavior_reason_end',
]

head = ','.join(columns)

def read_line(line, names, sep=',', **kwargs):
    return pd.read_table(StringIO(line), sep=sep, names=names, **kwargs).iloc[0]

read_line(head, names=columns, usecols=['track_id_clean'])

track_id_clean    track_id_clean
Name: 0, dtype: object

In [41]:
%%writefile mapper.py

import sys
import pandas as pd

COUNT_SKIPPED = True

if sys.version_info[0] < 3: 
    from StringIO import StringIO
else:
    from io import StringIO

columns = [
    'session_id',
    'session_position',
    'session_length',
    'track_id_clean',
    'skip_1',
    'skip_2',
    'skip_3',
    'not_skipped',
    'context_switch',
    'no_pause_before_play',
    'short_pause_before_play',
    'long_pause_before_play',
    'hist_user_behavior_n_seekfwd',
    'hist_user_behavior_n_seekback',
    'hist_user_behavior_is_shuffle',
    'hour_of_day',
    'date',
    'premium',
    'context_type',
    'hist_user_behavior_reason_start',
    'hist_user_behavior_reason_end',
]

head = ','.join(columns)

def read_line(line, names, sep=',', **kwargs):
    return pd.read_table(StringIO(line), sep=sep, names=names, **kwargs).iloc[0]


for line in sys.stdin:
    # line - строки из файла spotify/log_mini.csv
    
    if line.startswith(head):
        continue
    
    line = read_line(line, names=columns, usecols=['track_id_clean', 'not_skipped'])
    
    if COUNT_SKIPPED:
        count = 1
    elif line['not_skipped']:
        count = 1
    else:
        count = 0
    
    print('{track_id}\t{count}'.format(
        track_id=line['track_id_clean'],
        count=count
    )) # Выведите строки на поток вывода: <track_id>\t1

Overwriting mapper.py


In [42]:
# Протестируйте mapper локально
! head -n 5 spotify/log_mini.csv | python mapper.py

t_0479f24c-27d2-46d6-a00c-7ec928f2b539	1
t_9099cd7b-c238-47b7-9381-f23f2c1d1043	1
t_fc5df5ba-5396-49a7-8b29-35d0d28249e0	1
t_23cff8d6-d874-4b20-83dc-94e450e8aa20	1


In [43]:
%%writefile reducer.py
# Реализуйте reducer
import sys

current_track_id = None
current_count = 0
track_id = None

for line in sys.stdin:
    # line - группа строк из выхода mapper.py
    try:
        track_id, count = line.strip().split('\t', 1)
    except ValueError:
        continue
    
    try:
        count = int(count)
    except ValueError:
        continue
    
    if current_track_id == track_id:
        current_count += count
    else:
        if current_track_id:
            print(f'{current_track_id}\t{current_count}')
            
        current_count = count
        current_track_id = track_id
    
if current_track_id == track_id:
    print(f'{current_track_id}\t{current_count}')

Writing reducer.py


In [44]:
# Протестируйте reducer локально
! python -c "print('\n'.join([f'{x}\t1' for x in (['aa'] * 10 + ['bb'] * 5)]))" | python reducer.py

aa	10
bb	5


In [45]:
# Запустите MapReduce Streaming
! mapred streaming \
  -input /spotify/log_mini.csv \
  -output /track-count \
  -mapper "/opt/conda/bin/python mapper.py" \
  -reducer "/opt/conda/bin/python reducer.py" \
  -file mapper.py \
  -file reducer.py

2024-05-17 16:21:08,444 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar] /tmp/streamjob5069320952616127519.jar tmpDir=null
2024-05-17 16:21:09,289 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2024-05-17 16:21:09,444 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2024-05-17 16:21:09,735 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1715961919982_0001
2024-05-17 16:21:10,468 INFO mapred.FileInputFormat: Total input files to process : 1
2024-05-17 16:21:10,497 INFO net.NetworkTopology: Adding a new node: /default-rack/127.0.0.1:9866
2024-05-17 16:21:10,535 INFO mapreduce.JobSubmitter: number of splits:2
2024-05-17 16:21:11,271 INFO mapreduce.JobSubmitter: Submitting toke

In [48]:
!hdfs dfs -ls /track-count

Found 2 items
-rw-r--r--   1 jovyan supergroup          0 2024-05-17 16:24 /track-count/_SUCCESS
-rw-r--r--   1 jovyan supergroup    2081227 2024-05-17 16:24 /track-count/part-00000


In [69]:
%%writefile stream_processor.py
# Реализуйте код обработки результата MapReduce
import sys, json

uniq_tracks_count = 0
uniq_tracks = dict()
popular_tracks_count = 0

top_2_tracks = [None, None]


for line in sys.stdin:
    # Парсинг результата работы MapReduce (финальный формат от reducer.py - <track_id>\t<N>)
    track_id, count = line.strip().split('\t')
    count = int(count)
    
    # Подсчет уникальных треков
    if not (track_id in uniq_tracks):
        uniq_tracks[track_id] = 0
    count_before = uniq_tracks[track_id]
    uniq_tracks[track_id] += count
    
    # Подсчет популярных треков
    if (uniq_tracks[track_id] > 20) and (count_before <= 20):
        popular_tracks_count += 1
    
    # Топ 2 трека
    total_count = uniq_tracks[track_id]
    if top_2_tracks[0] is None:
        top_2_tracks[0] = track_id, total_count
    elif (top_2_tracks[1] is None) and (top_2_tracks[0][1] >= total_count):
        top_2_tracks[1] = track_id, total_count
    elif top_2_tracks[0][1] < total_count:
        top_2_tracks[1] = top_2_tracks[0]
        top_2_tracks[0] = track_id, total_count
        
uniq_tracks_count = len(uniq_tracks)

data = {
    'q1': uniq_tracks_count,
    'q2': popular_tracks_count,
    'q3': top_2_tracks,
}

with open('result.json', 'w') as f:
    f.write(json.dumps(data))


Overwriting stream_processor.py


In [51]:
!python -c "print('\n'.join([f'{x}\t1' for x in (['aa'] * 10 + ['bb'] * 50 + ['dd'] * 2 + ['cc'] * 30 + ['aa'] * 4)]))" | python reducer.py | python stream_processor.py
!cat result.json

{"q1": 10, "q2": 2, "q3": [["bb", 50], ["aa", 10]]}

In [54]:
# %%time
# !cat spotify/log_mini.csv | python mapper.py | python reducer.py | python stream_processor.py
# !cat result.json

In [70]:
# Обработайте данные из HDFS с помощью stream_processor.py
!hdfs dfs -cat /track-count/* | python stream_processor.py
!cat result.json

{"q1": 50704, "q2": 874, "q3": [["t_bacf06d3-9185-4183-84ea-ff0db51475ce", 1427], ["t_5718ab08-3a15-4d3f-9e63-42b2f6805e31", 915]]}