# Введение в MapReduce модель на Python


In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(1.4301556115830296)),
 (1, np.float64(1.4301556115830296)),
 (2, np.float64(1.4301556115830296)),
 (3, np.float64(1.4301556115830296)),
 (4, np.float64(1.4301556115830296))]

## Inverted index

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('is', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('is', 18), ('it', 18)]),
 (1, [('a', 2), ('banana', 2), ('what', 10)])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.16387426782667824)),
   (None, np.float64(0.18845887427504737)),
   (None, np.float64(0.20865295884442214)),
   (None, np.float64(0.2466962331149979)),
   (None, np.float64(0.34717837748901237)),
   (None, np.float64(0.3935875923751957)),
   (None, np.float64(0.4148675618660935)),
   (None, np.float64(0.43919350711925)),
   (None, np.float64(0.46954896844457117)),
   (None, np.float64(0.4917233055224872))]),
 (1,
  [(None, np.float64(0.5227129817885263)),
   (None, np.float64(0.5427586856593501)),
   (None, np.float64(0.5640152948953823)),
   (None, np.float64(0.5785930371558655)),
   (None, np.float64(0.5796915174273558)),
   (None, np.float64(0.5798643725808812)),
   (None, np.float64(0.5800158494428916)),
   (None, np.float64(0.5973573018437456)),
   (None, np.float64(0.6003707218546139)),
   (None, np.float64(0.6221024492227969)),
   (None, np.float64(0.6290301192365333)),
   (None, np.float64(0.6408649632584192)),
   (None, np.float64(0.6816832116468637

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [None]:
import random

# Находит максимум в своём подсписке
def MAP(list):
  return max(list)

# Находит максимум среди значений, полученных от MAP
def REDUCE(list):
  return max(list)

# Генерация списка случайных чисел
def RECORDREADER(n):
  return [random.randint(0, 1000) for i in range(n)]

record = RECORDREADER(1000)

# Разбиваем список на части
parts = 10
record_partitional = []
for i in range(0, len(record), parts):
  record_partitional.append(record[i:i+parts])

print(REDUCE(list(map(lambda x: MAP(x), record_partitional))))
print(record_partitional)

997
[[236, 955, 777, 866, 3, 679, 399, 412, 430, 700], [316, 802, 612, 913, 492, 719, 73, 866, 172, 125], [187, 393, 981, 164, 479, 162, 439, 814, 772, 747], [79, 552, 2, 664, 528, 926, 628, 535, 786, 164], [41, 306, 572, 580, 67, 754, 870, 584, 634, 544], [430, 411, 822, 161, 860, 95, 484, 794, 338, 869], [712, 656, 869, 509, 669, 407, 658, 523, 459, 827], [645, 574, 949, 588, 703, 649, 331, 680, 606, 446], [924, 311, 301, 518, 64, 899, 881, 793, 173, 385], [794, 60, 164, 12, 609, 170, 289, 799, 920, 816], [191, 893, 589, 252, 131, 329, 644, 909, 383, 759], [862, 564, 249, 574, 58, 815, 253, 956, 996, 823], [742, 284, 951, 99, 496, 165, 189, 332, 474, 669], [438, 903, 31, 450, 264, 636, 185, 481, 259, 803], [450, 247, 873, 584, 483, 960, 211, 317, 76, 3], [707, 592, 785, 440, 81, 196, 462, 218, 673, 896], [613, 657, 883, 428, 137, 372, 599, 260, 99, 997], [453, 539, 773, 900, 145, 109, 890, 996, 363, 692], [671, 410, 257, 404, 408, 949, 921, 593, 519, 84], [96, 977, 73, 293, 810, 582,

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [None]:
from typing import Iterator, Tuple
import random
from itertools import groupby
from collections import defaultdict

def RECORDREADER(amount: int):
    return [random.randint(0, 50) for _ in range(amount)]

def MAP(value: int) -> Tuple[int, int]:
    return 1, value

def REDUCE(_, values: Iterator[int]):
    total, count = 0, 0
    for val in values:
        total += val
        count += 1
    yield ('AVG', total / count)

# Генерация данных
random_numbers = RECORDREADER(10)

# Применение отображения
mapped_data = list(map(MAP, random_numbers))

# Группировка по ключу
shuffled_data = list(groupbykey(mapped_data))

# Применение редукции
result = list(flatten(map(lambda pair: REDUCE(*pair), shuffled_data)))

print(result)
print(mapped_data)


[('AVG', 20.3)]
[(1, 23), (1, 22), (1, 11), (1, 28), (1, 27), (1, 25), (1, 6), (1, 30), (1, 12), (1, 19)]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [None]:
from typing import Iterator, Tuple
import random

# Функция для группировки данных по ключу
def group_by_key(iterable):
    grouped = {}
    for key, value in iterable:
        grouped.setdefault(key, []).append(value)
    return list(grouped.items())

# Функция отображения (Map) - присваивает числу ключ 1
def MAP(num):
    return 1, num

# Функция редукции (Reduce) - вычисляет среднее значение
def REDUCE(_, numbers: Iterator[int]):
    total, count = sum(numbers), len(numbers)
    yield ('AVG', total / count) if count > 0 else ('AVG', 0)

# Функция для генерации случайных чисел в заданном диапазоне
def RECORDREADER(count):
    return [random.randint(0, 5) for _ in range(count)]

# Генерация данных и применение функции отображения
map_output = list(map(MAP, RECORDREADER(3)))
print(map_output)

# Группировка данных по ключу
shuffled_output = group_by_key(map_output)
print(shuffled_output)

# Применение редукции и вывод результата
result = list(flatten(map(lambda pair: REDUCE(*pair), shuffled_output)))
print(result)


[(1, 5), (1, 1), (1, 3)]
[(1, [5, 1, 3])]
[('AVG', 3.0)]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [None]:
from typing import Iterator
import numpy as np

# Исходные документы
d1 = """
what is it
what is it"""
d2 = """
it is what it is
it is what it is
it is what it is"""
d3 = """
it is a banana"""

documents = [d1, d2, d3, d1, d2, d3]

# Количество мапперов и редьюсеров
maps = 3
reducers = 2

# Функция входного формата
def INPUTFORMAT():
    global maps

    def RECORDREADER(split):
        for docid, document in enumerate(split):
            for lineid, line in enumerate(document.split('\n')):
                yield ("{}:{}".format(docid, lineid), line)

    split_size = int(np.ceil(len(documents) / maps))
    for i in range(0, len(documents), split_size):
        yield RECORDREADER(documents[i:i + split_size])

# Функция отображения (Map) - разбивает строки на слова и создает пары (слово, слово)
def MAP(docId: str, line: str):
    for word in line.split(" "):
        yield (word, word)

# Функция редукции (Reduce) - возвращает ключ (уникальные слова)
def REDUCE(key: str, value: Iterator[str]):
    yield key

# Запуск распределенного MapReduce
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
print(partitioned_output)


56 key-value pairs were sent over a network.
[(0, ['', 'is', 'it']), (1, ['a', 'banana', 'what'])]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [None]:
from collections import defaultdict
from typing import Iterator, NamedTuple

# Функция Map для отбора строк, содержащих слово "female"
def map_select(_: None, row: NamedTuple) -> Iterator[tuple[NamedTuple, NamedTuple]]:
    if row.gender == "female":
        yield (row, row)

# Функция Reduce для группировки выбранных строк
def reduce_select(row: NamedTuple, rows: Iterator[NamedTuple]) -> Iterator[tuple[NamedTuple, list[NamedTuple]]]:
    yield (row, list(rows))

# Функция для чтения записей из коллекции данных
def record_reader() -> list[tuple[int, NamedTuple]]:
    return [(entry.id, entry) for entry in input_collection]  # input_collection должен быть определен ранее

# Функция для группировки данных по ключу
def group_by_key(iterable: list[tuple[NamedTuple, NamedTuple]]) -> list[tuple[NamedTuple, list[NamedTuple]]]:
    grouped_data = defaultdict(list)
    for key, value in iterable:
        grouped_data[key].append(value)
    return list(grouped_data.items())

# Функция для распаковки вложенных списков
def flatten(nested_list: list[list]) -> list:
    return [item for sublist in nested_list for item in sublist]

# Основная функция MapReduce
def map_reduce(data: list[tuple[int, NamedTuple]]) -> list[tuple[NamedTuple, list[NamedTuple]]]:
    # Шаг Map: отбираем строки по критерию
    mapped_data = list(flatten(map(lambda x: list(map_select(None, x[1])), data)))

    # Шаг Shuffle: группируем по ключу
    shuffled_data = group_by_key(mapped_data)

    # Шаг Reduce: собираем итоговые группы
    reduced_data = list(flatten(map(lambda x: list(reduce_select(x[0], iter(x[1]))), shuffled_data)))

    return reduced_data

# Запуск процесса MapReduce
output = map_reduce(record_reader())
print(output)


[(User(id=1, age=25, social_contacts=240, gender='female'), [User(id=1, age=25, social_contacts=240, gender='female')]), (User(id=2, age=25, social_contacts=500, gender='female'), [User(id=2, age=25, social_contacts=500, gender='female')]), (User(id=3, age=33, social_contacts=800, gender='female'), [User(id=3, age=33, social_contacts=800, gender='female')])]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [None]:
from collections import defaultdict
from typing import Iterator, NamedTuple

# Исходные данные
input_collection_2 = [
    dict(id=10, age=45, gender="male", social_contacts=15),
    dict(id=11, age=34, gender="female", social_contacts=320),
    dict(id=12, age=29, gender="undefined", social_contacts=512),
    dict(id=13, age=19, gender="male", social_contacts=89),
    dict(id=14, age=40, gender="undefined", social_contacts=44),
    dict(id=15, age=25, gender="female", social_contacts=276),
]

# Функция Map: фильтрует данные по полу и удаляет поле gender у "undefined"
def MAP_PROJECTION(_, row: dict):
    valid_genders = ["male", "female"]
    if row["gender"] in valid_genders:
        yield (row["id"], row)
    else:
        modified_row = row.copy()
        del modified_row["gender"]  # Удаляем поле gender
        yield (modified_row["id"], modified_row)

# Функция Reduce: группировка данных по идентификатору
def REDUCE_PROJECTION(row: str, rows: Iterator[NamedTuple]):
    yield (row, list(rows))

# Функция чтения данных
def RECORDREADER():
    return [(u["id"], u) for u in input_collection_2]

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP_PROJECTION, REDUCE_PROJECTION)
output = list(output)
output


[(10, [{'id': 10, 'age': 45, 'gender': 'male', 'social_contacts': 15}]),
 (11, [{'id': 11, 'age': 34, 'gender': 'female', 'social_contacts': 320}]),
 (12, [{'id': 12, 'age': 29, 'social_contacts': 512}]),
 (13, [{'id': 13, 'age': 19, 'gender': 'male', 'social_contacts': 89}]),
 (14, [{'id': 14, 'age': 40, 'social_contacts': 44}]),
 (15, [{'id': 15, 'age': 25, 'gender': 'female', 'social_contacts': 276}])]

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [None]:
from collections import defaultdict
from typing import Iterator, NamedTuple

# Исходные данные (обновленные)
input_collection_a = [
    dict(id=101, age=50, gender="male", social_contacts=200),
    dict(id=102, age=23, gender="female", social_contacts=150),
    dict(id=103, age=35, gender="male", social_contacts=80),
    dict(id=104, age=27, gender="male", social_contacts=500),
    dict(id=105, age=42, gender="female", social_contacts=300),
]

input_collection_b = [
    dict(id=105, age=39, gender="female", social_contacts=275),
    dict(id=106, age=65, gender="male", social_contacts=4500),
    dict(id=107, age=30, gender="female", social_contacts=50),
]

# Функция Map: группирует пользователей по id
def MAP_UNION(_, row: dict):
    yield (row["id"], row)

# Функция Reduce: объединяет записи по id
# Выходное значение равно (t, t) независимо от присутствия пользователя в одной или обеих выборках
def REDUCE_UNION(row: str, rows: Iterator[NamedTuple]):
    row_list = list(rows)
    yield (row_list[0], row_list[0])

# Функция чтения данных
def RECORDREADER():
    return [(u["id"], u) for u in input_collection_a + input_collection_b]

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP_UNION, REDUCE_UNION)
output = list(output)
output


[({'id': 101, 'age': 50, 'gender': 'male', 'social_contacts': 200},
  {'id': 101, 'age': 50, 'gender': 'male', 'social_contacts': 200}),
 ({'id': 102, 'age': 23, 'gender': 'female', 'social_contacts': 150},
  {'id': 102, 'age': 23, 'gender': 'female', 'social_contacts': 150}),
 ({'id': 103, 'age': 35, 'gender': 'male', 'social_contacts': 80},
  {'id': 103, 'age': 35, 'gender': 'male', 'social_contacts': 80}),
 ({'id': 104, 'age': 27, 'gender': 'male', 'social_contacts': 500},
  {'id': 104, 'age': 27, 'gender': 'male', 'social_contacts': 500}),
 ({'id': 105, 'age': 42, 'gender': 'female', 'social_contacts': 300},
  {'id': 105, 'age': 42, 'gender': 'female', 'social_contacts': 300}),
 ({'id': 106, 'age': 65, 'gender': 'male', 'social_contacts': 4500},
  {'id': 106, 'age': 65, 'gender': 'male', 'social_contacts': 4500}),
 ({'id': 107, 'age': 30, 'gender': 'female', 'social_contacts': 50},
  {'id': 107, 'age': 30, 'gender': 'female', 'social_contacts': 50})]

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [None]:
from collections import defaultdict
from typing import Iterator, NamedTuple

# Исходные данные (обновленные)
input_collection_a = [
    dict(id=101, age=50, gender="male", social_contacts=200),
    dict(id=102, age=23, gender="female", social_contacts=150),
    dict(id=103, age=35, gender="male", social_contacts=80),
    dict(id=104, age=27, gender="male", social_contacts=500),
    dict(id=105, age=42, gender="female", social_contacts=300),
]

input_collection_b = [
    dict(id=105, age=39, gender="female", social_contacts=275),
    dict(id=106, age=65, gender="male", social_contacts=4500),
    dict(id=107, age=30, gender="female", social_contacts=50),
]

# Функция Map: группирует пользователей по id
def MAP_INTERSECTION(_, row: dict):
    yield (row["id"], row)

# Функция Reduce: выбирает только тех пользователей, которые есть в обоих списках
def REDUCE_INTERSECTION(row_id: int, rows: Iterator[NamedTuple]):
    row_list = list(rows)
    if len(row_list) == 2:
        yield tuple(row_list)

# Функция чтения данных
def RECORDREADER():
    return [(u["id"], u) for u in input_collection_a + input_collection_b]

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP_INTERSECTION, REDUCE_INTERSECTION)
output = list(output)
output


[({'id': 105, 'age': 42, 'gender': 'female', 'social_contacts': 300},
  {'id': 105, 'age': 39, 'gender': 'female', 'social_contacts': 275})]

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [None]:
from collections import defaultdict
from typing import Iterator, NamedTuple

# Исходные данные (обновленные)
input_collection_a = [
    dict(id=101, age=50, gender="male", social_contacts=200),
    dict(id=102, age=23, gender="female", social_contacts=150),
    dict(id=103, age=35, gender="male", social_contacts=80),
    dict(id=104, age=27, gender="male", social_contacts=500),
    dict(id=105, age=42, gender="female", social_contacts=300),
]

input_collection_b = [
    dict(id=105, age=39, gender="female", social_contacts=275),
    dict(id=106, age=65, gender="male", social_contacts=4500),
    dict(id=107, age=30, gender="female", social_contacts=50),
]

# Функция Map: сопоставляет пользователей с номером коллекции
def MAP_DIFFERENCE(collection_id, user):
    yield (user["id"], collection_id)

# Функция Reduce: выбирает пользователей, которые есть только в первой выборке
def REDUCE_DIFFERENCE(user_id, collections):
    collection_list = list(collections)
    if collection_list == [0]:
        yield next(user for user in input_collection_a if user["id"] == user_id)

# Функция чтения данных (добавляем идентификатор выборки)
def RECORDREADER():
    return [(0, a) for a in input_collection_a] + [(1, b) for b in input_collection_b]

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP_DIFFERENCE, REDUCE_DIFFERENCE)
output = list(output)
output

[{'id': 101, 'age': 50, 'gender': 'male', 'social_contacts': 200},
 {'id': 102, 'age': 23, 'gender': 'female', 'social_contacts': 150},
 {'id': 103, 'age': 35, 'gender': 'male', 'social_contacts': 80},
 {'id': 104, 'age': 27, 'gender': 'male', 'social_contacts': 500}]

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [None]:
from collections import defaultdict
from typing import Iterator, NamedTuple

# Определение структуры пользователя
class User(NamedTuple):
    id: int
    age: int
    gender: str
    social_contacts: int
    city_id: int

# Коллекция пользователей
users_collection = [
    User(id=1, age=45, gender="male", social_contacts=100, city_id=1),
    User(id=2, age=25, gender="female", social_contacts=200, city_id=2),
    User(id=3, age=30, gender="male", social_contacts=150, city_id=2),
    User(id=4, age=20, gender="male", social_contacts=300, city_id=3),
    User(id=5, age=50, gender="female", social_contacts=250, city_id=4),
]

# Определение структуры города
class City(NamedTuple):
    id: int
    name: str

# Коллекция городов
cities_collection = [
    City(id=1, name="New York"),
    City(id=2, name="Los Angeles"),
    City(id=3, name="Chicago"),
    City(id=4, name="Houston"),
    City(id=5, name="Phoenix"),
]

# Функция MAP_JOIN сопоставляет пользователей и города по city_id
def MAP_JOIN(city_id, row):
    yield (city_id, row)

# Функция REDUCE_JOIN объединяет пользователей с их городами
def REDUCE_JOIN(city_id, rows):
    users = []
    city = None

    # Разделяем данные: определяем пользователей и город
    for row in rows:
        if isinstance(row, User):
            users.append(row)
        else:
            city = row

    # Объединяем пользователей с городом
    for user in users:
        yield (user, city)

# Функция RECORDREADER собирает данные для обработки
def RECORDREADER():
    return [(user.city_id, user) for user in users_collection] + [
        (city.id, city) for city in cities_collection
    ]

# Выполнение MapReduce
output = MapReduce(RECORDREADER, MAP_JOIN, REDUCE_JOIN)
output = list(output)

# Вывод объединенных данных
for user, city in output:
    print(f"User {user.id} ({user.gender}, {user.age} y.o.) from {city.name}, Social Contacts: {user.social_contacts}")


User 1 (male, 45 y.o.) from New York, Social Contacts: 100
User 2 (female, 25 y.o.) from Los Angeles, Social Contacts: 200
User 3 (male, 30 y.o.) from Los Angeles, Social Contacts: 150
User 4 (male, 20 y.o.) from Chicago, Social Contacts: 300
User 5 (female, 50 y.o.) from Houston, Social Contacts: 250


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [None]:
# Готовим объединенные данные (users_collection + cities_collection)
joined_data = [
    (user.city_id, user, next(city for city in cities_collection if city.id == user.city_id))
    for user in users_collection
]

# Функция MAP_GROUP: группирует пользователей по городу
def MAP_GROUP(city_id, user, city):
    yield (city_id, user)

# Функция REDUCE_GROUP: считает количество пользователей в каждом городе
def REDUCE_GROUP(city_id, rows):
    user_count = sum(1 for _ in rows)  # Подсчет количества пользователей
    yield f"City with id={city_id} has {user_count} user(s)"

# Функция RECORDREADER: подготавливает данные из объединенного набора
def RECORDREADER():
    return [(city_id, user, city) for city_id, user, city in joined_data]

# Выполнение MapReduce
output = MapReduce(RECORDREADER, MAP_GROUP, REDUCE_GROUP)
output = list(output)

# Вывод результатов
for result in output:
    print(result)


City with id=1 has 1 user(s)
City with id=2 has 2 user(s)
City with id=3 has 1 user(s)
City with id=4 has 1 user(s)


#

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [None]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [None]:
import numpy as np

I = 2
J = 3
K = 2 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)


def record_reader():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])


def mapper(k1, v1):
    j, k = k1
    w = v1

    for i in range(I):
        k2 = (i, k)
        v2 = small_mat[i, j] * w
        yield (k2, v2)


def reducer(key, values):
    i, k = key

    k3 = (i, k)

    v3 = sum(values)  # Используем встроенную функцию sum()

    yield (k3, v3)


def group_by_key(items):
    groups = {}
    for key, value in items:
        if key not in groups:
            groups[key] = []
        groups[key].append(value)
    return groups.items()


def flatten(list_of_lists):
    return [item for sublist in list_of_lists for item in sublist]


# Map phase
map_output = list(flatten(map(lambda item: list(mapper(item[0], item[1])), record_reader())))

# Shuffle phase
shuffle_output = list(group_by_key(map_output))

# Reduce phase
reduce_output = list(flatten(map(lambda item: list(reducer(item[0], item[1])), shuffle_output)))

# Create result matrix
result_matrix = np.zeros((I, K))
for (i, k), v3 in reduce_output:
    result_matrix[i, k] = v3

print(result_matrix)

[[0.25957597 0.60542209 0.30375568 0.15146427 0.06601426 0.33869877
  0.66559417 0.47387563 0.58981713 0.62016206 0.25209548 0.16989033
  0.07349851 0.14975212 0.12235994 0.48272147 0.55422638 0.7021007
  0.66389154 0.1661727 ]
 [0.69667733 0.84780583 0.51557569 0.27865386 0.27043745 0.66584971
  1.16242397 1.04813269 1.10254335 0.730176   0.45075421 0.471518
  0.26757373 0.40080417 0.47899779 0.89692968 0.80790844 1.17073135
  0.92595263 0.46279965]]


Проверьте своё решение

In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(record_reader, mapper, reducer)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [None]:
reduce_output = list(MapReduce(record_reader, mapper, reducer))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [None]:
import numpy as np

I = 2
J = 3
K = 2 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def record_reader():
    """Генерация пар элементов матриц для MapReduce."""
    for i in range(I):
        for j in range(J):
            for k in range(K):
                yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

def mapper(element1, element2):
    """Функция Map: перемножение элементов и генерация пар (ключ, значение)."""
    (i, j), v1 = element1
    (j, k), v2 = element2
    yield ((i, k), v1 * v2)

def reducer(key, values):
    """Функция Reduce: суммирование произведений для получения элемента матрицы."""
    i, k = key
    yield ((i, k), sum(values))  # Используем встроенную функцию sum()

def group_by_key(items):
    """Группировка элементов по ключу."""
    groups = {}
    for key, value in items:
        if key not in groups:
            groups[key] = []
        groups[key].append(value)
    return groups.items()

def flatten(list_of_lists):
    """Функция для "расплющивания" списка списков."""
    return [item for sublist in list_of_lists for item in sublist]

def map_reduce():
    """Функция для запуска MapReduce."""
    # Map phase
    map_output = list(flatten(map(lambda item: list(mapper(item[0], item[1])), record_reader())))

    # Shuffle phase
    shuffle_output = list(group_by_key(map_output))

    # Reduce phase
    reduce_output = list(flatten(map(lambda item: list(reducer(item[0], item[1])), shuffle_output)))

    # Создание результирующей матрицы
    result_matrix = np.zeros((I, K))
    for (i, k), v3 in reduce_output:
        result_matrix[i, k] = v3
    return result_matrix

# Запуск MapReduce и вывод результата
result_matrix = map_reduce()
print(result_matrix)

[[0.51882168 0.48776475 0.22051046 0.41309776 0.18489695 0.42828332
  0.27010834 0.45731299 0.47079521 0.4322203  0.34910875 0.27672734
  0.46484685 0.50064569 0.34245799 0.51364973 0.41301552 0.29721517
  0.45704712 0.16250683]
 [1.14576629 0.98884563 0.36494206 0.83840648 0.54456904 0.8729619
  0.52396695 0.88735779 0.95533658 0.77256422 0.70093297 0.62511793
  0.8188235  0.95530588 0.7624936  0.98856169 0.93854463 0.36720571
  0.9317227  0.61586449]]


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [None]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)
maps = 2
reducers = 2

def input_format():
    """Генерация данных для MapReduce с разделением на узлы."""
    split_size = int(np.ceil(I / maps))
    for i in range(0, I, split_size):
        yield record_reader(range(i, min(i + split_size, I)))  # Ограничиваем верхнюю границу

def record_reader(i_range):
    """Генерация пар элементов матриц для MapReduce."""
    for i in i_range:
        for j in range(J):
            for k in range(K):
                yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

def mapper(element1, element2):
    """Функция Map: перемножение элементов и генерация пар (ключ, значение)."""
    (i, j), v1 = element1
    (j, k), v2 = element2
    yield ((i, k), v1 * v2)

def reducer(key, values):
    """Функция Reduce: суммирование произведений для получения элемента матрицы."""
    i, k = key
    yield ((i, k), sum(values))  # Используем встроенную функцию sum()

def group_by_key(items):
    """Группировка элементов по ключу."""
    groups = {}
    for key, value in items:
        if key not in groups:
            groups[key] = []
        groups[key].append(value)
    return groups.items()

def flatten(list_of_lists):
    """Функция для "расплющивания" списка списков."""
    return [item for sublist in list_of_lists for item in sublist]

def map_reduce_distributed(input_format_func, map_func, reduce_func, combiner_func=None):
    """Функция для запуска распределенного MapReduce."""
    partitioned_output = []
    for partition_id, record_reader_instance in enumerate(input_format_func()):
        map_output = list(flatten(map(lambda item: list(map_func(item[0], item[1])), record_reader_instance)))

        if combiner_func:
            shuffle_output = list(group_by_key(map_output))
            combined_output = list(flatten(map(lambda item: list(combiner_func(item[0], item[1])), shuffle_output)))
            partitioned_output.append((partition_id, combined_output))
        else:
            partitioned_output.append((partition_id, map_output))  # Не используем combiner

    # Shuffle phase (имитация для распределенной обработки)
    all_map_output = list(flatten([partition[1] for partition in partitioned_output]))
    shuffle_output = list(group_by_key(all_map_output))

    reduce_output = list(flatten(map(lambda item: list(reduce_func(item[0], item[1])), shuffle_output)))

    return reduce_output


# Запуск MapReduce и вывод результата
partitioned_output = map_reduce_distributed(input_format, mapper, reducer)

result_matrix = np.zeros((I, K))
for (i, k), v3 in partitioned_output:
    result_matrix[i, k] = v3

print(result_matrix)
# np.allclose(reference_solution, result_matrix)  # Раскомментируйте для проверки, если reference_solution определен

[[0.16150383 0.43036958 0.55106414 0.30310462 0.14225387 0.72768905
  0.41367753 0.92533078 0.46181544 1.10530562 0.533248   1.10562692
  0.7836155  1.11172309 0.23129407 0.9537107  0.52783614 0.87434615
  0.82557539 0.4675654  0.36765892 0.66681253 0.81818434 1.22260025
  0.90499732 0.92077103 0.99543778 0.4727498  0.72729067 0.13874458
  0.96312485 0.75862489 0.83743455 0.2763682  0.87778914 0.49581093
  0.90770596 0.93043768 0.86518936 0.9772203 ]
 [0.19096835 0.52868689 0.68946905 0.76540498 0.32484028 0.96884059
  1.06474605 1.29855507 0.55611016 1.09968373 0.84561197 1.19172161
  1.16082916 1.48587422 0.53385601 0.93213836 0.63049438 0.98396795
  1.39500335 1.0068992  0.81292114 0.93959187 0.99272255 1.20183567
  1.067778   1.10657626 1.32838243 0.79175819 0.75639767 0.29571478
  1.21635452 1.21940647 1.14764172 0.2966223  1.08474408 0.58468024
  1.29999478 1.37126907 1.31546861 1.56595769]]


Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [None]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)
reference_solution = np.matmul(small_mat, big_mat)
maps = 3
reducers = 2


def input_format():
    """Генерация данных для MapReduce с разделением на узлы."""
    first_mat = []
    for i in range(small_mat.shape[0]):
        for j in range(small_mat.shape[1]):
            first_mat.append(((0, i, j), small_mat[i, j]))  # Первая матрица

    split_size = int(np.ceil(len(first_mat) / maps))
    for i in range(0, len(first_mat), split_size):
        yield first_mat[i:i + split_size]

    second_mat = []
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            second_mat.append(((1, j, k), big_mat[j, k]))  # Вторая матрица

    split_size = int(np.ceil(len(second_mat) / maps))
    for i in range(0, len(second_mat), split_size):
        yield second_mat[i:i + split_size]


def map_join(k1, v1):
    """Функция Map для соединения матриц."""
    mat_num, i, j = k1
    w = v1
    if mat_num == 0:
        yield (j, (mat_num, i, w))
    else:
        yield (i, (mat_num, j, w))


def reduce_join(key, values):
    """Функция Reduce для соединения матриц."""
    from_first_mat = [v for v in values if v[0] == 0]
    from_second_mat = [v for v in values if v[0] == 1]
    for f in from_first_mat:
        for s in from_second_mat:
            yield ((f[1], s[1]), f[2] * s[2])


def map_mul(k1, v1):
    """Функция Map для умножения."""
    yield (k1, v1)


def reduce_mul(key, values):
    """Функция Reduce для умножения."""
    yield (key, sum(values))


def group_by_key(items):
    """Группировка элементов по ключу."""
    groups = {}
    for key, value in items:
        if key not in groups:
            groups[key] = []
        groups[key].append(value)
    return groups.items()


def flatten(list_of_lists):
    """Функция для "расплющивания" списка списков."""
    return [item for sublist in list_of_lists for item in sublist]


def map_reduce_distributed(input_format_func, map_func, reduce_func, combiner_func=None):
    """Функция для запуска распределенного MapReduce."""
    partitioned_output = []
    for partition_id, data_partition in enumerate(input_format_func()):
        map_output = list(flatten(map(lambda item: list(map_func(item[0], item[1])), data_partition)))

        if combiner_func:
            shuffle_output = list(group_by_key(map_output))
            combined_output = list(flatten(map(lambda item: list(combiner_func(item[0], item[1])), shuffle_output)))
            partitioned_output.append((partition_id, combined_output))
        else:
            partitioned_output.append((partition_id, map_output))

    # Shuffle phase (имитация для распределенной обработки)
    all_map_output = list(flatten([partition[1] for partition in partitioned_output]))
    shuffle_output = list(group_by_key(all_map_output))

    reduce_output = list(flatten(map(lambda item: list(reduce_func(item[0], item[1])), shuffle_output)))

    return reduce_output


# Запуск MapReduce и вывод результата
joined_data = map_reduce_distributed(input_format, map_join, reduce_join)
mul_output = map_reduce_distributed(lambda: [joined_data], map_mul, reduce_mul) # Передаем joined_data как входные данные

result = {}
for k, v in mul_output:
    result[k] = v

result_matrix = np.zeros((I, K))
for (i, k), v in result.items():
    result_matrix[i, k] = v

print(result_matrix)
print(np.allclose(reference_solution, result_matrix))

[[0.7626427  0.53040791 0.80305148 0.92657528 0.28768083 0.93808842
  0.42503866 0.5917943  0.64270552 1.20450879 1.37169996 0.36882698
  0.97869383 0.93991269 0.58508511 1.03941925 0.35775455 1.0428118
  0.5911072  0.79745878 0.76735568 0.65551232 0.63727241 1.11297087
  0.62065019 0.44366153 0.26136054 0.49640792 0.75324228 0.18503575
  0.40857195 0.17825842 1.08806206 0.70189171 1.3221965  1.2451538
  0.84305541 0.54405127 1.01334847 0.53356929]
 [0.62276253 0.38295977 0.34198775 0.57472228 0.21032484 0.56626087
  0.29301776 0.21730583 0.47661489 0.77648783 0.89408016 0.11333013
  0.71738335 0.68550657 0.37112358 0.76521653 0.15667554 0.62188232
  0.31097048 0.57097491 0.54822801 0.36806249 0.4060519  0.64118229
  0.31433953 0.29200433 0.11191645 0.24623611 0.59936408 0.1144166
  0.14363589 0.06579956 0.83876799 0.51303067 0.89169877 0.73207859
  0.50232306 0.3056413  0.51982977 0.17513977]]
True
