# MapReduce

Основная идея данной парадигмы вычисления заключается в разделении обработки данных на 2 этапа: map и reduce. На первом этапе каждая запись обарабатывается независимо. На втором этапе записи сортируются по выбранному ключу и все записи, принадлежащие одному ключу, обрабатываются в рамках одного процесса.

Для примера будем использовать подход hadoop streaming, при котором в качестве map и reduce могут выступать любые исполняемые файлы.

-------------------------------------------------------------------------------------------------------------------------

Посчитаем среднюю корректность ответов для content_id


датасет "Kaggle Riiid Answers Correctness Prediction"
* row_id: (int64) идентификатор записи
* timestamp: (int64) время от начала взаимодействия пользователя до завершения задания
* user_id: (int32) идентификатор пользователя
* content_id: (int16) идентификатор типа взаимодействия
* content_type_id: (int8) 0 - вопрос, 1 - просмотр лекции
* task_container_id: (int16) ID набора вопросов
* user_answer: (int8) ответ пользователя. -1 для лекция
* answered_correctly: (int8) маркер правильного овтета. -1 для лекций
* prior_question_elapsed_time: (float32) время, затраченное пользователей на ответ 
после прослушивания лекции
* prior_question_had_explanation: (bool) пользователю показали правильный ответ, 
после ответа на вопросы

Данильченко Вадим

In [1]:
%%file mapper.py
#!/usr/bin/env python3
import sys

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    row = line.split(',')
    print("{}\t{}".format(row[3], row[7]))

Overwriting mapper.py


In [2]:
!chmod +x mapper.py

In [3]:
%%file reducer.py
#!/usr/bin/env python3
import sys

current_content_id = None
answered_correctly_count = 0
answered_correctly_sum = 0

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    content_id, answered_correctly = line.split('\t', 1)
    try:
        content_id = int(content_id)
        answered_correctly = int(answered_correctly)
        if answered_correctly==-1:
            continue
    except ValueError:
        continue
    
    if content_id == current_content_id:
        answered_correctly_sum += answered_correctly
        answered_correctly_count += 1
    else:
        if current_content_id is not None:
            print("{}\t{}".format(content_id, answered_correctly_sum / answered_correctly_count))
        current_content_id = content_id
        answered_correctly_count = 1
        answered_correctly_sum = answered_correctly

if current_content_id == content_id and answered_correctly_count != 0:
    print("{}\t{}".format(content_id, answered_correctly_sum / answered_correctly_count))

Overwriting reducer.py


In [4]:
!chmod +x reducer.py

пример shell строки, которая производит аналогичные hadoop MapReduce операции.

Для тестирования будем использовать сокращенную выборку из 1000 строк.

In [5]:
!head -n 1000 ~/data/train_10m.csv | ./mapper.py | sort | ./reducer.py > result.csv

In [6]:
!head result.csv

100	1.0
10046	1.0
10047	0.0
10048	1.0
10049	1.0
10070	0.0
10071	0.0
10072	1.0
10073	1.0
10090	1.0


## Запуск MapReduce

In [7]:
import random
random_dir = "mapred_output_{}".format(random.randint(0, 100))
random_dir

'mapred_output_63'

In [8]:
!hdfs dfs -rm -r $random_dir
!hadoop jar /usr/local/hadoop-2.10.0/share/hadoop/tools/lib/hadoop-streaming.jar \
-files mapper.py,reducer.py \
-mapper mapper.py \
-reducer reducer.py \
-input train_10m.csv  \
-output $random_dir

rm: `mapred_output_63': No such file or directory
packageJobJar: [/tmp/hadoop-unjar7596701997786816959/] [] /tmp/streamjob2581672004335687599.jar tmpDir=null
21/08/10 17:59:56 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/08/10 17:59:56 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/08/10 17:59:56 INFO mapred.FileInputFormat: Total input files to process : 1
21/08/10 17:59:56 INFO mapreduce.JobSubmitter: number of splits:5
21/08/10 17:59:56 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
21/08/10 17:59:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1628598864856_0007
21/08/10 17:59:57 INFO conf.Configuration: resource-types.xml not found
21/08/10 17:59:57 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/08/10 17:59:57 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, t

In [9]:
!hdfs dfs -cat $random_dir/* | head -n 20
!hdfs dfs -rm -r $random_dir

1	0.9088277858176556
10	0.8939393939393939
100	0.6849534809111325
1000	0.35821872953503603
10000	0.7222946544980443
10001	0.540785498489426
10002	0.43529411764705883
10003	0.5970664365832614
10004	0.4175152749490835
10009	0.4227467811158798
1001	0.9297872340425531
10010	0.6476306196840826
10011	0.7851063829787234
10012	0.9319148936170213
10013	0.8702127659574468
10014	0.8954954954954955
10015	0.7351351351351352
10016	0.9261261261261261
10017	0.6324324324324324
10018	0.3902439024390244
cat: Unable to write to output stream.
Deleted mapred_output_63
