# Введение в MapReduce модель на Python


In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.7019695545376639),
 (1, 1.7019695545376639),
 (2, 1.7019695545376639),
 (3, 1.7019695545376639),
 (4, 1.7019695545376639)]

## Inverted index

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('is', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [65]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('what', 10)]),
 (1, [('it', 18)])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.011512808966720578),
   (None, 0.025436203922930445),
   (None, 0.03485513403483076),
   (None, 0.11403172317184629),
   (None, 0.1310800669143134),
   (None, 0.13422277834615426),
   (None, 0.16325189048069189),
   (None, 0.2674687178897228),
   (None, 0.2841955618969202),
   (None, 0.2863804769965731),
   (None, 0.29541854106102994),
   (None, 0.37361986566455496),
   (None, 0.38823208841768275),
   (None, 0.41431279018802014),
   (None, 0.4199915346102846),
   (None, 0.4844980109308441),
   (None, 0.49831900082407776)]),
 (1,
  [(None, 0.5125873124480756),
   (None, 0.5304725739569252),
   (None, 0.6060887888820583),
   (None, 0.6873411347037663),
   (None, 0.690943175085079),
   (None, 0.6927994447896815),
   (None, 0.7018938895046862),
   (None, 0.7084387616353934),
   (None, 0.7247286112815191),
   (None, 0.7406915030070437),
   (None, 0.9133322443216914),
   (None, 0.9323393419371324),
   (None, 0.9825656318758326)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [71]:
from typing import NamedTuple  # требует python 3.6+
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов.
def flatten(nested_iterable):
    # Проходим по каждому вложенному итерируемому объекту
    for iterable in nested_iterable:
        # Внутри каждого из них проходим по его элементам
        for element in iterable:
            yield element  # Возвращаем элементы по одному

# Функция для группировки элементов по ключу.
# В данном случае ключ - это значение, возвращаемое функцией MAP.
def groupbykey(iterable):
    t = {}  # Словарь для хранения групп по ключу
    for (k2, v2) in iterable:
        # Если ключ k2 уже существует в словаре, добавляем новое значение к существующему списку,
        # если нет - создаем новый список.
        t[k2] = t.get(k2, []) + [v2]
    # Возвращаем элементы словаря (ключ и список значений)
    return t.items()

# Основная функция MapReduce, которая применяет MAP, затем группирует по ключу и применяет REDUCE.
def MapReduce(RECORDREADER, MAP, REDUCE):
    # Шаг 1: Прочитать данные через RECORDREADER.
    # Шаг 2: Применить функцию MAP к данным. Это преобразует данные в пары (ключ, значение).
    # Шаг 3: Применить группировку по ключу с использованием groupbykey.
    # Шаг 4: Применить REDUCE ко всем сгруппированным данным.
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Исходный список чисел
input_collection = [1, 5, 3, 9, 2, 4, -23, 10, 7, 6]

# Переопределяем функцию RECORDREADER, чтобы она возвращала индексы и числа из input_collection
def RECORDREADER():
    return [(i, n) for i, n in enumerate(input_collection)]  # Создаем пару (индекс, число)

# Переопределяем функцию MAP, которая будет преобразовывать числа в пары (ключ, число),
# где ключ - это просто строка 'max', а число - это само число из списка.
def MAP(_, number):
    # Возвращаем пару (ключ 'max', значение - число)
    yield ('max', number)

# Переопределяем функцию REDUCE для нахождения максимального числа из переданных чисел.
def REDUCE(key: str, numbers: Iterator[int]):
    # Инициализируем переменную для хранения максимального числа,
    # начинаем с очень маленького значения, чтобы первое число точно стало максимальным.
    max_number = float('-inf')
    # Проходим по всем числам и находим максимальное.
    for number in numbers:
        if number > max_number:
            max_number = number
    # Возвращаем пару (ключ 'max', найденное максимальное число)
    yield (key, max_number)


output = MapReduce(RECORDREADER, MAP, REDUCE)

# Преобразуем результат в список и выводим его
output = list(output)
output  # Выводим максимальное число


[('max', 10)]

Вывод:

 [('max', 10)]

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [74]:
from typing import NamedTuple  # Требуется Python 3.6 и выше
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов.
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция для группировки элементов по ключу.
# Применяется к результатам MAP, чтобы собрать элементы с одинаковыми ключами в списки.
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()  # Возвращаем список всех групп по ключу в виде кортежей (ключ, значения)

# Основная функция MapReduce.
def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Список чисел, с которым будем работать.
input_collection = [1, 5, 3, 9, 2, 4, -23, 10, 7, 6]

# Переопределяем функцию RECORDREADER, чтобы она возвращала пары (индекс, число) из input_collection.
def RECORDREADER():
    return [(i, n) for i, n in enumerate(input_collection)]  # Возвращаем пару (индекс, значение)

# Функция MAP преобразует каждое число в пару (ключ 'average', число).
def MAP(_, number):
    # Для каждого числа возвращаем пару (ключ 'average', значение - само число)
    yield ('average', number)

# Функция REDUCE принимает ключ 'average' и список чисел.
# Она вычисляет среднее арифметическое всех чисел в списке.
def REDUCE(key: str, numbers: Iterator[int]):
    avrg = 0  # Инициализируем переменную для суммы чисел
    k = 0  # Переменная для подсчета количества чисел
    # Проходим по всем числам
    for number in numbers:
        avrg += number  # Добавляем число к общей сумме
        k += 1  # Увеличиваем счетчик
    avrg = avrg / k  # Вычисляем среднее арифметическое
    yield (key, avrg)  # Возвращаем пару (ключ, среднее значение)


output = MapReduce(RECORDREADER, MAP, REDUCE)

output = list(output)
output


[('average', 2.4)]

Вывод:

[('average', 2.4)]

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [75]:
def groupByKey(iterable):
    from itertools import groupby  # Импортируем функцию groupby из модуля itertools

    # Сначала сортируем данные по ключу, потому что groupby требует отсортированных данных.
    sorted_iterable = sorted(iterable, key=lambda x: x[0])  # Сортируем по первому элементу каждой пары

    grouped_data = []  # Список для хранения сгруппированных данных
    # Используем groupby для группировки по ключу
    for key, group in groupby(sorted_iterable, key=lambda x: x[0]):
        # Для каждой группы (по ключу) собираем значения в список и добавляем пару (ключ, список значений) в итоговый результат
        grouped_data.append((key, [item[1] for item in group]))  # Собираем только значения (вторые элементы каждой пары)

    return grouped_data  # Возвращаем результат в виде списка пар (ключ, список значений)

# Пример 1: Исходные данные - список кортежей (ключ, значение)
primer1 = [('a', 1), ('b', 2), ('a', 3), ('c', 4), ('b', 5)]
# Группируем данные по ключам
result1 = groupByKey(primer1)
# Выводим результат
print(f"Пример 1: {result1}")

# Пример 2: Исходные данные - список кортежей (ключ, значение), но ключи - строки
primer2 = [('1', 'one'), ('2', 'two'), ('1', 'uno')]  # Ключи - строки
# Группируем данные по строковым ключам
result2 = groupByKey(primer2)
# Выводим результат
print(f"Пример 2 (ключи - строки): {result2}")


Пример 1: [('a', [1, 3]), ('b', [2, 5]), ('c', [4])]
Пример 2 (ключи - строки): [('1', ['one', 'uno']), ('2', ['two'])]


Вывод:

Пример 1: [('a', [1, 3]), ('b', [2, 5]), ('c', [4])]

Пример 2 (ключи - строки): [('1', ['one', 'uno']), ('2', ['two'])]

### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [85]:
from typing import NamedTuple, Iterator  # Импортируем необходимые типы данных
import random

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element  # Возвращаем каждый элемент по одному

# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()  # Возвращаем пары (ключ, список значений)

# Основной алгоритм MapReduce
def MapReduce(RECORDREADER, MAP, REDUCE, PARTITIONER=None, COMBINER=None):
    records = RECORDREADER()
    mapped = flatten(map(lambda x: MAP(*x), records))
    # Если определен COMBINER, применяем его для агрегации данных перед группировкой
    if COMBINER:
        mapped = flatten(map(lambda x: COMBINER(*x), groupbykey(mapped)))

    # Группируем данные по ключу
    grouped = groupbykey(mapped)

    # Если определен PARTITIONER, распределяем данные по партициям
    if PARTITIONER:
        partitioned = {}  # Словарь для хранения данных, распределенных по партициям
        for key, values in grouped:
            partition = PARTITIONER(key)  # Получаем партицию для текущего ключа
            partitioned.setdefault(partition, []).append((key, values))
        result = flatten(map(lambda part: flatten(map(lambda x: REDUCE(*x), part[1])), partitioned.items()))
    else:
        # Если PARTITIONER не определен, просто применяем REDUCE ко всем данным
        result = flatten(map(lambda x: REDUCE(*x), grouped))

    return result  # Возвращаем результат после применения всех функций

# Переопределение RECORDREADER
def RECORDREADER():
    return [(u.id, u) for u in input_collection]


class User(NamedTuple):
    id: int
    age: str
    social_contacts: int
    gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800),
    User(id=1, age=25, gender='female', social_contacts=240),  # Дубликат
    User(id=2, age=25, gender='female', social_contacts=500),   # Дубликат
    User(id=3, age=33, gender='female', social_contacts=800)  # Дубликат
]

# Функция MAP
def MAP(_, row: NamedTuple):
    yield (row.id, row)

# Функция REDUCE
def REDUCE(user_id: int, rows: Iterator):
    # Используем set для исключения дубликатов (если они есть)
    unique_elements = set(rows)  # Преобразуем данные в set для удаления дубликатов
    for element in unique_elements:  # Проходим по уникальным элементам
        yield (user_id, element)  # Возвращаем каждый уникальный элемент для каждого user_id

# Функция COMBINER для частичной агрегации
def COMBINER(user_id: int, rows: Iterator):
    unique_elements = set(rows)
    for element in unique_elements:
        yield (user_id, element)

# Функция PARTITIONER для распределения данных по партициям
def PARTITIONER(key):
    global reducers  # Количество редьюсеров
    # Рандомно имитируем распределение по партициям
    return random.randint(0, reducers - 1)  # Возвращаем случайную партицию для ключа

# Количество редьюсеров
reducers = 3

# Выполнение алгоритма MapReduce с использованием всех функций
output = MapReduce(RECORDREADER, MAP, REDUCE, PARTITIONER, COMBINER)

# Преобразуем результат в список и выводим
output = list(output)
print(output)  # Выводим результат MapReduce


[(0, User(id=0, age=55, social_contacts=20, gender='male')), (2, User(id=2, age=25, social_contacts=500, gender='female')), (1, User(id=1, age=25, social_contacts=240, gender='female')), (3, User(id=3, age=33, social_contacts=800, gender='female'))]


Вывод:

[(0, User(id=0, age=55, social_contacts=20, gender='male')), (2, User(id=2, age=25, social_contacts=500, gender='female')), (1, User(id=1, age=25, social_contacts=240, gender='female')), (3, User(id=3, age=33, social_contacts=800, gender='female'))]

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [46]:
from typing import NamedTuple
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Пример данных
class User(NamedTuple):
    id: int
    age: int
    gender: str
    social_contacts: int

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]


def RECORDREADER():
    return [(u.id, u) for u in input_collection]

# Переопределяем MAP для применения предиката C
def MAP(_, user: NamedTuple):
    # Условие: выбираем пользователей, у которых возраст больше 30
    if user.age > 30:
        yield (user, user)  # (ключ, значение) одинаковые - это сам кортеж

# Переопределяем REDUCE для выполнения функции идентичности
def REDUCE(key: NamedTuple, values: Iterator[NamedTuple]):
    # Функция идентичности - просто возвращаем то, что получили
    for value in values:
        yield value

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)

# Выводим результат
print("\nВыбранные кортежи (age > 30):")
output


Выбранные кортежи (age > 30):


[User(id=0, age=55, gender='male', social_contacts=20),
 User(id=3, age=33, gender='female', social_contacts=800)]

Вывод:

Выбранные кортежи (age > 30):

[User(id=0, age=55, gender='male', social_contacts=20),

 User(id=3, age=33, gender='female', social_contacts=800)]

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [45]:
from typing import NamedTuple
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Пример данных
class User(NamedTuple):
    id: int
    age: int
    gender: str
    social_contacts: int

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

# Множество атрибутов S, которые мы хотим оставить
S = {'id', 'age'}

# Переопределяем RECORDREADER для чтения данных
def RECORDREADER():
    return [(u.id, u) for u in input_collection]

# Переопределяем MAP для проекции на множество атрибутов S
def MAP(_, user: NamedTuple):
    # Создаем проекцию на атрибуты 'id' и 'age'
    projection = {key: value for key, value in user._asdict().items() if key in S}
    yield (tuple(projection.items()), tuple(projection.items()))  # (t', t')

# Переопределяем REDUCE для объединения результатов и удаления дубликатов
def REDUCE(key: NamedTuple, values: Iterator[NamedTuple]):
    # Для каждого ключа возвращаем одну пару (t', t')
    yield key

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)

# Выводим результат
output

[(('id', 0), ('age', 55)),
 (('id', 1), ('age', 25)),
 (('id', 2), ('age', 25)),
 (('id', 3), ('age', 33))]

Вывод:

[(('id', 0), ('age', 55)),

 (('id', 1), ('age', 25)),

 (('id', 2), ('age', 25)),

 (('id', 3), ('age', 33))]

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [98]:
from typing import Iterator, List, Tuple, Dict
from collections import defaultdict
# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element
# Функция группировки по ключу
def groupbykey(iterable):
  t = defaultdict(list)
  for k2, v2 in iterable:
    t[k2].append(v2)
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

relation1 = [{'name': 'Alice', 'age': 30}, {'name': 'Vasiliy', 'age': 42},{'name': 'Vasiliy', 'age': 36}]
relation2 = [{'name': 'Vasiliy', 'age': 42}, {'name': 'Charlie', 'age': 30}]

def RECORDREADER():
  # Возвращает кортежи (relation_id, row) , где relation_id - идентификатор списка (1 или 2), а row - словарь.
  for row in relation1:
    yield (1, row) # 1 - идентификатор первого списка
  for row in relation2:
    yield (2, row) # 2 - идентификатор второго списка

def MAP(_, row):
  # Преобразует каждый входной словарь row в пару (кортеж_значений, словарь).
  # Кортеж значений используем как ключ.
  key = tuple(row.values())
  yield (key, row)

def REDUCE(key, values):
  # Поскольку мы хотим уникальные записи, берем только первый словарь для каждого ключа.
  yield values[0]

output_union = MapReduce(RECORDREADER, MAP, REDUCE)
output_union = list(output_union)

print(output_union)



[{'name': 'Alice', 'age': 30}, {'name': 'Vasiliy', 'age': 42}, {'name': 'Vasiliy', 'age': 36}, {'name': 'Charlie', 'age': 30}]


Вывод:

[{'name': 'Alice', 'age': 30}, {'name': 'Vasiliy', 'age': 42}, {'name': 'Vasiliy', 'age': 36}, {'name': 'Charlie', 'age': 30}]

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [43]:
from typing import NamedTuple
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Пример данных
class User(NamedTuple):
    id: int
    age: int
    gender: str
    social_contacts: int

# Два набора пользователей, чтобы найти их пересечение
input_collection_1 = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500)
]

input_collection_2 = [
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=3, age=33, gender='female', social_contacts=800)
]

# Множество атрибутов S, которые мы хотим оставить
S = {'id', 'age', 'gender', 'social_contacts'}

# Переопределяем RECORDREADER для чтения данных из двух коллекций
def RECORDREADER():
    return [(u.id, u) for u in input_collection_1] + [(u.id, u) for u in input_collection_2]

# Переопределяем MAP для создания пар (t, t)
def MAP(_, user: NamedTuple):
    yield (user, user)  # (t, t)

# Переопределяем REDUCE для пересечения
def REDUCE(key: NamedTuple, values: Iterator[NamedTuple]):
    values_list = list(values)
    if len(values_list) == 2:  # Если ключ встречается дважды, создаем пару (t, t)
        yield (key, key)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)

output

[(User(id=1, age=25, gender='female', social_contacts=240),
  User(id=1, age=25, gender='female', social_contacts=240))]

Вывод:

[(User(id=1, age=25, gender='female', social_contacts=240),

  User(id=1, age=25, gender='female', social_contacts=240))]

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [103]:
from typing import NamedTuple
from typing import Iterator
from collections import defaultdict

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Пример данных
class User(NamedTuple):
    id: int
    age: int
    gender: str
    social_contacts: int

# Два набора пользователей, чтобы найти их разницу
input_collection_R = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500)
]

input_collection_S = [
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500)
]

# Переопределяем RECORDREADER для чтения данных из двух коллекций
def RECORDREADER():
    return [(u, 'R') for u in input_collection_R] + [(u, 'S') for u in input_collection_S]

# Переопределяем MAP для создания пар (t, R) или (t, S)
def MAP(t: NamedTuple, source: str):
    yield (t, source)  # Для каждого кортежа t создаем пару (t, 'R') или (t, 'S')

# Переопределяем REDUCE для нахождения разницы
def REDUCE(key: NamedTuple, values: Iterator[str]):
    value_list = list(values)
    if 'R' in value_list and 'S' not in value_list:  # Проверяем, что есть в R и нет в S
        yield key  # Возвращаем только ключ (сам объект User)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)

print(output)

[User(id=0, age=55, gender='male', social_contacts=20)]


Вывод:

[User(id=0, age=55, gender='male', social_contacts=20)]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [41]:
from typing import NamedTuple
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Пример данных
class RelationR(NamedTuple):
    a: int
    b: int

class RelationS(NamedTuple):
    b: int
    c: int

# Отношения R и S
input_collection_R = [
    RelationR(a=1, b=10),
    RelationR(a=2, b=20),
    RelationR(a=3, b=30)
]

input_collection_S = [
    RelationS(b=10, c=100),
    RelationS(b=20, c=200),
    RelationS(b=40, c=400)
]

# Переопределяем RECORDREADER для чтения данных из двух коллекций
def RECORDREADER():
    return [(r.b, ('R', r.a)) for r in input_collection_R] + [(s.b, ('S', s.c)) for s in input_collection_S]

# Переопределяем MAP для создания пар (b, (R, a)) или (b, (S, c))
def MAP(b: int, value: tuple):
    yield (b, value)  # Для каждого кортежа из R или S создаем пару (b, (R, a)) или (b, (S, c))

# Переопределяем REDUCE для выполнения Join операции
def REDUCE(b: int, values: Iterator[tuple]):
    values_list = list(values)
    R_values = [v[1] for v in values_list if v[0] == 'R']
    S_values = [v[1] for v in values_list if v[0] == 'S']

    # Создаем все возможные комбинации (a, b, c)
    for a in R_values:
        for c in S_values:
            yield (a, b, c)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)

output

[(1, 10, 100), (2, 20, 200)]

Вывод:

[(1, 10, 100), (2, 20, 200)]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [40]:
from typing import NamedTuple
from typing import Iterator

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Пример данных
class Relation(NamedTuple):
    a: int
    b: int
    c: int

# Пример данных
input_collection = [
    Relation(a=1, b=10, c=100),
    Relation(a=1, b=20, c=200),
    Relation(a=2, b=30, c=300),
    Relation(a=2, b=40, c=400),
    Relation(a=1, b=50, c=500),
    Relation(a=3, b=50, c=500)
]

# Переопределяем RECORDREADER для чтения данных
def RECORDREADER():
    return [(t.a, t.b) for t in input_collection]

# Переопределяем MAP для создания пар (a, b)
def MAP(a: int, b: int):
    yield (a, b)

# Переопределяем REDUCE для выполнения агрегации (например, SUM)
def REDUCE(a: int, values: Iterator[int]):
    result = sum(values)  # Выполняем операцию SUM
    yield (a, result)

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)

# Выводим результат
output

[(1, 80), (2, 70), (3, 50)]

Вывод:

[(1, 80), (2, 70), (3, 50)]

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [102]:
import numpy as np
from typing import List, Tuple, Iterator
import math
import multiprocessing as mp
from collections import defaultdict

# Параметры
MATRIX_ROWS = 4
MATRIX_COLS = 4
VECTOR_SIZE = MATRIX_COLS
CHUNK_SIZE = 2  # Размер части вектора, обрабатываемой каждым Map
NUM_REDUCERS = 2  # Количество редукторов

# Генерация случайной матрицы и вектора
matrix = np.random.rand(MATRIX_ROWS, MATRIX_COLS)
vector = np.random.rand(VECTOR_SIZE)


def RECORDREADER(matrix):
    # Читает матрицу построчно. Каждая строка - отдельная запись
    for i, row in enumerate(matrix):
        yield (i, row)  # (row_index, row_data)

def MAP(row_index: int, row: np.ndarray) -> Iterator[Tuple[int, Tuple[int, float]]]:
    # Разбивает вектор на части и сопоставляет каждой части строки матрицы.
    # Для каждой части строки матрицы и каждой части вектора выдает пару (reducer_id, (row_index, частичное произведение))
    num_chunks = math.ceil(VECTOR_SIZE / CHUNK_SIZE)
    start = 0
    end = VECTOR_SIZE
    #vector_chunk = vector[start:end]
    row_chunk = row
    reducer_id = row_index % NUM_REDUCERS #  используем индекс строки для распределения по редукторам
    partial_product = np.sum(row_chunk * vector) # умножаем строку матрицы на весь вектор
    yield (reducer_id, (row_index, partial_product))


def REDUCE(reducer_id: int, partial_results: Iterator[Tuple[int, float]]) -> Iterator[Tuple[int, float]]:
    # Суммирует частичные произведения для каждой строки матрицы
    row_sums = defaultdict(float)
    for row_index, partial_product in partial_results:
        row_sums[row_index] += partial_product
    for row_index, row_sum in row_sums.items():
        yield (row_index, row_sum)#

def PARTITIONER(key):
  return key

# Функция для выравнивания всех данных (flatten) из вложенных итерируемых объектов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element
# Функция группировки по ключу
def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    mapped = list(flatten(map(lambda x: MAP(*x), RECORDREADER(matrix))))
    grouped = groupbykey(mapped)
    reduced = list(flatten(map(lambda x: REDUCE(*x), grouped)))
    return reduced


results = MapReduce(RECORDREADER, MAP, REDUCE)
# Сортировка результатов по индексу строки
results.sort(key=lambda x: x[0])
 # Преобразование в NumPy array
map_reduce_result = np.array([result[1] for result in results])
# Проверка результата (обычное матрично-векторное умножение)
numpy_result = matrix @ vector
# Сравнение результатов
print("MapReduce result (first 10 elements):", map_reduce_result[:10])
print("NumPy result (first 10 elements):", numpy_result[:10])
# Проверка на равенство с заданной точностью
if np.allclose(map_reduce_result, numpy_result):
      print("True")
else:
      print("False")

MapReduce result (first 10 elements): [1.42804592 1.21285676 1.45066749 1.73160117]
NumPy result (first 10 elements): [1.42804592 1.21285676 1.45066749 1.73160117]
True


Вывод:

MapReduce result (first 10 elements): [1.42804592 1.21285676 1.45066749 1.73160117]

NumPy result (first 10 elements): [1.42804592 1.21285676 1.45066749 1.73160117]

True

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [27]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [114]:
import numpy as np
from typing import Tuple, Iterator, Dict, List
from collections import defaultdict

# Параметры матриц
I = 2
J = 3
K = 4*10

small_mat = np.random.rand(I, J)  # Матрица, хранящаяся в памяти
big_mat = np.random.rand(J, K)  # Матрица, генерируемая RECORDREADER

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j, k), big_mat[j, k])  # ((j, k), njk)

def MAP(key: Tuple[int, int], value: float) -> Iterator[Tuple[Tuple[int, int], float]]:
  # Для каждого элемента big_mat (j, k) выдает пары ( (i, k), mij * njk )
  # где i - номер строки в small_mat, j - номер столбца small_mat
  (j, k) = key
  w = value  # njk
  for i in range(small_mat.shape[0]):
    mij = small_mat[i, j]
    yield ((i, k), mij * w)  # ((i, k), mij * njk)

def REDUCE(key: Tuple[int, int], values: Iterator[float]) -> Iterator[Tuple[Tuple[int, int], float]]:
  # Суммирует произведения mij * njk для каждой пары (i, k)
  (i, k) = key
  sum_pik = sum(values)
  yield ((i, k), sum_pik)

# Запуск MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Форматирование результата
result_matrix = np.zeros((I, K))
for (i, k), pik in output:
    result_matrix[i, k] = pik

print("Результат MapReduce P:")
print(result_matrix)

# Проверка результата (с использованием NumPy)
numpy_result = small_mat @ big_mat
print("\nРезультат NumPy:")
print(numpy_result)

# Сравнение результатов
if np.allclose(result_matrix, numpy_result):
     print("True")
else:
      print("False")

Результат MapReduce P:
[[0.30621513 0.85712799 0.77357254 0.48122983 0.25793161 0.93077786
  0.56746459 0.27028144 0.25506057 0.35424529 0.61397699 0.29623879
  0.22956892 0.77204597 0.75258846 0.4486331  0.22853996 0.25693799
  0.67270599 0.76344107 0.36659478 0.64411993 0.45201779 0.63910323
  0.41600804 0.88145825 0.30117384 0.45593892 0.67424814 0.69964462
  0.31446172 0.34775071 0.81691345 0.56145829 0.28469632 0.46522809
  0.14359058 0.54977011 0.76457053 0.34281099]
 [0.36542292 0.3862561  0.50849906 0.33796115 0.3251338  0.4712049
  0.47696501 0.38387557 0.15287488 0.65089514 0.33620328 0.26747315
  0.32215626 0.33267416 0.33718642 0.55082911 0.28883081 0.36177462
  0.25255928 0.37175225 0.14931363 0.33896768 0.31440801 0.39179259
  0.64307099 0.53251082 0.12705245 0.22459177 0.4239396  0.32969271
  0.37974894 0.40691543 0.4866181  0.23295454 0.3745421  0.29255739
  0.13772063 0.24550737 0.29582306 0.30203622]]

Результат NumPy:
[[0.30621513 0.85712799 0.77357254 0.48122983 0.2

Вывод:

Результат MapReduce P:

[[0.65103703 0.6957463  1.43265521 1.55257496 1.32265845 1.33108916
  1.43230334 1.92093842 1.71885217 0.9810601  1.27801142 0.79084701
  0.6160354  0.79673351 0.93701701 1.57376606 0.4221972  1.46870906
  1.3817057  0.89259454 1.22667538 1.64864243 1.05078393 1.35987349
  2.07401376 1.07503128 2.16422823 1.617069   0.76628063 0.3881462
  0.90709897 1.69586165 1.24939035 1.93179565 1.49721322 1.88201609
  0.41700276 1.55159568 1.28005696 1.610856  ]
 [0.44195175 0.56544688 0.74476145 0.97947438 0.65338974 0.76550332
  0.89601738 1.18606498 0.94277816 0.70964796 0.71693588 0.51381988
  0.32255007 0.56082831 0.39374864 0.85477277 0.28263166 1.04218681
  0.78138024 0.72043434 0.64174222 1.02880713 0.67612928 0.72669664
  1.28784967 0.58106416 1.26310287 0.8615227  0.48080726 0.23957179
  0.46662805 1.03434326 0.78042046 1.12209714 0.94929398 0.97738826
  0.19809204 0.89246125 0.90897749 1.02544925]]

Результат NumPy:

[[0.65103703 0.6957463  1.43265521 1.55257496 1.32265845 1.33108916
  1.43230334 1.92093842 1.71885217 0.9810601  1.27801142 0.79084701
  0.6160354  0.79673351 0.93701701 1.57376606 0.4221972  1.46870906
  1.3817057  0.89259454 1.22667538 1.64864243 1.05078393 1.35987349
  2.07401376 1.07503128 2.16422823 1.617069   0.76628063 0.3881462
  0.90709897 1.69586165 1.24939035 1.93179565 1.49721322 1.88201609
  0.41700276 1.55159568 1.28005696 1.610856  ]
 [0.44195175 0.56544688 0.74476145 0.97947438 0.65338974 0.76550332
  0.89601738 1.18606498 0.94277816 0.70964796 0.71693588 0.51381988
  0.32255007 0.56082831 0.39374864 0.85477277 0.28263166 1.04218681
  0.78138024 0.72043434 0.64174222 1.02880713 0.67612928 0.72669664
  1.28784967 0.58106416 1.26310287 0.8615227  0.48080726 0.23957179
  0.46662805 1.03434326 0.78042046 1.12209714 0.94929398 0.97738826
  0.19809204 0.89246125 0.90897749 1.02544925]]

True

Проверьте своё решение

In [105]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [106]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [113]:
import numpy as np  # Импортируем библиотеку для работы с массивами
from typing import Tuple, Iterator, List  # Импортируем типы данных для аннотаций

def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()



def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Размерности матриц для умножения
I = 2
J = 3
K = 4 * 10

mat_M = np.random.rand(I, J)
mat_N = np.random.rand(J, K)


# Генерирует кортежи для каждой записи из матриц M и N
def RECORDREADER():
    for i in range(mat_M.shape[0]):
        for j in range(mat_M.shape[1]):
            yield "M", (i, j), mat_M[i, j]  # Возвращаем кортеж с индексами и значением
    for j in range(mat_N.shape[0]):
        for k in range(mat_N.shape[1]):
            yield "N", (j, k), mat_N[j, k]  # Возвращаем кортеж с индексами и значением


# Преобразует записи из матрицы M и N в пары ключ-значение
def MAP(matrix_name: str, coords: Tuple[int, int], value: float) -> Iterator[Tuple[Tuple[int, int], Tuple[str, int, int, float]]]:
    if matrix_name == "M":
        i, j = coords
        for k in range(K):  # Итерируем по столбцам матрицы N
            yield (i, k), ("M", i, j, value)  # Ключом является индекс строки i и столбца k, значением - строка и столбец из M
    elif matrix_name == "N":
        j, k = coords
        for i in range(I):  # Итерируем по строкам матрицы M
            yield (i, k), ("N", j, k, value)  # Ключом является индекс строки i и столбца k, значением - строка и столбец из N

# Выполняет агрегацию для каждой ячейки результата (i, k)
def REDUCE(key: Tuple[int, int], values: Iterator[Tuple[str, int, int, float]]) -> Iterator[Tuple[Tuple[int, int], float]]:
    """Вычисляет сумму произведений для каждой ячейки (i, k) в результирующей матрице."""
    i, k = key
    sum_val = 0
    m_vals = {}
    n_vals = {}

    # Проходим по всем значениям для данной ячейки (i, k)
    for matrix_name, first_index, second_index, val in values:
        if matrix_name == "M":
            m_vals[second_index] = val
        elif matrix_name == "N":
            n_vals[first_index] = val

    # Выполняем суммирование произведений соответствующих значений
    for j in range(J):
        m_val = m_vals.get(j, 0)
        n_val = n_vals.get(j, 0)
        sum_val += m_val * n_val  # Добавляем произведение к сумме

    yield (i, k), sum_val  # Возвращаем итоговое значение для ячейки (i, k)

# Выполняем алгоритм MapReduce
result = MapReduce(RECORDREADER, MAP, REDUCE)

# Форматируем результат в матрицу
output_matrix = np.zeros((I, K))  # Создаем пустую матрицу для результата умножения
for (i, k), value in result:
    output_matrix[i, k] = value  # Заполняем ячейку результатами


numpy_matrix = np.matmul(mat_M, mat_N)

print("Результат MapReduce:")
print(output_matrix)
print("\nРезультат NumPy:")
print(numpy_matrix)

# Сравниваем результаты
print("\nСовпадают ли результаты?", np.allclose(output_matrix, numpy_matrix))


Результат MapReduce:
[[0.38131499 0.38581551 0.39958747 0.52062489 0.60182781 0.27857375
  0.56539161 0.42930609 0.68895801 0.27500688 0.57539856 0.75193426
  0.08693775 0.54205606 0.37203334 0.14494357 0.19331791 0.293676
  0.48679804 0.47237209 0.22176125 0.71972412 0.43962569 0.40920454
  0.30017687 0.62832843 0.4178736  0.34607863 0.67349604 0.57767088
  0.33662095 0.7274654  0.7056129  0.4236142  0.4544439  0.24644341
  0.42649337 0.22077415 0.45636891 0.38968631]
 [0.77998613 0.41214165 0.72744979 0.58037725 0.82137405 0.61375078
  0.47455226 0.53529614 0.5387448  0.42314302 0.56417525 0.92972176
  0.15106109 0.49741181 0.37661584 0.11406549 0.23885348 0.72663128
  0.39226508 0.44474938 0.48154282 0.79089185 0.45814205 0.69923224
  0.3987395  0.59630559 0.63539491 0.62072539 0.7125347  0.573538
  0.5237198  0.89842602 0.74467647 0.79319897 0.63479627 0.43802466
  0.81518197 0.27563703 0.53722385 0.69392787]]

Результат NumPy:
[[0.38131499 0.38581551 0.39958747 0.52062489 0.601827

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [115]:
import numpy as np  # Импортируем библиотеку для работы с массивами
from typing import Tuple, Iterator, List, Dict  # Импортируем необходимые типы для аннотаций
from collections import defaultdict  # Импортируем defaultdict для удобного создания словарей с дефолтными значениями

def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

# Основная функция MapReduce, выполняющая распределенную обработку для умножения матриц
def MapReduce(RECORDREADER_M, RECORDREADER_N, MAP, REDUCE, num_reducers):
    # Каждый RECORDREADER передает свои данные в отдельные задачи MAP
    mapped_M = list(flatten(map(lambda x: MAP("M", *x), RECORDREADER_M())))
    mapped_N = list(flatten(map(lambda x: MAP("N", *x), RECORDREADER_N())))

    # Объединяем результаты обработки для обеих матриц
    mapped_all = mapped_M + mapped_N

    # Разбиение данных по партициям (моделируем с помощью простого хеширования)
    partitioned = defaultdict(list)  # Словарь для хранения данных по партициям
    for key, value in mapped_all:  # Для каждой пары ключ-значение
        reducer_id = hash(key) % num_reducers  # Простое хеширование для определения партиции
        partitioned[reducer_id].append((key, value))  # Добавляем данные в соответствующую партицию

    # Каждый редьюсер обрабатывает свою партицию данных
    reduced_results = []  # Список для хранения результатов редьюсинга
    for reducer_id in range(num_reducers):  # Для каждого редьюсера
        grouped = groupbykey(partitioned[reducer_id])  # Группируем данные по ключу
        reduced_results.extend(flatten(map(lambda x: REDUCE(*x), grouped)))  # Применяем REDUCE и добавляем результаты

    return reduced_results  # Возвращаем итоговые результаты

# Размерности матриц для умножения
I = 2
J = 3
K = 4 * 10

# Генерация случайных матриц M и N
mat_M = np.random.rand(I, J)
mat_N = np.random.rand(J, K)

# Функции для чтения данных из матриц (для распределенной обработки)
def RECORDREADER_M():
    for i in range(mat_M.shape[0]):
        for j in range(mat_M.shape[1]):
            yield (i, j), mat_M[i, j]  # Возвращаем индекс (i, j) и соответствующее значение

def RECORDREADER_N():
    for j in range(mat_N.shape[0]):
        for k in range(mat_N.shape[1]):
            yield (j, k), mat_N[j, k]  # Возвращаем индекс (j, k) и соответствующее значение


def MAP(matrix_name: str, coords: Tuple[int, int], value: float) -> Iterator[Tuple[Tuple[int, int], Tuple[str, int, int, float]]]:
    if matrix_name == "M":
        i, j = coords  # Извлекаем индексы из матрицы M
        for k in range(K):
            yield (i, k), ("M", i, j, value)  # Возвращаем ключ (i, k) и значение из матрицы M
    elif matrix_name == "N":
        j, k = coords  # Извлекаем индексы из матрицы N
        for i in range(I):
            yield (i, k), ("N", j, k, value)  # Возвращаем ключ (i, k) и значение из матрицы N

def REDUCE(key: Tuple[int, int], values: Iterator[Tuple[str, int, int, float]]) -> Iterator[Tuple[Tuple[int, int], float]] :
    i, k = key
    sum_val = 0
    m_vals = {}  # Словарь для значений из матрицы M
    n_vals = {}  # Словарь для значений из матрицы N

    # Проходим по всем значениям для данной ячейки (i, k)
    for matrix_name, first_index, second_index, val in values:
        if matrix_name == "M":
            m_vals[second_index] = val  # Запоминаем значения из матрицы M по индексу столбца
        elif matrix_name == "N":
            n_vals[first_index] = val  # Запоминаем значения из матрицы N по индексу строки

    # Суммируем произведения значений из M и N для каждого j
    for j in range(J):
        m_val = m_vals.get(j, 0)
        n_val = n_vals.get(j, 0)
        sum_val += m_val * n_val

    yield (i, k), sum_val  # Возвращаем итоговое значение для ячейки (i, k)


num_reducers = 2  # Примерное количество редьюсеров для распределенной обработки

result = MapReduce(RECORDREADER_M, RECORDREADER_N, MAP, REDUCE, num_reducers)

# Форматируем результаты в матрицу
output_matrix = np.zeros((I, K))  # Создаем пустую матрицу для результата
for (i, k), value in result:
    output_matrix[i, k] = value  # Заполняем соответствующие ячейки матрицы

numpy_matrix = np.matmul(mat_M, mat_N)

print("Результат MapReduce (распределенно):")
print(output_matrix)
print("\nРезультат NumPy:")
print(numpy_matrix)
print("\nСовпадают ли результаты?", np.allclose(output_matrix, numpy_matrix))  # Сравниваем результаты


Результат MapReduce (распределенно):
[[0.89229586 0.96180838 0.98651543 0.66494346 1.00285364 0.41136541
  1.13546726 0.67411128 0.53745969 0.46955353 0.98026392 1.29619308
  1.71694617 1.2010406  1.06874615 1.03920917 0.61917215 1.56972527
  0.96522264 0.93233962 0.80869108 1.21457716 1.47379662 1.18231854
  0.77863486 0.57298503 0.85751717 0.63054877 1.03406844 1.22596255
  0.9496268  1.10369699 1.19474934 0.89905126 0.52847279 1.1286353
  1.10635248 0.74218927 0.29230413 1.11342531]
 [1.00157314 0.77362302 1.01992121 0.55709547 1.20021261 0.27667588
  1.04952951 1.05480231 0.79333921 0.72807054 0.90484056 0.97751694
  1.32077899 1.14514116 1.1274559  0.76647005 0.72703404 1.17846751
  0.62860983 0.61056561 0.5530291  0.93905465 1.02600372 0.99064258
  0.78697053 0.67754684 1.13691617 0.95421392 0.6557252  1.05222945
  0.67114065 0.8737367  1.0228684  0.80686375 0.61016131 1.08632702
  0.89008233 0.49254561 0.35732872 1.0311972 ]]

Результат NumPy:
[[0.89229586 0.96180838 0.98651543 

Вывод:

Результат MapReduce (распределенно):

[[0.89229586 0.96180838 0.98651543 0.66494346 1.00285364 0.41136541
  1.13546726 0.67411128 0.53745969 0.46955353 0.98026392 1.29619308
  1.71694617 1.2010406  1.06874615 1.03920917 0.61917215 1.56972527
  0.96522264 0.93233962 0.80869108 1.21457716 1.47379662 1.18231854
  0.77863486 0.57298503 0.85751717 0.63054877 1.03406844 1.22596255
  0.9496268  1.10369699 1.19474934 0.89905126 0.52847279 1.1286353
  1.10635248 0.74218927 0.29230413 1.11342531]
 [1.00157314 0.77362302 1.01992121 0.55709547 1.20021261 0.27667588
  1.04952951 1.05480231 0.79333921 0.72807054 0.90484056 0.97751694
  1.32077899 1.14514116 1.1274559  0.76647005 0.72703404 1.17846751
  0.62860983 0.61056561 0.5530291  0.93905465 1.02600372 0.99064258
  0.78697053 0.67754684 1.13691617 0.95421392 0.6557252  1.05222945
  0.67114065 0.8737367  1.0228684  0.80686375 0.61016131 1.08632702
  0.89008233 0.49254561 0.35732872 1.0311972 ]]

Результат NumPy:

[[0.89229586 0.96180838 0.98651543 0.66494346 1.00285364 0.41136541
  1.13546726 0.67411128 0.53745969 0.46955353 0.98026392 1.29619308
  1.71694617 1.2010406  1.06874615 1.03920917 0.61917215 1.56972527
  0.96522264 0.93233962 0.80869108 1.21457716 1.47379662 1.18231854
  0.77863486 0.57298503 0.85751717 0.63054877 1.03406844 1.22596255
  0.9496268  1.10369699 1.19474934 0.89905126 0.52847279 1.1286353
  1.10635248 0.74218927 0.29230413 1.11342531]
 [1.00157314 0.77362302 1.01992121 0.55709547 1.20021261 0.27667588
  1.04952951 1.05480231 0.79333921 0.72807054 0.90484056 0.97751694
  1.32077899 1.14514116 1.1274559  0.76647005 0.72703404 1.17846751
  0.62860983 0.61056561 0.5530291  0.93905465 1.02600372 0.99064258
  0.78697053 0.67754684 1.13691617 0.95421392 0.6557252  1.05222945
  0.67114065 0.8737367  1.0228684  0.80686375 0.61016131 1.08632702
  0.89008233 0.49254561 0.35732872 1.0311972 ]]

Совпадают ли результаты? True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [211]:
import numpy as np
from typing import Tuple, Iterator, List, Dict
from collections import defaultdict
import random

# Функция для "выпрямления" вложенных итераторов
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция для группировки данных по ключам (для редьюсера)
def groupbykey(iterable):
    t = defaultdict(list)
    for k2, v2 in iterable:
        t[k2].append(v2)
    return t.items()

# Основная функция MapReduce
def MapReduce(RECORDREADER_M, RECORDREADER_N, MAP, REDUCE, num_reducers):
    """
    MapReduce для умножения матриц с несколькими RECORDREADER-ами.
    """
    # Этап Map: Каждый RECORDREADER генерирует данные для своей части матрицы
    mapped_M = list(flatten(map(lambda x: MAP("M", *x), RECORDREADER_M())))
    mapped_N = list(flatten(map(lambda x: MAP("N", *x), RECORDREADER_N())))

    # Объединяем все результаты после Map этапа
    mapped_all = mapped_M + mapped_N

    # Разбиение данных на партиции для редьюсеров
    partitioned = defaultdict(list)
    for key, value in mapped_all:
        reducer_id = hash(key) % num_reducers  # Хеширование для определения партиции
        partitioned[reducer_id].append((key, value))

    # Этап Reduce: Каждый редьюсер обрабатывает свою партицию данных
    reduced_results = []
    for reducer_id in range(num_reducers):
        grouped = groupbykey(partitioned[reducer_id])
        reduced_results.extend(flatten(map(lambda x: REDUCE(*x), grouped)))

    return reduced_results

# Размерности матриц для умножения
I = 3  # Увеличили для большего теста
J = 4
K = 2*10

# Генерация случайных матриц M и N
mat_M = np.random.rand(I, J)
mat_N = np.random.rand(J, K)

# --- Множество DISTRIBUTED RECORDREADER-ов ---
def RECORDREADER_M(num_readers: int, missing_data_probability: float = 0.0):
    """
    Генерирует данные для матрицы M с возможностью пропуска данных.
    """
    for reader_id in range(num_readers):  # Эмулируем несколько читателей
        for i in range(mat_M.shape[0]):
            for j in range(mat_M.shape[1]):
                if random.random() > missing_data_probability:  # Пропуск данных с заданной вероятностью
                    yield (i, j), mat_M[i, j]  # Возвращаем индексы и значение

def RECORDREADER_N(num_readers: int, missing_data_probability: float = 0.0):
    """
    Генерирует данные для матрицы N с возможностью пропуска данных.
    """
    for reader_id in range(num_readers):  # Эмулируем несколько читателей
        for j in range(mat_N.shape[0]):
            for k in range(mat_N.shape[1]):
                if random.random() > missing_data_probability:  # Пропуск данных с заданной вероятностью
                    yield (j, k), mat_N[j, k]  # Возвращаем индексы и значение

# --- Функция MAP ---
def MAP(matrix_name: str, coords: Tuple[int, int], value: float) -> Iterator[Tuple[Tuple[int, int], Tuple[str, int, int, float]]]:
    """Генерирует ключ-значение для дальнейшей обработки"""
    if matrix_name == "M":
        i, j = coords
        for k in range(K):  # Проходим по всем столбцам матрицы N
            yield (i, k), ("M", i, j, value)  # Возвращаем (i, k) и значение из M
    elif matrix_name == "N":
        j, k = coords
        for i in range(I):  # Проходим по всем строкам матрицы M
            yield (i, k), ("N", j, k, value)  # Возвращаем (i, k) и значение из N

# --- Функция REDUCE ---
def REDUCE(key: Tuple[int, int], values: Iterator[Tuple[str, int, int, float]]) -> Iterator[Tuple[Tuple[int, int], float]] :
    """Вычисление суммы произведений для каждой ячейки (i, k) результирующей матрицы"""
    i, k = key
    sum_val = 0
    m_vals = {}
    n_vals = {}

    # Группируем все данные по ключам для матриц M и N
    for matrix_name, first_index, second_index, val in values:
        if matrix_name == "M":
            m_vals[second_index] = val  # Сохраняем значения из матрицы M по индексу столбца
        elif matrix_name == "N":
            n_vals[first_index] = val  # Сохраняем значения из матрицы N по индексу строки

    # Суммируем произведения значений из M и N для каждого j
    for j in range(J):
        mij = m_vals.get(j, 0)  # Получаем значение из M или 0, если отсутствует
        njk = n_vals.get(j, 0)  # Получаем значение из N или 0, если отсутствует
        sum_val += mij * njk  # Суммируем произведение

    yield (i, k), sum_val  # Возвращаем результат для (i, k)

# --- Главная часть выполнения ---
if __name__ == "__main__":
    num_reducers = 2  # Количество редьюсеров
    num_readers_M = 2  # Количество читателей для матрицы M
    num_readers_N = 2  # Количество читателей для матрицы N
    missing_data_probability = 0.1  # Вероятность пропуска данных

    # Запуск MapReduce (с несколькими RECORDREADER-ами)
    result = MapReduce(
        lambda: RECORDREADER_M(num_readers_M, missing_data_probability),
        lambda: RECORDREADER_N(num_readers_N, missing_data_probability),
        MAP,
        REDUCE,
        num_reducers,
    )

    # Форматируем результаты в матрицу
    output_matrix = np.zeros((I, K))
    for (i, k), value in result:
        output_matrix[i, k] = value

    # Проверка результата с использованием NumPy
    numpy_matrix = np.matmul(mat_M, mat_N)

    print("Результат MapReduce (с несколькими RECORDREADER-ами):")
    print(output_matrix)

    print("\nРезультат NumPy:")
    print(numpy_matrix)

    print("\nРезультаты совпадают?", np.allclose(output_matrix, numpy_matrix))  # Проверка на совпадение


Результат MapReduce (с несколькими RECORDREADER-ами):
[[1.30921503 1.37056443 1.71825884 1.21271182 1.67449666 0.57108472
  1.19846385 1.58480875 1.21963913 0.61874692 0.96219584 1.32438219
  1.13330654 1.22979297 1.51288963 1.02461694 0.95172204 1.07769714
  1.2669526  1.746086  ]
 [1.46929067 1.4927038  2.34821126 1.71813431 2.04314442 1.11785645
  1.84556816 2.09283934 1.48642885 0.96754118 1.20277492 1.86157641
  1.69690499 1.59006903 2.37486199 1.66467767 1.42397541 1.53611259
  1.54935906 2.05976177]
 [1.23412379 1.2517264  2.09492502 1.52802984 1.77549981 1.06913741
  1.70446855 1.83465562 1.30684656 0.87255963 1.01239322 1.62630351
  1.53755548 1.43302481 2.16900765 1.52620923 1.29409037 1.29212671
  1.39916901 1.78733022]]

Результат NumPy:
[[1.30921503 1.37056443 1.71825884 1.21271182 1.67449666 0.57108472
  1.19846385 1.58480875 1.21963913 0.61874692 0.96219584 1.32438219
  1.13330654 1.22979297 1.51288963 1.02461694 0.95172204 1.07769714
  1.2669526  1.746086  ]
 [1.4692906

Вывод:

Результат MapReduce (с несколькими RECORDREADER-ами):

[[1.30921503 1.37056443 1.71825884 1.21271182 1.67449666 0.57108472
  1.19846385 1.58480875 1.21963913 0.61874692 0.96219584 1.32438219
  1.13330654 1.22979297 1.51288963 1.02461694 0.95172204 1.07769714
  1.2669526  1.746086  ]
 [1.46929067 1.4927038  2.34821126 1.71813431 2.04314442 1.11785645
  1.84556816 2.09283934 1.48642885 0.96754118 1.20277492 1.86157641
  1.69690499 1.59006903 2.37486199 1.66467767 1.42397541 1.53611259
  1.54935906 2.05976177]
 [1.23412379 1.2517264  2.09492502 1.52802984 1.77549981 1.06913741
  1.70446855 1.83465562 1.30684656 0.87255963 1.01239322 1.62630351
  1.53755548 1.43302481 2.16900765 1.52620923 1.29409037 1.29212671
  1.39916901 1.78733022]]

Результат NumPy:

[[1.30921503 1.37056443 1.71825884 1.21271182 1.67449666 0.57108472
  1.19846385 1.58480875 1.21963913 0.61874692 0.96219584 1.32438219
  1.13330654 1.22979297 1.51288963 1.02461694 0.95172204 1.07769714
  1.2669526  1.746086  ]
 [1.46929067 1.4927038  2.34821126 1.71813431 2.04314442 1.11785645
  1.84556816 2.09283934 1.48642885 0.96754118 1.20277492 1.86157641
  1.69690499 1.59006903 2.37486199 1.66467767 1.42397541 1.53611259
  1.54935906 2.05976177]
 [1.23412379 1.2517264  2.09492502 1.52802984 1.77549981 1.06913741
  1.70446855 1.83465562 1.30684656 0.87255963 1.01239322 1.62630351
  1.53755548 1.43302481 2.16900765 1.52620923 1.29409037 1.29212671
  1.39916901 1.78733022]]

Результаты совпадают? True

Да, с изменениями, описанными выше, он должен работать корректно, даже если средства чтения записей генерируют случайные подмножества элементов матрицы и если эти подмножества являются неполными. Комбинация нескольких средств чтения записей (каждое из которых создает подмножество) и использование .get(j, 0) в функции REDUCE для обработки возможных пропущенных значений делает код устойчивым к этому сценарию.