<center>
    <img src="https://upload.wikimedia.org/wikipedia/commons/a/a8/%D0%9B%D0%9E%D0%93%D0%9E_%D0%A8%D0%90%D0%94.png" width=500px/>
    <font>Python 2023</font><br/>
    <br/>
    <br/>
    <b style="font-size: 2em">MapReduce (в т. ч. в Python)</b><br/>
    <br/>
    <font>Золотарёв Ярослав, по материалам Максима Ахмедова 🫡</font><br/>
</center>

### План лекции

**Первая часть: Map-Reduce (50m)**
1. Map-Reduce
2. Модель данных "распределённая система"

**Вторая часть: Стриминг (30m)** 
1. Стриминг в обычной жизни
2. Модель данных "стриминг"

**Третья часть: Выдача домашки (<1h)**
1. `diesel_power`
2. `compgraph` 

# Map-Reduce

## Что такое Map-Reduce

* Вычислительная парадигма, придуманная в Google
* Представлена широкой публике в статье 2004 году
* В 2006 году появился свой прототип в Яндексе
* В 2007 году появился Apache Hadoop -- первая открытая реализация
* Используется и по сей день в хоть сколько-то больших вычислительных кластерах

### Табличная модель вычислений
* Графы, деревья, другие содержательные структуры данных -- слишком сложно для внешней памяти распределенных систем
* Таблицы со строками -- просто и удобно
* $\sf{table} := \sf{sequence\;of\;rows}$
* $\sf{row} := \sf{dictionary}:\; \{\sf{column\;name} \rightarrow \sf{field} \}$

In [1]:
try:
    import polars as pl
except ImportError:
    !pip install polars
    import polars as pl
    
_EX1 = [
    {"name": "Max", "age": 24, "height": 183},
    {"name": "Peter", "age": 30, "height": 162},
    {"name": "Maria", "age": 20, "height": 200}
]

_DF_EX1 = pl.DataFrame(_EX1)

In [2]:
print('Как мы видим базу данных: ')
print(_DF_EX1)
print('Как она может быть представлена логически: ')
for row in _EX1:
    print(row)

Как мы видим базу данных: 
shape: (3, 3)
┌───────┬─────┬────────┐
│ name  ┆ age ┆ height │
│ ---   ┆ --- ┆ ---    │
│ str   ┆ i64 ┆ i64    │
╞═══════╪═════╪════════╡
│ Max   ┆ 24  ┆ 183    │
│ Peter ┆ 30  ┆ 162    │
│ Maria ┆ 20  ┆ 200    │
└───────┴─────┴────────┘
Как она может быть представлена логически: 
{'name': 'Max', 'age': 24, 'height': 183}
{'name': 'Peter', 'age': 30, 'height': 162}
{'name': 'Maria', 'age': 20, 'height': 200}


### Табличная модель вычислений на практике

* Физически таблица может находиться на одной машине, а может быть нарезана на произвольное число кусков, находящихся на разных машинах.
* Замечательная черта парадигмы Map-Reduce: она хорошо масштабируется в распределённых системах, позволяя мыслить таблицами и не задумываться о физическом аспекте хранения.

In [3]:
_EX2 = [
    {"ts": 14211307, "query_text": "Где скачать Death Stranding?", "device": "PC"},
    {"ts": 14211396, "query_text": "Где растёт кокос", "device": "Android"},
    {"ts": 14211402, "query_text": "дора слушать онлайн", "device": "Android"}
]

_DF_EX2 = pl.DataFrame(_EX2)

### Другой пример - дневной лог запросов к поиску Яндекса

Размер таблицы с подобным дневным логом продакшн-сервиса может достигать петабайт данных.

In [4]:
print(_DF_EX2)

shape: (3, 3)
┌──────────┬──────────────────────────────┬─────────┐
│ ts       ┆ query_text                   ┆ device  │
│ ---      ┆ ---                          ┆ ---     │
│ i64      ┆ str                          ┆ str     │
╞══════════╪══════════════════════════════╪═════════╡
│ 14211307 ┆ Где скачать Death Stranding? ┆ PC      │
│ 14211396 ┆ Где растёт кокос             ┆ Android │
│ 14211402 ┆ дора слушать онлайн          ┆ Android │
└──────────┴──────────────────────────────┴─────────┘


### Word Count
* Классическая задача, на которой можно проиллюстрировать парадигму Map-Reduce, это Word Count.
* Дан текст, представленный в виде набора строковых значений в столбце (потенциально большой) таблицы.
* Посчитать количество вхождений каждого слова в текст.
* Иными словами, сформировать таблицу из пар $(word_i,\,count_i)$.

### Map
* Первая операция в нашей модели -- ${\sf Map}_M(T)$, которая применяет **маппер** $M$ к таблице $T$.
* Маппер -- правило, по которому одну строку таблицы можно преобразовать в одну или несколько (возможно ноль) строк новой таблицы.
* $M: {\sf row} \rightarrow {\sf row}^*$.
* Операция ${\sf Map}_M(T)$ применяет $M$ ко всем строкам входной таблицы и составляет выходную таблицу из объединения результатов.
* ${\sf Map}_M : table \rightarrow table$.

### Примеры мапперов:
* Project: оставить одно и то же подмножество столбцов в каждой строке.
* StripPunctuation: избавить строковое значение от символов пунктуации в конкретном столбце.
* ToLowercase: привести к нижнему регистру строковые значения в конкретном столбце.
            
Все эти мапперы биективны, то есть они переводят строку таблицы ровно в одну строку таблицы.  

Подействуем по очереди мап-операциями с описанными мапперами на таблицу из предыдущего примера.

### Word Count, шаг 1
Пусть $T_0$ -- наша исходная база данных. 

Тогда $T_1 = {\sf Map}_{\sf Project}(T_0)$ (проецируем на колонку $query\_text$):

In [5]:
print('T_0:')
print(_DF_EX2)
ex2_query = _DF_EX2.select(pl.col('query_text'))
print('T_1:')
print(ex2_query)

T_0:
shape: (3, 3)
┌──────────┬──────────────────────────────┬─────────┐
│ ts       ┆ query_text                   ┆ device  │
│ ---      ┆ ---                          ┆ ---     │
│ i64      ┆ str                          ┆ str     │
╞══════════╪══════════════════════════════╪═════════╡
│ 14211307 ┆ Где скачать Death Stranding? ┆ PC      │
│ 14211396 ┆ Где растёт кокос             ┆ Android │
│ 14211402 ┆ дора слушать онлайн          ┆ Android │
└──────────┴──────────────────────────────┴─────────┘
T_1:
shape: (3, 1)
┌──────────────────────────────┐
│ query_text                   │
│ ---                          │
│ str                          │
╞══════════════════════════════╡
│ Где скачать Death Stranding? │
│ Где растёт кокос             │
│ дора слушать онлайн          │
└──────────────────────────────┘


### Word Count, шаг 2
$T_2 = {\sf Map}_{StripPunctuation}(T_1)$

In [6]:
from functools import partial
_PUNCT = r'!"#$%&\'()*+,-./:;<=>?@[\]^_`{|}~'
print('T_1:')
print(ex2_query)
ex2_stripped = ex2_query.select(
    pl.col('query_text').map_elements(
        lambda x: str.translate(x, str.maketrans('', '', _PUNCT))
    )
)
print('T_2:')
print(ex2_stripped)

T_1:
shape: (3, 1)
┌──────────────────────────────┐
│ query_text                   │
│ ---                          │
│ str                          │
╞══════════════════════════════╡
│ Где скачать Death Stranding? │
│ Где растёт кокос             │
│ дора слушать онлайн          │
└──────────────────────────────┘
T_2:
shape: (3, 1)
┌─────────────────────────────┐
│ query_text                  │
│ ---                         │
│ str                         │
╞═════════════════════════════╡
│ Где скачать Death Stranding │
│ Где растёт кокос            │
│ дора слушать онлайн         │
└─────────────────────────────┘


### Word Count, шаг 3
$T_3 = {\sf Map}_{ToLowercase}(T_2)$

In [7]:
print('T_2:')
print(ex2_stripped)
ex2_lower = ex2_stripped.select(pl.col('query_text').map_elements(str.lower))
print('T_3:')
print(ex2_lower)

T_2:
shape: (3, 1)
┌─────────────────────────────┐
│ query_text                  │
│ ---                         │
│ str                         │
╞═════════════════════════════╡
│ Где скачать Death Stranding │
│ Где растёт кокос            │
│ дора слушать онлайн         │
└─────────────────────────────┘
T_3:
shape: (3, 1)
┌─────────────────────────────┐
│ query_text                  │
│ ---                         │
│ str                         │
╞═════════════════════════════╡
│ где скачать death stranding │
│ где растёт кокос            │
│ дора слушать онлайн         │
└─────────────────────────────┘


### Word Count, шаг 4
* Split: взять строковое значение из строки и породить по одной строке на каждое слово в этом строковом значении
* Split - уже не биективный маппер!

$T_4 = {\sf Map}_{Split}(T_3)$

In [8]:
# print('T_3:')
# print(ex2_lower)
ex2_split = ex2_lower.select(pl.col('query_text').map_elements(str.split).explode())
print('T_4:')
ex2_split

T_4:


query_text
str
"""где"""
"""скачать"""
"""death"""
"""stranding"""
"""где"""
"""растёт"""
"""кокос"""
"""дора"""
"""слушать"""
"""онлайн"""


### Reduce

* Вторая операция модели -- Reduce.
* ${\sf Reduce}_{R,cols}(T)$ логически состоит из двух шагов:
     * разбить строки таблицы по группам согласно кортежу значений в наборе колонок $cols$;
     * применить редьюсер $R$ к каждой подобной группе.
* Таким образом, $R$ должен быть функцией $R: {\sf row}^* \rightarrow {\sf row}^*$.
* ${\sf Reduce}_{R,cols}: {\sf table} \rightarrow {\sf table}$.
* $cols$ называется **ключом** редьюса.

### Считающий редьюсер

* ${\sf Count}$: простейший пример редьюсера, считающего количество строк в каждой группе.
* Формально, пусть ${\sf rows}$ -- это группа строк с одинаковым значением `row["word"]`. Тогда:     
${\sf Count}({\sf rows})$ = `[{'word': rows[0]['word'], 'count': len(rows)}]`
* Это агрегирующий редьюсер, он принимает пачку строк и возвращает одну агрегированную строку.

### Word Count, шаг 5
$T_5 = {\sf Reduce}_{Count,\{"{\sf word}"\}}(T_4)$

In [9]:
# print('T_4:')
# print(ex2_split)
ex2_count = ex2_split.group_by('query_text').count()
print('T_5:')
ex2_count

T_5:


query_text,count
str,u32
"""скачать""",1
"""онлайн""",1
"""слушать""",1
"""растёт""",1
"""кокос""",1
"""где""",2
"""stranding""",1
"""death""",1
"""дора""",1


### Парадигма Map-Reduce
* Резюмируем.
* Мы сформулировали две логические операции:
    * ${\sf Map}_M: {\sf table} \rightarrow {\sf table}$
    * ${\sf Reduce}_{R,cols}: {\sf table} \rightarrow {\sf table}$
* Простая и понятная абстрактная модель
* Чем же она хороша?

## Модель данных "распределённая система"

### Хранение таблиц

* setup1:
    * есть одна машина c $HDD = 4TB$ и $CPU = 20$ cores
    * таблица размером $1TB$ хранится на HDD;
    * маппер $M$ применяется к таблице в 10 потоков
    
* setup2:
    * есть 10 машин в той же конфигурации
    * таблица размером $1TB$ разбита на 10 кусков по $100GB$ на HDD на каждой машине
    * маппер $M$ применяется к каждому куску на своей машине независимо в один поток

**Вопрос**: какой setup отработает быстрее?

**Ответ**: setup2, т.к. IO сильно медленнее CPU

### Распределённый Map

* Устроен предельно легко
* Независимо обрабатываем порции таблицы на каждой машине в отдельности

### Распределённый Reduce

* Всё сложнее
* Группа строк с одинаковым значением ключа может быть разбросана по произвольному набор машин
* **Вопрос**: что же делать?

### Сортировка
    
* Расширим нашу модель третьей операцией: ${\sf Sort}$
* ${\sf Sort}_{cols}: {\sf table} \rightarrow {\sf table}$
* После сортировки строки с одним значением ключа $cols$ окажутся рядом.

### Sorted Reduce
    
* Получаем первый вариант стратегии для Reduce: SortedReduce.
* SortedReduce реализует ${\sf Reduce}_{R, cols}(T)$ при выполнении **предусловия**: таблица $T$ сортирована по набору колонок $cols$.
* Типичный паттерн действий: ${\sf Map}, {\sf Sort}, {\sf Reduce}$.

### Shuffle
    
* Опишем альтернативный путь
* Вместо сортировки часто используется примитив ${\sf Shuffle}$ (иногда называется партицирование)
* ${\sf Shuffle}$ раскладывает строки по корзинам согласно некоторой функции от ключа $key$, типично -- $hash(key) \bmod bucketCount$
* Каждая корзина целиком живёт на одной машине, значит одинаковые ключи снова оказались "рядышком"
* Обрабатываем каждую корзину независимо

### Распределённые Sort и Shuffle
  
* **Вопрос**: как устроены?
* **Подумайте** сами на досуге :)

# Стриминг

## Стриминг в обычной жизни и Python

### Shell
    
* Утилиты командной строки, превращащие вход (из stdin) в выход (в stdout):

| ? | ? | ? |
|---------|---------|---------|
| `cat` | `rev` | `sort` |
| `sed -s 's/foo/bar/g'` | `grep 'foo'` | `tac` |
| `wc -l` | `tail -n 10` | `shuf` |
| `cut -f2` | `uniq -c`  
| `md5sum` |  
| `head -c 1024` 
| `tail -c 1024` 
| `gzip -f` 
| `head -n 10`

**Вопрос**: по какому принципу сгруппированы утилиты?

### Pipe'ы
     
* Программы можно сцеплять друг за дружкой посредством пайпов:
* `cut -f2 | sed -s 's/foo/bar/g' | md5sum`
* Если все программы являются потоковыми, то комбинация через пайпы -- тоже!
* Ядро ОС будет перекладывать небольшие кусочки данных из выхода одной программы во вход следующей, следя, чтобы в буферах пайпов не накапливалось слишком много необработанных данных
* Простейший пример парадигмы потоковой обработки aka "streaming".

### Итераторы и генераторы в Python
    
* Основные примитивы для потока данных в Python -- итераторы и генераторы.
* `range(n)` -- поток чисел до $n$
* `open("file.txt", "r")` -- поток строк из файла `file.txt`
* `(x * x for x in iterable)` -- generator expression
* Предыдущее -- частный случай генератора (функции с yield'ами).

### range
    
* **Вопрос**: что содержится в состоянии у `range(n)`?

* Несколько интов: текущее число, граница, шаг. 
* $O(1)$ state.

### open
    
* **Вопрос**: что содержится в состоянии у `open("file.txt", "r")`?

* Номер файлового дескриптора (идентификатор открытого файла в ядре ОС)
* Буфер чтения (порядка 64 KiB, чтобы не делать random read на каждую новую строку)

### Материализация
    
* Рассмотрим функцию, которая используется следующим образом:

In [10]:
def filter_even(seq):
    result = []
    for item in seq:
        if item % 2 == 0:
            result.append(item)
    return result

def print_even(seq):
    for even_item in filter_even(seq):
        print(even_item)

**Вопрос**: какое у неё потребление памяти?

* Линейное, она материализует ответ в список.

### Генератор
    
* Рассмотрим альтернативную реализацию:

In [11]:
def xfilter_even(seq):
    for item in seq:
        if item % 2 == 0:
            yield item

**Вопрос**: какое у неё потребление памяти?

* $O(1)$

### Устройство генератора
    
* Можно думать про генератор как про функцию, "замороженную" в процессе исполнения.

In [12]:
def xprint_even(seq):
    gen = xfilter_even(seq)
    for i in gen:
        print(i)

* Хронология одной итерации:
    * `for` зовёт `next(gen)`
    * функция генератора "просыпается"
    * функция дорабатывает до следующего `yield smth`
    * функция "засыпает"
    * `smth` возвращается из `next(gen)`

### Состояние генератора
    
* Резюмируем, из чего состоит объект генератора:
    * служебная структура, указывающая на байт-код функции-генератора в интерпретаторе
    * позиция в коде, на которой "спит" функция
    * состояния локальных переменных в момент "засыпания" функции

* В примере выше -- $O(1)$ state!
* Эквивалентная форма записи: 
`(item for item in seq if item % 2 == 0)`

### Эффективность
    
Сравним две реализации.
* `xfilter_even`:
    * $O(1)$ памяти - отлично для интерпретатора;
    * context switch на каждом `next` - может быть bottleneck'ом.

* `filter_even`:
    * линейные затраты по памяти;
    * тривиальная имплементация и ни одного `context switch`;
    * на больших входах может замедлять интерпретатор питона или даже приводить к OOM.
    
**Вопрос**: что же выбрать?
Подумайте, как взять лучшее от обоих подходов.

## Модель данных "стриминг" в Python

### Табличный поток данных
    
* Рассмотрим модель данных, схожую с использовавшейся в MR
* ${\sf stream := iterator\, of\, rows}$.
* ${\sf row}$ -- как и раньше, словарь.
* Рассматриваемые нами операции будут теперь иметь сигнатуру ${\sf operation}: {\sf stream} \rightarrow {\sf stream}$.
* Критерий "хорошести" операции: она работает потоковым образом, то есть обладает небольшим (ограниченным) state'ом.
* Потоковая операция может "бесконечно" сидеть на входном потоке, порождая выходной, и не OOMиться.
* Поток данных можно мыслить как "бесконечную вниз" таблицу.

### Streaming Map
    
* ${\sf Map}_M: {\sf stream} \rightarrow {\sf stream}$.
* Устроен очень просто.
    * вытаскиваем из потока строку;
    * применяем к ней $M$;
    * кладём результат в выходной поток.

### Streaming Reduce
    
* ${\sf Reduce}_{R, cols}: {\sf stream} \rightarrow {\sf stream}$.
* Как и в обычном MR, тут всё немного сложнее.
* Потребуем выполнения двух предусловий:
    * входной поток отсортирован;
    * размер каждой группы редьюса небольшой.
* Тогда понятно, как делать потоковый ${\sf Reduce}$.
    * накапливаем группу, пока ключ совпадает с предыдущим;
    * когда ключ переключился, вызываем $R$ от накопленной группы
    * кладём результат в выходной поток.

### Join
    
* Обсудим небольшое (и очень удобное) расширение нашей модели операций.
* В этом месте Макс устал техать таблички, я тоже не хочу их техать, но их за меня нарисует Python

In [13]:
_EX3_users = [
    {"user_id": 15, "user_name": "Alex"},
    {"user_id": 17, "user_name": "Marci"},
    {"user_id": 42, "user_name": "Max"}
]

_EX3_orders = [
    {"user_id": 15, "product_name": "bread"},
    {"user_id": 17, "product_name": "orchid"},
    {"user_id": 30, "product_name": "water"},
    {"user_id": 15, "product_name": "milk"},
    {"user_id": 50, "product_name": "hammer"}
]

_DF_EX3_users = pl.DataFrame(_EX3_users)
_DF_EX3_orders = pl.DataFrame(_EX3_orders)
print(_DF_EX3_users)
print(_DF_EX3_orders)

shape: (3, 2)
┌─────────┬───────────┐
│ user_id ┆ user_name │
│ ---     ┆ ---       │
│ i64     ┆ str       │
╞═════════╪═══════════╡
│ 15      ┆ Alex      │
│ 17      ┆ Marci     │
│ 42      ┆ Max       │
└─────────┴───────────┘
shape: (5, 2)
┌─────────┬──────────────┐
│ user_id ┆ product_name │
│ ---     ┆ ---          │
│ i64     ┆ str          │
╞═════════╪══════════════╡
│ 15      ┆ bread        │
│ 17      ┆ orchid       │
│ 30      ┆ water        │
│ 15      ┆ milk         │
│ 50      ┆ hammer       │
└─────────┴──────────────┘


In [14]:
print(
    _DF_EX3_users.join(_DF_EX3_orders, on='user_id', how='left') # 'inner', left', 'outer'  - no right join in polars :( 
)

shape: (4, 3)
┌─────────┬───────────┬──────────────┐
│ user_id ┆ user_name ┆ product_name │
│ ---     ┆ ---       ┆ ---          │
│ i64     ┆ str       ┆ str          │
╞═════════╪═══════════╪══════════════╡
│ 15      ┆ Alex      ┆ bread        │
│ 15      ┆ Alex      ┆ milk         │
│ 17      ┆ Marci     ┆ orchid       │
│ 42      ┆ Max       ┆ null         │
└─────────┴───────────┴──────────────┘


### Join
    
* Подчеркнём один момент -- формально говоря, Join не является потоковой операцией согласно предыдущему определению, так как он обрабатывает два потока одновременно.
* Join'ы слишком полезные, чтобы их выкинуть, поэтому придётся исправить определение :)
* ${\sf operation}: {\sf stream}^* \rightarrow {\sf stream}$
* **Вопрос**: а можно ли сказать, что потоковая операция порождает больше одного потока?

`... 5 5 5 7 7 9 10 10 11 ...`

`... 4 5 8 8 8 9 10 10 10 12 ...`

### Tee
    
* Рассмотрим классическую операцию, которая порождает два потока -- это Tee.
* ${\sf Tee}: {\sf stream} \rightarrow {\sf stream}  \times {\sf stream}$.
* Логически устроена так: любая запись из входного потока перекладывается в оба выходных потока. 
* **Вопрос**: является ли эта операция потоковой?

* Нет. Один выходной поток может дальше обрабатываться сильно медленнее, чем другой. В таком случае Tee должен помнить материализованное "окно" между позициями двух потоков.

### Sort
    
* Как и в MR, остаётся интересный и нераскрытый **вопрос**: как может быть устроен потоковый ${\sf Sort}$?

* Честно -- никак, это невозможно :) 
* В промышленности стриминг используется на временных окнах ограниченного размера, скажем, 15 секунд.
* Соответственно, поток данных становится "локально сортированным".
* Применения ${\sf Reduce}$ и ${\sf Join}$ к этому обычно готовы, так как их смысл зачастую это "агрегация" (подсчёт суммарной статистики, разбиение на субпотоки по корзинам), которую можно делать порциями произвольного размера.

# Конец :)

## Спасибо за внимание, перекатываемся в свободное обсуждение домашки