# 3.1 Парадигма MapReduce

Модель вычислений. Алгоритмы могут быть реализованы с помощью парадигмы MapReduce (распределенные вычисления). Если на машине не хвататет памяти при вычислениях, то обращаются к распределенным вычислениям.

2 шага:
- `map`: обработка данных (применения функции ко всем элементам)
- `reduce`: свертка данных (применение функции ко сбору всех элементов)

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-02.png" title="Схема MapReduce" width="500" height="500"/>

Обработка происходит с помощью worker-ов:
- Map phase == Mapper
- Reduce phase == Reducer

Файл или файлы разбиваются на разные части (split-ы). Каждый split может быть обработан своим worker-ом. ОБработка происходит параллельно и независимо. Worker-ы не могут общаться между собой. Worker-ы могут быть запущены не одновременно.

Каждый mapper после обработки записывает себе данные на диск. 

После того, как все mapper-ы отработают происходит переход к reduce фазе.

Данные reducer-у могут приходить от разных mapper-ов в формате **key -> value**. На repucer-е образуется этот **key** и массив всех значений, которые этому ключу соответствуют.

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-03.png" title="Входные данные" width="500" height="500"/>

Все данные разделяются на независимые split-ы и обрабатываются своим worker-ом. Если данные зависимы, то мы не сможем обработать такие связи, потому что их нет.

Обычно, split==block в HDFS. Но, к примеру, храня файл архива `.gz` прослеживается явная зависимость блоков, так как по отдельности worker-ы не смогут разархивировать файл. В таком случае отправляем все блоки такого файла на один worker, хоть это и не оптимальное решение задачи.

**Data locality** - стараются запускать worker ровно на той машине, на которой лежат данные. С диска читать быстрее, чем передавать по сети.

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-04.png" title="Передача данных между Map и Reduce" width="500" height="500"/>

Промежуточные данные mapper-ы хранят на локальных дисках, а не в HDFS, потому что в HDFS нужно производить репликацию данных. Мы теряем надежность данных, если произойдет ошибка. В таком случае MapReduce перезапускает такой mapper.

**Shuffle** - процесс копирования с mapper-ов на reducer-ы. Обычно этот процесс запускается после того, как все mapper-ы завершились. Но можно и такую конфигурацию настроить, что как только mapper завершился, данные начинают копироваться на reducer.

**Вопрос**: как правильно разбивать файл на split-ы, чтобы одна и та же строка не оказалась в разных блоках?
***Ответ**: лезем в реализацию класса `TextInputFormat`. Как это работает:
- Есть функция, которая вычисляет сплиты. Она ничего не знает про концы строк. Она просто возвращает для каждого сплита название файла и сдвиг в нем. Сдвиг равен размеру блока. Т.е. обычно попадает на середину строки.
- Есть функция, которая возвращает записи из данного сплита. Она берет файл, делает в нем нужный сдвиг, пропускает первую строку (т.е. делает `getLine()`) и начинает возвращать последовательно все следующие строки, пока текущая позиция не будет заходить за нужный сплит. При этом, **внимание!**, последняя строка (которая не вся влезла в этот сплит) считывается до конца, т.е. формально из другого сплита (и блока HDFS, если говорить про более низкий уровень). Именно поэтому, сначала первая строка пропускается, т.к. она уже попала в предыдущий сплит (конечно, для первого блока в файле не нужно пропускать строку). Для программы нет никаких сложностей считать строку, часть которой находится в одном блока, а часть в другой. Просто делаешь `getLine()` и получаешь нужную строку. На уровне HDFS при этом часть строки будет считано с одного блока (как правило локального), а остаток будет прислан по сети из другого блока.

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-05.png" title="Результат Map-Reduce задачи" width="500" height="500"/>

Разные reduce-ры (процессы) не могут писать в один и тот же файл. Выходные файлы хранятся в HDFS и реплицируются.

Оптимально, когда каждый reducer обрабатывает от 500 мб до гб.

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-06.png" title="Map-Reduce без Reduce" width="500" height="500"/>

Частный случай задачи без Reduce фазы. Результат работы каждого mapper-а пишется сразу в output файл, которые хранятся в HDFS. 

Долгие задачи mapper-ов можно писать сразу  в HDFS, минуя локальное сохранение.

**Задача**:  
Число mapper-ов определяет система, а число reduce-ов в MapReduce задаче задается пользователем при описании MapReduce задачи.

# 3.2 Фреймворк MapReduce

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-09.png" title="Схема работы демонов в  MapReduce" width="500" height="500"/>

- Hadoop основан на HDFS. 
- На нашем кластере есть сервера (slave node) на которых запущены демоны datanode (непосредственно демоны файловой системы HDFS), которые работают поверх файловой системы Linux и обеспечивают работу с файловой системой HDFS.
- Есть отдельный сервер, на котором запущен демон namenode. Он не хранит никаких данных, но отвечает за хранение мета-информации.
- В MapReduce есть 2 типа демонов:
    - Jobtracker - это процесс, который в целом отвечает за запуск задачи (job).
    - Tasktracker запущен непосредственно на каждой машине кластера, на которой запущен datanode. Отвечает непосредственно за запуск worker-ов на node. Ничего не мешает запускать tasktracker и datanode на разных машинах, но тогда придется постоянно данные по сети.

### JobTracker

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-10.png" title="JobTracker" width="500" height="500"/>



### TaskTracker

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-11.png" title="TaskTracker" width="500" height="500"/>

### Система слотов

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-12.png" title="Система слотов" width="500" height="500"/>

Данная система слотов встречается в более ранних версиях Hadoop. В более новых - более новая, о ней дальше по курсу.

Число слотов определяется из ресурсов сервера. Ресурсы сервера - это ЦПУ и Память оперативная. Обычно, каждый слот - одно ядро процессора и часть оперативной памяти.

Слоты делятся на 2 типа: слоты для mapper-ов и слоты для reduce-ов.

### Кейс

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-13.png" title="Кейс" width="500" height="500"/>

Стоит задача: посчитать сколько раз встречается каждый символ в тексте.  
Текст представляется в виде файлов `k1:v1, ... , k6:v6`. Так как у нас файл, то в качестве ключа номер строки, а в качестве значения сама строка. Далее каждый mapper считает сколько раз встретился каждый символ. Далее начинается фаза reduce.

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-14.png" title="Кейс" width="500" height="500"/>

Перед началом фазы reduce мы должны каким-то образом скопировать данные из mapper-ов в reduce-ры. Этот этап называется **shuffle**. После этапа **shuffle** происходит этап **sort**.

На один reduce-р попадают данные с одинаковым ключом от разных mapper-ов. Т.о. на одном reduce-ре мы для каждого ключа будем иметь все данные. 

Код на reduce-ре довольно простой. Мы просто сворачиваем в сумму.

<img src="slides/LE03/le03-MapReduce/le03-MapReduce-15.png" title="Кейс" width="500" height="500"/>

Схема может немного усложниться, если мы в середину добавим 2 этапа:
- **combine** - это фактически локальный reduce-р. Делает свертку (агрегацию), но только для локальных данных. Обычно используют такой же код в combine, как и в reduce. К примеру, если на одном mapper-е у нас есть значения с одинаковым ключом, то мы их можем агрегировать.
- **partition** определяет по ключу в какой reduce-р его нужно отправить.

### Основные функции

- `map(k1,v1) -> list(k2, v2)`
- `reduce(k2, list(v2*)) -> list(k3, v3)`

### Опциональные функции

- `partition(k2,v2, |reducers| ) -> № of reducer` 
    - распределяет ключи по reducer-ам
    - часто просто хеш от key: `hash(k2) mod n`
- `combine(k2, v2) -> list(k2,v2)`
    - мини-reducer-ы, которые выполняются после завершения фазы map
    - используется в качестве оптимизации для снижения сетевого трафика на reduce
    - (!) не должен менять тип ключа и значения

# Java API. Введение

## Типы API

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-02.png" title="Типы API." width="500" height="500"/>

## Класс Job

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-03.png" title="Класс Job" width="500" height="500"/>

## Класс Mapper

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-04.png" title="Класс Mapper" width="500" height="500"/>

В отличии от класса job, который создает объект, класс mapper должен наследоваться в пользовательском классе. Плюс, в пользовательском классе должны быть реализованы некоторые функции: 
- `map`, которая будет вызываться для каждого из split-а. Так, же будет передан `context`, из которого можно получить информацию о всей задаче. А так же, может передавать некоторую статистику в Jobtracker.
- `setup` запускается, чтобы что-то проинициализировать.
- `cleanup` зеркальная `setup` запускается в конце работы mapper-а и закрывает те ресурсы, которые были выделены.

## Класс Reducer/Combiner

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-05.png" title="Класс Reducer/Combiner" width="500" height="500"/>

По аналогии с классом Mapper.

## Класс Partitioner

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-06.png" title="Класс Partitioner" width="500" height="500"/>

# 3.4 Java API. Продолжение

## Reducer в качестве Combiner

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-19.png" title="Reducer в качестве Combiner" width="500" height="500"/>

В общем случае, если каку-то операцию можно делать частично (сумму от части данных, а потом сумму от итогового результата), то тогда combiner применять можно, иначе применять нельзя.

MapReduce **не гарантирует** вызов combiner-а, он служит только для некоторой оптимизации, чтобы данных передавалось меньше между mapper-ом и reducer-ом. Поэтому нет смысла закладывать некую логику внего, тк он может быть и не вызван.

## Типы данных в Hadoop

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-20.png" title="Типы данных в Hadoop" width="500" height="500"/>

Для передачи данных между mapper-ом и reducer-ом не используются стандартные типы данных, такие как int, flor или строки. Для этого внутренние типы должны иметь внутренние методы для сериализации и десериализации данных.

## Класс InputSplit

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-23.png" title="Класс InputSplit" width="500" height="500"/>

## Класс InputFormat

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-24.png" title="Класс InputFormat" width="500" height="500"/>

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-25.png" title="Готовые классы InputFormat" width="500" height="500"/>

## Класс OutputFormat

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-26.png" title="Класс OutputFormat" width="500" height="500"/>

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-27.png" title="Класс OutputFormat" width="500" height="500"/>

## Shuffle и Sort в Hadoop

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-29.png" title="Shuffle и Sort в Hadoop" width="500" height="500"/>

Важно понимать, как происходит процесс передачи данных от mapper-а к reducer-у. При большом объему данных время, затраченное на передачу данных, существенно. И нужно понимать на какие процессы уходит время.

Те данные, которые которые пишет mapper попадает в **цикличный буффер (circ buff)**. Размер его ограничен, он задается в файле конфигурации. Когда буффер заполняется он сбрасывает их в файлы spill-ы на диске. По окончанию работы mapper-а таких файлов может оказаться много и мы должны объединить их в один файл. Все эти файлы зачитываются, сортируются и merg-атся в один файл. Причем может работать combiner (локальный reducer), который агрегирует непосредственно на mapper-е. При подготовке итогового файла (он не просто монолитный, он разбит на разные части) каждая часть соответствует определенному reducer-у. То каким образом разбивается файл определяется partition-ом. Каждая часть этого файла будет отсортирована (это довольно важно для дальнейшей работы reducer-а). 

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-30.png" title="Shuffle и Sort в Hadoop" width="500" height="500"/>

После того, как на каждом mapper-е образовался файл, разбитый на части. Каждая его часть будет скопирована на соответствующий reducer. При этом другие части пойдут на другие reducer-ы. Этот процесс копирования файлов называется **shuffle**. На одном конкретном reducer-е мы получаем много частей файлов с разных mapper-ов. Далее производим так называемый процесс сортировки. Мы помним, что данные в каждой части уже как-то отсортированы. Задача упрощается и мы получаем отсортированный поток, который отправляется в reducer. 

## Запуск и отладка MapReduce программы

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-31.png" title="Запуск MapReduce программы" width="500" height="500"/>

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-32.png" title="Отладка MapReduce программы" width="500" height="500"/>

## Hadoop Streaming

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-34.png" title="Hadoop Streaming" width="500" height="500"/>

Hadoop streaming - инструмент для запуска команд на другом языке.

Map: `countMap.py`
```python
#!/usr/bin/python
import sys

for line in sys.stdin:
    for token in line.strip().split(" "):
        if token: print(token + "\t1")
```

Reduce: `countReduce.py`
```python
#!/usr/bin/python
import sys

(lastKey, sum) = (None, 0)

for line in sys.stdin:
    (key, value) = line.strip().split("\t")
    if lastKey and lastKey != key:
        print(lastKey + "\t" + str(sum))
        (lastKey, sum) = (key, int(value))
    else:
        (lastKey, sum) = (key, sum + int(value))
        
if lastKey:
    print(lastKey + "\t" + str(sum))
```

## Запуск и отладка

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-38.png" title="Запуск и отладка" width="500" height="500"/>

Сначала запускаем и проверяем корректность работы локально.

<img src="slides/LE03/le03-MapReduce_API/le03-MapReduce_API-39.png" title="Запуск и отладка" width="500" height="500"/>

Запускаем с помощью Hadoop streaming.