# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [2]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [14]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [15]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(1.6473836777529094)),
 (1, np.float64(1.6473836777529094)),
 (2, np.float64(1.6473836777529094)),
 (3, np.float64(1.6473836777529094)),
 (4, np.float64(1.6473836777529094))]

## Inverted index

In [16]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [17]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [18]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [19]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('it', 18), ('what', 10)]),
 (1, [('banana', 2), ('is', 18)])]

## TeraSort

In [20]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.02156699686751784)),
   (None, np.float64(0.06040865057589517)),
   (None, np.float64(0.07435981345750187)),
   (None, np.float64(0.07992943805683839)),
   (None, np.float64(0.11666317728135189)),
   (None, np.float64(0.14471009828544779)),
   (None, np.float64(0.18603756389959636)),
   (None, np.float64(0.21390699995992013)),
   (None, np.float64(0.2341704067260818)),
   (None, np.float64(0.26401758603436276)),
   (None, np.float64(0.29866212060733677)),
   (None, np.float64(0.3062007074495994)),
   (None, np.float64(0.3214055405258347)),
   (None, np.float64(0.3639695304231413)),
   (None, np.float64(0.41799931195989104)),
   (None, np.float64(0.4601542553288016))]),
 (1,
  [(None, np.float64(0.5900282738578496)),
   (None, np.float64(0.6245056806045802)),
   (None, np.float64(0.6354423277211749)),
   (None, np.float64(0.6654615958405462)),
   (None, np.float64(0.7744941405923091)),
   (None, np.float64(0.7846447338419545)),
   (None, np.float64(0.78939878

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [21]:
from typing import Iterator
import numpy as np

# --- данные ---
rng = np.random.default_rng(42)
arr = rng.integers(low=-50, high=200, size=50).tolist()

n_maps = 3

def INPUTFORMAT():
    """Разбиваем вход на n_maps частей и отдаём генераторы записей."""
    def record_reader(chunk):
        for value in chunk:
            yield (0, value)
    for chunk in np.array_split(arr, n_maps):
        yield record_reader(chunk.tolist())

def MAP(_, value: int):
    # Все значения под один ключ -> глобальный максимум
    yield ("GLOBAL_MAX", value)

def REDUCE(key: str, values: Iterator[int]):
    best = None
    for v in values:
        if best is None or v > best:
            best = v
    yield (key, best)

# локально возьмём максимум на каждом mapper'е
parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=REDUCE)

parts = [(pid, list(part)) for (pid, part) in parts]
out = [kv for (pid, part) in parts for kv in part]

out, "python max =", max(arr)

3 key-value pairs were sent over a network.


([('GLOBAL_MAX', 193)], 'python max =', 193)

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [22]:
from typing import Iterator, Tuple
import numpy as np

# --- данные ---
rng = np.random.default_rng(42)
arr = rng.integers(low=-50, high=200, size=50).tolist()

n_maps = 3

def INPUTFORMAT():
    """Раздаём вход на n_maps кусков."""
    def record_reader(chunk):
        for v in chunk:
            yield (None, v)

    for chunk in np.array_split(arr, n_maps):
        yield record_reader(chunk.tolist())

def MAP(_, value: int):
    # Для среднего передаём (значение, 1)
    yield ("AVG", (value, 1))

def REDUCE(key: str, pairs: Iterator[Tuple[int, int]]):
    total = 0
    cnt = 0
    for s, c in pairs:
        total += s
        cnt += c
    yield (key, (total, cnt))

# combiner тот же: локально суммируем (sum, count), уменьшаем трафик
parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=REDUCE)

parts = [(pid, list(part)) for (pid, part) in parts]
out = [kv for (pid, part) in parts for kv in part]
total_sum = 0
total_cnt = 0
for _, (s, c) in out:
    total_sum += s
    total_cnt += c

mr_avg = total_sum / total_cnt

mr_avg, "python mean =", (sum(arr) / len(arr))

3 key-value pairs were sent over a network.


(84.76, 'python mean =', 84.76)

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [23]:
from typing import Iterable, Iterator, Tuple, List, TypeVar
from itertools import groupby
from operator import itemgetter

K = TypeVar("K")
V = TypeVar("V")

def groupByKey_sorted(pairs: Iterable[Tuple[K, V]]) -> Iterator[Tuple[K, List[V]]]:
    """
    Группирует (key, value) по key через сортировку:
    sort -> groupby -> собираем список значений.
    """
    pairs_sorted = sorted(pairs, key=itemgetter(0))  # сортируем по key

    for k, group in groupby(pairs_sorted, key=itemgetter(0)):
        yield (k, [v for _, v in group])


# ---- проверки на примерах ----
example1 = [("b", 2), ("a", 1), ("b", 3), ("a", 7), ("c", 10)]
print("input:", example1)
print("grouped:", list(groupByKey_sorted(example1)))

example2 = [("x", 1)]
print("\ninput:", example2)
print("grouped:", list(groupByKey_sorted(example2)))

example3 = []
print("\ninput:", example3)
print("grouped:", list(groupByKey_sorted(example3)))

example4 = [(2, "u"), (1, "a"), (2, "v"), (1, "b"), (3, "z")]
print("\ninput:", example4)
print("grouped:", list(groupByKey_sorted(example4)))

input: [('b', 2), ('a', 1), ('b', 3), ('a', 7), ('c', 10)]
grouped: [('a', [1, 7]), ('b', [2, 3]), ('c', [10])]

input: [('x', 1)]
grouped: [('x', [1])]

input: []
grouped: []

input: [(2, 'u'), (1, 'a'), (2, 'v'), (1, 'b'), (3, 'z')]
grouped: [(1, ['a', 'b']), (2, ['u', 'v']), (3, ['z'])]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [24]:
from typing import Iterator
import numpy as np

# --- данные ---
rng = np.random.default_rng(42)
arr = rng.integers(low=0, high=10, size=30).tolist()  # специально с повторениями

n_maps = 3
n_reducers = 2

def INPUTFORMAT():
    def record_reader(chunk):
        for v in chunk:
            yield (None, v)

    for chunk in np.array_split(arr, n_maps):
        yield record_reader(chunk.tolist())

def MAP(_, value: int):
    # ключом делаем сам элемент
    yield (value, 1)

def COMBINER(key: int, values: Iterator[int]):
    # если элемент есть хотя бы раз в сплите — оставляем один
    yield (key, 1)

def REDUCE(key: int, values: Iterator[int]):
    # элемент присутствует -> выдаём его один раз
    yield key

parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=COMBINER)

parts = [(pid, list(part)) for (pid, part) in parts]
out = [kv for (pid, part) in parts for kv in part]

sorted(out), "python distinct =", sorted(set(arr))

20 key-value pairs were sent over a network.


([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
 'python distinct =',
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [25]:
from typing import Iterator, Tuple, List
import math

# Отношение R: (name, age, city)
R: List[Tuple[str, int, str]] = [
    ("Irishka", 5, "Samara"),
    ("Ilya", 20, "Kazan"),
    ("Dima", 11, "Samarkant"),
]

mappers = 2

def predicate(t: Tuple[str, int, str]) -> bool:
    # условие C(t): возраст >= 10
    return int(t[1]) >= 10

def INPUTFORMAT():
    """Разбиваем R на несколько частей и отдаём генераторы записей."""
    def record_reader(chunk: List[Tuple[str, int, str]]):
        for t in chunk:
            yield (None, t)

    split_size = int(math.ceil(len(R) / mappers))
    for i in range(0, len(R), split_size):
        yield record_reader(R[i:i + split_size])

def MAP(_, t: Tuple[str, int, str]):
    # Если предикат истинен -> (t, t)
    if predicate(t):
        yield (t, t)

def REDUCE(key: Tuple[str, int, str], values: Iterator[Tuple[str, int, str]]):
    # Identity reduce: вернуть то, что пришло
    for v in values:
        yield (key, v)

parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE)
parts = [(pid, list(part)) for (pid, part) in parts]

selected = [t for (pid, part) in parts for (t, _) in part]
selected, "python selection =", [t for t in R if predicate(t)]

2 key-value pairs were sent over a network.


([('Dima', 11, 'Samarkant'), ('Ilya', 20, 'Kazan')],
 'python selection =',
 [('Ilya', 20, 'Kazan'), ('Dima', 11, 'Samarkant')])

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [26]:
from typing import Iterator, Tuple, List
import math

# Отношение R: (name, age, city)
R: List[Tuple[str, int, str]] = [
    ("Irishka", 5, "Samara"),
    ("Ilya", 20, "Kazan"),
    ("Dima", 11, "Samarkant"),
    ("Olya", 31, "Kazan"),        # добавила, чтобы показать дубликат по городу
]

mappers = 2

# Проекция на множество атрибутов S (индексы столбцов)
S_idx = (2,)   # например, только город

def project(t: Tuple[str, int, str], idxs: Tuple[int, ...]) -> Tuple:
    return tuple(t[i] for i in idxs)

def INPUTFORMAT():
    """Разбиваем R на несколько частей и отдаём генераторы записей."""
    def record_reader(chunk: List[Tuple[str, int, str]]):
        for t in chunk:
            yield (None, t)

    split_size = int(math.ceil(len(R) / mappers))
    for i in range(0, len(R), split_size):
        yield record_reader(R[i:i + split_size])

def MAP(_, t: Tuple[str, int, str]):
    # строим t' и возвращаем (t', t')
    t2 = project(t, S_idx)
    yield (t2, t2)

def REDUCE(t2: Tuple, _values: Iterator[Tuple]):
    # для каждого ключа t' вернуть ровно одну пару (t', t')
    yield (t2, t2)

parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE)
parts = [(pid, list(part)) for (pid, part) in parts]

projection = [t2 for (pid, part) in parts for (t2, _) in part]

# сравнение с питоном (distinct после projection)
projection_sorted = sorted(projection)
python_sorted = sorted(set(project(t, S_idx) for t in R))

projection_sorted, "python projection =", python_sorted

4 key-value pairs were sent over a network.


([('Kazan',), ('Samara',), ('Samarkant',)],
 'python projection =',
 [('Kazan',), ('Samara',), ('Samarkant',)])

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [27]:
from typing import Iterator, Tuple, List
import math

# Два отношения одинаковой схемы
R: List[Tuple[str, int, str]] = [
    ("Irishka", 5, "Samara"),
    ("Ilya", 20, "Kazan"),
    ("Dima", 11, "Samarkant"),
]

S: List[Tuple[str, int, str]] = [
    ("Ilya", 20, "Kazan"),        # пересечение с R
    ("Olga", 31, "Moscow"),
    ("Timur", 19, "Kazan"),
]

mappers = 2

# будем склеивать R и S в один поток для MR
all_rows = R + S

def INPUTFORMAT():
    def record_reader(chunk):
        for t in chunk:
            yield (None, t)

    split_size = int(math.ceil(len(all_rows) / mappers))
    for i in range(0, len(all_rows), split_size):
        yield record_reader(all_rows[i:i + split_size])

def MAP(_, t: Tuple[str, int, str]):
    # каждый кортеж t -> (t, t)
    yield (t, t)

def REDUCE(t: Tuple[str, int, str], _values: Iterator[Tuple[str, int, str]]):
    # может прийти 1 или 2 значения, но union возвращает t один раз
    yield (t, t)

parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE)
parts = [(pid, list(part)) for (pid, part) in parts]

union_res = sorted([t for (pid, part) in parts for (t, _) in part])

# проверка питоном: union как множество
python_union = sorted(set(R).union(set(S)))

union_res, "python union =", python_union

6 key-value pairs were sent over a network.


([('Dima', 11, 'Samarkant'),
  ('Ilya', 20, 'Kazan'),
  ('Irishka', 5, 'Samara'),
  ('Olga', 31, 'Moscow'),
  ('Timur', 19, 'Kazan')],
 'python union =',
 [('Dima', 11, 'Samarkant'),
  ('Ilya', 20, 'Kazan'),
  ('Irishka', 5, 'Samara'),
  ('Olga', 31, 'Moscow'),
  ('Timur', 19, 'Kazan')])

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [28]:
from typing import Iterator, Tuple, List
import math

R: List[Tuple[str, int, str]] = [
    ("Irishka", 5, "Samara"),
    ("Ilya", 20, "Kazan"),
    ("Dima", 11, "Samarkant"),
]

S: List[Tuple[str, int, str]] = [
    ("Ilya", 20, "Kazan"),        # есть в пересечении
    ("Olga", 31, "Moscow"),
    ("Timur", 19, "Kazan"),
]

mappers = 2

# помечаем источник, чтобы intersection работал корректно даже если будут повторы
all_rows = [("R", t) for t in R] + [("S", t) for t in S]

def INPUTFORMAT():
    def record_reader(chunk):
        for src, t in chunk:
            yield (src, t)

    split_size = int(math.ceil(len(all_rows) / mappers))
    for i in range(0, len(all_rows), split_size):
        yield record_reader(all_rows[i:i + split_size])

def MAP(src: str, t: Tuple[str, int, str]):
    # ключ — сам кортеж, значение — источник
    yield (t, src)

def REDUCE(t: Tuple[str, int, str], srcs: Iterator[str]):
    seen = set()
    for s in srcs:
        seen.add(s)
        if len(seen) == 2:    # есть и R, и S
            yield (t, t)
            return

parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE)
parts = [(pid, list(part)) for (pid, part) in parts]

inter = sorted([t for (pid, part) in parts for (t, _) in part])

python_inter = sorted(set(R).intersection(set(S)))

inter, "python intersection =", python_inter

6 key-value pairs were sent over a network.


([('Ilya', 20, 'Kazan')], 'python intersection =', [('Ilya', 20, 'Kazan')])

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [29]:
from typing import Iterator, Tuple, List
import math

R: List[Tuple[str, int]] = [
    ("a", 1),
    ("b", 2),
    ("c", 3),
]

S: List[Tuple[str, int]] = [
    ("b", 2),
    ("d", 4),
]

mappers = 2
reducers = 2

# помечаем источник
all_rows = [("R", t) for t in R] + [("S", t) for t in S]

def INPUTFORMAT():
    def record_reader(chunk):
        for src, t in chunk:
            yield (src, t)

    split_size = int(math.ceil(len(all_rows) / mappers))
    for i in range(0, len(all_rows), split_size):
        yield record_reader(all_rows[i:i + split_size])

def MAP(src: str, t: Tuple[str, int]):
    yield (t, src)

def REDUCE(t: Tuple[str, int], rels: Iterator[str]):
    rels_set = set(rels)
    # оставить только если есть в R и нет в S
    if rels_set == {"R"}:
        yield (t, t)

parts = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE)
parts = [(pid, list(part)) for (pid, part) in parts]

diff = sorted([t for (pid, part) in parts for (t, _) in part])

python_diff = sorted(set(R) - set(S))

diff, "python R-S =", python_diff

5 key-value pairs were sent over a network.


([('a', 1), ('c', 3)], 'python R-S =', [('a', 1), ('c', 3)])

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [30]:
from typing import Iterator, Tuple, List

# R(a, b)
R: List[Tuple[str, int]] = [
    ("x1", 1),
    ("x2", 2),
    ("x3", 1),
]

# S(b, c)
S: List[Tuple[int, str]] = [
    (1, "A"),
    (2, "B"),
    (3, "C"),
]

def RECORDREADER():
    for a, b in R:
        yield ("R", (a, b))
    for b, c in S:
        yield ("S", (b, c))

def MAP(rel, tup):
    if rel == "R":
        a, b = tup
        yield (b, ("R", a))
    else:
        b, c = tup
        yield (b, ("S", c))

def REDUCE(b, values):
    r_vals = []
    s_vals = []

    for tag, val in values:
        if tag == "R":
            r_vals.append(val)
        else:
            s_vals.append(val)

    for a in r_vals:
        for c in s_vals:
            yield (None, (a, b, c))

join_res = [v for (_, v) in MapReduce(RECORDREADER, MAP, REDUCE)]
sorted(join_res)

[('x1', 1, 'A'), ('x2', 2, 'B'), ('x3', 1, 'A')]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [31]:
from typing import Iterator, Tuple, List

# (a, b, c)
R: List[Tuple[str, int, str]] = [
    ("A", 10, "x"),
    ("B", 5, "y"),
    ("A", 7, "z"),
    ("B", 3, "k"),
    ("C", 8, "m"),
]
def RECORDREADER():
    for t in R:
        yield (None, t)

def MAP(_, t: Tuple[str, int, str]):
    a, b, _ = t
    yield (a, b)

def REDUCE(a: str, values: Iterator[int]):
    total = 0
    for v in values:
        total += v
    yield (a, total)

group_sum = sorted([kv for kv in MapReduce(RECORDREADER, MAP, REDUCE)])
group_sum

[('A', 17), ('B', 8), ('C', 8)]

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [32]:
import numpy as np
from typing import Iterator, Tuple, List

# --- данные ---
rng = np.random.default_rng(1)
I, J = 4, 5
M = rng.integers(0, 5, size=(I, J)).astype(float)
v = rng.standard_normal(J).astype(float)

# ---------- Шаг 1: join по j -> partials (i, M[i,j] * v[j]) ----------

def RECORDREADER1():
    # поток матрицы
    for i in range(I):
        for j in range(J):
            yield (("M", i, j), float(M[i, j]))
    # поток вектора
    for j in range(J):
        yield (("V", j), float(v[j]))

def MAP1(k1, val):
    if k1[0] == "M":
        _, i, j = k1
        yield (j, ("M", i, val))
    else:
        _, j = k1
        yield (j, ("V", val))

def REDUCE1(j, tagged: Iterator[Tuple]):
    vj = None
    m_list = []

    for rec in tagged:
        tag = rec[0]
        if tag == "V":
            vj = rec[1]
        else:
            # ("M", i, mij)
            m_list.append((rec[1], rec[2]))

    if vj is None:
        return

    for i, mij in m_list:
        yield (i, mij * vj)

partials = list(MapReduce(RECORDREADER1, MAP1, REDUCE1))

# ---------- Шаг 2: sum по i -> итоговый y ----------

def RECORDREADER2():
    for i, contrib in partials:
        yield (i, contrib)

def MAP2(i, contrib):
    yield (i, contrib)

def REDUCE2(i, contribs: Iterator[float]):
    s = 0.0
    for x in contribs:
        s += x
    yield (i, s)

result = list(MapReduce(RECORDREADER2, MAP2, REDUCE2))

# приводим к вектору в порядке i
result_sorted = [val for (i, val) in sorted(result, key=lambda x: x[0])]

reference = (M @ v).tolist()
print("Matches numpy:", np.allclose(result_sorted, reference))

result_sorted, reference

Matches numpy: True


([-1.7107315971207244,
  -1.403993662229911,
  -0.6630982531657752,
  0.22407526846560044],
 [-1.7107315971207244,
  -1.4039936622299112,
  -0.6630982531657752,
  0.22407526846560044])

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [33]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [34]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J)
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1
  for i in range(small_mat.shape[0]):
    yield ((i, k), small_mat[i, j] * w)

def REDUCE(key, values):
  (i, k) = key
  s = 0.0
  for x in values:
    s += x
  yield ((i, k), s)

reference_solution = small_mat @ big_mat

Проверьте своё решение

In [35]:
def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

sol_list = list(MapReduce(RECORDREADER, MAP, REDUCE))

print("len =", len(sol_list), "expected =", I*K)
np.allclose(reference_solution, asmatrix(sol_list))

len = 80 expected = 80


True

In [36]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [37]:
import numpy as np
from typing import Iterator, Iterable, Tuple, Any

# MapReduce model (1 machine)
def flatten(nested_iterable: Iterable[Iterable[Any]]):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable: Iterable[Tuple[Any, Any]]):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(
        map(lambda x: REDUCE(*x),
            groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))
    )

# Data
np.random.seed(0)

I = 2
J = 3
K = 4 * 10

# матрицы как "отношения": M(I,J,V) и N(J,K,W)
M = np.random.rand(I, J)
N = np.random.rand(J, K)

reference_solution = M @ N

# join по j -> partial products ((i,k), m_ij * n_jk)
def RECORDREADER1():
    # сначала матрица M: (i, j, v)
    for i in range(I):
        for j in range(J):
            yield (("M", i, j), float(M[i, j]))
    # потом матрица N: (j, k, w)
    for j in range(J):
        for k in range(K):
            yield (("N", j, k), float(N[j, k]))

def MAP1(k1, v1):
    tag = k1[0]
    if tag == "M":
        _, i, j = k1
        # ключ = j
        yield (j, ("M", i, v1))
    else:
        _, j, k = k1
        # ключ = j
        yield (j, ("N", k, v1))

def REDUCE1(j, tagged_values: Iterator[tuple]):
    left = []   # (i, m_ij)
    right = []  # (k, n_jk)

    for rec in tagged_values:
        if rec[0] == "M":
            _, i, mij = rec
            left.append((i, mij))
        else:
            _, k, njk = rec
            right.append((k, njk))

    # все пары (i,k): m_ij * n_jk
    for (i, mij) in left:
        for (k, njk) in right:
            yield ((i, k), mij * njk)

partials = list(MapReduce(RECORDREADER1, MAP1, REDUCE1))
print("partials:", len(partials), "expected:", I * J * K)

# sum по (i,k)
def RECORDREADER2():
    for (ik, val) in partials:
        yield (ik, val)

def MAP2(ik, val):
    yield (ik, val)

def REDUCE2(ik, vals: Iterator[float]):
    s = 0.0
    for x in vals:
        s += x
    yield (ik, s)

solution = list(MapReduce(RECORDREADER2, MAP2, REDUCE2))


# Проверка
def asmatrix_fixed(reduce_output, I: int, K: int):
    mat = np.zeros((I, K), dtype=float)
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat

mat_mr = asmatrix_fixed(solution, I, K)

print("Matches numpy:", np.allclose(reference_solution, mat_mr))
mat_mr, reference_solution

partials: 240 expected: 240
Matches numpy: True


(array([[0.7060119 , 0.63824768, 1.10162159, 1.03070567, 1.03432976,
         1.00623835, 1.09806791, 1.0127371 , 0.36281538, 0.27360563,
         0.83172078, 0.65022457, 1.26021813, 0.65511526, 1.0593353 ,
         0.68027906, 1.16581408, 1.10716395, 0.3554435 , 0.96219216,
         1.02270104, 0.93282535, 1.02015213, 0.87058284, 1.11305592,
         1.27026841, 1.3705311 , 0.92382741, 1.02955307, 0.60599253,
         1.06928367, 0.77491501, 1.26121579, 0.87653767, 0.8618777 ,
         0.8383055 , 1.00503923, 0.91647496, 1.02664221, 0.92337048],
        [0.66552633, 0.60120798, 1.03071682, 0.96324559, 0.87872607,
         0.90507802, 0.81337471, 1.01024534, 0.31412369, 0.23413706,
         0.66653439, 0.5739761 , 1.15696681, 0.58063984, 1.03837862,
         0.65660558, 1.00436702, 1.10531101, 0.308397  , 0.87703102,
         0.80832492, 0.92548577, 0.78343287, 0.88202985, 0.84662929,
         1.16710363, 1.11414771, 0.75808858, 0.84905803, 0.60924202,
         1.02243256, 0.76251954, 

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [38]:
from typing import Iterator, Tuple, Iterable, List
import numpy as np

# Данные
np.random.seed(123)

I, J, K = 3, 4, 6
A = np.random.randn(I, J).astype(float)
B = np.random.randn(J, K).astype(float)

reference = A @ B

n_maps = 4

def split_list(items: List, n_chunks: int) -> List[List]:
    n_chunks = max(1, int(n_chunks))
    if not items:
        return [[] for _ in range(n_chunks)]
    size = int(np.ceil(len(items) / n_chunks))
    return [items[i:i + size] for i in range(0, len(items), size)]

# Каждая матрица имеет свой собственный СЧИТЫВАТЕЛЬ записей
def RECORDREADER_A() -> Iterable[Tuple[Tuple[str, int, int], float]]:
    for i in range(I):
        for j in range(J):
            yield (("A", i, j), float(A[i, j]))

def RECORDREADER_B() -> Iterable[Tuple[Tuple[str, int, int], float]]:
    for j in range(J):
        for k in range(K):
            yield (("B", j, k), float(B[j, k]))

# Соединение по j -> частичному ((i,k), a_ij*b_jk)
def INPUTFORMAT1():
    a_items = list(RECORDREADER_A())
    b_items = list(RECORDREADER_B())

    # делим число мапов между A и B (хотя бы по 1)
    a_maps = max(1, n_maps // 2)
    b_maps = max(1, n_maps - a_maps)

    a_chunks = split_list(a_items, a_maps)
    b_chunks = split_list(b_items, b_maps)

    def record_reader(chunk):
        for (k1, v1) in chunk:
            yield (k1, v1)

    # A -> отдельно
    for ch in a_chunks:
        yield record_reader(ch)
    # B -> отдельно
    for ch in b_chunks:
        yield record_reader(ch)

def MAP1(k1, v1):
    tag = k1[0]
    if tag == "A":
        _, i, j = k1
        yield (j, ("A", i, float(v1)))
    else:
        _, j, k = k1
        yield (j, ("B", k, float(v1)))

def REDUCE1(j: int, tagged_vals: Iterator[tuple]):
    left = []   # (i, a_ij)
    right = []  # (k, b_jk)

    for rec in tagged_vals:
        if rec[0] == "A":
            _, i, aij = rec
            left.append((i, aij))
        else:
            _, k, bjk = rec
            right.append((k, bjk))

    for (i, aij) in left:
        for (k, bjk) in right:
            yield ((i, k), aij * bjk)

stage1 = MapReduceDistributed(INPUTFORMAT1, MAP1, REDUCE1)
stage1 = [(pid, list(part)) for (pid, part) in stage1]
partials = [kv for (pid, part) in stage1 for kv in part]

print("partials:", len(partials), "expected:", I * J * K)


# Суммируем по (i,k)
def INPUTFORMAT2():
    chunks = split_list(partials, n_maps)

    def record_reader(chunk):
        for (ik, val) in chunk:
            yield (ik, float(val))

    for ch in chunks:
        yield record_reader(ch)

def MAP2(ik, val: float):
    yield (ik, float(val))

def REDUCE2(ik, vals: Iterator[float]):
    s = 0.0
    for x in vals:
        s += x
    yield (ik, s)

# combiner = reduce для суммы
stage2 = MapReduceDistributed(INPUTFORMAT2, MAP2, REDUCE2, COMBINER=REDUCE2)
stage2 = [(pid, list(part)) for (pid, part) in stage2]
out = [kv for (pid, part) in stage2 for kv in part]

def asmatrix_fixed(pairs, I: int, K: int):
    mat = np.zeros((I, K), dtype=float)
    for ((i, k), v) in pairs:
        mat[i, k] = v
    return mat

solution = asmatrix_fixed(out, I, K)

print("Matches numpy:", np.allclose(reference, solution))
solution, reference

36 key-value pairs were sent over a network.
partials: 72 expected: 72
72 key-value pairs were sent over a network.
Matches numpy: True


(array([[-0.5874928 ,  5.11380282,  4.14255381,  2.6082553 , -4.76484305,
         -1.18364575],
        [ 3.94761745,  3.75539973,  0.03318505,  6.48030108, -2.87972044,
          2.84220932],
        [ 1.89320179, -0.44552041, -1.64920108, -0.80574251,  3.6109436 ,
          2.35067158]]),
 array([[-0.5874928 ,  5.11380282,  4.14255381,  2.6082553 , -4.76484305,
         -1.18364575],
        [ 3.94761745,  3.75539973,  0.03318505,  6.48030108, -2.87972044,
          2.84220932],
        [ 1.89320179, -0.44552041, -1.64920108, -0.80574251,  3.6109436 ,
          2.35067158]]))

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [39]:
import numpy as np
import math
from typing import Iterator, List, Tuple


# Перемножение матриц через MapReduceDistributed
# Случай: каждая матрица генерируется НЕ одним, а несколькими RECORDREADER-ами,
# причём каждому RECORDREADER-у достаётся СЛУЧАЙНОЕ подмножество элементов,
# но В СУММЕ покрытие ПОЛНОЕ (ничего не теряем и не дублируем).
np.random.seed(4)

I, J, K = 4, 5, 3
A = np.random.randn(I, J).astype(float)
B = np.random.randn(J, K).astype(float)

reference = A @ B

# параметры "распределённой" модели (если среда использует глобальные maps/reducers)
maps = 6
reducers = 3


# Готовим списки всех элементов матриц (полное покрытие)
# A: ключ ("A", i, j), значение a_ij
# B: ключ ("B", j, k), значение b_jk
A_entries = [(("A", i, j), float(A[i, j])) for i in range(I) for j in range(J)]
B_entries = [(("B", j, k), float(B[j, k])) for j in range(J) for k in range(K)]


# Функция: случайно перемешать элементы и порезать на n_splits частей
# потерь нет, дубликатов нет — просто случайное распределение индексов
def random_splits(entries: List[Tuple[tuple, float]], n_splits: int, seed: int):
    rng = np.random.default_rng(seed)
    idx = np.arange(len(entries))
    rng.shuffle(idx)  # случайная перестановка индексов
    split_idx = np.array_split(idx, n_splits)
    return [[entries[i] for i in part] for part in split_idx]

# Делаем несколько RECORDREADER-ов для каждой матрицы
A_splits = random_splits(A_entries, n_splits=3, seed=10)
B_splits = random_splits(B_entries, n_splits=3, seed=20)

# Выдаём несколько RECORDREADER-ов
# Сначала сплиты A, затем сплиты B (так как в условии “например, сначала одна, потом другая”)
def INPUTFORMAT1():
    def RR(split):
        for kv in split:
            yield kv

    for split in A_splits:
        yield RR(split)
    for split in B_splits:
        yield RR(split)


# Шаг 1: JOIN по j и генерация частичных произведений
# На выходе: ((i, k), a_ij * b_jk)
def MAP1(k1, val):
    tag = k1[0]
    if tag == "A":
        _, i, j = k1
        # ключ для join — j
        yield (j, ("A", i, val))
    else:
        _, j, k = k1
        # ключ для join — j
        yield (j, ("B", k, val))

def REDUCE1(j, tagged_values: Iterator[tuple]):
    left = [(i, aij) for (tag, i, aij) in tagged_values if tag == "A"]
    right = [(k, bjk) for (tag, k, bjk) in tagged_values if tag == "B"]

    # декартово произведение внутри одного j: получаем все (i,k)
    for (i, aij) in left:
        for (k, bjk) in right:
            yield ((i, k), aij * bjk)

stage1 = MapReduceDistributed(INPUTFORMAT1, MAP1, REDUCE1, COMBINER=None)
stage1 = [(pid, list(part)) for (pid, part) in stage1]
partials = [kv for (pid, part) in stage1 for kv in part]

print("partials:", len(partials), "ожидалось:", I * J * K)


# Шаг 2: суммирование по ключу (i,k)
# Можно тоже случайно раздать partials по map-сплитам
maps = 4
reducers = 3

def INPUTFORMAT2():
    rng = np.random.default_rng(30)
    idx = np.arange(len(partials))
    rng.shuffle(idx)

    split_size = int(math.ceil(len(partials) / maps)) if partials else 1

    def RR(index_chunk):
        for t in index_chunk:
            yield partials[t]

    for i0 in range(0, len(idx), split_size):
        yield RR(idx[i0:i0 + split_size])

def MAP2(key, val):
    yield (key, float(val))

def REDUCE2(key, vals: Iterator[float]):
    s = 0.0
    for x in vals:
        s += x
    yield (key, s)

# COMBINER = REDUCE2 уменьшает “трафик” (суммирует локально на mapper-е)
stage2 = MapReduceDistributed(INPUTFORMAT2, MAP2, REDUCE2, COMBINER=REDUCE2)
stage2 = [(pid, list(part)) for (pid, part) in stage2]
P = [kv for (pid, part) in stage2 for kv in part]

# Преобразуем список пар ((i,k), v) в матрицу I x K
def asmatrix(pairs, I, K):
    mat = np.zeros((I, K), dtype=float)
    for ((i, k), v) in pairs:
        mat[i, k] = v
    return mat

solution = asmatrix(P, I, K)

print("Совпадает с numpy (полное покрытие, случайные RR):", np.allclose(reference, solution))
solution, reference

35 key-value pairs were sent over a network.
partials: 60 ожидалось: 60
36 key-value pairs were sent over a network.
Совпадает с numpy (полное покрытие, случайные RR): True


(array([[-3.30359593,  2.7572802 , -0.5851318 ],
        [ 2.8464065 ,  0.29702452,  1.91922881],
        [-0.29012046,  1.87650961,  2.21872797],
        [ 2.02448215, -3.63157051,  2.6886677 ]]),
 array([[-3.30359593,  2.7572802 , -0.5851318 ],
        [ 2.8464065 ,  0.29702452,  1.91922881],
        [-0.29012046,  1.87650961,  2.21872797],
        [ 2.02448215, -3.63157051,  2.6886677 ]]))