# Введение в MapReduce модель на Python


In [127]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [128]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [129]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [130]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [131]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [132]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [133]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [134]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [135]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [136]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [137]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [138]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [139]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [140]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [141]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.2390201069521387),
 (1, 2.2390201069521387),
 (2, 2.2390201069521387),
 (3, 2.2390201069521387),
 (4, 2.2390201069521387)]

## Inverted index

In [142]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('is', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('it', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [143]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [144]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [145]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('it', 18)]),
 (1, [('what', 10)])]

## TeraSort

In [146]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.0277167149038966),
   (None, 0.05164138631726711),
   (None, 0.07806733414444411),
   (None, 0.0868746852394855),
   (None, 0.10963790865421541),
   (None, 0.12509444987337293),
   (None, 0.1252461961690794),
   (None, 0.14277830723162677),
   (None, 0.1524135440537695),
   (None, 0.2564489432383714),
   (None, 0.2777872033545221),
   (None, 0.3399793769937639),
   (None, 0.3968427932893184),
   (None, 0.4013940164824189),
   (None, 0.40220143866892266),
   (None, 0.48898052986751794)]),
 (1,
  [(None, 0.5048234727507128),
   (None, 0.5362745606370134),
   (None, 0.5434483727632927),
   (None, 0.5906366310773838),
   (None, 0.6095875898335479),
   (None, 0.6350565846949412),
   (None, 0.6491140504509058),
   (None, 0.6579013867763354),
   (None, 0.736563128518598),
   (None, 0.8452959071528013),
   (None, 0.8522152908071275),
   (None, 0.8623077639550152),
   (None, 0.8914039836949257),
   (None, 0.9746593815657957)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [147]:
import random
import numpy as np

def MAP(num_list):
    return max(num_list)

def REDUCE(num_list):
    return max(num_list)

def RECORDREADER(count):
    return [random.randint(0, 100) for i in range(count)]


record = RECORDREADER(100)
print("Сгенерированный список чисел:",record)
parts = 5

record_partitional = np.array_split(record, len(record) // parts if len(record) % parts == 0 else len(record) // parts + 1)
record_partitional = [list(arr) for arr in record_partitional]
print("максимальное число:",max(map(max, record_partitional)))


Сгенерированный список чисел: [89, 64, 34, 88, 86, 48, 26, 97, 87, 31, 66, 77, 35, 48, 7, 63, 87, 34, 10, 15, 39, 42, 82, 32, 10, 67, 30, 100, 5, 15, 81, 63, 57, 7, 69, 9, 96, 63, 92, 86, 37, 73, 35, 22, 43, 37, 68, 70, 49, 78, 54, 37, 3, 96, 4, 57, 59, 92, 100, 58, 42, 45, 43, 63, 61, 30, 77, 2, 6, 32, 6, 25, 79, 70, 17, 87, 94, 37, 100, 17, 96, 15, 15, 93, 24, 3, 88, 67, 50, 88, 39, 6, 35, 44, 44, 94, 85, 35, 57, 57]
максимальное число: 100


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [148]:
from typing import Iterator, NamedTuple
import random

def RECORDREADER(count):
    return [random.randint(0, 100) for _ in range(count)]

def MAP(num):
    return (1, num)

def REDUCE(_, numbers: Iterator[NamedTuple]):
    total_sum = 0
    item_count = 0
    for number in numbers:
        total_sum += number
        item_count += 1
    if item_count > 0:
        yield ('AVG', total_sum / item_count)
    else:
        yield ('AVG', 0)

def flatten(list_of_lists):
    for sublist in list_of_lists:
        yield from sublist

record = RECORDREADER(100)

map_output = [MAP(x) for x in record]
reduced_results = [REDUCE(1, (item[1] for item in map_output))]
output = list(flatten(reduced_results))
print(output)


[('AVG', 52.35)]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [149]:
from typing import Iterator, NamedTuple
import random
from itertools import groupby
from operator import itemgetter

def group_by_key(iterable):
    return [(key, [item[1] for item in group]) for key, group in groupby(sorted(iterable, key=itemgetter(0)), key=itemgetter(0))]

def MAP(num):
    return (1, num)

def REDUCE(_, numbers: Iterator[NamedTuple]):
    total_sum = 0
    item_count = 0
    for number in numbers:
      total_sum+=number
      item_count+=1
    if item_count > 0:
      yield('AVG', total_sum/item_count)
    else:
      yield('AVG', 0)

def RECORDREADER(count):
    return [random.randint(0, 100) for _ in range(count)]

def flatten(list_of_lists):
    for sublist in list_of_lists:
        yield from sublist

map_output = list(map(lambda x: MAP(x), RECORDREADER(100)))
shuffle_output = group_by_key(map_output)
print(shuffle_output)

output = list(flatten(map(lambda x: REDUCE(*x), shuffle_output)))
print(output)


[(1, [19, 8, 7, 46, 12, 20, 81, 92, 85, 68, 88, 16, 33, 45, 15, 91, 97, 95, 4, 25, 11, 72, 7, 84, 7, 13, 37, 74, 90, 92, 96, 13, 66, 92, 59, 94, 80, 43, 98, 75, 51, 61, 38, 58, 10, 4, 21, 73, 67, 98, 52, 65, 79, 8, 17, 9, 15, 57, 84, 24, 38, 37, 23, 100, 32, 69, 57, 35, 28, 21, 27, 77, 32, 56, 33, 22, 33, 22, 48, 93, 21, 5, 96, 56, 24, 23, 53, 63, 58, 42, 23, 40, 64, 79, 19, 15, 28, 56, 4, 15])]
[('AVG', 47.08)]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [150]:
from itertools import chain

reducer_count = 2

def input_data_source():
    global mapper_count
    data_points = ["dog", "cat", "dog", "bird", "cat", "fish", "fish", "dog"]

    def data_reader(chunk):
        for item in chunk:
            yield (item, None)

    chunk_size = max(1, len(data_points) // mapper_count)
    for index in range(0, len(data_points), chunk_size):
        yield data_reader(data_points[index:index+chunk_size])

def map_task(key, dummy_value):
    yield (key, None)

def partition_task(key):
    global reducer_count
    return hash(key) % reducer_count

def reduce_task(key, dummy_value):
    yield (key, None)


mapper_count = 2
distinct_items = MapReduceDistributed(input_data_source, map_task, reduce_task, partition_task)

distinct_items = [key for (_, partition) in distinct_items for (key, _) in partition]
print("Unique values:", distinct_items)


8 key-value pairs were sent over a network.
Unique values: ['bird', 'cat', 'dog', 'fish']


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [151]:
def RECORDREADER():
    dataset = [(1, 24), (2, 30), (3, 18), (4, 27), (5, 22)]
    for t in dataset:
        yield (t, None)

def MAP(t, _):
    if (sum(t) % 2) == 0:
        yield (t, t)

def REDUCE(_, values):
    for value in values:
        yield value

result = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("Выбранные кортежи по условию деления суммы элементов кортежа на 2 без остатка:", result)


Выбранные кортежи по условию деления суммы элементов кортежа на 2 без остатка: [(2, 30)]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [152]:
S = {1, 5, 10}

def MAP(t, _):
    """Фильтруем только нужные атрибуты."""
    filtered_result = dict(filter(lambda item: item[0] in S, t.items()))
    key = tuple(sorted(filtered_result.items()))
    yield (key, key)


def REDUCE(_, values):
    """Удаляем повторяющиеся значения."""
    yield values[0]

def RECORDREADER():
    """Генератор данных."""
    yield ({1: "cat", 2: "dog", 5: "rabbit"}, None)
    yield ({1: "lion", 3: "tiger", 10: "elephant"}, None)
    yield ({5: "bear", 7: "fox", 10: "giraffe"}, None)


result = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("Проекция по атрибутам:", result)


Проекция по атрибутам: [((1, 'cat'), (5, 'rabbit')), ((1, 'lion'), (10, 'elephant')), ((5, 'bear'), (10, 'giraffe'))]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [153]:

def MAP(t, _):
    """Каждый входной элемент превращается в пару (t, t)."""
    yield (t, t)

def REDUCE(t, values):
    """Удаляет дубликаты и возвращает (t, t)."""
    yield (t, t)
def RECORDREADER():
    """Объединяем два множества."""
    set1 = [(1, None), (2, None), (3, None)]
    set2 = [(2, None), (4, None), (5, None)]
    return list(chain(set1, set2))


result = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("Объединенное множество:", result)


Объединенное множество: [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [154]:
def MAP(t, _):
    """Каждый входной элемент превращается в пару (t, t)."""
    yield (t, t)

def REDUCE(t, values):
    """Если для ключа t есть список из двух элементов [t, t], создаем пару (t, t)."""
    values_list = list(values)
    if len(values_list) == 2 and values_list[0] == t and values_list[1] == t:
        yield (t, t)

def RECORDREADER():
    """Пересечение двух множеств: {1, 2, 3, 4} и {2, 3, 5, 6}."""
    set1 = [1, 2, 3, 4]
    set2 = [2, 3, 5, 6]
    return [(t, None) for t in set1] + [(t, None) for t in set2]  # Объединяем элементы

result = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("Пересечение множеств:", result)


Пересечение множеств: [(2, 2), (3, 3)]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [155]:
def MAP(t, source):
    """Помечаем каждый элемент его источником (1 для R и 0 для S)."""
    yield (t, source)

def REDUCE(t, values):
    """Добавляем элемент в результат, если он есть только в R (т.е. значение 1)."""
    if values == ["R"]:
        yield (t, t)

def RECORDREADER():
    """Задаем два множества: R и S."""
    R = [(1, "R"), (2, "R"), (3, "R"), (4, "R")]
    S = [(2, "S"), (3, "S"), (5, "S"), (6, "S")]
    return R + S


result = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("Разность R - S:", result)


Разность R - S: [(1, 1), (4, 4)]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [156]:
def MAP(record, source):
    """Формирует пары (b, (source, value)) для дальнейшего соединения."""
    if source == "R":
        a, b = record
        yield (b, ("R", a))
    elif source == "S":
        b, c = record
        yield (b, ("S", c))

def REDUCE(b, values):
    """Создает пары (a, b, c) для каждого подходящего соединения."""
    r_values = [a for src, a in values if src == "R"]
    s_values = [c for src, c in values if src == "S"]

    for a in r_values:
        for c in s_values:
            yield (a, b, c)

def RECORDREADER():
    """Данные для соединения: R(a, b) и S(b, c)."""
    R = [("Bear", 1), ("Lion", 2), ("Tiger", 3)]
    S = [(1, "Northern"), (2, "African"), (3, "Indian")]
    return [(r, "R") for r in R] + [(s, "S") for s in S]

result = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("Natural Join:", result)


Natural Join: [('Bear', 1, 'Northern'), ('Lion', 2, 'African'), ('Tiger', 3, 'Indian')]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [157]:

joined_data = [
    ("Leo", 1, "Safari Park"),
    ("Ella", 1, "Safari Park"),
    ("Zara", 2, "Ocean World"),
    ("Tina", 2, "Ocean World"),
    ("Max", 3, "Mountain Zoo"),
]


def MAP_GROUP(zoo_id, animal, zoo):
    yield (zoo_id, animal)

def REDUCE_GROUP(zoo_id, animals):
    yield f"Zoo with id={zoo_id} has {len(animals)} animal(s)"

def RECORDREADER():
    return [(zoo_id, animal, zoo) for animal, zoo_id, zoo in joined_data]


def MapReduce(reader, mapper, reducer):
    intermediate = {}

    for zoo_id, animal, zoo in reader():
        for key, value in mapper(zoo_id, animal, zoo):
            if key not in intermediate:
                intermediate[key] = []
            intermediate[key].append(value)


    for zoo_id, animals in intermediate.items():
        yield from reducer(zoo_id, animals)

output = MapReduce(RECORDREADER, MAP_GROUP, REDUCE_GROUP)
output = list(output)


for line in output:
    print(line)


Zoo with id=1 has 2 animal(s)
Zoo with id=2 has 2 animal(s)
Zoo with id=3 has 1 animal(s)


#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [158]:
from typing import List, Tuple, Dict
from collections import defaultdict

NUM_REDUCERS = 2
CHUNK_SIZE = 2

class MatrixRow:
    def __init__(self, row, col, value):
        self.row = row
        self.col = col
        self.value = value

class VectorElement:
    def __init__(self, index, value):
        self.index = index
        self.value = value

def map_function(data, data_type):
    if data_type == "matrix":
        return (data.col % NUM_REDUCERS, ("M", data))
    elif data_type == "vector":
        return (data.index % NUM_REDUCERS, ("V", data))

def reduce_function(reducer_id, data):
    matrix_parts = []
    vector_parts = []
    for tag, item in data:
        if tag == "M":
            matrix_parts.append(item)
        else:
            vector_parts.append(item)
    results = []
    for row in matrix_parts:
        for el in vector_parts:
            if row.col == el.index:
                results.append((row.row, row.value * el.value))
    return results

def map_reduce_algorithm(matrix, vector):
    mapped_matrix = [map_function(row, "matrix") for row in matrix]
    mapped_vector = [map_function(el, "vector") for el in vector]

    grouped_data = defaultdict(list)
    for reducer_id, data_pair in mapped_matrix + mapped_vector:
        grouped_data[reducer_id].append(data_pair)

    partial_results = []
    for reducer_id, data in grouped_data.items():
        partial_results.extend(reduce_function(reducer_id, data))

    final_result = defaultdict(float)
    for row, value in partial_results:
        final_result[row] += value
    return dict(final_result)


matrix = [MatrixRow(0, 0, 1.0), MatrixRow(0, 1, 2.0), MatrixRow(1, 0, 3.0), MatrixRow(1, 1, 4.0)]
vector = [VectorElement(0, 0.5), VectorElement(1, 0.7)]

result = map_reduce_algorithm(matrix, vector)
print("Matrix-Vector Multiplication Result:", result)


Matrix-Vector Multiplication Result: {0: 1.9, 1: 4.3}


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [159]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [160]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def RECORDREADER():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])

def MAP(k1, v1):
    (j, k) = k1
    w = v1
    for i in range(small_mat.shape[0]):
        yield ((i, k), (small_mat[i, j] * w))

def REDUCE(key, values):
    (i, k) = key
    total = sum(value for value in values)
    yield (key, total)

mapped_data = defaultdict(list)
for key, value in RECORDREADER():
    for intermediate_key, intermediate_value in MAP(key, value):
        mapped_data[intermediate_key].append(intermediate_value)

reduced_data = {}
for key, values in mapped_data.items():
    for reduced_key, reduced_value in REDUCE(key, values):
        reduced_data[reduced_key] = reduced_value

result_matrix = np.zeros((I, K))
for (i, k), value in reduced_data.items():
    result_matrix[i, k] = value

print("Result Matrix:\n", result_matrix)

numpy_result = np.dot(small_mat, big_mat)
print("\nVerification (numpy's result):\n", numpy_result)

tolerance = 1e-9
print("\nAre results equal ?", np.allclose(result_matrix, numpy_result, atol=tolerance))

from collections import defaultdict


Result Matrix:
 [[0.2387498  0.73768695 0.34144946 0.63410413 0.73626738 0.47973512
  0.35706934 0.20778721 0.23262856 0.63345307 0.70202327 0.64395208
  0.04869614 0.70063181 0.5394256  0.6641238  0.61893322 0.3885193
  0.49084229 0.03331122 0.50482203 0.15128972 0.1088326  0.20921799
  0.39338204 0.34787741 0.2157395  0.59773472 0.06376904 0.24614382
  0.61006832 0.41197023 0.1857571  0.21343573 0.6857339  0.2313315
  0.1952692  0.61579394 0.46982202 0.35978113]
 [0.13100452 0.2981312  0.2896083  0.35655639 0.33033127 0.40331069
  0.16558728 0.12493558 0.1887602  0.21038057 0.34620787 0.40207098
  0.0740998  0.46786309 0.20664786 0.21178192 0.46318601 0.18682726
  0.33290506 0.06507121 0.28724003 0.11586466 0.29166146 0.11232671
  0.18050214 0.34350507 0.10332023 0.34825474 0.08070948 0.2794385
  0.28623544 0.25124805 0.21893814 0.20015266 0.3174245  0.28836595
  0.18762901 0.29369515 0.27263741 0.21908481]]

Verification (numpy's result):
 [[0.2387498  0.73768695 0.34144946 0.634104

Проверьте своё решение

In [161]:
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [162]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [163]:
import numpy as np
from collections import defaultdict

I = 2
J = 3
K = 4


matrix_a = np.random.rand(I, J)
matrix_b = np.random.rand(J, K)


def RECORDREADER():

    for i in range(matrix_a.shape[0]):
        for j in range(matrix_a.shape[1]):
            yield ("A", (i, j), matrix_a[i, j])

    for j in range(matrix_b.shape[0]):
        for k in range(matrix_b.shape[1]):
            yield ("B", (j, k), matrix_b[j, k])

def MAP(key, value):
    matrix_type, index, element = key, value[0], value[1]
    if matrix_type == "A":
        for k in range(K):
            yield (("result", index[0], k), (index[1], element))
    elif matrix_type == "B":
        for i in range(I):
            yield (("result", i, index[1]), (index[0], element))

def REDUCE(key, values):
    result_index = key
    values_dict = defaultdict(list)
    for col_index, value in values:
        values_dict[col_index].append(value)
    total_sum = 0
    for col_index in values_dict:
        total_sum += values_dict[col_index][0] * values_dict[col_index][1]

    yield (result_index, total_sum)

mapped_data = defaultdict(list)
for key, value in enumerate(RECORDREADER()):
    for intermediate_key, intermediate_value in MAP(value[0], (value[1], value[2])):
        mapped_data[intermediate_key].append(intermediate_value)

reduced_data = {}
for key, values in mapped_data.items():
    for reduced_key, reduced_value in REDUCE(key, values):
        reduced_data[reduced_key] = reduced_value

result_matrix = np.zeros((I, K))
for (result_type, i, k), value in reduced_data.items():
    result_matrix[i, k] = value

print("Result Matrix:\n", result_matrix)


numpy_result = np.dot(matrix_a, matrix_b)
print("\nVerification (NumPy):\n", numpy_result)
tolerance = 1e-9
print("\nAre results equal?", np.allclose(result_matrix, numpy_result, atol=tolerance))


Result Matrix:
 [[0.55960439 1.57538797 1.08169458 1.31447357]
 [0.79088263 2.0586298  1.35818621 1.83666706]]

Verification (NumPy):
 [[0.55960439 1.57538797 1.08169458 1.31447357]
 [0.79088263 2.0586298  1.35818621 1.83666706]]

Are results equal? True


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [164]:
import numpy as np
from collections import defaultdict

NUM_MAPPERS = 2
NUM_REDUCERS = 2
ROWS_A = 100
COLS_A = 50
COLS_B = 40


A = np.random.rand(ROWS_A, COLS_A)
B = np.random.rand(COLS_A, COLS_B)

def generate_chunks():
    chunk_size = int(np.ceil(ROWS_A / NUM_MAPPERS))
    for i_start in range(0, ROWS_A, chunk_size):
        yield (i_start, min(i_start + chunk_size, ROWS_A))


def map_function(chunk_start, chunk_end):
    intermediate_results = defaultdict(list)
    for i in range(chunk_start, chunk_end):
        for j in range(COLS_A):
            for k in range(COLS_B):
                intermediate_results[(i, k)].append(A[i, j] * B[j, k])
    return intermediate_results


def partition_function(key, value):

    i, k = key
    return i % NUM_REDUCERS, (key, value)


def reduce_function(reducer_id, intermediate_results):
    reduced_results = {}
    for key, values in intermediate_results.items():
        reduced_results[key] = sum(values)
    return reduced_results


mapper_results = [map_function(chunk_start, chunk_end) for chunk_start, chunk_end in generate_chunks()]


partitioned_data = defaultdict(list)
for mapper_output in mapper_results:
    for key, value in mapper_output.items():
        reducer_id, data_item = partition_function(key, value)
        partitioned_data[reducer_id].append(data_item)


reducer_results = [reduce_function(reducer_id, dict(partitioned_data[reducer_id])) for reducer_id in partitioned_data]

final_result = {}
for reducer_output in reducer_results:
    final_result.update(reducer_output)

print("Matrix Multiplication Result (partial):\n", dict(final_result))

result_matrix = np.zeros((ROWS_A, COLS_B))
for (i, k), value in final_result.items():
    result_matrix[i, k] = value


print("\nResult Matrix (NumPy):\n", result_matrix)

numpy_result = np.dot(A, B)
print("\nVerification (NumPy):\n", numpy_result)

tolerance = 1e-9
print("\nAre results equal ?", np.allclose(result_matrix, numpy_result, atol=tolerance))



Matrix Multiplication Result (partial):
 {(0, 0): 13.841859349084276, (0, 1): 13.838594434554, (0, 2): 15.278158746354574, (0, 3): 11.701865036873603, (0, 4): 11.412915472664148, (0, 5): 13.055154796572808, (0, 6): 12.370028761162892, (0, 7): 13.825246951160327, (0, 8): 12.499917046241901, (0, 9): 14.720816231837544, (0, 10): 11.33602530759061, (0, 11): 11.864507632164342, (0, 12): 13.565135304986166, (0, 13): 13.984740343723617, (0, 14): 11.719433334807892, (0, 15): 14.169741523699047, (0, 16): 13.671685835423407, (0, 17): 12.305296777190211, (0, 18): 14.218429447982428, (0, 19): 13.74407226559767, (0, 20): 12.134147976456592, (0, 21): 13.220563391819258, (0, 22): 13.417462312948935, (0, 23): 14.57396515329611, (0, 24): 11.834669538992394, (0, 25): 11.631287000339329, (0, 26): 13.342200496148442, (0, 27): 14.832910266437205, (0, 28): 12.240501301390514, (0, 29): 14.965339053789645, (0, 30): 12.689044185136297, (0, 31): 13.262830295815302, (0, 32): 13.2569478594105, (0, 33): 13.9593755

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [165]:
import numpy as np
I = 2
J = 3
K = 40
small_mat = np.random.rand(I,J)
big_mat = np.random.rand(J,K)
reference_solution = np.matmul(small_mat, big_mat)

def INPUTFORMAT():
  first_mat = []
  for i in range(small_mat.shape[0]):
    for j in range(small_mat.shape[1]):
      first_mat.append(((0, i, j), small_mat[i,j]))

  global maps
  split_size =  int(np.ceil(len(first_mat)/maps))
  for i in range(0, len(first_mat), split_size):
    yield first_mat[i:i+split_size]

  second_mat = []
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      second_mat.append(((1, j, k), big_mat[j,k]))

  split_size =  int(np.ceil(len(second_mat)/maps))
  for i in range(0, len(second_mat), split_size):
    yield second_mat[i:i+split_size]


def MAP_JOIN(k1, v1):
  (mat_num, i, j) = k1
  w = v1
  if mat_num == 0:
    yield (j, (mat_num, i, w))
  else:
    yield (i, (mat_num, j, w))


def REDUCE_JOIN(key, values):
  from_first_mat = [v for v in values if v[0] == 0]
  from_second_mat = [v for v in values if v[0] == 1]
  for f in from_first_mat:
    for s in from_second_mat:
      yield ((f[1], s[1]), f[2] * s[2])


def GET_JOINED():
  for j in joined:
    print("aa", j)
    yield j[1]


def MAP_MUL(k1, v1):
  yield (k1, v1)


def REDUCE_MUL(key, values):
  res_val = 0
  for v in values:
    res_val += v
  yield (key, res_val)

maps = 3
reducers = 2
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP_JOIN, REDUCE_JOIN, COMBINER=None)
joined = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]

mul_output = MapReduceDistributed(GET_JOINED, MAP_MUL, REDUCE_MUL, COMBINER=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in mul_output]

solution = []
for p in pre_result:
  for v in p[1]:
    solution.append(v)

print(solution)
np.allclose(reference_solution, asmatrix(solution)) # should return true

126 key-value pairs were sent over a network.
aa (0, [((0, 0), 0.055458024903016406), ((0, 1), 0.22390413915081236), ((0, 2), 0.11461390711581826), ((0, 3), 0.1918076547212709), ((0, 4), 0.05391043915487261), ((0, 5), 0.20454401455115634), ((0, 6), 0.07413571277488959), ((0, 7), 0.2010867729384999), ((0, 8), 0.05859892573818361), ((0, 9), 0.03519598090766663), ((0, 10), 0.1728516393704133), ((0, 11), 0.24616569488008191), ((0, 12), 0.07208831569991092), ((0, 13), 0.05387651077626188), ((0, 14), 0.02241471795957654), ((0, 15), 0.1855224048902564), ((0, 16), 0.24162796537241993), ((0, 17), 0.22675813117772617), ((0, 18), 0.046687356389127126), ((0, 19), 0.05850866599787), ((0, 20), 0.23813080023398284), ((0, 21), 0.25691086351982767), ((0, 22), 0.07105136095850333), ((0, 23), 0.25553946732833865), ((0, 24), 0.08227127950610123), ((0, 25), 0.12418971340412448), ((0, 26), 0.12932222664096898), ((0, 27), 0.25366552250602914), ((0, 28), 0.08560387989591307), ((0, 29), 0.15762193327071516), (

True