# Hadoop and MapReduce

## Основы MapReduce

В своей сути MapRedcue это очень простая парадигма. Допустим у нас есть датасет

In [None]:
! curl https://raw.githubusercontent.com/fivethirtyeight/russian-troll-tweets/master/IRAhandle_tweets_1.csv > tweets_1.csv

In [8]:
! head -n 2 tweets_1.csv





Мы хотим в этом датасете что-нибудь найти. Например (сюрприз-сюрприз), посчитать количество уникальных слов. Мы могли бы сделать что-то такое:

#### Вариант 1 
Используем исключительно питон и наивный алгоритм

In [5]:
%%time

from collections import Counter
import csv
import re
import sys

counter = Counter()
pattern = re.compile(r"[a-z]+")

with open('tweets_1.csv', 'r') as f:
    reader = csv.reader(f, delimiter=',')
    for row in reader:
        content = row[2]
        for match in pattern.finditer(content.lower()):
            word = match.group(0)
            counter[word] += 1

for word, count in counter.most_common(10):
    print(f"{word}\t{count}")

t	268703
co	250375
https	221366
the	69350
to	55972
a	43420
in	37099
s	36085
of	33579
http	28661
CPU times: user 4.42 s, sys: 59.8 ms, total: 4.48 s
Wall time: 4.48 s


Такое сработает только если у нас не очень много данных и они все вмещаются в оперативную память

In [4]:
! du -h tweets_1.csv

91M	tweets_1.csv


#### Вариант 2
Используем парадигму Map Reduce

В этом примере у нас всего 90 мегабайт данных, и на моем компьютере они обрабатываются за примерно 30 секунд с помощью питона. Теперь представим (это достаточно несложно), что у нас приходит новых данных приходит _десятки терабайт_ в сутки. Такое уже не поместится ни в один сервер, поэтому нам нужно придумать что-нибудь похитрее.

MapReduce как раз является парадигмой, помогающей обрабатывать большие объемы данных, за счет простоты своего устройства.

Приятная новость - для того, чтобы понять и научиться программировать программы в парадигме MapReduce вам потребуется... **5 секунд!**

<img src="https://raw.githubusercontent.com/ADKosm/lsml-2021-public/main/imgs/you-know-mapreduce.png" width="400">

Все потому что вы уже прошли семинар по Bash и научились составлять большие программы в виде компоновки небольших  программ, соединенных пайпами. По своей сути программа на MapReduce - это хорошо отмасштабированная программа вида

```bash
cat data.txt | map | sort | reduce
```

Сортировку за вас выполняет сам фреймворк (и ее вы можете дополнительно настроить точно такое как и команду sort). А также он сам разбивает данные на части и параллельно запускает операции map и reduce. 

Таким образом на самом деле Hadoop - это всего лишь гигантская машина сортировки, которая дополнительно дает вам некоторые гарантии:

* Для всех данных параллельно будет применена операция map
* Данные будут отсортированы по указанному вами ключу
* Каждый ключ будет целиком передан на один и только один reduce

Программисту остается реализовать программу, которая состоит из двух компонент: `map` и `reduce`. 

Операция `map` -- это просто функция из одного элемента в другой элемент, у которого есть первичный ключ. 

Операция `reduce` -- это коммутативная и ассоциативная агрегация всех элементов по ключу. Чтобы эти операции совершить, надо разбить весь вход на куски данных и отправить их на машины, чтобы они выполнялись в параллель, а весь выход операции map идёт в операцию shuffle, которая по одним и тем же ключам определяет записи на одинаковые хосты. 

В итоге получается, что мы можем спокойно увеличивать количество worker'ов для map операций и с увеличением количества данных мы лишь будем линейно утилизировать количество машин, то же самое с операцией reduce -- мы можем добавлять машины с ростом увеличения количества ключей линейно, не боясь того, что мы не можем позволить на одной какой-то машине больше памяти или диска.

Давайте напишем маппер и редьюсер на питоне для этой задачи:

In [6]:
! sudo pip3 install tqdm



In [7]:
%%writefile wordcount.py
import sys
import csv
import re


def mapper():
    pattern = re.compile(r"[a-z]+")
    for row in csv.reader(iter(sys.stdin.readline, '')):
        content = row[2]
        for match in pattern.finditer(content.lower()):
            word = match.group(0)
            print("{}\t{}".format(word, 1))


def reducer():
    word, number = next(sys.stdin).split('\t')
    number = int(number)
    for line in sys.stdin:
        current_word, current_number = line.split('\t')
        current_number = int(current_number)
        if current_word != word:
            print("{}\t{}".format(word, number))
            word = current_word
            number = current_number
        else:
            number += current_number
    print("{}\t{}".format(word, number))


if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting wordcount.py


Важно еще удалить голову у таблицы, иначе подсчеты могут быть некоректными

In [9]:
! sed -i -e '1'd tweets_1.csv

In [11]:
! head -n 2 tweets_1.csv





In [13]:
! cat tweets_1.csv | python wordcount.py map | sort -k1,1 | head

a	1
a	1
a	1
a	1
a	1
a	1
a	1
a	1
a	1
a	1
sort: write failed: 'standard output': Broken pipe
sort: write error


In [14]:
%%time

! cat tweets_1.csv | \
    tqdm --total $(cat tweets_1.csv | wc -l)| \
    python wordcount.py map | \
    sort -k1,1 | \
    python wordcount.py reduce > result.txt

100%|████████████████████████████████| 243891/243891 [00:09<00:00, 25475.62it/s]
CPU times: user 253 ms, sys: 23.5 ms, total: 276 ms
Wall time: 13.4 s


In [15]:
! head result.txt

a	43420
aa	151
aaa	13
aaaaaa	1
aaaaaaaaaaaaand	1
aaaaaaaaaall	1
aaaaaaaamen	1
aaaaaaaand	2
aaaaaaargh	1
aaaaaand	2


Отлично! Слова есть, осталось только найти top-10.

In [16]:
%%writefile top10.py
import sys


def _rewind_stream(stream):
    for _ in stream:
        pass


def mapper():
    for row in sys.stdin:
        key, value = row.split('\t')
        print("{}+{}\t".format(key, value.strip()))


def reducer():
    for _ in range(10):
        key, _ = next(sys.stdin).split('\t')
        word, count = key.split("+")
        print("{}\t{}".format(word, count))
    _rewind_stream(sys.stdin)

if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting top10.py


In [17]:
! cat result.txt | \
    tqdm --total $(cat result.txt | wc -l) | \
    python top10.py map | \
    sort -t'+' -k2,2nr -k1,1 | \
    python top10.py reduce > top-10.txt

100%|███████████████████████████████| 346613/346613 [00:00<00:00, 596413.03it/s]


In [18]:
! cat top-10.txt

t	268703
co	250375
https	221366
the	69350
to	55972
a	43420
in	37099
s	36085
of	33579
http	28661


На MapReduce мы задачу переписали, однако быстрее работать она пока не стала. Все дело в том, что мы это еще не на кластере запускали! Время запускать все на настоящем кластере!

### Загружаем данные в HDFS

При работе с HDFS нужно понимать, что есть два места, где хранятся данные

1. На локальных жестких дисках машин кластера - это деволтная система, на нее можно посмотреть через `hdfs dfs -ls /`

2. В Object Storage - для работы с ней, нужно указывать путь до бакета - `hdfs dfs -ls s3a://lsml2022alexius/`

In [19]:
! curl -O https://raw.githubusercontent.com/fivethirtyeight/russian-troll-tweets/master/IRAhandle_tweets_{`seq -s , 1 13`}.csv


[1/13]: https://raw.githubusercontent.com/fivethirtyeight/russian-troll-tweets/master/IRAhandle_tweets_1.csv --> IRAhandle_tweets_1.csv
--_curl_--https://raw.githubusercontent.com/fivethirtyeight/russian-troll-tweets/master/IRAhandle_tweets_1.csv
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 89.9M  100 89.9M    0     0  17.2M      0  0:00:05  0:00:05 --:--:-- 22.4M

[2/13]: https://raw.githubusercontent.com/fivethirtyeight/russian-troll-tweets/master/IRAhandle_tweets_2.csv --> IRAhandle_tweets_2.csv
--_curl_--https://raw.githubusercontent.com/fivethirtyeight/russian-troll-tweets/master/IRAhandle_tweets_2.csv
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 90.0M  100 90.0M    0     0  20.7M      0  0:00:04  0:00:04 --:--:-- 27.0M

[3/13]: https://raw.githubuse

Подформатируем

In [20]:
! for i in {1..13}; do sed IRAhandle_tweets_$i.csv -i -e '1'd && echo "Finish $i" ; done

Finish 1
Finish 2
Finish 3
Finish 4
Finish 5
Finish 6
Finish 7
Finish 8
Finish 9
Finish 10
Finish 11
Finish 12
Finish 13


Создадим отдельную папку для этих данных в HDFS

In [21]:
! hdfs dfs -ls /

Found 5 items
drwx------   - mapred hadoop          0 2023-01-06 08:41 /hadoop
drwxr-xr-x   - hdfs   hadoop          0 2023-01-21 15:13 /system
drwxrwxrwt   - hdfs   hadoop          0 2023-01-06 08:40 /tmp
drwxrwxrwt   - hdfs   hadoop          0 2023-01-22 18:09 /user
drwxrwxrwt   - hdfs   hadoop          0 2023-01-06 08:41 /var


In [22]:
! hdfs dfs -rm -r /user/tweets/data
! hdfs dfs -mkdir -p /user/tweets/data

Deleted /user/tweets/data


Note: все команды для hdfs смотреть здесь - https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html

Заливаем данные

In [23]:
! hdfs dfs -put IRAhandle_tweets_* /user/tweets/data/

In [24]:
! sudo mkdir -p /usr/lib/hadoop/logs
! sudo chmod 0777 /usr/lib/hadoop/logs
! sudo -u hdfs hdfs balancer 

2023-01-24 15:47:52,085 INFO balancer.Balancer: namenodes  = [hdfs://rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net:8020]
2023-01-24 15:47:52,090 INFO balancer.Balancer: parameters = Balancer.BalancerParameters [BalancingPolicy.Node, threshold = 10.0, max idle iteration = 5, #excluded nodes = 0, #included nodes = 0, #source nodes = 0, #blockpools = 0, run during upgrade = false]
2023-01-24 15:47:52,090 INFO balancer.Balancer: included nodes = []
2023-01-24 15:47:52,090 INFO balancer.Balancer: excluded nodes = []
2023-01-24 15:47:52,090 INFO balancer.Balancer: source nodes = []
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved  NameNode
2023-01-24 15:47:52,094 INFO balancer.NameNodeConnector: getBlocks calls for hdfs://rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net:8020 will be rate-limited to 20 per second
2023-01-24 15:47:53,166 INFO balancer.Balancer: dfs.namenode.get-blocks.max-qps = 20 (default=20)
2023-01-24 15:47:53,166 IN

In [25]:
! sudo -u hdfs hdfs dfsadmin -report

Configured Capacity: 1585661300736 (1.44 TB)
Present Capacity: 1483057369088 (1.35 TB)
DFS Remaining: 1419503308800 (1.29 TB)
DFS Used: 63554060288 (59.19 GB)
DFS Used%: 4.29%
Replicated Blocks:
	Under replicated blocks: 0
	Blocks with corrupt replicas: 0
	Missing blocks: 0
	Missing blocks (with replication factor 1): 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0
Erasure Coded Block Groups: 
	Low redundancy block groups: 0
	Block groups with corrupt internal blocks: 0
	Missing block groups: 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (3):

Name: 10.128.0.27:9866 (rc1a-dataproc-d-1jipfldazn1nw9u1.mdb.yandexcloud.net)
Hostname: rc1a-dataproc-d-1jipfldazn1nw9u1.mdb.yandexcloud.net
Decommission Status : Normal
Configured Capacity: 528553766912 (492.25 GB)
DFS Used: 28756549468 (26.78 GB)
Non DFS Used: 1265958928

In [None]:
! sudo -u hdfs hdfs balancer -threshold 1

In [27]:
! hdfs dfs -ls /user/tweets/data/

Found 13 items
-rw-r--r--   1 ubuntu hadoop   94371561 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_1.csv
-rw-r--r--   1 ubuntu hadoop   94371615 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_10.csv
-rw-r--r--   1 ubuntu hadoop   94371552 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_11.csv
-rw-r--r--   1 ubuntu hadoop   94371703 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_12.csv
-rw-r--r--   1 ubuntu hadoop    8238864 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_13.csv
-rw-r--r--   1 ubuntu hadoop   94371748 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_2.csv
-rw-r--r--   1 ubuntu hadoop   94371796 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_3.csv
-rw-r--r--   1 ubuntu hadoop   94371606 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_4.csv
-rw-r--r--   1 ubuntu hadoop   94371616 2023-01-24 15:46 /user/tweets/data/IRAhandle_tweets_5.csv
-rw-r--r--   1 ubuntu hadoop   94371646 2023-01-24 15:46 /user/tweets/data/IRAhandle_twee

### Запускаем MapReduce

Проверяем, что скрипты на головной машине

In [28]:
! cat wordcount.py

import sys
import csv
import re


def mapper():
    pattern = re.compile(r"[a-z]+")
    for row in csv.reader(iter(sys.stdin.readline, '')):
        content = row[2]
        for match in pattern.finditer(content.lower()):
            word = match.group(0)
            print("{}\t{}".format(word, 1))


def reducer():
    word, number = next(sys.stdin).split('\t')
    number = int(number)
    for line in sys.stdin:
        current_word, current_number = line.split('\t')
        current_number = int(current_number)
        if current_word != word:
            print("{}\t{}".format(word, number))
            word = current_word
            number = current_number
        else:
            number += current_number
    print("{}\t{}".format(word, number))


if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()


In [29]:
! cat top10.py

import sys


def _rewind_stream(stream):
    for _ in stream:
        pass


def mapper():
    for row in sys.stdin:
        key, value = row.split('\t')
        print("{}+{}\t".format(key, value.strip()))


def reducer():
    for _ in range(10):
        key, _ = next(sys.stdin).split('\t')
        word, count = key.split("+")
        print("{}\t{}".format(word, count))
    _rewind_stream(sys.stdin)

if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()


Собираем команду на запуск

In [None]:
! find /usr/ -name hadoop-streaming.jar 

In [30]:
! file /usr/lib/hadoop-mapreduce/hadoop-streaming.jar

/usr/lib/hadoop-mapreduce/hadoop-streaming.jar: symbolic link to hadoop-streaming-3.2.2.jar


In [32]:
%%time

! hdfs dfs -rm -r /user/tweets/result || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="word-count" \
-D mapreduce.job.reduces=3 \
-files ~/wordcount.py \
-mapper "python3 wordcount.py map" \
-reducer "python3 wordcount.py reduce" \
-input /user/tweets/data/ \
-output /user/tweets/result/

Deleted /user/tweets/result
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob7667778047204893517.jar tmpDir=null
2023-01-24 16:00:55,046 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:00:55,266 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:00:55,305 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:00:55,306 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:00:55,506 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0002
2023-01-24 16:00:55,866 INFO mapred.FileInputFormat: Total input files t

Помимо того, что можно следить здесь в терминале, за выполнением можно наблюдать через UI-proxy в интерфейсе облака

Смотрим результат


In [33]:
! hdfs dfs -ls /user/tweets/result

Found 4 items
-rw-r--r--   1 ubuntu hadoop          0 2023-01-24 16:02 /user/tweets/result/_SUCCESS
-rw-r--r--   1 ubuntu hadoop   10107405 2023-01-24 16:02 /user/tweets/result/part-00000
-rw-r--r--   1 ubuntu hadoop   10134121 2023-01-24 16:02 /user/tweets/result/part-00001
-rw-r--r--   1 ubuntu hadoop   10118293 2023-01-24 16:02 /user/tweets/result/part-00002


In [34]:
! hdfs dfs -cat /user/tweets/result/part-* | head

aa	1726
aaaaa	7
aaaaaaaaaaaaaa	3
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaannnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnddddddddddddddddddddddddddddddddddddd	1
aaaaaaaaaaaaaaaaaaah	1
aaaaaaaaaaaaand	1
aaaaaaaaannnnnnnnnnnddddddddddddd	1
aaaaaaaah	1
aaaaaaaamen	1
aaaaaaagh	2
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.


Запустим вторую задачу 

In [35]:
%%time

! hdfs dfs -rm -r /user/tweets/top10/
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="top-10" \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
-D mapreduce.map.output.key.field.separator='+' \
-files top10.py \
-mapper "python top10.py map" \
-reducer "python top10.py reduce" \
-input /user/tweets/result/ \
-output /user/tweets/top10/

Deleted /user/tweets/top10
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob9174466269628992396.jar tmpDir=null
2023-01-24 16:08:34,627 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:08:34,855 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:08:34,897 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:08:34,898 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:08:35,129 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0003
2023-01-24 16:08:35,831 INFO mapred.FileInputFormat: Total input files to

In [36]:
! hdfs dfs -ls /user/tweets/top10

Found 2 items
-rw-r--r--   1 ubuntu hadoop          0 2023-01-24 16:09 /user/tweets/top10/_SUCCESS
-rw-r--r--   1 ubuntu hadoop        106 2023-01-24 16:09 /user/tweets/top10/part-00000


In [37]:
! hdfs dfs -cat /user/tweets/top10/part-*

t	3015051
co	2833375
https	2454132
the	591885
to	589004
in	457433
a	412888
s	397889
http	375299
of	350983


### Distributed cache

Помимо самого скрипта, мы можем положить в MapReduce любой другой файл, который может пригодиться для работы программы. Например при подсчете количества слов мы бы хотели выкинуть "стоп-слова". Их количество скорее всего не очень большое поэтому смело может передавать их обычным файлом. Hadoop гарантирует, что доставит все файлы ко всем машинам.

In [38]:
! hdfs dfs -cat /user/tweets/top10/part-* > stop-words.txt

#### Хозяйке на заметку

В питоне уже есть хорошая стандартная библиотека, которая позволяет вам гораздо удобнее работать с такимим стримовыми данными. Давайте напишем новую задачу со стоп словами, чтобы они смотрелись поприличнее.

In [40]:
%%writefile wordcount2.py

import sys
import csv
import re
from itertools import groupby


def csv_stream():
    return csv.reader(iter(sys.stdin.readline, ''))

def kv_stream(sep="\t"):
    return map(lambda x: x.split(sep), sys.stdin)


def mapper():
    pattern = re.compile(r"[a-z]+")
    for row in csv_stream():
        content = row[2]
        for match in pattern.finditer(content.lower()):
            word = match.group(0)
            print("{}\t{}".format(word, 1))


def reducer():
    for key, group in groupby(kv_stream(), lambda x: x[0]):
        word = key
        number = sum(int(x) for _, x in group)
        print("{}\t{}".format(word, number))


if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting wordcount2.py


In [41]:
%%writefile top10-2.py

import sys
import collections
from itertools import islice

def build_stop_words():
    with open('stop-words.txt', 'r') as f:
        stop_words = {x.split('\t')[0] for x in f}
    return stop_words

def kv_stream(sep="\t"):
    return map(lambda x: x.split(sep), sys.stdin)

def rewind():
    collections.deque(sys.stdin, maxlen=0)

def mapper():
    for key, value in kv_stream():
        print("{}+{}\t".format(key, value.strip()))

def reducer():
    stop_words = build_stop_words()
    first_10_stream = islice(filter(lambda x: x[0] not in stop_words, kv_stream('+')), 10)
    
    for word, count in first_10_stream:
        print("{}\t{}".format(word, count.strip()))
    rewind()

if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting top10-2.py


In [42]:
! cat stop-words.txt

t	3015051
co	2833375
https	2454132
the	591885
to	589004
in	457433
a	412888
s	397889
http	375299
of	350983


In [43]:
%%time

! hdfs dfs -rm -r /user/tweets/top10-stop-words/ || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="top-10-stop-words" \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
-D mapreduce.map.output.key.field.separator='+' \
-files top10-2.py,stop-words.txt \
-mapper "python top10-2.py map" \
-reducer "python top10-2.py reduce" \
-input /user/tweets/result/ \
-output /user/tweets/top10-stop-words/

Deleted /user/tweets/top10-stop-words
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob2658700216274696416.jar tmpDir=null
2023-01-24 16:19:32,776 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:19:33,004 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:19:33,047 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:19:33,048 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:19:33,239 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0004
2023-01-24 16:19:33,597 INFO mapred.FileInputFormat: Total inp

In [44]:
! hdfs dfs -cat /user/tweets/top10-stop-words/*

i	287232
for	272995
and	247749
is	246856
on	210172
you	196950
trump	169520
news	156101
it	152816
with	134178


### Ускоряем вычисления 

Несмотря на все оптимизации внутри Hadoop, самое узкое место - это передача данных от mapper к reducer. Таким образом если у нас получиться ускорить передачи данных в этом месте, мы сможем сильно ускорить весь процесс .

In [45]:
%%writefile wordcount3.py

import sys
import csv
import re
from itertools import groupby
from collections import Counter


def csv_stream():
    return csv.reader(iter(sys.stdin.readline, ''))

def kv_stream(sep="\t"):
    return map(lambda x: x.split(sep), sys.stdin)


def mapper():
    counter = Counter()
    pattern = re.compile(r"[a-z]+")
    for row in csv_stream():
        content = row[2]
        for match in pattern.finditer(content.lower()):
            word = match.group(0)
            counter[word] += 1
    
    for word, number in counter.items():
        print("{}\t{}".format(word, number))


def reducer():
    for key, group in groupby(kv_stream(), lambda x: x[0]):
        word = key
        number = sum(int(x) for _, x in group)
        print("{}\t{}".format(word, number))


if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting wordcount3.py


In [46]:
%%time

! hdfs dfs -rm -r /user/tweets/result-fast1 || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="word-count" \
-D mapreduce.job.reduces=3 \
-files ~/wordcount3.py \
-mapper "python3 wordcount3.py map" \
-reducer "python3 wordcount3.py reduce" \
-input /user/tweets/data/ \
-output /user/tweets/result-fast1/

Deleted /user/tweets/result-fast1
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob5383784256749372209.jar tmpDir=null
2023-01-24 16:26:29,085 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:26:29,322 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:26:29,357 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:26:29,358 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:26:29,568 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0005
2023-01-24 16:26:29,872 INFO mapred.FileInputFormat: Total input f

In [47]:
! hdfs dfs -cat /user/tweets/result-fast1/* | head

aa	1726
aaaaa	7
aaaaaaaaaaaaaa	3
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaannnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnddddddddddddddddddddddddddddddddddddd	1
aaaaaaaaaaaaaaaaaaah	1
aaaaaaaaaaaaand	1
aaaaaaaaannnnnnnnnnnddddddddddddd	1
aaaaaaaah	1
aaaaaaaamen	1
aaaaaaagh	2
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.


Однако у этого решения есть **очень большой минус** - сложность по памяти **O(n)**. Это означает, что вычисление может упасть если данные попадутся неудачные. 

Важный принцип работы с большими данными - все алгоритмы должны работать меньше чем за O(n). Это относится не только к MapReduce, а в целом почти к любым инструментам обработки больших данных.

In [None]:
! hdfs dfs -ls /user/tweets

### Используем комбайнер

Чтобы побороться с этой бедой, воспользуемся дополнительным инструментом в Hadoop - Combiner. По сути это маленький Reduce, который запускается после маппера. Это позволяет уменьшить количество выходных данных с Map стадии.

<img src="https://habrastorage.org/getpro/habr/post_images/587/2d2/dfe/5872d2dfe12643665370708d225bc1d4.jpg">

In [48]:
%%time

! hdfs dfs -rm -r /user/tweets/result-fast2 || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="word-count" \
-D mapreduce.job.reduces=3 \
-files ~/wordcount2.py \
-mapper "python3 wordcount2.py map" \
-combiner "python3 wordcount2.py reduce" \
-reducer "python3 wordcount2.py reduce" \
-input /user/tweets/data/ \
-output /user/tweets/result-fast2/

Deleted /user/tweets/result-fast2
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob2036465758215084331.jar tmpDir=null
2023-01-24 16:30:44,361 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:30:44,589 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:30:44,629 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:30:44,630 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:30:44,863 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0006
2023-01-24 16:30:45,190 INFO mapred.FileInputFormat: Total input f

In [49]:
! hdfs dfs -cat /user/tweets/result-fast1/* | head

aa	1726
aaaaa	7
aaaaaaaaaaaaaa	3
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaannnnnnnnnnnnnnnnnnnnnnnnnnnnnnnnddddddddddddddddddddddddddddddddddddd	1
aaaaaaaaaaaaaaaaaaah	1
aaaaaaaaaaaaand	1
aaaaaaaaannnnnnnnnnnddddddddddddd	1
aaaaaaaah	1
aaaaaaaamen	1
aaaaaaagh	2
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.


Часто combiner может просто совпадать с reducer однако это не всегда так по следующей причине - combiner не имеет права менять формат вывода после стадии map.

Hadoop самостоятельно опеределяет целесообразность запуска combiner и может его не запускать вовсе.
Или например задача может вообще не подходить под такую модель запуска. Если мы ищем среднее, то нельзя заранее подсчитывать среднее на стадии combiner - макмимум, что мы там можем запустить - это подсчет количество и суммы.

In [50]:
%%writefile top10-3.py

import sys
import collections
from itertools import islice

def build_stop_words():
    with open('stop-words.txt', 'r') as f:
        stop_words = {x.split('\t')[0] for x in f}
    return stop_words

def kv_stream(sep="\t"):
    return map(lambda x: x.split(sep), sys.stdin)

def rewind():
    collections.deque(sys.stdin, maxlen=0)

def mapper():
    for key, value in kv_stream():
        print("{}+{}\t".format(key, value.strip()))

def reducer():
    stop_words = build_stop_words()
    first_10_stream = islice(filter(lambda x: x[0] not in stop_words, kv_stream('+')), 10)
    
    for word, count in first_10_stream:
        print("{}\t{}".format(word, count.strip()))
    rewind()
    
def combiner():
    stop_words = build_stop_words()
    first_10_stream = islice(filter(lambda x: x[0] not in stop_words, kv_stream('+')), 10)
    
    for word, count in first_10_stream:
        print("{}+{}\t".format(word, count.strip()))
    rewind()

if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer,
        'combiner': combiner
    }[mr_command]()

Overwriting top10-3.py


In [51]:
%%time

! hdfs dfs -rm -r /user/tweets/top10-fast || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="word-count" \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
-D mapreduce.map.output.key.field.separator='+' \
-files ~/top10-3.py,stop-words.txt \
-mapper "python3 top10-3.py map" \
-combiner "python3 top10-3.py combiner" \
-reducer "python3 top10-3.py reduce" \
-input /user/tweets/result-fast1 \
-output /user/tweets/top10-fast/

Deleted /user/tweets/top10-fast
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob2343487677005611575.jar tmpDir=null
2023-01-24 16:33:22,355 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:33:22,566 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:33:22,604 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:33:22,606 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:33:22,828 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0007
2023-01-24 16:33:23,584 INFO mapred.FileInputFormat: Total input fil

In [52]:
! hdfs dfs -cat /user/tweets/top10-fast/* 

i	287232
for	272995
and	247749
is	246856
on	210172
you	196950
trump	169520
news	156101
it	152816
with	134178


### Кастомный Partitioner

В Hadoop MapReduce можно указывать свой кастомный партишенер, который будет определять, как разбивать данные по редюсерам.

Это бывает важно, когда вы используете сложный ключ и много редюсеров - вполне возможно вы захотите, чтобы такие ключи определенным образом распределялись по редюс-задачам.

Для наглядности давайте решим такую задачу - для каждого пользователя подсчитаем, на каких языках он писал твиты и в каком количестве

In [53]:
%%writefile lang-distribution.py

import sys
import csv
from collections import Counter
from itertools import groupby

def kv_stream(sep="\t"):
    return map(lambda x: x.split(sep), sys.stdin)

def csv_stream():
    return csv.reader(iter(sys.stdin.readline, ''))

def mapper():
    for row in csv_stream():
        author, lang = row[1], row[4]
        print("{}+{}\t1".format(author.strip(), lang.strip()))

def reducer():
    for author, records in groupby(kv_stream('+'), lambda x: x[0]):
        langs_stream = (x.split('\t') for _, x in records)
        for lang, group in groupby(langs_stream, lambda x: x[0]):
            count = sum(int(x) for _, x in group)
            print("{}+{}\t{}".format(author, lang, count))

if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer,
    }[mr_command]()

Overwriting lang-distribution.py


Параметры

```
-D mapred.partitioner.class=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapreduce.partition.keypartitioner.options="-k1,1" \
```

Указывают на то, что разделятся на редюсеры записи должны не по полному ключу, а только по первой его части

In [54]:
! hdfs dfs -rm -r /user/tweets/lang-dist || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="lang-dist" \
-D mapreduce.job.reduces=3 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k1,1 -k2,2" \
-D mapreduce.map.output.key.field.separator='+' \
-D mapred.partitioner.class=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapreduce.partition.keypartitioner.options="-k1,1" \
-files ~/lang-distribution.py \
-mapper "python3 lang-distribution.py map" \
-reducer "python3 lang-distribution.py reduce" \
-input /user/tweets/data/ \
-output /user/tweets/lang-dist

Deleted /user/tweets/lang-dist
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob1335278815778382587.jar tmpDir=null
2023-01-24 16:40:15,086 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:40:15,310 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:40:15,345 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:40:15,346 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:40:15,561 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0008
2023-01-24 16:40:16,277 INFO mapred.FileInputFormat: Total input file

In [55]:
! hdfs dfs -cat /user/tweets/lang-dist/* | head 

1488REASONS+Russian	50
1488REASONS+Serbian	1
1488REASONS+Ukrainian	1
1D_NICOLE_+Albanian	1
1D_NICOLE_+English	41
1D_NICOLE_+Tagalog (Filipino)	2
1ERIK_LEE+English	2
459JISALGE+Russian	1
4MYSQUAD+Arabic	5
4MYSQUAD+Catalan	1
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.


### Как дебажить ошибки

Через yarn можно выводить логи приложения

In [56]:
%%writefile mistake.py

import sys
import csv
import re
from itertools import groupby


def csv_stream():
    return csv.reader(iter(sys.stdin.readline, ''))

def kv_stream(sep="\t"):
    return map(lambda x: x.split(sep), sys.stdin)


def mapper():
    pattern = re.compile(r"[a-z]+")
    for row in csv_stream():
        content = row[2]
        for match in pattern.finditer(content.lower()):
            word = match.group(0)
            strange_number = 1.0 / (len(word) - 1)
            print("{}\t{}".format(word, strange_number))


def reducer():
    for key, group in groupby(kv_stream(), lambda x: x[0]):
        word = key
        number = sum(float(x) for _, x in group)
        print("{}\t{}".format(word, number))


if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting mistake.py


In [57]:
! hdfs dfs -rm -r /user/tweets/mistake || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="mistake" \
-D mapreduce.job.reduces=3 \
-files ~/mistake.py \
-mapper "python3 mistake.py map" \
-reducer "python3 mistake.py reduce" \
-input /user/tweets/data/ \
-output /user/tweets/mistake

Deleted /user/tweets/mistake
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob472224327129279597.jar tmpDir=null
2023-01-24 16:41:58,494 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:41:58,725 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:41:58,768 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:41:58,769 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
2023-01-24 16:41:58,965 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1674572529553_0009
2023-01-24 16:41:59,714 INFO mapred.FileInputFormat: Total input files t

In [58]:
! yarn logs -applicationId application_1674572529553_0009 -log_files stderr | head -n 50

2023-01-24 16:43:07,268 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:43:07,551 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
Container: container_1674572529553_0009_01_000015 on rc1a-dataproc-d-1jipfldazn1nw9u1.mdb.yandexcloud.net_33887
LogAggregationType: AGGREGATED
LogType:stderr
LogLastModifiedTime:Tue Jan 24 16:42:38 +0000 2023
LogLength:208
LogContents:
Traceback (most recent call last):
  File "mistake.py", line 34, in <module>
    {
  File "mistake.py", line 21, in mapper
    strange_number = 1.0 / (len(word) - 1)
ZeroDivisionError: float division by zero

End of LogType:stderr
***********************************************************************


End of LogType:stderr
***********************************************************************

Container: container_1674572529553_0009_01_000011 on rc

Идентификатор можно найти логах запуска, в интерфейсе или посмотреть список всех и найти там

In [59]:
! yarn application -list 

2023-01-24 16:43:30,487 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:8032
2023-01-24 16:43:30,702 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-ymnek55ntqs3neie.mdb.yandexcloud.net/10.128.0.25:10200
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):0
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL


Чтобы заранее прикончить задачу, можно также использовать yarn

```bash
yarn application -kill <application_id>
```

### Переносим результаты

Все данные, с которыми мы работали только что, лежат на жестких дисках кластера и доступны только через hdfs

Если мы готовы презентовать наш результат миру, нужно его переместить в s3

In [60]:
! hdfs dfs -ls /user/tweets/top10-stop-words

Found 2 items
-rw-r--r--   1 ubuntu hadoop          0 2023-01-24 16:20 /user/tweets/top10-stop-words/_SUCCESS
-rw-r--r--   1 ubuntu hadoop        109 2023-01-24 16:20 /user/tweets/top10-stop-words/part-00000


In [61]:
! hdfs dfs -cp /user/tweets/top10-stop-words/part-00000 s3a://lsml-kosmos/top10.txt

2023-01-24 16:46:10,285 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-01-24 16:46:10,369 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-01-24 16:46:10,369 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
cp: `s3a://lsml-kosmos/top10.txt': File exists
2023-01-24 16:46:13,241 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2023-01-24 16:46:13,242 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2023-01-24 16:46:13,243 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


Можно проверять через интерфейс - файл оказался в бакете

### Hadoop жжет бабло

<img src="http://vostokovod.ru/assets/images/blog/2013/000333.png">

Полноценный кластер - весьма дорогое удовольствие, поэтому отлючайте его после использования. 