# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple  # requires python 3.6+
from typing import Iterator

from pandas.core.common import random_state

In [2]:
def MAP(_, row: NamedTuple):
    if row.gender == 'female':
        yield row.age, row


def REDUCE(age: str, rows: Iterator[NamedTuple]):
    sum = 0
    count = 0
    for row in rows:
        sum += row.social_contacts
        count += 1
    if count > 0:
        yield age, sum / count
    else:
        yield age, 0

Модель элемента данных

In [3]:
class User(NamedTuple):
    id: int
    age: str
    social_contacts: int
    gender: str

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
    return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output)  # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. 

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element


def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()


def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*
 
mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL 

In [14]:
from typing import NamedTuple  # requires python 3.6+
from typing import Iterator


class User(NamedTuple):
    id: int
    age: str
    social_contacts: int
    gender: str


input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]


def MAP(_, row: NamedTuple):
    if row.gender == 'female':
        yield row.age, row


def REDUCE(age: str, rows: Iterator[NamedTuple]):
    sum = 0
    count = 0
    for row in rows:
        sum += row.social_contacts
        count += 1
    if count > 0:
        yield age, sum / count
    else:
        yield age, 0


def RECORDREADER():
    return [(u.id, u) for u in input_collection]


output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication 

In [15]:
from typing import Iterator
import numpy as np

arr = np.ones((5, 4))
vec = np.random.rand(4)  # in-memory vector in all map tasks


def MAP(coordinates: (int, int), value: int):
    i, j = coordinates
    yield i, value * vec[j]


def REDUCE(i: int, products: Iterator[NamedTuple]):
    sum = 0
    for p in products:
        sum += p
    yield i, sum


def RECORDREADER():
    for i in range(arr.shape[0]):
        for j in range(arr.shape[1]):
            yield (i, j), arr[i, j]


output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(2.41767400409039)),
 (1, np.float64(2.41767400409039)),
 (2, np.float64(2.41767400409039)),
 (3, np.float64(2.41767400409039)),
 (4, np.float64(2.41767400409039))]

## Inverted index 

In [16]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]


def RECORDREADER():
    for (docid, document) in enumerate(documents):
        yield "{}".format(docid), document


def MAP(docId: str, body: str):
    for word in set(body.split(' ')):
        yield word, docId


def REDUCE(word: str, docIds: Iterator[str]):
    yield word, sorted(docIds)


output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [17]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]


def RECORDREADER():
    for (docid, document) in enumerate(documents):
        for (lineid, line) in enumerate(document.split('\n')):
            yield "{}:{}".format(docid, lineid), line


def MAP(docId: str, line: str):
    for word in line.split(" "):
        yield word, 1


def REDUCE(word: str, counts: Iterator[int]):
    sum = 0
    for c in counts:
        sum += c
    yield word, sum


output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [18]:
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element


def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()


def groupbykey_distributed(map_partitions, PARTITIONER):
    global reducers
    partitions = [dict() for _ in range(reducers)]
    for map_partition in map_partitions:
        for (k2, v2) in map_partition:
            p = partitions[PARTITIONER(k2)]
            p[k2] = p.get(k2, []) + [v2]
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in
            enumerate(partitions)]


def PARTITIONER(obj):
    global reducers
    return hash(obj) % reducers


def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
    map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
    if COMBINER is not None:
        map_partitions = map(
            lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions
        )
    reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER)  # shuffle
    reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(
        map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1])
    )), reduce_partitions)

    print("{} key-value pairs were sent over a network.".format(
        sum([len(vs) for (k, vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
    return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*
 
flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount 

In [19]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2


def INPUTFORMAT():
    global maps

    def RECORDREADER(split):
        for (docid, document) in enumerate(split):
            for (lineid, line) in enumerate(document.split('\n')):
                yield "{}:{}".format(docid, lineid), line

    split_size = int(np.ceil(len(documents) / maps))
    for i in range(0, len(documents), split_size):
        yield RECORDREADER(documents[i:i + split_size])


def MAP(docId: str, line: str):
    for word in line.split(" "):
        yield word, 1


def REDUCE(word: str, counts: Iterator[int]):
    sum = 0
    for c in counts:
        sum += c
    yield word, sum


# try to set COMBINER=REDUCER and look at the number of values sent over the network 
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('it', 18)]),
 (1, [('is', 18), ('what', 10)])]

## TeraSort

In [20]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0


def INPUTFORMAT():
    global maps

    def RECORDREADER(split):
        for value in split:
            yield value, None

    split_size = int(np.ceil(len(input_values) / maps))
    for i in range(0, len(input_values), split_size):
        yield RECORDREADER(input_values[i:i + split_size])


def MAP(value: int, _):
    yield value, None


def PARTITIONER(key):
    global reducers
    global max_value
    global min_value
    bucket_size = (max_value - min_value) / reducers
    bucket_id = 0
    while (key > (bucket_id + 1) * bucket_size) and ((bucket_id + 1) * bucket_size < max_value):
        bucket_id += 1
    return bucket_id


def REDUCE(value: int, _):
    yield None, value


partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.04085989470788898)),
   (None, np.float64(0.061300549793170456)),
   (None, np.float64(0.08456556383735792)),
   (None, np.float64(0.09461575367097019)),
   (None, np.float64(0.16419925522891865)),
   (None, np.float64(0.1935274101918124)),
   (None, np.float64(0.19893910739772314)),
   (None, np.float64(0.22485499616355165)),
   (None, np.float64(0.2564642973646254)),
   (None, np.float64(0.30727003383590856)),
   (None, np.float64(0.3584183220294139)),
   (None, np.float64(0.3661575740584725)),
   (None, np.float64(0.37437078838249116)),
   (None, np.float64(0.40236534554748893))]),
 (1,
  [(None, np.float64(0.5030258410518511)),
   (None, np.float64(0.5331002853802773)),
   (None, np.float64(0.5489509051254893)),
   (None, np.float64(0.5723215671966612)),
   (None, np.float64(0.619846767773708)),
   (None, np.float64(0.6369531039552078)),
   (None, np.float64(0.6643016551832278)),
   (None, np.float64(0.6858738926082301)),
   (None, np.float64(0.761429579

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [21]:
arr = np.random.randint(0, 10, 10)
print("Vector", arr)

def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

def MAP(_, value: int):
    yield 0, value


def REDUCE(_, arr):
    maximum = -np.inf
    for element in arr:
        maximum = max(maximum, element)

    yield 0, maximum


def RECORDREADER():
    for i in range(arr.shape[0]):
        yield i, arr[i]


output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

Vector [8 5 8 4 9 4 1 3 0 1]


[(0, np.int32(9))]

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [22]:
def MAP(_, value: int):
    yield 0, value

def REDUCE(_, arr):
    summa = 0
    for element in arr:
        summa += element

    yield 0, summa / len(arr)

def RECORDREADER():
    for i in range(arr.shape[0]):
        yield i, arr[i]


output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(4.3))]

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [23]:
def groupbykey_sorted(iterable):
    """GroupByKey реализация на основе сортировки"""
    # Сортируем по ключу
    sorted_iterable = sorted(iterable, key=lambda x: x[0])

    # Группируем последовательные элементы с одинаковым ключом
    result = {}
    for k, v in sorted_iterable:
        if k not in result:
            result[k] = []
        result[k].append(v)

    return result.items()


# Проверка работы
test_data = [(2, 'a'), (1, 'b'), (2, 'c'), (1, 'd'), (3, 'e')]
grouped = list(groupbykey_sorted(test_data))
print(f"GroupByKey sorted result: {grouped}")
# Ожидаемый результат: [(1, ['b', 'd']), (2, ['a', 'c']), (3, ['e'])]

GroupByKey sorted result: [(1, ['b', 'd']), (2, ['a', 'c']), (3, ['e'])]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [24]:
def RECORDREADER():
    """Генерирует данные с дубликатами"""
    data = [1, 2, 2, 3, 1, 4, 3, 5, 2]
    for i, val in enumerate(data):
        yield i, val


def MAP(key, value):
    """MAP для удаления дубликатов"""
    # Ключ - само значение, значение - None (или любой маркер)
    yield value, None


def REDUCE(key, values):
    """REDUCE для удаления дубликатов"""
    # Возвращаем ключ только один раз независимо от количества значений
    yield key, None


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Unique values: {[k for k, v in output]}")
# Ожидаемый результат: [1, 2, 3, 4, 5]

Unique values: [1, 2, 3, 4, 5]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [25]:
def RECORDREADER():
    """Пример данных для выборки"""
    users = [
        User(id=0, age=25, gender='female', social_contacts=240),
        User(id=1, age=55, gender='male', social_contacts=20),
        User(id=2, age=30, gender='female', social_contacts=500),
    ]
    for i, u in enumerate(users):
        yield i, u


def MAP(key, record):
    """Выборка: фильтр по предикату (например, gender == 'female')"""
    if record.gender == 'female':
        yield record, record  # (ключ, значение) = (кортеж, кортеж)


def REDUCE(key, values):
    """Identity функция - возвращает то же значение"""
    yield key, key


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Selection (female users): {[u.id for u, _ in output]}")
# Ожидаемый результат: [0, 2]

Selection (female users): [0, 2]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [26]:
def MAP(key, record):
    """Проекция на атрибуты (age, gender)"""
    # Создаём кортеж только с нужными атрибутами
    projected = (record.age, record.gender)
    yield projected, projected


def REDUCE(key, values):
    """Возвращаем одну пару для каждого уникального ключа"""
    yield key, key


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Projection (age, gender): {output}")

Projection (age, gender): [((25, 'female'), (25, 'female')), ((55, 'male'), (55, 'male')), ((30, 'female'), (30, 'female'))]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [27]:
def RECORDREADER():
    """Два набора данных для объединения"""
    set1 = [1, 2, 3]
    set2 = [3, 4, 5]
    for i, v in enumerate(set1 + set2):
        yield i, v


def MAP(key, value):
    """Превращаем каждый кортеж в пару (t, t)"""
    yield value, value


def REDUCE(key, values):
    """Возвращаем (t, t) независимо от количества значений (1 или 2)"""
    yield key, key


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Union result: {[k for k, v in output]}")
# Ожидаемый результат: [1, 2, 3, 4, 5]

Union result: [1, 2, 3, 4, 5]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [28]:
def RECORDREADER():
    """Два набора данных с метками источника"""
    set1 = [1, 2, 3, 4]
    set2 = [3, 4, 5, 6]
    for v in set1:
        yield v, ('R', v)  # Метка источника R
    for v in set2:
        yield v, ('S', v)  # Метка источника S


def MAP(key, value):
    """Превращаем каждый кортеж в пары (t, источник)"""
    yield key, value[0]  # (значение, метка_источника)


def REDUCE(key, sources):
    """Если есть оба источника [R, S] - создаём пару (t, t)"""
    sources_list = list(sources)
    if 'R' in sources_list and 'S' in sources_list:
        yield key, key


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Intersection result: {[k for k, v in output]}")
# Ожидаемый результат: [3, 4]

Intersection result: [3, 4]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [29]:
def RECORDREADER():
    """Два набора данных для разницы"""
    setR = [1, 2, 3, 4]
    setS = [3, 4, 5, 6]
    for v in setR:
        yield v, 'R'
    for v in setS:
        yield v, 'S'


def MAP(key, value):
    """Создаём пару (t, источник)"""
    yield key, value


def REDUCE(key, sources):
    """Если только [R] - создаём пару (t, t), иначе ничего"""
    sources_list = list(sources)
    if sources_list == ['R']:
        yield key, key


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Difference R-S result: {[k for k, v in output]}")
# Ожидаемый результат: [1, 2]

Difference R-S result: [1, 2]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [30]:
def RECORDREADER():
    """Два отношения для natural join: R(a,b) и S(b,c)"""
    # R(a, b)
    R = [(1, 10), (2, 20), (3, 30)]
    # S(b, c)
    S = [(10, 100), (20, 200), (40, 400)]

    for a, b in R:
        yield (a, b), ('R', a)
    for b, c in S:
        yield (b, c), ('S', c)


def MAP(key, value):
    """Ключ - общий атрибут b, значение - (источник, остальное)"""
    if value[0] == 'R':
        b = key[1]  # общий атрибут из R
        yield b, ('R', key[0])  # (b, (R, a))
    else:
        b = key[0]  # общий атрибут из S
        yield b, ('S', key[1])  # (b, (S, c))


def REDUCE(key, values):
    """Создаём все пары (R, a) и (S, c) для каждого ключа b"""
    r_values = []
    s_values = []
    for v in values:
        if v[0] == 'R':
            r_values.append(v[1])
        else:
            s_values.append(v[1])

    # Декартово произведение R и S для этого ключа
    for a in r_values:
        for c in s_values:
            yield None, (a, key, c)  # (a, b, c)


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Natural Join result: {output}")
# Ожидаемый результат: [(None, (1, 10, 100)), (None, (2, 20, 200))]

Natural Join result: [(None, (1, 10, 100)), (None, (2, 20, 200))]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [31]:
def RECORDREADER():
    """Данные для группировки: (a, b, c)"""
    data = [
        (1, 10, 100),
        (1, 20, 200),
        (2, 30, 300),
        (1, 40, 400),
        (2, 50, 500),
    ]
    for i, (a, b, c) in enumerate(data):
        yield i, (a, b, c)


def MAP(key, record):
    """Группировка по атрибуту a, значение - b"""
    a, b, c = record
    yield a, b  # (ключ группы, значение для агрегации)


def REDUCE(key, values):
    """Агрегация SUM по группе"""
    total = sum(values)
    yield key, total


# Проверка
output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(f"Grouping and Aggregation (SUM by a): {output}")
# Ожидаемый результат: [(1, 70), (2, 80)]

Grouping and Aggregation (SUM by a): [(1, 70), (2, 80)]


# 

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [32]:
I = 2
J = 3
K = 4 * 10

def RECORDREADER():
    """Генерирует элементы матрицы и вектора"""
    # Элементы матрицы M[i,j]
    for i in range(I):
        for j in range(J):
            yield ('M', i, j), small_mat[i, j]

    # Элементы вектора V[j]
    for j in range(J):
        yield ('V', j), vec_large[j]


def MAP(key, value):
    """Распределяем элементы для умножения"""
    if key[0] == 'M':
        _, i, j = key
        # Отправляем элемент матрицы с ключом j (для соединения с вектором)
        yield j, ('M', i, value)
    else:
        _, j = key
        # Отправляем элемент вектора с ключом j
        yield j, ('V', value)


def REDUCE(key, values):
    """Выполняем умножение и суммирование"""
    m_values = []
    v_value = None

    for v in values:
        if v[0] == 'M':
            m_values.append((v[1], v[2]))  # (i, m_ij)
        else:
            v_value = v[1]  # v_j

    if v_value is not None:
        for i, m_ij in m_values:
            yield (i, key), m_ij * v_value  # (i, j), product


# Для финальной агрегации по i
def REDUCE_final(key, values):
    """Суммируем произведения для каждой строки i"""
    total = sum(values)
    yield key, total


# Проверка
small_mat = np.random.rand(I, J)  # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J, K)
vec_large = np.random.rand(J)
output = list(MapReduce(RECORDREADER, MAP, REDUCE))

def RECORDREADER_intermediate():
    for item in output:
        yield item[0], item[1]

def MAP_identity(key, value):
    yield key, value

def REDUCE_final(key, values):
    yield key, sum(values)

final_output = list(MapReduce(RECORDREADER_intermediate, MAP_identity, REDUCE_final))
print(f"Matrix-Vector result: {sorted(final_output)}")

Matrix-Vector result: [((0, 0), np.float64(0.025644470492028473)), ((0, 1), np.float64(0.009214066508754355)), ((0, 2), np.float64(0.7975867538706364)), ((1, 0), np.float64(0.10606463602107585)), ((1, 1), np.float64(0.021946718286689135)), ((1, 2), np.float64(0.01041978677605562))]


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$. 





In [33]:
# MapReduce model
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element


def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()


def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [34]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)  # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J, K)


def MAP(key, value):
    (j, k) = key
    w = value
    for i in range(I):
        m_ij = small_mat[i, j]
        yield (i, k), m_ij * w

def REDUCE(key, values):
    yield key, sum(values)

def RECORDREADER():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield (j, k), big_mat[j, k]

Проверьте своё решение

In [35]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)


def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), vw) in reduce_output) + 1
    K = max(k for ((i, k), vw) in reduce_output) + 1
    mat = np.empty(shape=(I, K))
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat


np.allclose(reference_solution, asmatrix(solution))  # should return true

True

In [36]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i, k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [37]:
def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I_out = max(i for ((i, k), vw) in reduce_output) + 1
    K_out = max(k for ((i, k), vw) in reduce_output) + 1
    mat = np.empty(shape=(I_out, K_out))
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat

def RECORDREADER():
    """Генерирует элементы обеих матриц"""
    # Матрица M(I, J)
    for i in range(I):
        for j in range(J):
            yield ('M', i, j), small_mat[i, j]

    # Матрица N(J, K)
    for j in range(J):
        for k in range(K):
            yield ('N', j, k), big_mat[j, k]


def MAP(key, value):
    """MAP для умножения когда обе матрицы из RECORDREADER"""
    if key[0] == 'M':
        _, i, j = key
        # Ключ для join: j, значение: (M, i, value)
        yield j, ('M', i, value)
    else:
        _, j, k = key
        # Ключ для join: j, значение: (N, k, value)
        yield j, ('N', k, value)


def REDUCE(key, values):
    """Join по j, затем умножение и группировка по (i, k)"""
    m_values = []  # (i, m_ij)
    n_values = []  # (k, n_jk)

    for v in values:
        if v[0] == 'M':
            m_values.append((v[1], v[2]))
        else:
            n_values.append((v[1], v[2]))

    # Декартово произведение для этого j
    for i, m_ij in m_values:
        for k, n_jk in n_values:
            yield (i, k), m_ij * n_jk


# Для финальной агрегации
def REDUCE_final(key, values):
    """Суммируем все произведения для позиции (i, k)"""
    yield key, sum(values)


# Двухэтапный MapReduce
intermediate = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вторая стадия для суммирования
def RECORDREADER_intermediate():
    for item in intermediate:
        yield item[0], item[1]

def MAP_identity(key, value):
    yield key, value

final_solution = MapReduce(RECORDREADER_intermediate, MAP_identity, REDUCE_final)
print(f"Two-stage matrix multiplication check: {np.allclose(reference_solution, asmatrix(final_solution))}")

Two-stage matrix multiplication check: True


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER. 

In [38]:
def COMBINER(key, values):
    for v in values:
        yield key, v

def RECORDREADER():
    """Генерирует элементы обеих матриц"""
    # Матрица M(I, J)
    for i in range(I):
        for j in range(J):
            yield ('M', i, j), small_mat[i, j]

    # Матрица N(J, K)
    for j in range(J):
        for k in range(K):
            yield ('N', j, k), big_mat[j, k]


def MAP(key, value):
    """MAP для умножения когда обе матрицы из RECORDREADER"""
    if key[0] == 'M':
        _, i, j = key
        # Ключ для join: j, значение: (M, i, value)
        yield j, ('M', i, value)
    else:
        _, j, k = key
        # Ключ для join: j, значение: (N, k, value)
        yield j, ('N', k, value)


def REDUCE(key, values):
    """Join по j, затем умножение и группировка по (i, k)"""
    m_values = []  # (i, m_ij)
    n_values = []  # (k, n_jk)

    for v in values:
        if v[0] == 'M':
            m_values.append((v[1], v[2]))
        else:
            n_values.append((v[1], v[2]))

    # Декартово произведение для этого j
    for i, m_ij in m_values:
        for k, n_jk in n_values:
            yield (i, k), m_ij * n_jk

def PARTITIONER(key):
    global reducers
    if isinstance(key, tuple):
        i, k = key
        return (i + k) % reducers
    else:
        return hash(key) % reducers

# Для финальной агрегации
def REDUCE_final(key, values):
    """Суммируем все произведения для позиции (i, k)"""
    yield key, sum(values)


# Двухэтапный MapReduce
intermediate = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вторая стадия для суммирования
def RECORDREADER_intermediate():
    for item in intermediate:
        yield item[0], item[1]

def MAP_identity(key, value):
    yield key, value

final_solution = MapReduce(RECORDREADER_intermediate, MAP_identity, REDUCE_final)
print(f"Two-stage matrix multiplication check: {np.allclose(reference_solution, asmatrix(final_solution))}")

Two-stage matrix multiplication check: True


Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [39]:
# Для распределённой версии используем MapReduceDistributed
# с соответствующими INPUTFORMAT, PARTITIONER и COMBINER

def INPUTFORMAT():
    """INPUTFORMAT для распределённого умножения матриц"""
    global maps

    def RECORDREADER_M(split):
        for i, j in split:
            yield ('M', i, j), small_mat[i, j]

    def RECORDREADER_N(split):
        for j, k in split:
            yield ('N', j, k), big_mat[j, k]

    # Разделяем данные для map задач
    m_indices = [(i, j) for i in range(I) for j in range(J)]
    n_indices = [(j, k) for j in range(J) for k in range(K)]

    split_size_m = int(np.ceil(len(m_indices) / maps))
    for i in range(0, len(m_indices), split_size_m):
        yield RECORDREADER_M(m_indices[i:i + split_size_m])

    split_size_n = int(np.ceil(len(n_indices) / maps))
    for i in range(0, len(n_indices), split_size_n):
        yield RECORDREADER_N(n_indices[i:i + split_size_n])


def PARTITIONER(key):
    global reducers
    if isinstance(key, tuple):
        i, k = key
        return (i + k) % reducers
    else:
        return hash(key) % reducers

def COMBINER(key, values):
    for v in values:
        yield key, v

def MAP(key, value):
    if key[0] == 'M':
        _, i, j = key
        yield j, ('M', i, value)
    else:
        _, j, k = key
        yield j, ('N', k, value)


# Запуск распределённого MapReduce
partitioned_output = MapReduceDistributed(
    INPUTFORMAT,
    MAP,
    REDUCE,
    PARTITIONER=PARTITIONER,
    COMBINER=None
)

partitioned_output = [(pid, list(part)) for (pid, part) in partitioned_output]
print(f"Distributed matrix multiplication partitions: {len(partitioned_output)}")

126 key-value pairs were sent over a network.
Distributed matrix multiplication partitions: 2
