<a href="https://colab.research.google.com/github/Sireax/Labs-Big-Data/blob/main/Big_Data_0_6403_Ovchinnikov_EV.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [2]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [14]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [15]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(2.405367162143548)),
 (1, np.float64(2.405367162143548)),
 (2, np.float64(2.405367162143548)),
 (3, np.float64(2.405367162143548)),
 (4, np.float64(2.405367162143548))]

## Inverted index

In [16]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [17]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [18]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [19]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('banana', 2), ('is', 18), ('what', 10)]),
 (1, [('a', 2), ('it', 18)])]

## TeraSort

In [20]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.08772826291824609)),
   (None, np.float64(0.10025743474100002)),
   (None, np.float64(0.10580549992725174)),
   (None, np.float64(0.13235851481366756)),
   (None, np.float64(0.1333978779781826)),
   (None, np.float64(0.1439777169483729)),
   (None, np.float64(0.21889562475886448)),
   (None, np.float64(0.23025065144020251)),
   (None, np.float64(0.25172574551999594)),
   (None, np.float64(0.2517957894608325)),
   (None, np.float64(0.2752921535197522)),
   (None, np.float64(0.3387875836298254)),
   (None, np.float64(0.3974233638205673)),
   (None, np.float64(0.4860653093925582)),
   (None, np.float64(0.4861030339489898))]),
 (1,
  [(None, np.float64(0.5370555828277126)),
   (None, np.float64(0.5538537509963262)),
   (None, np.float64(0.5789429436357445)),
   (None, np.float64(0.5920061281363996)),
   (None, np.float64(0.6047610755301843)),
   (None, np.float64(0.6630673894362309)),
   (None, np.float64(0.667369152572531)),
   (None, np.float64(0.7079064536187

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [21]:
import numpy as np

# Генерируем 30 случайных чисел от 0 до 1
values = np.random.random(30)
print("Исходный список чисел:", values)

# Функция, которая передаёт числа в виде (ключ, значение)
def RECORDREADER():
    for num in values:
        yield (None, num)  # Ключ остаётся фиксированным для группировки

# Функция MAP
def MAP(_, num):
    """Каждое число маппится на один ключ (None)."""
    yield (None, num)

# Функция REDUCE
def REDUCE(_, num_list):
    """Определяем максимальное значение в списке."""
    yield (None, max(num_list))

# Выполнение MapReduce
result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Выводим результат
print("Максимальное значение:", result[0][1])

Исходный список чисел: [0.73811005 0.89205165 0.41080576 0.48570791 0.08330325 0.16927768
 0.46645276 0.88120594 0.82323808 0.36422204 0.92736092 0.0782692
 0.7017772  0.69402596 0.48755851 0.81331281 0.66403723 0.9157449
 0.60963691 0.49901268 0.36217759 0.01007723 0.17889559 0.96656489
 0.58745275 0.83070625 0.97843825 0.32155189 0.23920859 0.71914913]
Максимальное значение: 0.9784382514968788


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [22]:

import numpy as np

# Генерируем 100 случайных чисел от 0 до 1
input_values = np.random.rand(100)
print("Исходный список чисел:", input_values)

# Функция, которая передаёт числа в виде (ключ, значение)
def RECORDREADER():
    for value in input_values:
        yield (None, value)  # Ключ фиксированный, чтобы все значения группировались вместе

# Функция MAP
def MAP(_, value):
    """Каждое число маппится на один ключ (None)."""
    yield (None, value)

# Функция REDUCE
def REDUCE(_, values):
    """Вычисляет среднее значение из списка чисел."""
    yield (None, sum(values) / len(values))

# Выполнение MapReduce
average_value_output = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Выводим результат
print("Среднее значение:", average_value_output[0][1])

Исходный список чисел: [0.54219828 0.98746041 0.8763868  0.69613902 0.19025442 0.25345013
 0.26545207 0.87686816 0.91753134 0.96843392 0.52190511 0.00233833
 0.78924324 0.77873902 0.98966367 0.84059349 0.24268863 0.59177776
 0.61536709 0.58656824 0.55141323 0.19626105 0.6452612  0.42014308
 0.65526261 0.27956994 0.72208141 0.81839216 0.75779747 0.41067085
 0.41171752 0.41523909 0.94614438 0.57855634 0.11261583 0.39739123
 0.15447323 0.56812083 0.6909885  0.87375165 0.41319576 0.11055627
 0.25656933 0.98259073 0.04748203 0.99302533 0.84519748 0.76318072
 0.545285   0.1639286  0.6199002  0.42328495 0.26962403 0.94615613
 0.26935381 0.24845371 0.75095838 0.17072125 0.54500726 0.56700843
 0.85654105 0.59870385 0.93195343 0.0243046  0.76447796 0.28389097
 0.51344235 0.60399681 0.60300132 0.92150814 0.89349561 0.0086524
 0.1622584  0.39737841 0.5066416  0.45370317 0.72518368 0.45918403
 0.15399598 0.88095063 0.97789583 0.61916772 0.40195449 0.21495048
 0.17746732 0.67059311 0.83826995 0.3562

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [23]:
def group_by_key_sorted(data):
    """Группирует пары (ключ, значение) по ключу, используя сортировку."""
    sorted_data = sorted(data, key=lambda pair: pair[0])  # Сортируем по ключу
    grouped_result = []

    active_key, value_group = sorted_data[0][0], []  # Первый ключ
    for k, v in sorted_data:
        if k == active_key:
            value_group.append(v)  # Добавляем значение в текущую группу
        else:
            grouped_result.append((active_key, value_group))  # Сохраняем старую группу
            active_key, value_group = k, [v]  # Начинаем новую группу

    grouped_result.append((active_key, value_group))  # Добавляем последнюю группу
    return grouped_result

# Пример работы
sample_data = [
    ("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5), ("a", 6)
]

grouped_output = group_by_key_sorted(sample_data)
print(grouped_output)


[('a', [1, 3, 6]), ('b', [2, 4]), ('c', [5])]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [24]:
from itertools import chain

reducers = 2  # Количество редьюсеров

def INPUTFORMAT():
    global maps
    data_entries = ["apple", "banana", "apple", "orange", "banana", "grape", "grape", "apple"]

    def RECORDREADER(chunk):
        for item in chunk:
            yield (item, None)  # Формат (значение, None)

    chunk_size = max(1, len(data_entries) // maps)
    for i in range(0, len(data_entries), chunk_size):
        yield RECORDREADER(data_entries[i:i+chunk_size])

def MAP(item, _):
    yield (item, None)  # Просто передаем ключ, значение не важно

def PARTITIONER(key):
    global reducers
    return hash(key) % reducers  # Определяет, какой редьюсер обработает ключ

def REDUCE(item, _):
    yield (item, None)  # Возвращаем только одно значение

maps = 2  # Количество мапперов
unique_values_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER)

# Преобразуем результат в удобный формат
unique_values_output = [word for (_, partition) in unique_values_output for (word, _) in partition]
print("Unique values:", unique_values_output)  # Ожидаем ['apple', 'banana', 'orange', 'grape']


8 key-value pairs were sent over a network.
Unique values: ['banana', 'apple', 'grape', 'orange']


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [25]:

def C(t):
    """Проверяет, делится ли сумма элементов кортежа на 2 без остатка."""
    return (t[0] + t[1]) % 2 == 0

def RECORDREADER():
    """Генерирует кортежи из заданного набора данных."""
    data_samples = [(0, 2), (6, 1), (5, 5)]
    for pair in data_samples:
        yield (pair, None)  # Фиксированный ключ для обработки MapReduce

def MAP(pair, _):
    """Применяет предикат к элементу и передает его в Reduce, если условие выполняется."""
    if C(pair):
        yield (pair, pair)  # Ключ и значение равны отобранному элементу

def REDUCE(_, selected_values):
    """Функция идентичности — возвращает полученные значения."""
    yield from selected_values

# Запуск MapReduce
filtered_tuples = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Кортежи, удовлетворяющие условию:", filtered_tuples)


Кортежи, удовлетворяющие условию: [(0, 2), (5, 5)]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [26]:
S = {1, 5, 10}  # Оставляем только эти атрибуты

def MAP(t, _):
    """Фильтруем атрибуты, оставляя только нужные."""
    filtered_data = {key: value for key, value in t.items() if key in S}
    sorted_key = tuple(sorted(filtered_data.items()))  # Делаем ключ детерминированным
    yield (sorted_key, sorted_key)  # Пара (t', t')

def REDUCE(_, unique_values):
    """Удаляем дубликаты, оставляя один экземпляр."""
    yield unique_values[0]  # Берём первое значение

def RECORDREADER():
    """Генерация входных данных."""
    yield ({1: "apple", 2: "banana", 5: "grape"}, None)
    yield ({1: "orange", 3: "pear", 10: "kiwi"}, None)
    yield ({5: "melon", 7: "cherry", 10: "mango"}, None)

# Запуск MapReduce
filtered_result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Проекция по выбранным атрибутам:", filtered_result)


Проекция по выбранным атрибутам: [((1, 'apple'), (5, 'grape')), ((1, 'orange'), (10, 'kiwi')), ((5, 'melon'), (10, 'mango'))]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [27]:
def MAP(t, _):
    """Каждый входной элемент преобразуется в пару (element, element)."""
    yield (t, t)

def REDUCE(t, grouped_values):
    """Удаляет дубликаты и возвращает (element, element)."""
    yield (t, t)  # Значения уже сгруппированы, просто оставляем один экземпляр

def RECORDREADER():
    """Имитация объединения двух множеств."""
    first_set = [(1, None), (2, None), (3, None)]
    second_set = [(2, None), (4, None), (5, None)]
    return first_set + second_set  # Объединяем два множества

# Запуск MapReduce
merged_set = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Объединенное множество:", merged_set)



Объединенное множество: [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [28]:
def MAP(t, _):
    """Каждый входной элемент преобразуется в пару (element, element)."""
    yield (t, t)

def REDUCE(t, occurrences):
    """Если элемент встречается дважды (в обоих множествах), добавляем его в результат."""
    if len(occurrences) == 2:
        yield (t, t)

def RECORDREADER():
    """Имитация пересечения двух множеств: {1, 2, 3, 4} и {2, 3, 5, 6}."""
    first_set = [(1, None), (2, None), (3, None), (4, None)]
    second_set = [(2, None), (3, None), (5, None), (6, None)]
    return first_set + second_set  # Объединяем данные перед MapReduce

# Запуск MapReduce
intersection_result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Пересечение множеств:", intersection_result)


Пересечение множеств: [(2, 2), (3, 3)]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [29]:

def MAP(t, source):
    """Помечаем каждый элемент его источником (R или S)."""
    yield (t, source)

def REDUCE(t, sources):
    """Добавляем элемент в результат, если он присутствует только в R."""
    if sources == ["R"]:  # Если элемент встречается только в R
        yield (t, t)

def RECORDREADER():
    """Определяем два множества: R и S."""
    R = [(1, "R"), (2, "R"), (3, "R"), (4, "R")]  # Множество R
    S = [(2, "S"), (3, "S"), (5, "S"), (6, "S")]  # Множество S
    return R + S  # Объединяем для обработки

# Запуск MapReduce
difference_result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Разность R - S:", difference_result)



Разность R - S: [(1, 1), (4, 4)]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [30]:
def MAP(record, source):
    """Формирует пары (b, (источник, значение)) для выполнения соединения."""
    if source == "R":
        a, b = record
        yield (b, ("R", a))
    elif source == "S":
        b, c = record
        yield (b, ("S", c))

def REDUCE(b, pairs):
    """Создает кортежи (a, b, c) для всех подходящих соединений."""
    r_group = [a for src, a in pairs if src == "R"]
    s_group = [c for src, c in pairs if src == "S"]

    for a in r_group:
        for c in s_group:
            yield (a, b, c)  # Оставляем b для наглядности

def RECORDREADER():
    """Исходные данные для соединения: R(a, b) и S(b, c)."""
    R = [("A", 1), ("B", 2), ("C", 3)]  # (a, b)
    S = [(1, "apple"), (2, "banana"), (3, "cherry")]  # (b, c)
    return [(r, "R") for r in R] + [(s, "S") for s in S]

# Запуск MapReduce
join_result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Natural Join:", join_result)


Natural Join: [('A', 1, 'apple'), ('B', 2, 'banana'), ('C', 3, 'cherry')]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [31]:
from collections import defaultdict
from typing import Callable, List, Tuple

def MAP(a, b, _):
    """Формируем (ключ, значение для агрегации)."""
    yield (a, b)

def REDUCE(a, values, aggregation_function: Callable[[List[int]], int]):
    """Применяем агрегирующую функцию и выдаем (ключ, агрегированное значение)."""
    yield (a, aggregation_function(values))

def RECORDREADER():
    """Пример входных данных (a, b, c)."""
    return [(1, 10, "x"), (2, 20, "y"), (1, 15, "z"), (2, 25, "w"), (3, 30, "v")]

def shuffle_phase(mapped_data: List[Tuple[int, int]]) -> dict:
    """Группируем значения по ключу."""
    grouped = defaultdict(list)
    for k, v in mapped_data:
        grouped[k].append(v)
    return grouped

def MapReduce(recordreader, map_func, reduce_func, aggregation_function):
    """Общая логика MapReduce."""
    records = recordreader()

    # MAP
    mapped_data = []
    for record in records:
        mapped_data.extend(map_func(*record))  # Распаковываем кортежи

    # SHUFFLE
    grouped_data = shuffle_phase(mapped_data)

    # REDUCE
    reduced_data = []
    for key, values in grouped_data.items():
        reduced_data.extend(reduce_func(key, values, aggregation_function))

    return reduced_data

# Агрегация с использованием SUM
print("SUM Aggregation:", MapReduce(RECORDREADER, MAP, REDUCE, sum))

# Агрегация с использованием MAX
print("MAX Aggregation:", MapReduce(RECORDREADER, MAP, REDUCE, max))


SUM Aggregation: [(1, 25), (2, 45), (3, 30)]
MAX Aggregation: [(1, 15), (2, 25), (3, 30)]


#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [42]:
from collections import namedtuple, defaultdict

# Описание структуры строки матрицы и элемента вектора
Row = namedtuple('Row', ['row', 'col', 'value'])
VectorElement = namedtuple('VectorElement', ['index', 'value'])

# Входные данные: матрица и вектор
matrix = [Row(0, 0, 1.0), Row(0, 1, 2.0), Row(1, 0, 3.0), Row(1, 1, 4.0)]
vector = [VectorElement(0, 0.5), VectorElement(1, 0.7)]

# Функция INPUTFORMAT для считывания данных и подачи их в MapReduce
def INPUTFORMAT():
    def reader():
        # Помечаем записи как "matrix" или "vector"
        for row in matrix:
            yield ("matrix", row)
        for vec in vector:
            yield ("vector", vec)
    return [list(reader())]  # Возвращаем как один список записей

# Функция MAP сопоставления: готовим пары ключ-значение
def MAP(source, data):
    if source == "matrix":
        # Для матрицы: ключ = номер столбца
        yield (data.col, ("M", data.row, data.value))
    elif source == "vector":
        # Для вектора: ключ = индекс элемента
        yield (data.index, ("V", data.value))

# Функция REDUCE обработки группированных значений
def REDUCE(key, values):
    matrix_elements = []  # Список строк матрицы для данного столбца
    vector_value = None   # Значение вектора для данного индекса
    for v in values:
        if v[0] == "M":
            matrix_elements.append((v[1], v[2]))
        elif v[0] == "V":
            vector_value = v[1]

    if vector_value is not None:
        # Умножаем элементы матрицы на значение вектора
        for row, m_val in matrix_elements:
            yield (row, m_val * vector_value)

# Функция PARTITIONER: определяет, к какому редуктору отправить ключ
def PARTITIONER(key):
    return key  # Просто по номеру столбца (без распределения)

# Вспомогательная функция: разворачивает списки списков в один список
def flatten(list_of_lists):
    return [item for sublist in list_of_lists for item in sublist]

# Функция финальной агрегации: складывает частичные произведения
def aggregate(results):
    result = defaultdict(float)
    for row, value in results:
        result[row] += value
    return dict(result)

intermediate_result = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER)

# Финальная сборка результата
final_result = aggregate(flatten([values for pid, values in intermediate_result]))

print("Matrix-Vector Multiplication Result:", final_result)

6 key-value pairs were sent over a network.
Matrix-Vector Multiplication Result: {0: 1.9, 1: 4.3}


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [33]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [34]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)  # Локальная маленькая матрица
big_mat = np.random.rand(J, K)  # Большая матрица загружается через RECORDREADER

def RECORDREADER():
    """Генератор, читающий элементы большой матрицы."""
    for row in range(big_mat.shape[0]):
        for col in range(big_mat.shape[1]):
            yield ((row, col), big_mat[row, col])  # (j, k) - ключ, значение - n_{jk}

def MAP(matrix_key, matrix_value):
    """Функция отображения: умножает элементы большой и малой матриц."""
    j, k = matrix_key  # Извлекаем индексы (j, k)
    w = matrix_value  # Значение n_{jk}

    for i in range(small_mat.shape[0]):  # Перебираем строки маленькой матрицы
        v = small_mat[i, j]  # m_{ij}
        yield ((i, k), v * w)  # (i, k) - ключ, произведение - значение

def REDUCE(key, values):
    """Функция редукции: суммирует произведения."""
    i, k = key
    yield ((i, k), sum(values))  # Суммируем все произведения

# Запуск MapReduce
result = list(MapReduce(RECORDREADER, MAP, REDUCE))
print(result)


[((0, 0), np.float64(0.4057777070336668)), ((1, 0), np.float64(1.200211459219013)), ((0, 1), np.float64(0.20211281296288955)), ((1, 1), np.float64(0.8559190125067031)), ((0, 2), np.float64(0.36481513780643193)), ((1, 2), np.float64(0.7880641268543922)), ((0, 3), np.float64(0.4706182040223124)), ((1, 3), np.float64(0.9463535417187143)), ((0, 4), np.float64(0.46754170166821274)), ((1, 4), np.float64(1.5469276908869785)), ((0, 5), np.float64(0.3080198957606918)), ((1, 5), np.float64(1.336937709567397)), ((0, 6), np.float64(0.24184606574871909)), ((1, 6), np.float64(1.1099679327050767)), ((0, 7), np.float64(0.2712241979298812)), ((1, 7), np.float64(1.2670992773041487)), ((0, 8), np.float64(0.4883217616669323)), ((1, 8), np.float64(1.7976932851346024)), ((0, 9), np.float64(0.3373531433023043)), ((1, 9), np.float64(0.683427336840267)), ((0, 10), np.float64(0.19924233894356844)), ((1, 10), np.float64(1.2015613012294666)), ((0, 11), np.float64(0.33188103286785453)), ((1, 11), np.float64(0.9429

Проверьте своё решение

In [35]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [36]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [37]:
import numpy as np  # Импорт библиотеки NumPy для работы с матрицами

# Размерности матриц
I = 2
J = 3
K = 40

# Генерация случайных матриц
np.random.seed(42)  # Фиксируем seed для воспроизводимости результатов
small_mat = np.random.rand(I, J)  # Матрица M размером (I x J)
big_mat = np.random.rand(J, K)  # Матрица N размером (J x K)

# Эталонное решение с использованием стандартного перемножения матриц
reference_solution = np.matmul(small_mat, big_mat)

# Функция, читающая данные обеих матриц
def RECORDREADER():
    """Чтение данных из матриц M и N."""
    for row in range(small_mat.shape[0]):
        for col in range(small_mat.shape[1]):
            yield ((0, row, col), small_mat[row, col])  # (0, i, j, M[i, j])

    for row in range(big_mat.shape[0]):
        for col in range(big_mat.shape[1]):
            yield ((1, row, col), big_mat[row, col])  # (1, j, k, N[j, k])

# Функция MAP для соединения матриц по j
def MAP_JOIN(key, value):
    """Разбивает элементы по индексу j для соединения."""
    mat_type, first_idx, second_idx = key

    if mat_type == 0:  # Элемент первой матрицы (M)
        yield (second_idx, (0, first_idx, value))  # Группируем по j
    else:  # Элемент второй матрицы (N)
        yield (first_idx, (1, second_idx, value))  # Группируем по j

# Функция REDUCE для соединения матриц по j
def REDUCE_JOIN(key, values):
    """Соединяет элементы M и N по общему индексу j."""
    matrix_m = [v for v in values if v[0] == 0]
    matrix_n = [v for v in values if v[0] == 1]

    for m in matrix_m:
        for n in matrix_n:
            yield ((m[1], n[1]), m[2] * n[2])  # Умножение mij * njk

# Функция MAP для финальной агрегации
def MAP_MUL(key, value):
    """Передаёт (i, k) без изменений для суммирования."""
    yield (key, value)

# Функция REDUCE для суммирования произведений
def REDUCE_MUL(key, values):
    """Суммирует все элементы с одинаковым (i, k)."""
    yield (key, sum(values))

# Генерация объединённых значений через MapReduce
joined_data = MapReduce(RECORDREADER, MAP_JOIN, REDUCE_JOIN)

# Финальная агрегация через MapReduce
final_solution = MapReduce(lambda: joined_data, MAP_MUL, REDUCE_MUL)

# Проверка результата
print(np.allclose(reference_solution, asmatrix(final_solution)))  # Должно вывести True


True


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [38]:
import numpy as np

NUM_MAPPERS = 2
reducers = 2
ROWS_A = 2
COLS_A = 3
COLS_B = 40  # K = 4 * 10

# Генерация данных
A = np.random.rand(ROWS_A, COLS_A)
B = np.random.rand(COLS_A, COLS_B)

def generate_chunks():
    """Создаёт части матриц для обработки в MAP."""
    chunk_size = int(np.ceil(ROWS_A / NUM_MAPPERS))
    for start_row in range(0, ROWS_A, chunk_size):
        yield process_chunk(range(start_row, min(start_row + chunk_size, ROWS_A)))

def process_chunk(rows):
    """Генерирует пары (координаты, значение) из двух матриц."""
    for row in rows:
        for j in range(COLS_A):
            for k in range(COLS_B):
                yield ((row, j), A[row, j]), ((j, k), B[j, k])

def map_function(pair_A, pair_B):
    """Умножает элементы и группирует по (i, k)."""
    (i, j), val_A = pair_A
    (j, k), val_B = pair_B
    yield (i, k), val_A * val_B

def reduce_function(index, values):
    """Суммирует элементы с одинаковыми индексами."""
    yield index, sum(values)

# Запуск MapReduce
intermediate_results = MapReduceDistributed(generate_chunks, map_function, reduce_function)

# Объединение результатов
final_result = {}
for partition_id, partition in intermediate_results:
    for index, value in partition:
        final_result[index] = final_result.get(index, 0) + value

# Вывод результата
print("Matrix Multiplication Result:\n", final_result)

240 key-value pairs were sent over a network.
Matrix Multiplication Result:
 {(0, 1): np.float64(0.47268518561234973), (0, 2): np.float64(1.331585175830705), (0, 4): np.float64(1.021351852495352), (0, 5): np.float64(0.9131445349756118), (0, 8): np.float64(1.0854341486196752), (0, 9): np.float64(0.3075827214969634), (0, 11): np.float64(1.003832121786526), (0, 12): np.float64(0.45686569948009204), (0, 15): np.float64(0.8918100486065025), (0, 16): np.float64(0.5041984539756577), (0, 18): np.float64(0.8286369376706371), (0, 19): np.float64(0.9717206052489281), (0, 22): np.float64(1.099447290027778), (0, 23): np.float64(0.5036027285086736), (0, 25): np.float64(1.395355376421955), (0, 26): np.float64(0.9624208429352515), (0, 29): np.float64(0.5962891831345462), (0, 32): np.float64(0.5959378597563265), (0, 33): np.float64(0.6976597508277623), (0, 36): np.float64(0.04441053975372091), (0, 39): np.float64(0.6117816957363831), (1, 0): np.float64(0.4197798780682388), (1, 1): np.float64(0.31678983

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [39]:
import numpy as np

# Определение размеров матриц
ROWS_A = 2
COLS_A = 3
COLS_B = 40  # K = 4 * 10

# Генерация случайных матриц
matrix_a = np.random.rand(ROWS_A, COLS_A)
matrix_b = np.random.rand(COLS_A, COLS_B)

# Получение эталонного результата через умножение матриц
expected_result = np.matmul(matrix_a, matrix_b)

# Функция для "разглаживания" вложенных итерируемых объектов
def flatten_iterables(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция для группировки элементов по ключу
def group_elements_by_key(iterable):
    grouped = {}
    for (key, value) in iterable:
        grouped[key] = grouped.get(key, []) + [value]
    return grouped.items()

# Функция для распределенной группировки элементов по ключу
def distributed_grouping(map_partitions, partition_func):
    partitions = [dict() for _ in range(num_reducers)]
    for map_partition in map_partitions:
        for (key, value) in map_partition:
            partition = partitions[partition_func(key)]
            partition[key] = partition.get(key, []) + [value]
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

# Функция для выполнения MapReduce на распределенных данных
def distributed_map_reduce(input_format, map_func, reduce_func, partition_func=partition_function, combiner=None):
    map_partitions = map(lambda record_reader: flatten_iterables(map(lambda kv: map_func(*kv), record_reader)), input_format())

    if combiner is not None:
        map_partitions = map(lambda map_partition: flatten_iterables(map(lambda kv: combiner(*kv), group_elements_by_key(map_partition))), map_partitions)

    reduce_partitions = distributed_grouping(map_partitions, partition_func)

    reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten_iterables(map(lambda reduce_input_group: reduce_func(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

    return reduce_outputs

# Функция для преобразования результата REDUCE в матрицу
def convert_to_matrix(reduce_output):
    reduce_output = list(reduce_output)
    rows = max(i for ((i, k), value) in reduce_output) + 1
    cols = max(k for ((i, k), value) in reduce_output) + 1
    result_matrix = np.empty(shape=(rows, cols))
    for ((i, k), value) in reduce_output:
        result_matrix[i, k] = value
    return result_matrix

# Генератор для вводных данных
def input_data_format():
    data_a = []

    for i in range(matrix_a.shape[0]):
        for j in range(matrix_a.shape[1]):
            data_a.append(((0, i, j), matrix_a[i, j]))  # первая матрица

    split_size = int(np.ceil(len(data_a) / num_maps))

    for i in range(0, len(data_a), split_size):
        yield data_a[i:i + split_size]

    data_b = []

    for j in range(matrix_b.shape[0]):
        for k in range(matrix_b.shape[1]):
            data_b.append(((1, j, k), matrix_b[j, k]))  # вторая матрица

    split_size = int(np.ceil(len(data_b) / num_maps))

    for i in range(0, len(data_b), split_size):
        yield data_b[i:i + split_size]

# MAP функция для соединения матриц
def map_join_func(key1, value1):
    (matrix_id, i, j) = key1
    weight = value1

    if matrix_id == 0:
        yield (j, (matrix_id, i, weight))
    else:
        yield (i, (matrix_id, j, weight))

# REDUCE функция для соединения матриц
def reduce_join_func(key, values):
    from_first_matrix = [v for v in values if v[0] == 0]
    from_second_matrix = [v for v in values if v[0] == 1]

    for f in from_first_matrix:
        for s in from_second_matrix:
            yield ((f[1], s[1]), f[2] * s[2])

# Генератор для получения соединенных данных
def generate_joined_data():
    for j in joined_data:
        yield j[1]

# MAP функция для передачи значений
def map_multiplication(key1, value1):
    yield (key1, value1)

# REDUCE функция для суммирования значений
def reduce_multiplication(key, values):
    total_value = 0

    for v in values:
        total_value += v
    yield (key, total_value)

num_maps = 3
num_reducers = 2

# Выполнение MapReduce для соединения матриц
partitioned_result = distributed_map_reduce(input_data_format, map_join_func, reduce_join_func, combiner=None)
joined_data = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_result]

# Выполнение MapReduce для умножения значений
multiplication_output = distributed_map_reduce(generate_joined_data, map_multiplication, reduce_multiplication, combiner=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in multiplication_output]

# Формирование окончательного результата
final_solution = []

for p in pre_result:
    for v in p[1]:
        final_solution.append(v)

# Проверка на соответствие с эталонным решением
is_close = np.allclose(expected_result, convert_to_matrix(final_solution))

# Вывод части матрицы
result_matrix = convert_to_matrix(final_solution)
print("Результат умножения матриц (части первой и последней строки):")
print(result_matrix[0])  # Первая строка
print(result_matrix[-1])  # Последняя строка
print("Сравнение с эталонным решением:", is_close)

NameError: name 'partition_function' is not defined