# Введение в MapReduce модель на Python


In [3]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [4]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [5]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [6]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [7]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [8]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [10]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [11]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [12]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [13]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [14]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. 

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [15]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*
 
mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL 

In [16]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str
    
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)
 
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication 

In [17]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])
 
def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])
      
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(2.9113572290351506)),
 (1, np.float64(2.9113572290351506)),
 (2, np.float64(2.9113572290351506)),
 (3, np.float64(2.9113572290351506)),
 (4, np.float64(2.9113572290351506))]

## Inverted index 

In [18]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)
      
def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)
 
def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [19]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [20]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()
      
def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]
 
def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers
  
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)
  
  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*
 
flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount 

In [21]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)
      
  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)
  
# try to set COMBINER=REDUCER and look at the number of values sent over the network 
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None) 
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('it', 18), ('what', 10)]),
 (1, [('banana', 2), ('is', 18)])]

## TeraSort

In [22]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for value in split:
        yield (value, None)
      
  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])
    
def MAP(value:int, _):
  yield (value, None)
  
def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)
  
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.07478367368734973)),
   (None, np.float64(0.11337306177959283)),
   (None, np.float64(0.11662826676369475)),
   (None, np.float64(0.11968667840103009)),
   (None, np.float64(0.15038177472421244)),
   (None, np.float64(0.24607903695813182)),
   (None, np.float64(0.26913885281189054)),
   (None, np.float64(0.3368293833024293)),
   (None, np.float64(0.3610099933195998)),
   (None, np.float64(0.3908962891705128)),
   (None, np.float64(0.4270694002267995)),
   (None, np.float64(0.4301817015929481)),
   (None, np.float64(0.431075661588334)),
   (None, np.float64(0.4445609092790369)),
   (None, np.float64(0.4483134201929728)),
   (None, np.float64(0.48066840474872097)),
   (None, np.float64(0.491452396172673)),
   (None, np.float64(0.4985938035088059)),
   (None, np.float64(0.4991432228114192))]),
 (1,
  [(None, np.float64(0.7367095656758074)),
   (None, np.float64(0.7484499909924545)),
   (None, np.float64(0.7569797952848971)),
   (None, np.float64(0.7619052116070

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [29]:
import numpy as np

# Функция, которая генерирует пары (ключ, значение) из входных чисел
def RECORDREADER():
    for value in input_values:
        yield None, value  # Используем постоянный ключ (None) для объединения значений
        
def MAP(_, value):
    """Сопоставляет каждое число с фиксированным ключом (None)."""
    yield None, value

def REDUCE(_, values):
    """Находит максимальное значение из списка чисел."""
    yield None, max(values)

input_values = np.random.rand(30)
max_value_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(max(input_values), max_value_output[0][1], max(input_values) == max_value_output[0][1])

0.9961336836376687 0.9961336836376687 True


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [None]:
import numpy as np

# Функция, которая генерирует пары (ключ, значение) из входных чисел
def RECORDREADER():
    for value in input_values:
        yield None, value  # Используем постоянный ключ (None) для объединения значений
        
def MAP(_, value):
    """Сопоставляет каждое число с фиксированным ключом (None)."""
    yield None, value

def REDUCE(_, values):
    """Находит среднее значение из списка чисел."""
    yield None, sum(values)/len(values)

input_values = np.random.rand(30)
avg_value_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(np.average(input_values), max_value_output[0][1], np.average(input_values) == avg_value_output[0][1])

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [32]:
def groupByKey(data_iterable):
    """
    Группирует элементы (key, value) по ключу, используя сортировку.
    """
    # Сортируем входные данные по ключу (первый элемент каждой пары)
    sorted_data = sorted(data_iterable, key=lambda item: item[0])
    
    # Инициализируем список для хранения сгруппированных результатов
    categorized_result = []

    # Устанавливаем первый ключ и создаём пустую коллекцию для значений
    current_identifier, current_collection = sorted_data[0][0], []
    
    # Проходим по отсортированным данным
    for identifier, value in sorted_data:
        # Если текущий ключ совпадает с предыдущим, добавляем значение в коллекцию
        if identifier == current_identifier:
            current_collection.append(value)  # Добавляем значение в текущую коллекцию
        else:
            # Если ключ изменился, сохраняем текущую коллекцию в результат
            categorized_result.append((current_identifier, current_collection))
            # Обновляем текущий ключ и начинаем новую коллекцию с новым значением
            current_identifier, current_collection = identifier, [value]

    # Добавляем последнюю коллекцию в результат после завершения цикла
    categorized_result.append((current_identifier, current_collection))
    
    return categorized_result

# Тестовый пример
test_input = [
    ("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5), ("d", 6)
]

output = groupByKey(test_input)
output 


[('a', [1, 4]), ('b', [2, 5]), ('c', [3]), ('d', [6])]

### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [41]:
def INPUT_FORMAT():
    global num_maps
    input_items = ["python", "c++", "python", "java", "python", "golang"]

    def RECORD_READER(chunk):
        for item in chunk:
            yield item, None  # Формат (элемент, None)

    chunk_size = max(1, len(input_items) // num_maps)
    for i in range(0, len(input_items), chunk_size):
        yield RECORD_READER(input_items[i:i + chunk_size])

def MAP_FUNCTION(item, _):
    yield item, None  # Просто передаем ключ, без значения

def PARTITION_FUNCTION(key):
    global num_reducers
    return hash(key) % num_reducers  # Определяет, какой редьюсер будет обрабатывать ключ

def REDUCE_FUNCTION(item, _):
    yield item, None  # Возвращаем только одно значение

num_maps = 2  # Число мапперов
num_reducers = 2  # Число редьюсеров
unique_items_output = MapReduceDistributed(INPUT_FORMAT, MAP_FUNCTION, REDUCE_FUNCTION, PARTITION_FUNCTION)

unique_items_output = [item for (_, partition) in unique_items_output for (item, _) in partition]
unique_items_output


6 key-value pairs were sent over a network.


['java', 'c++', 'golang', 'python']

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [44]:
def C(pair):
    """Предикат: проверяет, делится ли сумма элементов кортежа на 2 без остатка."""
    return (pair[0] + pair[1]) % 2 == 0

def RECORDREADER():
    """Генерирует кортежи из набора данных."""
    data_set = [(0, 5), (2, 2), (1, 11)]
    for pair in data_set:
        yield pair, None  # Постоянный ключ для обработки MapReduce

def MAP(pair, _):
    """Применяет предикат к каждому элементу и передаёт в Reduce, если условие выполняется."""
    if C(pair):
        yield pair, pair  # Ключ и значение совпадают с отобранным элементом

def REDUCE(_, values):
    """Функция тождественности — возвращает полученные значения."""
    yield from values

result = list(MapReduce(RECORDREADER, MAP, REDUCE))
result


[(2, 2), (1, 11)]

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [45]:
def MAP_PROJECTION(_, entry: dict):
    """
    Функция MAP_PROJECTION обрабатывает каждую запись и проверяет значение поля 'gender'.
    Если значение 'gender' равно 'male' или 'female', запись остается без изменений.
    В противном случае поле 'gender' удаляется из записи.
    """
    GENDER_VALUES = ["male", "female"]
    if entry["gender"] in GENDER_VALUES:
        yield entry["id"], entry
    else:
        # Удаляем поле gender из полученного словаря
        updated_entry = entry.copy()
        del updated_entry["gender"]
        yield updated_entry["id"], updated_entry

def REDUCE_PROJECTION(key: str, entries: Iterator[NamedTuple]):
    """
    Функция REDUCE_PROJECTION объединяет все записи с одинаковым ключом.
    Она принимает ключ и итератор записей, и возвращает ключ и итератор записей.
    """
    yield (key, entries)

def RECORDREADER():
    """
    Функция RECORDREADER генерирует пары (id, запись) из входного набора данных.
    """
    return [(user["id"], user) for user in input_collection]

# Пример входного набора данных
input_collection = [
    dict(id=0, age=54, gender="male", social_contacts=20),
    dict(id=0, age=25, gender="female", social_contacts=240),
    dict(id=1, age=27, gender="undefined", social_contacts=642),
    dict(id=2, age=16, gender="male", social_contacts=123),
    dict(id=2, age=35, gender="undefined", social_contacts=247),
    dict(id=3, age=31, gender="male", social_contacts=521),
    dict(id=3, age=32, gender="female", social_contacts=753),
]

# Выполнение MapReduce и получение результата
output = MapReduce(RECORDREADER, MAP_PROJECTION, REDUCE_PROJECTION)
output = list(output)
output


[(0,
  [{'id': 0, 'age': 54, 'gender': 'male', 'social_contacts': 20},
   {'id': 0, 'age': 25, 'gender': 'female', 'social_contacts': 240}]),
 (1, [{'id': 1, 'age': 27, 'social_contacts': 642}]),
 (2,
  [{'id': 2, 'age': 16, 'gender': 'male', 'social_contacts': 123},
   {'id': 2, 'age': 35, 'social_contacts': 247}]),
 (3,
  [{'id': 3, 'age': 31, 'gender': 'male', 'social_contacts': 521},
   {'id': 3, 'age': 32, 'gender': 'female', 'social_contacts': 753}])]

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [49]:
# Группировка пользователей по идентификатору
def MAP_UNION(_, record: NamedTuple):
    yield (record.id, record)

# Выходное значение равно (record, record) независимо от присутствия пользователя в только одной или обеих выборках
def REDUCE_UNION(key: str, records: Iterator[NamedTuple]):
    yield (records[0], records[0])

def RECORDREADER():
    return [(user.id, user) for user in input_collection_a + input_collection_b]



input_collection_a = [
    User(id=1, age=54, gender="male", social_contacts=20),
    User(id=2, age=25, gender="female", social_contacts=240),
    User(id=3, age=27, gender="male", social_contacts=642),
    User(id=4, age=16, gender="male", social_contacts=123),
    User(id=5, age=35, gender="female", social_contacts=247),
]

input_collection_b = [
    User(id=5, age=35, gender="female", social_contacts=247),
    User(id=6, age=31, gender="male", social_contacts=521),
    User(id=7, age=32, gender="female", social_contacts=753),
]

output = MapReduce(RECORDREADER, MAP_UNION, REDUCE_UNION)
output = list(output)
output

[(User(id=1, age=54, social_contacts=20, gender='male'),
  User(id=1, age=54, social_contacts=20, gender='male')),
 (User(id=2, age=25, social_contacts=240, gender='female'),
  User(id=2, age=25, social_contacts=240, gender='female')),
 (User(id=3, age=27, social_contacts=642, gender='male'),
  User(id=3, age=27, social_contacts=642, gender='male')),
 (User(id=4, age=16, social_contacts=123, gender='male'),
  User(id=4, age=16, social_contacts=123, gender='male')),
 (User(id=5, age=35, social_contacts=247, gender='female'),
  User(id=5, age=35, social_contacts=247, gender='female')),
 (User(id=6, age=31, social_contacts=521, gender='male'),
  User(id=6, age=31, social_contacts=521, gender='male')),
 (User(id=7, age=32, social_contacts=753, gender='female'),
  User(id=7, age=32, social_contacts=753, gender='female'))]

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [51]:
# Группировка по пользователям
def MAP_INTERSECTION(_, record: NamedTuple):
    yield (record.id, record)

# Возвращает пользователя, только если он присутствует в обоих выборках
def REDUCE_INTERSECTION(user_id: int, records: Iterator[NamedTuple]):
    if len(records) == 2:
        yield records

def RECORDREADER():
    return [(user.id, user) for user in input_collection_a + input_collection_b]

output = MapReduce(RECORDREADER, MAP_INTERSECTION, REDUCE_INTERSECTION)
output = list(output)
output


[[User(id=5, age=35, social_contacts=247, gender='female'),
  User(id=5, age=35, social_contacts=247, gender='female')]]

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [55]:
# Группировка по пользователям
def MAP_DIFFERENCE(collection_id, user):
    yield user, collection_id

# Возвращает пользователей, входящих только в первую выборку (и не входящих во вторую)
def REDUCE_DIFFERENCE(user, collection_ids):
    if collection_ids == [0]:
        yield user

# На первом месте стоит номер выборки
def RECORDREADER():
    return [(0, item) for item in input_collection_a] + [(1, item) for item in input_collection_b]

output = MapReduce(RECORDREADER, MAP_DIFFERENCE, REDUCE_DIFFERENCE)
output = list(output)
output


[User(id=1, age=54, social_contacts=20, gender='male'),
 User(id=2, age=25, social_contacts=240, gender='female'),
 User(id=3, age=27, social_contacts=642, gender='male'),
 User(id=4, age=16, social_contacts=123, gender='male')]

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [59]:
def MAP(entry, dataset):
    """Формирует пары (key, (источник, значение)) для дальнейшего соединения."""
    if dataset == "R":
        first, key = entry
        yield (key, ("R", first))
    elif dataset == "S":
        key, second = entry
        yield (key, ("S", second))

def REDUCE(key, entries):
    """Создает пары (first, key, second) для каждого подходящего соединения."""
    r_entries = [first for src, first in entries if src == "R"]
    s_entries = [second for src, second in entries if src == "S"]

    for first in r_entries:
        for second in s_entries:
            yield (first, key, second)  # Оставляем key для наглядности

def RECORDREADER():
    """Данные для соединения: R(first, key) и S(key, second)."""
    dataset_R = [("A", 1), ("B", 2), ("C", 3)]  # (first, key)
    dataset_S = [(1, "apple"), (2, "banana"), (3, "cherry")]  # (key, second)
    return [(r, "R") for r in dataset_R] + [(s, "S") for s in dataset_S]

result = list(MapReduce(RECORDREADER, MAP, REDUCE))
result


[('A', 1, 'apple'), ('B', 2, 'banana'), ('C', 3, 'cherry')]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [60]:
from collections import defaultdict
from typing import Callable, List, Tuple

def MAP(key, value, label):
    """Формируем (ключ, значение для агрегации)."""
    yield (key, value)

def REDUCE(key, values, aggregation_function: Callable[[List[int]], int]):
    """Применяем агрегирующую функцию и выдаем (ключ, агрегированное значение)."""
    yield (key, aggregation_function(values))

def RECORDREADER():
    """Пример данных (ключ, значение, метка)."""
    return [(1, 10, "x"), (2, 20, "y"), (1, 15, "z"), (2, 25, "w"), (3, 30, "v")]

def shuffle_phase(mapped_data: List[Tuple[int, int]]) -> dict:
    """Группируем значения по ключу."""
    grouped_data = defaultdict(list)
    for key, value in mapped_data:
        grouped_data[key].append(value)
    return grouped_data

def MapReduce(recordreader, map_func, reduce_func, aggregation_function):
    """Общая MapReduce логика."""
    records = recordreader()

    # MAP
    mapped_data = []
    for record in records:
        mapped_data.extend(map_func(*record))  # Распаковываем кортежи

    # SHUFFLE
    grouped_data = shuffle_phase(mapped_data)

    # REDUCE
    reduced_data = []
    for key, values in grouped_data.items():
        reduced_data.extend(reduce_func(key, values, aggregation_function))

    return reduced_data

# Используем SUM
print("SUM Aggregation:", MapReduce(RECORDREADER, MAP, REDUCE, sum))

# Используем MAX
print("MAX Aggregation:", MapReduce(RECORDREADER, MAP, REDUCE, max))


SUM Aggregation: [(1, 25), (2, 45), (3, 30)]
MAX Aggregation: [(1, 15), (2, 25), (3, 30)]


# 

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [61]:
from typing import List, Tuple, Dict, NamedTuple
from collections import defaultdict

NUM_REDUCERS = 2  # Количество редукторов
CHUNK_SIZE = 2  # Размер чанка вектора

# Функция отображения для матрицы
def map_matrix_entry(entry: NamedTuple) -> Tuple[int, Tuple[str, NamedTuple]]:
    return entry.col % NUM_REDUCERS, ('M', entry)

# Функция отображения для вектора (разбитого на чанки)
def map_vector_chunk(chunk: List[NamedTuple]) -> List[Tuple[int, Tuple[str, NamedTuple]]]:
    return [(el.index % NUM_REDUCERS, ('V', el)) for el in chunk]

# Функция редукции (умножение части матрицы на соответствующую часть вектора)
def reduce_partial(reducer_id: int, entries: List[Tuple[str, NamedTuple]]) -> List[Tuple[int, float]]:
    matrix_rows, vector_elements = [], []
    for tag, record in entries:
        (matrix_rows if tag == 'M' else vector_elements).append(record)

    return [(row.row, row.value * el.value) for row in matrix_rows for el in vector_elements if row.col == el.index]

# Функция группировки значений по редукторам
def shuffle(mapped_data: List[Tuple[int, Tuple[str, NamedTuple]]]) -> Dict[int, List[Tuple[str, NamedTuple]]]:
    grouped = defaultdict(list)
    for reducer_id, pair in mapped_data:
        grouped[reducer_id].append(pair)
    return grouped

# Функция финальной агрегации (складывает частичные произведения)
def aggregate(partial_results: List[Tuple[int, float]]) -> Dict[int, float]:
    result = defaultdict(float)
    for row, value in partial_results:
        result[row] += value
    return result

# Чтение данных из матрицы
def read_matrix_data(matrix: List[NamedTuple]) -> List[Tuple[str, NamedTuple]]:
    return [('M', row) for row in matrix]

# Разбиение вектора на чанки
def read_vector_data(vector: List[NamedTuple]) -> List[List[NamedTuple]]:
    return [vector[i:i + CHUNK_SIZE] for i in range(0, len(vector), CHUNK_SIZE)]

# Основная функция MapReduce
def map_reduce(matrix, vector):
    mapped_matrix = [map_matrix_entry(row) for _, row in read_matrix_data(matrix)]
    mapped_vector = [pair for chunk in read_vector_data(vector) for pair in map_vector_chunk(chunk)]
    grouped = shuffle(mapped_matrix + mapped_vector)
    partial_results = [res for rid, vals in grouped.items() for res in reduce_partial(rid, vals)]
    return aggregate(partial_results)

from collections import namedtuple
MatrixRow = namedtuple('MatrixRow', ['row', 'col', 'value'])
VectorElement = namedtuple('VectorElement', ['index', 'value'])

matrix = [MatrixRow(0, 0, 1.0), MatrixRow(0, 1, 2.0), MatrixRow(1, 0, 3.0), MatrixRow(1, 1, 4.0)]
vector = [VectorElement(0, 0.5), VectorElement(1, 0.7)]

result = map_reduce(matrix, vector)
result


defaultdict(float, {0: 1.9, 1: 4.3})

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$. 





In [24]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [63]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)  # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J, K)

def RECORDREADER():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])

def MAP(k1, v1):
    (j, k) = k1
    w = v1
    for i in range(small_mat.shape[0]):
        yield ((i, k), (i, j, w * small_mat[i, j]))

def REDUCE(key, values):
    (i, k) = key
    result = 0
    for _, j, value in values:
        result += value
    yield (i, k), result

# Функция для выполнения MapReduce
def MapReduce(recordreader, map_func, reduce_func):
    records = list(recordreader())

    # MAP
    mapped_data = []
    for record in records:
        mapped_data.extend(map_func(*record))

    # SHUFFLE
    grouped_data = {}
    for key, value in mapped_data:
        if key not in grouped_data:
            grouped_data[key] = []
        grouped_data[key].append(value)

    # REDUCE
    reduced_data = []
    for key, values in grouped_data.items():
        reduced_data.extend(reduce_func(key, values))

    return reduced_data

# Выполнение MapReduce
result = MapReduce(RECORDREADER, MAP, REDUCE)

# Преобразование результата в матрицу
result_matrix = np.zeros((I, K))
for (i, k), value in result:
    result_matrix[i, k] = value

result_matrix


array([[0.5940656 , 0.57632383, 0.50068662, 0.3426824 , 0.75168325,
        0.59942184, 0.52126286, 0.65537871, 0.18481584, 0.47900151,
        0.63625734, 0.20758939, 0.73523918, 0.49225662, 0.85961744,
        0.32702271, 0.41265735, 0.51255185, 0.72744043, 0.77811964,
        0.05283796, 0.52122633, 0.7111616 , 0.44984309, 0.79228152,
        0.3112578 , 0.65097884, 0.76759501, 0.58286752, 0.77463723,
        0.47266747, 0.41849302, 0.27038973, 0.39651717, 0.59805269,
        0.35577872, 0.45099572, 0.43988042, 0.82004321, 0.90524963],
       [0.74171739, 0.83605232, 0.94003194, 0.42815025, 0.87091488,
        0.93863732, 0.88171698, 1.00882715, 0.51234033, 0.74928723,
        0.67774889, 0.38941465, 0.98955846, 0.75096482, 1.24327464,
        0.37521098, 0.66811013, 0.78961761, 1.23110604, 0.99082748,
        0.06539738, 0.96943347, 0.96228172, 0.74689559, 1.15969713,
        0.73740873, 0.92989213, 0.9109635 , 0.61349345, 0.96117657,
        0.70722263, 0.60201752, 0.45718379, 0.5

Проверьте своё решение

In [64]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat) 
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [65]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [73]:
K = 40

# Генерация случайных матриц
np.random.seed(42)  # Фиксируем seed для воспроизводимости результатов
matrix_A = np.random.rand(I, J)  # Матрица A размером (I x J)
matrix_B = np.random.rand(J, K)  # Матрица B размером (J x K)

# Эталонное решение с использованием стандартного перемножения матриц
reference_result = np.matmul(matrix_A, matrix_B)

# Функция, читающая данные обеих матриц
def DATA_SOURCE():
    # Чтение первой матрицы (A)
    for row in range(matrix_A.shape[0]):
        for col in range(matrix_A.shape[1]):
            yield ((0, row, col), matrix_A[row, col])  # Формат (0, row, col, A[row, col])

    # Чтение второй матрицы (B)
    for col in range(matrix_B.shape[0]):
        for idx in range(matrix_B.shape[1]):
            yield ((1, col, idx), matrix_B[col, idx])  # Формат (1, col, idx, B[col, idx])

# Функция MAP для соединения матриц по индексу col
def MAP_JOIN(key, value):
    mat_type, row_or_col, col_or_idx = key  # Извлекаем номер матрицы и индексы
    element = value

    if mat_type == 0:  # Если это элемент первой матрицы (A)
        yield (col_or_idx, (0, row_or_col, element))  # Группируем по col
    else:  # Если это элемент второй матрицы (B)
        yield (row_or_col, (1, col_or_idx, element))  # Группируем по col

# Функция REDUCE для соединения матриц по col
def REDUCE_JOIN(key, values):
    matrix_A_elements = [v for v in values if v[0] == 0]  # Элементы первой матрицы (A)
    matrix_B_elements = [v for v in values if v[0] == 1]  # Элементы второй матрицы (B)

    for a in matrix_A_elements:
        for b in matrix_B_elements:
            yield ((a[1], b[1]), a[2] * b[2])  # Умножение значений a[row, col] * b[col, idx]

# Функция MAP для финальной агрегации (суммирование произведений)
def MAP_MUL(key, value):
    yield (key, value)  # Оставляем ключ (row, idx) без изменений

# Функция REDUCE для суммирования произведений элементов
def REDUCE_MUL(key, values):
    yield (key, sum(values))  # Складываем все элементы для ключа (row, idx)

# Генерация объединённых значений через MapReduce
joined = MapReduce(DATA_SOURCE, MAP_JOIN, REDUCE_JOIN)

# Выполнение финального этапа MapReduce для получения результата
solution = MapReduce(lambda: joined, MAP_MUL, REDUCE_MUL)

# Проверка, совпадает ли результат с эталонным значением
print(np.allclose(reference_result, asmatrix(solution)))  # Должно вывести True


True


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER. 

In [76]:
import numpy as np
from collections import defaultdict
from typing import List, Tuple, Dict, NamedTuple

NUM_MAPPERS = 2
NUM_REDUCERS = 2
ROWS_A = 2
COLS_A = 3
COLS_B = 40  # K = 4 * 10

# Генерация данных
A = np.random.rand(ROWS_A, COLS_A)
B = np.random.rand(COLS_A, COLS_B)

# Эталонное решение с использованием стандартного перемножения матриц
reference_result = np.matmul(A, B)

def generate_chunks():
    """Создаёт части матриц для обработки в MAP."""
    chunk_size = int(np.ceil(ROWS_A / NUM_MAPPERS))
    for i_start in range(0, ROWS_A, chunk_size):
        yield process_chunk(range(i_start, min(i_start + chunk_size, ROWS_A)))

def process_chunk(rows):
    """Генерирует пары (координаты, значение) из двух матриц."""
    for i in rows:
        for j in range(COLS_A):
            for k in range(COLS_B):
                yield ((i, j), A[i, j]), ((j, k), B[j, k])

def map_function(pair_A, pair_B):
    """Умножает элементы и группирует по (i, k)."""
    (i, j), val_A = pair_A
    (j, k), val_B = pair_B
    yield (i, k), val_A * val_B

def reduce_function(index, values):
    """Суммирует элементы с одинаковыми индексами."""
    yield index, sum(values)

def MapReduceDistributed(chunk_generator, map_func, reduce_func):
    """Имитирует распределенное выполнение MapReduce."""
    intermediate_results = []

    # MAP
    for chunk in chunk_generator():
        mapped_data = []
        for pair_A, pair_B in chunk:
            mapped_data.extend(map_func(pair_A, pair_B))

        # SHUFFLE
        grouped_data = defaultdict(list)
        for key, value in mapped_data:
            grouped_data[key].append(value)

        # REDUCE
        reduced_data = []
        for key, values in grouped_data.items():
            reduced_data.extend(reduce_func(key, values))

        intermediate_results.append(reduced_data)

    return intermediate_results

# Запуск MapReduce
intermediate_results = MapReduceDistributed(generate_chunks, map_function, reduce_function)

# Объединение результатов
final_result = {}
for partition in intermediate_results:
    for index, value in partition:
        final_result[index] = final_result.get(index, 0) + value

# Преобразование результата в формат матрицы
result_matrix = np.zeros((ROWS_A, COLS_B))
for (i, k), value in final_result.items():
    result_matrix[i, k] = value


np.allclose(reference_result, result_matrix) 


True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [84]:
import numpy as np

# Устанавливаем размеры входных матриц
rows_x = 2
cols_x = 3
cols_y = 4 * 10

# Генерируем случайные матрицы
matrix_x = np.random.rand(rows_x, cols_x)
matrix_y = np.random.rand(cols_x, cols_y)

# Эталонный результат матричного умножения
reference_result = np.matmul(matrix_x, matrix_y)

# Функция для разворачивания вложенных списков

def unfold_nested(iterable):
    for sublist in iterable:
        for item in sublist:
            yield item

# Группировка элементов по ключу

def categorize_by_key(sequence):
    categorized = {}
    for (key, value) in sequence:
        categorized[key] = categorized.get(key, []) + [value]
    return categorized.items()

# Разбиение элементов по разделам для параллельной обработки

def parallel_partitioning(mapped_parts, partition_algo):
    global reducer_count
    partitions = [dict() for _ in range(reducer_count)]
    for mapped_part in mapped_parts:
        for (key, value) in mapped_part:
            partition = partitions[partition_algo(key)]
            partition[key] = partition.get(key, []) + [value]
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

# Функция разбиения ключей

def key_divider(identifier):
    global reducer_count
    return hash(identifier) % reducer_count

# Универсальная функция выполнения MapReduce

def execute_map_reduce(input_source, map_op, reduce_op, partition_algo=key_divider, combiner=None):
    mapped_parts = map(lambda reader: unfold_nested(map(lambda kv: map_op(*kv), reader)), input_source())
    
    if combiner is not None:
        mapped_parts = map(lambda mapped: unfold_nested(map(lambda kv: combiner(*kv), categorize_by_key(mapped))), mapped_parts)
    
    reduced_parts = parallel_partitioning(mapped_parts, partition_algo)
    
    reduced_output = map(lambda part: (part[0], unfold_nested(map(lambda group: reduce_op(*group), part[1]))), reduced_parts)
    
    return reduced_output

# Преобразование результата в матрицу

def assemble_matrix(reduce_result):
    reduce_result = list(reduce_result)
    row_count = max(i for ((i, k), value) in reduce_result) + 1
    col_count = max(k for ((i, k), value) in reduce_result) + 1
    output_matrix = np.empty(shape=(row_count, col_count))
    for ((i, k), value) in reduce_result:
        output_matrix[i, k] = value
    return output_matrix

# Формирование входных данных

def generate_input():
    data_x = []
    for i in range(matrix_x.shape[0]):
        for j in range(matrix_x.shape[1]):
            data_x.append(((0, i, j), matrix_x[i, j]))  # Первая матрица
    
    global mapper_count
    batch_size = int(np.ceil(len(data_x) / mapper_count))
    
    for i in range(0, len(data_x), batch_size):
        yield data_x[i:i + batch_size]
    
    data_y = []
    for j in range(matrix_y.shape[0]):
        for k in range(matrix_y.shape[1]):
            data_y.append(((1, j, k), matrix_y[j, k]))  # Вторая матрица
    
    batch_size = int(np.ceil(len(data_y) / mapper_count))
    
    for i in range(0, len(data_y), batch_size):
        yield data_y[i:i + batch_size]

# MAP-операция для соединения данных

def map_combine(key1, value1):
    (matrix_id, i, j) = key1
    element = value1
    
    if matrix_id == 0:
        yield (j, (matrix_id, i, element))
    else:
        yield (i, (matrix_id, j, element))

# REDUCE-операция для объединения данных

def reduce_combine(key, values):
    from_x = [v for v in values if v[0] == 0]
    from_y = [v for v in values if v[0] == 1]
    
    for x in from_x:
        for y in from_y:
            yield ((x[1], y[1]), x[2] * y[2])

# Генерация объединенных данных

def fetch_combined_data():
    for entry in combined_output:
        yield entry[1]

# MAP-операция для передачи данных

def map_multiply(key1, value1):
    yield (key1, value1)

# REDUCE-операция для суммирования значений

def reduce_multiply(key, values):
    yield (key, sum(values))

mapper_count = 3
reducer_count = 2

# Выполнение первой части MapReduce (объединение)
partitioned_data = execute_map_reduce(generate_input, map_combine, reduce_combine, combiner=None)
combined_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_data]

# Выполнение второй части MapReduce (умножение)
multiplication_result = execute_map_reduce(fetch_combined_data, map_multiply, reduce_multiply, combiner=None)
final_data = [(partition_id, list(partition)) for (partition_id, partition) in multiplication_result]

# Формирование окончательного результата
final_matrix_data = []

for partition in final_data:
    for value in partition[1]:
        final_matrix_data.append(value)

np.allclose(reference_result, assemble_matrix(final_matrix_data))


True