# Введение в MapReduce модель на Python


In [39]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [40]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [41]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [42]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [43]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [44]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [45]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [46]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [47]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [48]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [49]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [50]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [51]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [52]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [53]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(2.6387311417430066)),
 (1, np.float64(2.6387311417430066)),
 (2, np.float64(2.6387311417430066)),
 (3, np.float64(2.6387311417430066)),
 (4, np.float64(2.6387311417430066))]

## Inverted index

In [54]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('is', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [55]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [56]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [57]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('is', 18), ('it', 18), ('what', 10)]),
 (1, [('banana', 2)])]

## TeraSort

In [58]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.012925826888224523)),
   (None, np.float64(0.03946558479229467)),
   (None, np.float64(0.054139655210143744)),
   (None, np.float64(0.07880626521315148)),
   (None, np.float64(0.08572721673048433)),
   (None, np.float64(0.10293703168527346)),
   (None, np.float64(0.14184874852507667)),
   (None, np.float64(0.15511297375851563)),
   (None, np.float64(0.26089978801091374)),
   (None, np.float64(0.2837043419291563)),
   (None, np.float64(0.29588020217636346)),
   (None, np.float64(0.3578497195653362)),
   (None, np.float64(0.44433290864884145))]),
 (1,
  [(None, np.float64(0.5125585701630619)),
   (None, np.float64(0.5294439804084565)),
   (None, np.float64(0.5583547295693592)),
   (None, np.float64(0.5837069105872251)),
   (None, np.float64(0.5936815531245887)),
   (None, np.float64(0.6641402711627278)),
   (None, np.float64(0.6965190568939257)),
   (None, np.float64(0.7842447877028063)),
   (None, np.float64(0.823608563716433)),
   (None, np.float64(0.8396205

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

In [59]:
from typing import NamedTuple, Iterator, Any, Iterable, Dict, List, Tuple
import math

def flatten(nested_iterable: Iterable[Iterable[Any]]) -> Iterator[Any]:
  """Уплощает итератор итераторов"""
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, List[Any]]]:
  """Группирует пары ключ-значение по ключу"""
  t: Dict[Any, List[Any]] = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER: callable, MAP: callable, REDUCE: callable) -> Iterator[Any]:
  """Основная функция MapReduce"""
  # Чтение и применение MAP
  map_results = map(lambda x: MAP(*x), RECORDREADER())
  # Шаг 2: Уплощение результатов MAP
  flattened_map_results = flatten(map_results)
  # Группировка по ключу
  grouped_results = groupbykey(flattened_map_results)
  # Применение REDUCE к группам и уплощение результатов
  final_result = flatten(map(lambda x: REDUCE(*x), grouped_results))
  return final_result

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [60]:
print("\n--- Задание 1: Максимальное значение ---")

# Входные данные
input_numbers_max = [10, 5, 25, 15, 30, 5, 20, 42, 8]

def RECORDREADER_max():
  """Читает числа, выдавая (индекс, число)."""
  print(f"RECORDREADER_max: Чтение данных {input_numbers_max}")
  for i, number in enumerate(input_numbers_max):
    yield (i, number)

def MAP_max(_, value: int):
  """Отправляет каждое число на один редьюсер с ключом 'max_value'."""
  yield ("max_value", value)

def REDUCE_max(key: str, values: Iterator[int]):
  """Находит максимальное значение среди всех полученных чисел."""
  print(f"REDUCE_max: Обработка ключа '{key}'")
  value_list = list(values)
  if not value_list:
    print("REDUCE_max: Нет значений для обработки.")
    return

  current_max = max(value_list)
  print(f"REDUCE_max: Найден максимум {current_max} из {value_list}")
  yield (key, current_max)

# Запуск MapReduce для поиска максимума
print("Запуск MapReduce для поиска максимума...")
max_output = MapReduce(RECORDREADER_max, MAP_max, REDUCE_max)
max_result = list(max_output) # Материализуем результат

print(f"\nВходные данные: {input_numbers_max}")
print(f"Результат MapReduce: {max_result}")
print(f"Максимальное значение: {max_result[0][1]}")
print("--- Конец Задания 1 ---")


--- Задание 1: Максимальное значение ---
Запуск MapReduce для поиска максимума...
RECORDREADER_max: Чтение данных [10, 5, 25, 15, 30, 5, 20, 42, 8]
REDUCE_max: Обработка ключа 'max_value'
REDUCE_max: Найден максимум 42 из [10, 5, 25, 15, 30, 5, 20, 42, 8]

Входные данные: [10, 5, 25, 15, 30, 5, 20, 42, 8]
Результат MapReduce: [('max_value', 42)]
Максимальное значение: 42
--- Конец Задания 1 ---


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [61]:
print("--- Задание 2: Арифметическое среднее ---")

# Входные данные
input_numbers_avg = [10, 5, 25, 15, 30, 5, 20] #15.7....

def RECORDREADER_avg():
  """Читает числа, выдавая (индекс, число)."""
  print(f"RECORDREADER_avg: Чтение данных {input_numbers_avg}")
  for i, number in enumerate(input_numbers_avg):
    yield (i, number)

def MAP_avg(_, value: int):
  """Отправляет каждое число на один редьюсер с ключом 'avg_value'."""
  yield ("avg_value", value)

def REDUCE_avg(key: str, values: Iterator[int]):
  """Считает сумму и количество, затем вычисляет среднее."""
  print(f"REDUCE_avg: Обработка ключа '{key}'")
  total_sum = 0
  count = 0
  value_list_for_print = []
  for value in values:
    total_sum += value
    count += 1
    value_list_for_print.append(value)

  print(f"REDUCE_avg: Обработаны значения {value_list_for_print}")

  if count > 0:
    average = total_sum / count
    print(f"REDUCE_avg: Сумма={total_sum}, Количество={count}, Среднее={average}")
    yield (key, average)
  else:
    print("REDUCE_avg: Нет значений для обработки.")
    yield (key, 0)

# Запуск MapReduce для поиска среднего
print("Запуск MapReduce для поиска среднего...")
avg_output = MapReduce(RECORDREADER_avg, MAP_avg, REDUCE_avg)
avg_result = list(avg_output)

print(f"\nВходные данные: {input_numbers_avg}")
print(f"Результат MapReduce: {avg_result}")
print(f"Арифметическое среднее: {avg_result[0][1]}")

--- Задание 2: Арифметическое среднее ---
Запуск MapReduce для поиска среднего...
RECORDREADER_avg: Чтение данных [10, 5, 25, 15, 30, 5, 20]
REDUCE_avg: Обработка ключа 'avg_value'
REDUCE_avg: Обработаны значения [10, 5, 25, 15, 30, 5, 20]
REDUCE_avg: Сумма=110, Количество=7, Среднее=15.714285714285714

Входные данные: [10, 5, 25, 15, 30, 5, 20]
Результат MapReduce: [('avg_value', 15.714285714285714)]
Арифметическое среднее: 15.714285714285714


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [62]:
print("--- Задание 3: GroupByKey на основе сортировки ---")

def groupbykey_sorted(iterable: Iterable[Tuple[Any, Any]]) -> Iterator[Tuple[Any, List[Any]]]:
  """
  Группирует пары ключ-значение по ключу, используя предварительную сортировку.
  """
  print("groupbykey_sorted: Начало группировки на основе сортировки.")

  # Итератор в список для сортировки
  data = list(iterable)
  if not data:
    print("groupbykey_sorted: Входной итератор пуст.")
    return

  print(f"groupbykey_sorted: Данные до сортировки: {data}")

  # Сортируем список пар по ключу
  data.sort(key=lambda x: x[0])
  print(f"groupbykey_sorted: Данные после сортировки: {data}")

  # Итерируем по отсортированным данным и группируем
  # Инициализация для первой группы
  current_key, first_value = data[0]
  current_values = [first_value]
  print(f"groupbykey_sorted: Начало группы для ключа '{current_key}' со значением {first_value}")

  # Проходим по остальным элементам
  for i in range(1, len(data)):
    key, value = data[i]
    if key == current_key:# Ключ тот же - добавляем значение в текущую группу
      current_values.append(value)
      print(f"groupbykey_sorted: Добавлено значение {value} к ключу '{current_key}'")
    else:      # Ключ изменился - завершаем предыдущую группу и выдаем ее
      print(f"groupbykey_sorted: Завершение группы для ключа '{current_key}'. Выдача: ({current_key}, {current_values})")
      yield (current_key, current_values)
      # Начинаем новую группу
      current_key = key
      current_values = [value]
      print(f"groupbykey_sorted: Начало новой группы для ключа '{current_key}' со значением {value}")

  # Последняя собранная группа
  print(f"groupbykey_sorted: Завершение последней группы для ключа '{current_key}'. Выдача: ({current_key}, {current_values})")
  yield (current_key, current_values)

  print("groupbykey_sorted: Завершение работы.")



print("\nПроверка groupbykey_sorted:")
# Пример входных данных (неотсортированных)
map_output_example = [
    ('b', 1), ('a', 2), ('b', 3), ('c', 4), ('a', 5), ('b', 6)
]
print(f"Входные данные для проверки: {map_output_example}")

grouped_output_sorted = list(groupbykey_sorted(map_output_example))
print(f"\nРезультат groupbykey_sorted: {grouped_output_sorted}")

# Сравнение с оригинальным groupbykey
grouped_output_dict = list(groupbykey(map_output_example))
grouped_output_dict.sort(key=lambda x: x[0])
print(f"Результат оригинального groupbykey (отсортированный для сравнения): {grouped_output_dict}")

# Проверка идентичности результатов (с учетом сортировки ключей)
if grouped_output_sorted == grouped_output_dict:
    print("\nРезультаты groupbykey_sorted и оригинального groupbykey совпадают (после сортировки).")
else:
    print("\nВНИМАНИЕ: Результаты groupbykey_sorted и оригинального groupbykey различаются!")

--- Задание 3: GroupByKey на основе сортировки ---

Проверка groupbykey_sorted:
Входные данные для проверки: [('b', 1), ('a', 2), ('b', 3), ('c', 4), ('a', 5), ('b', 6)]
groupbykey_sorted: Начало группировки на основе сортировки.
groupbykey_sorted: Данные до сортировки: [('b', 1), ('a', 2), ('b', 3), ('c', 4), ('a', 5), ('b', 6)]
groupbykey_sorted: Данные после сортировки: [('a', 2), ('a', 5), ('b', 1), ('b', 3), ('b', 6), ('c', 4)]
groupbykey_sorted: Начало группы для ключа 'a' со значением 2
groupbykey_sorted: Добавлено значение 5 к ключу 'a'
groupbykey_sorted: Завершение группы для ключа 'a'. Выдача: (a, [2, 5])
groupbykey_sorted: Начало новой группы для ключа 'b' со значением 1
groupbykey_sorted: Добавлено значение 3 к ключу 'b'
groupbykey_sorted: Добавлено значение 6 к ключу 'b'
groupbykey_sorted: Завершение группы для ключа 'b'. Выдача: (b, [1, 3, 6])
groupbykey_sorted: Начало новой группы для ключа 'c' со значением 4
groupbykey_sorted: Завершение последней группы для ключа 'c'. 

### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [63]:
print("--- Задание 4: Исключение дубликатов (распределённая версия) ---")

import numpy as np

reducers = 2 # Количество редьюсеров
maps = 3     # Количество мапперов

def groupbykey_distributed(map_partitions: Iterable[Iterable[Tuple[Any, Any]]], PARTITIONER: callable) -> List[Tuple[int, List[Tuple[Any, List[Any]]]]]:
  """
  Имитирует фазу Shuffle & Group в распределенной среде.
  Распределяет пары (k2, v2) по редьюсерам
  и группирует значения для каждого ключа внутри партиции редьюсера.
  """
  global reducers
  partitions: List[Dict[Any, List[Any]]] = [dict() for _ in range(reducers)]
  print(f"groupbykey_distributed: Распределение по {reducers} редьюсерам...")
  map_partition_index = 0
  total_pairs_received = 0
  network_traffic_pairs = [] # Список пар (k2, v2)

  for map_partition in map_partitions:
    map_partition_index += 1
    pairs_in_partition = 0
    print(f"  Обработка выхода маппера/комбайнера {map_partition_index-1}:")
    processed_in_partition = 0
    for (k2, v2) in map_partition: # Итерируем по парам от одного маппера/комбайнера
      total_pairs_received += 1
      processed_in_partition += 1
      target_reducer = PARTITIONER(k2)
      network_traffic_pairs.append((k2, v2))
      p = partitions[target_reducer]
      p[k2] = p.get(k2, []) + [v2] # Собираем значения для ключа
    print(f"    Передано {processed_in_partition} пар.")


  print(f"groupbykey_distributed: Всего получено и распределено {total_pairs_received} пар.")
  print(f"groupbykey_distributed: {len(network_traffic_pairs)} key-value pairs were sent over a network.")

  # Формируем результат: список пар (id_партиции, отсортированные_группы_в_партиции)
  result = []
  for partition_id, partition_dict in enumerate(partitions):
      # Сортируем группы внутри партиции по ключу для детерминированности
      sorted_groups = sorted(partition_dict.items(), key=lambda item: item[0])
      result.append((partition_id, sorted_groups))
      print(f"  Редьюсер {partition_id} получил группы: {sorted_groups}")
  return result, len(network_traffic_pairs) # Возвращаем и результат, и кол-во переданных пар


def PARTITIONER_default(obj: Any) -> int:
  """ PARTITIONER на основе хэша."""
  global reducers
  partition_index = hash(obj) % reducers
  return partition_index

def MapReduceDistributed(INPUTFORMAT: callable, MAP: callable, REDUCE: callable, PARTITIONER: callable = PARTITIONER_default, COMBINER: callable = None):
  """Распределенная версия MapReduce"""
  global reducers

  print("\nMapReduceDistributed: Начало работы.")
  print(f"Параметры: maps={maps}, reducers={reducers}, COMBINER={'есть' if COMBINER else 'нет'}")

  # INPUTFORMAT генерирует Record Readers для каждого сплита
  record_readers = list(INPUTFORMAT())
  print(f"INPUTFORMAT: Сгенерировано {len(record_readers)} Record Reader'ов (сплитов).")

  # Применяем MAP к каждому сплиту и уплощаем результат для каждого сплита
  # map_partitions_iter - это итератор, где каждый элемент - это итератор пар (k2, v2) от одного маппера
  map_outputs_iter = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), record_readers)

  # Материализуем выходы мапперов, чтобы можно было применить комбайнер
  map_outputs = [list(output) for output in map_outputs_iter]
  print(f"MAP: Завершено {len(map_outputs)} map-задач.")

  # Применяем COMBINER (если есть) локально на выходе каждого маппера
  if COMBINER is not None:
    print("COMBINER: Применение Combiner к выходу каждого маппера...")
    combined_outputs = []
    for i, map_output_list in enumerate(map_outputs):
        # Группируем локально выход маппера
        local_groups = groupbykey(iter(map_output_list)) # groupbykey ожидает итератор
        # Применяем COMBINER к каждой локальной группе и уплощаем
        combined = flatten(map(lambda k2v2s: COMBINER(*k2v2s), local_groups))
        combined_list = list(combined) # Материализуем результат комбайнера
        print(f"  Выход Combiner для Map {i}: {combined_list}")
        combined_outputs.append(iter(combined_list)) # Добавляем как итератор для след. шага
    map_partitions_for_shuffle = combined_outputs # Данные для шаффла - это выход комбайнеров
  else:
    print("COMBINER: Не используется.")
    # Если комбайнера нет
    # Преобразуем списки обратно в итераторы для единообразия
    map_partitions_for_shuffle = [iter(output) for output in map_outputs]

  # Shuffle & Group - Распределение и группировка данных по редьюсерам
  reduce_partitions_grouped, network_pairs = groupbykey_distributed(map_partitions_for_shuffle, PARTITIONER)


  # Применяем REDUCE к каждой группе в каждой партиции редьюсера
  print("REDUCE: Применение Reduce к группам на редьюсерах...")
  reduce_outputs = []
  for partition_id, groups_in_partition in reduce_partitions_grouped:
      print(f"  Обработка на Редьюсере {partition_id}:")
      # Применяем REDUCE к каждой группе (ключ, список_значений) и уплощаем результат
      partition_output_iter = flatten(map(lambda group: REDUCE(*group), groups_in_partition))
      reduce_outputs.append((partition_id, partition_output_iter)) # Сохраняем ID партиции и итератор результата

  print("MapReduceDistributed: Завершение работы.")
  # Возвращаем итератор пар (partition_id, итератор_выхода_редьюсера)
  return reduce_outputs



# Входные данные с дубликатами
input_data_duplicates = ["apple", "banana", "apple", "orange", "banana", "grape", "apple", "kiwi", "orange"]
print(f"\nВходные данные для Drop Duplicates: {input_data_duplicates}")


def INPUTFORMAT_duplicates():
  """Разбивает входной список на части."""
  global maps
  global input_data_duplicates
  split_size = int(np.ceil(len(input_data_duplicates) / maps))
  print(f"INPUTFORMAT_duplicates: Размер сплита = {split_size}")
  for i in range(0, len(input_data_duplicates), split_size):
    split = input_data_duplicates[i : i + split_size]
    print(f"  Генерация Record Reader для сплита: {split}")
    # RECORDREADER для этого сплита
    def RECORDREADER(current_split=split):
        # Используем элемент как ключ k1, None как значение v1
        for item in current_split:
            yield (item, None) # Ключ - сам элемент, значение не важно
    yield RECORDREADER() # Возвращаем функцию-генератор

def MAP_duplicates(item: Any, _):
  """MAP для Drop Duplicates. Выдает (элемент, 1) или (элемент, None)."""
  yield (item, 1) # Используем 1 как фиктивное значение

def REDUCE_duplicates(item: Any, values: Iterator[Any]):
  """REDUCE для Drop Duplicates. Просто выдает ключ (уникальный элемент)."""
  # Получает ключ (уникальный элемент) и итератор фиктивных значений [1, 1, ...].
  yield (item, None)

def COMBINER_duplicates(item: Any, values: Iterator[Any]):
  """COMBINER для Drop Duplicates. Аналогичен REDUCE."""
  # Выполняет ту же логику, что и REDUCE, но локально на маппере.
  yield (item, 1) # Выдаем одну пару (item, 1) для этого ключа

# --- Запуск MapReduceDistributed для Drop Duplicates ---

# Запуск БЕЗ Combiner
print("\n--- Запуск Drop Duplicates БЕЗ Combiner ---")
partitioned_output_no_combiner = MapReduceDistributed(
    INPUTFORMAT_duplicates,
    MAP_duplicates,
    REDUCE_duplicates,
    PARTITIONER_default,
    COMBINER=None
)
# Выводим результат
print("\nРезультат Drop Duplicates БЕЗ Combiner:")
final_result_no_combiner = []
for partition_id, partition_iter in partitioned_output_no_combiner:
    partition_list = list(partition_iter)
    print(f"  Партиция {partition_id}: {partition_list}")
    final_result_no_combiner.extend(partition_list)

# Собираем все уникальные элементы из всех партиций и сортируем
unique_items_no_combiner = sorted([item for item, _ in final_result_no_combiner])
print(f"Итоговый список уникальных элементов (без combiner): {unique_items_no_combiner}")


# Запуск С Combiner
print("\n--- Запуск Drop Duplicates С Combiner (используя COMBINER_duplicates) ---")
partitioned_output_with_combiner = MapReduceDistributed(
    INPUTFORMAT_duplicates,
    MAP_duplicates,
    REDUCE_duplicates,
    PARTITIONER_default,
    COMBINER=COMBINER_duplicates # Используем Combiner
)
# Выводим результат
print("\nРезультат Drop Duplicates С Combiner:")
final_result_with_combiner = []
for partition_id, partition_iter in partitioned_output_with_combiner:
    partition_list = list(partition_iter)
    print(f"  Партиция {partition_id}: {partition_list}")
    final_result_with_combiner.extend(partition_list)

# Собираем все уникальные элементы из всех партиций и сортируем
unique_items_with_combiner = sorted([item for item, _ in final_result_with_combiner])
print(f"Итоговый список уникальных элементов (с combiner): {unique_items_with_combiner}")

# Сравнение результатов
if unique_items_no_combiner == unique_items_with_combiner:
    print("\nРезультаты с Combiner и без него совпадают.")
else:
    print("\nВНИМАНИЕ: Результаты с Combiner и без него различаются!")

--- Задание 4: Исключение дубликатов (распределённая версия) ---

Входные данные для Drop Duplicates: ['apple', 'banana', 'apple', 'orange', 'banana', 'grape', 'apple', 'kiwi', 'orange']

--- Запуск Drop Duplicates БЕЗ Combiner ---

MapReduceDistributed: Начало работы.
Параметры: maps=3, reducers=2, COMBINER=нет
INPUTFORMAT_duplicates: Размер сплита = 3
  Генерация Record Reader для сплита: ['apple', 'banana', 'apple']
  Генерация Record Reader для сплита: ['orange', 'banana', 'grape']
  Генерация Record Reader для сплита: ['apple', 'kiwi', 'orange']
INPUTFORMAT: Сгенерировано 3 Record Reader'ов (сплитов).
MAP: Завершено 3 map-задач.
COMBINER: Не используется.
groupbykey_distributed: Распределение по 2 редьюсерам...
  Обработка выхода маппера/комбайнера 0:
    Передано 3 пар.
  Обработка выхода маппера/комбайнера 1:
    Передано 3 пар.
  Обработка выхода маппера/комбайнера 2:
    Передано 3 пар.
groupbykey_distributed: Всего получено и распределено 9 пар.
groupbykey_distributed: 9 key-

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [64]:
from typing import NamedTuple, Iterator, Any, Iterable, Dict, List, Tuple, Callable, Set
import math

# Пример структуры данных  для реляционных операторов
class Record(NamedTuple):
  id: int
  name: str
  age: int
  city: str
  value: float

def flatten(nested_iterable: Iterable[Iterable[Any]]) -> Iterator[Any]:
  """Уплощает итератор итераторов в один итератор."""
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, List[Any]]]:
  """Группирует пары ключ-значение по ключу."""
  t: Dict[Any, List[Any]] = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER: Callable[[], Iterable[Tuple[Any, Any]]],
              MAP: Callable[[Any, Any], Iterable[Tuple[Any, Any]]],
              REDUCE: Callable[[Any, Iterator[Any]], Iterable[Tuple[Any, Any]]]) -> Iterator[Any]:
  """Основная функция MapReduce (не распределенная версия)."""
  # Чтение и применение MAP
  map_results = map(lambda x: MAP(*x), RECORDREADER())
  # Уплощение результатов MAP
  flattened_map_results = flatten(map_results)
  # Группировка по ключу
  grouped_results = groupbykey(flattened_map_results)
  # Применение REDUCE к группам и уплощение результатов
  final_result_iter = flatten(map(lambda x: REDUCE(*x), grouped_results))
  return final_result_iter # Возвращаем итератор пар (ключ, значение) из REDUCE

In [65]:
print("--- Задание 5: Selection (Выборка) ---")

# Пример входных данных (отношение R)
input_relation_selection = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5),
    Record(id=2, name="Bob", age=25, city="London", value=250.0),
    Record(id=3, name="Charlie", age=35, city="New York", value=50.75),
    Record(id=4, name="David", age=25, city="Paris", value=300.0),
]
print(f"Входное отношение R: {input_relation_selection}")

# Условие выборки C: age > 25 ИЛИ city == "London"
def selection_condition(record: Record) -> bool:
  return record.age > 25 or record.city == "London"

print(f"Условие выборки C: age > 25 or city == 'London'")

def RECORDREADER_selection():
  """Читает записи отношения R."""
  print("RECORDREADER_selection: Чтение данных...")
  # Используем id как ключ k1, саму запись как значение v1
  for record in input_relation_selection:
    yield (record.id, record)

def MAP_selection(record_id: Any, record: Record):
  """Применяет условие C к каждой записи."""
  if selection_condition(record):
    # Выдаем пару (запись, запись)
    print(f"  MAP_selection: Условие истинно для {record}. Выдача ({record}, {record})")
    yield (record, record)

def REDUCE_selection(key_record: Record, records_iterator: Iterator[Record]):
  """Функция идентичности для значения."""
  print(f"REDUCE_selection: Обработка ключа {key_record}. Выдача ({key_record}, {key_record})")
  yield (key_record, key_record)


# Запуск MapReduce для Selection
print("\nЗапуск MapReduce для Selection...")
selection_output = MapReduce(RECORDREADER_selection, MAP_selection, REDUCE_selection)
selection_result = list(selection_output) # Результат

print("\nРезультат операции Selection:")
# Извлечем сами записи
selected_records = [res[0] for res in selection_result]
for record in selected_records:
    print(record)

if not selected_records:
    print("Нет записей, удовлетворяющих условию.")

--- Задание 5: Selection (Выборка) ---
Входное отношение R: [Record(id=1, name='Alice', age=30, city='New York', value=100.5), Record(id=2, name='Bob', age=25, city='London', value=250.0), Record(id=3, name='Charlie', age=35, city='New York', value=50.75), Record(id=4, name='David', age=25, city='Paris', value=300.0)]
Условие выборки C: age > 25 or city == 'London'

Запуск MapReduce для Selection...
RECORDREADER_selection: Чтение данных...
  MAP_selection: Условие истинно для Record(id=1, name='Alice', age=30, city='New York', value=100.5). Выдача (Record(id=1, name='Alice', age=30, city='New York', value=100.5), Record(id=1, name='Alice', age=30, city='New York', value=100.5))
  MAP_selection: Условие истинно для Record(id=2, name='Bob', age=25, city='London', value=250.0). Выдача (Record(id=2, name='Bob', age=25, city='London', value=250.0), Record(id=2, name='Bob', age=25, city='London', value=250.0))
  MAP_selection: Условие истинно для Record(id=3, name='Charlie', age=35, city='Ne

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [66]:
print("--- Задание 6: Projection (Проекция) ---")

# Используем те же входные данные
input_relation_projection = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5),
    Record(id=2, name="Bob", age=25, city="London", value=250.0),
    Record(id=3, name="Charlie", age=35, city="New York", value=50.75),
    Record(id=4, name="David", age=25, city="Paris", value=300.0),
    Record(id=5, name="Eve", age=30, city="New York", value=150.0), # Добавим запись для проверки дубликатов проекции
]
print(f"Входное отношение R: {input_relation_projection}")

# Множество атрибутов S для проекции: name и city
attributes_to_project: Set[str] = {"name", "city"}
print(f"Атрибуты для проекции S: {attributes_to_project}")

ProjectType = Tuple[Any, ...] # Тип для спроецированного кортежа

def RECORDREADER_projection():
  """Читает записи отношения R."""
  print("RECORDREADER_projection: Чтение данных...")
  for record in input_relation_projection:
    yield (record.id, record)

def MAP_projection(record_id: Any, record: Record):
  """Создает проекцию записи на атрибуты S."""
  # Создаем новый кортеж t', содержащий только значения для атрибутов из S
  projected_values = []
  # Сортируем атрибуты для стабильного порядка в кортеже-ключе
  sorted_attributes = sorted(list(attributes_to_project))
  for attr_name in sorted_attributes:
    if hasattr(record, attr_name):
      projected_values.append(getattr(record, attr_name))
    else:
      projected_values.append(None)

  projected_tuple: ProjectType = tuple(projected_values)
  # Выдаем пару (t', t')
  print(f"  MAP_projection: Спроецировано {record} -> {projected_tuple}. Выдача ({projected_tuple}, {projected_tuple})")
  yield (projected_tuple, projected_tuple)

def REDUCE_projection(projected_key: ProjectType, projected_iterator: Iterator[ProjectType]):
  """Удаляет дубликаты проекций."""
  print(f"REDUCE_projection: Обработка ключа {projected_key}. Выдача ({projected_key}, {projected_key})")
  # Выдаем пару (ключ, ключ)
  yield (projected_key, projected_key)

# Запуск MapReduce для Projection
print("\nЗапуск MapReduce для Projection...")
projection_output = MapReduce(RECORDREADER_projection, MAP_projection, REDUCE_projection)
projection_result = list(projection_output) # Результат

print("\nРезультат операции Projection (уникальные проекции):")
# Извлечем сами проекции
projected_tuples = [res[0] for res in projection_result]
for proj_tuple in projected_tuples:
    print(proj_tuple)


--- Задание 6: Projection (Проекция) ---
Входное отношение R: [Record(id=1, name='Alice', age=30, city='New York', value=100.5), Record(id=2, name='Bob', age=25, city='London', value=250.0), Record(id=3, name='Charlie', age=35, city='New York', value=50.75), Record(id=4, name='David', age=25, city='Paris', value=300.0), Record(id=5, name='Eve', age=30, city='New York', value=150.0)]
Атрибуты для проекции S: {'city', 'name'}

Запуск MapReduce для Projection...
RECORDREADER_projection: Чтение данных...
  MAP_projection: Спроецировано Record(id=1, name='Alice', age=30, city='New York', value=100.5) -> ('New York', 'Alice'). Выдача (('New York', 'Alice'), ('New York', 'Alice'))
  MAP_projection: Спроецировано Record(id=2, name='Bob', age=25, city='London', value=250.0) -> ('London', 'Bob'). Выдача (('London', 'Bob'), ('London', 'Bob'))
  MAP_projection: Спроецировано Record(id=3, name='Charlie', age=35, city='New York', value=50.75) -> ('New York', 'Charlie'). Выдача (('New York', 'Charlie

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [67]:
print("--- Задание 7: Union (Объединение) ---")

# Входные данные: два отношения R и S
relation_R = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Общая
    Record(id=2, name="Bob", age=25, city="London", value=250.0),   # Только в R
    Record(id=3, name="Charlie", age=35, city="New York", value=50.75), # Только в R
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Дубликат в R
]
relation_S = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Общая
    Record(id=4, name="David", age=25, city="Paris", value=300.0),   # Только в S
    Record(id=5, name="Eve", age=30, city="New York", value=150.0),   # Только в S
    Record(id=4, name="David", age=25, city="Paris", value=300.0),   # Дубликат в S
]

print("Входное отношение R:")
for r in relation_R: print(f"  {r}")
print("Входное отношение S:")
for s in relation_S: print(f"  {s}")

def get_record_reader_for_union(rel_R, rel_S):
  """Создает RECORDREADER, который читает оба отношения."""
  def reader():
    print("RECORDREADER_union: Чтение из R...")
    for record in rel_R:
      # Используем саму запись как ключ k1 и значение v1
      yield (record, record)
    print("RECORDREADER_union: Чтение из S...")
    for record in rel_S:
      yield (record, record)
  return reader

def MAP_union(key_record: Record, value_record: Record):
  """Превращает входную запись t в пару (t, t)."""
  yield (key_record, value_record)

def REDUCE_union(key_record: Record, values_iterator: Iterator[Record]):
  """Удаляет дубликаты записей."""
  print(f"REDUCE_union: Обработка ключа {key_record}. Выдача ({key_record}, {key_record})")
  yield (key_record, key_record)

# Создаем RECORDREADER для отношений
record_reader_union = get_record_reader_for_union(relation_R, relation_S)

# Запуск MapReduce для Union
print("\nЗапуск MapReduce для Union...")
union_output = MapReduce(record_reader_union, MAP_union, REDUCE_union)
union_result = list(union_output) # Результат

print("\nРезультат операции Union (уникальные записи из R и S):")
# Извлечем сами записи
union_records = sorted([res[0] for res in union_result], key=lambda x: x.id) # Сортируем по ID
for record in union_records:
    print(record)

--- Задание 7: Union (Объединение) ---
Входное отношение R:
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
  Record(id=2, name='Bob', age=25, city='London', value=250.0)
  Record(id=3, name='Charlie', age=35, city='New York', value=50.75)
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
Входное отношение S:
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
  Record(id=4, name='David', age=25, city='Paris', value=300.0)
  Record(id=5, name='Eve', age=30, city='New York', value=150.0)
  Record(id=4, name='David', age=25, city='Paris', value=300.0)

Запуск MapReduce для Union...
RECORDREADER_union: Чтение из R...
RECORDREADER_union: Чтение из S...
REDUCE_union: Обработка ключа Record(id=1, name='Alice', age=30, city='New York', value=100.5). Выдача (Record(id=1, name='Alice', age=30, city='New York', value=100.5), Record(id=1, name='Alice', age=30, city='New York', value=100.5))
REDUCE_union: Обработка ключа Record(id=2, name='Bob', a

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [68]:
print("--- Задание 8: Intersection (Пересечение) ---")

# Используем те же входные данные
relation_R = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Общая
    Record(id=2, name="Bob", age=25, city="London", value=250.0),   # Только в R
    Record(id=3, name="Charlie", age=35, city="New York", value=50.75), # Только в R
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Дубликат в R
]
relation_S = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Общая
    Record(id=4, name="David", age=25, city="Paris", value=300.0),   # Только в S
    Record(id=5, name="Eve", age=30, city="New York", value=150.0),   # Только в S
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Дубликат общей в S
]

print("Входное отношение R:")
for r in relation_R: print(f"  {r}")
print("Входное отношение S:")
for s in relation_S: print(f"  {s}")

def get_record_reader_for_set_ops(rel_R, rel_S):
  """
  Создает RECORDREADER, который читает оба отношения.
  Использует саму запись как ключ k1 и значение v1.
  """
  def reader():
    print("RECORDREADER_set_ops: Чтение из R...")
    for record in rel_R:
      yield (record, record)
    print("RECORDREADER_set_ops: Чтение из S...")
    for record in rel_S:
      yield (record, record)
  return reader

def MAP_intersection(key_record: Record, value_record: Record):
  """Превращает входную запись t в пару (t, t)."""
  yield (key_record, value_record)

def REDUCE_intersection(key_record: Record, values_iterator: Iterator[Record]):
  """
  Выдает запись, если для ключа t есть список из двух (или более) элементов.
  """
  values_list = list(values_iterator)
  count = len(values_list)
  print(f"REDUCE_intersection: Обработка ключа {key_record}, количество = {count}")

  # Проверяем условие "если для ключа t есть список из двух элементов [t, t]"
  # Интерпретируем это как наличие 2 или более копий записи.
  if count >= 2:
     print(f"  REDUCE_intersection: Условие (count >= 2) выполнено. Выдача ({key_record}, {key_record})")
     yield (key_record, key_record)
  else:
     print(f"  REDUCE_intersection: Условие (count >= 2) не выполнено.")


# Создаем RECORDREADER
record_reader_intersection = get_record_reader_for_set_ops(relation_R, relation_S)

# Запуск MapReduce для Intersection
print("\nЗапуск MapReduce для Intersection")
intersection_output = MapReduce(record_reader_intersection, MAP_intersection, REDUCE_intersection)
intersection_result = list(intersection_output) # Результат

print("\nРезультат операции Intersection:")
intersection_records = sorted([res[0] for res in intersection_result], key=lambda x: x.id)
for record in intersection_records:
    print(record)

--- Задание 8: Intersection (Пересечение) ---
Входное отношение R:
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
  Record(id=2, name='Bob', age=25, city='London', value=250.0)
  Record(id=3, name='Charlie', age=35, city='New York', value=50.75)
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
Входное отношение S:
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
  Record(id=4, name='David', age=25, city='Paris', value=300.0)
  Record(id=5, name='Eve', age=30, city='New York', value=150.0)
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)

Запуск MapReduce для Intersection
RECORDREADER_set_ops: Чтение из R...
RECORDREADER_set_ops: Чтение из S...
REDUCE_intersection: Обработка ключа Record(id=1, name='Alice', age=30, city='New York', value=100.5), количество = 4
  REDUCE_intersection: Условие (count >= 2) выполнено. Выдача (Record(id=1, name='Alice', age=30, city='New York', value=100.5), Record(id=1, name='Alice', 

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [69]:
print("--- Задание 9: Difference (Разница R - S) ---")

# Используем те же входные данные
relation_R = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Общая
    Record(id=2, name="Bob", age=25, city="London", value=250.0),   # Только в R
    Record(id=3, name="Charlie", age=35, city="New York", value=50.75), # Только в R
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Дубликат в R
]
relation_S = [
    Record(id=1, name="Alice", age=30, city="New York", value=100.5), # Общая
    Record(id=4, name="David", age=25, city="Paris", value=300.0),   # Только в S
    Record(id=5, name="Eve", age=30, city="New York", value=150.0),   # Только в S
]

print("Входное отношение R:")
for r in relation_R: print(f"  {r}")
print("Входное отношение S:")
for s in relation_S: print(f"  {s}")

def get_record_reader_for_difference(rel_R, rel_S):
  """
  Создает RECORDREADER, который читает оба отношения и тегирует записи.
  """
  def reader():
    print("RECORDREADER_difference: Чтение из R...")
    for record in rel_R:
      # Ключ k1 не важен, значение v1 - пара (запись, тег_отношения)
      yield (record, ('R', record)) # Используем 'R' как тег
    print("RECORDREADER_difference: Чтение из S...")
    for record in rel_S:
      yield (record, ('S', record)) # Используем 'S' как тег
  return reader

def MAP_difference(_, value_pair: Tuple[str, Record]):
  """Извлекает запись и тег, выдает (запись, тег)."""
  tag, record = value_pair
  # Ключ k2 = запись, Значение v2 = тег ('R' или 'S')
  yield (record, tag)

def REDUCE_difference(key_record: Record, tags_iterator: Iterator[str]):
  """Выдает запись, если она была только в R (тег 'R' и нет тега 'S')."""
  # Собираем все теги в множество, чтобы легко проверить наличие 'R' и отсутствие 'S'.
  tags_set = set(tags_iterator)
  print(f"REDUCE_difference: Обработка ключа {key_record}, теги = {tags_set}")

  # Запись должна быть в R И не должна быть в S.
  if 'R' in tags_set and 'S' not in tags_set:
    print(f"  REDUCE_difference: Тег 'R' есть, 'S' нет. Выдача ({key_record}, {key_record})")
    # Выдаем пару (t, t)
    yield (key_record, key_record)
  else:
    print(f"  REDUCE_difference: Условие не выполнено (либо нет 'R', либо есть 'S').")


# Создаем RECORDREADER для разницы
record_reader_difference = get_record_reader_for_difference(relation_R, relation_S)

# Запуск MapReduce для Difference
print("\nЗапуск MapReduce для Difference (R - S)...")
difference_output = MapReduce(record_reader_difference, MAP_difference, REDUCE_difference)
difference_result = list(difference_output) # Результат

print("\nРезультат операции Difference (R - S):")
difference_records = sorted([res[0] for res in difference_result], key=lambda x: x.id)
for record in difference_records:
    print(record)

--- Задание 9: Difference (Разница R - S) ---
Входное отношение R:
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
  Record(id=2, name='Bob', age=25, city='London', value=250.0)
  Record(id=3, name='Charlie', age=35, city='New York', value=50.75)
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
Входное отношение S:
  Record(id=1, name='Alice', age=30, city='New York', value=100.5)
  Record(id=4, name='David', age=25, city='Paris', value=300.0)
  Record(id=5, name='Eve', age=30, city='New York', value=150.0)

Запуск MapReduce для Difference (R - S)...
RECORDREADER_difference: Чтение из R...
RECORDREADER_difference: Чтение из S...
REDUCE_difference: Обработка ключа Record(id=1, name='Alice', age=30, city='New York', value=100.5), теги = {'S', 'R'}
  REDUCE_difference: Условие не выполнено (либо нет 'R', либо есть 'S').
REDUCE_difference: Обработка ключа Record(id=2, name='Bob', age=25, city='London', value=250.0), теги = {'R'}
  REDUCE_difference: Тег

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [70]:
print("--- Задание 10: Natural Join (Естественное Соединение) ---")

# Входные данные: два отношения R(A, B) и S(B, C)
# Используем простые кортежи
# R: (a, b)
relation_R_join = [
    ('a1', 'b1'),
    ('a2', 'b1'),
    ('a3', 'b2'),
    ('a4', 'b3'),
]
# S: (b, c)
relation_S_join = [
    ('b1', 'c1'),
    ('b2', 'c2'),
    ('b1', 'c3'),
    ('b4', 'c4'),
]

print("Входное отношение R(A, B):")
for r in relation_R_join: print(f"  {r}")
print("Входное отношение S(B, C):")
for s in relation_S_join: print(f"  {s}")

def get_record_reader_for_join(rel_R, rel_S):
  """
  Читает оба отношения и тегирует кортежи.
  """
  def reader():
    print("RECORDREADER_join: Чтение из R...")
    for i, record in enumerate(rel_R):
      # Ключ k1 не важен, значение v1 - пара (тег, кортеж)
      yield (i, ('R', record)) # Тег 'R'
    print("RECORDREADER_join: Чтение из S...")
    for i, record in enumerate(rel_S):
      yield (len(rel_R) + i, ('S', record)) # Тег 'S'
  return reader

def MAP_join(_, value_pair: Tuple[str, Tuple]):
  """
  Извлекает атрибут соединения (b) и остальные атрибуты.
  Выдает (b, (tag, other_attribute)).
  """
  tag, record = value_pair
  if tag == 'R':
    # record это (a, b)
    a, b = record
    join_key = b
    value = ('R', a) # Значение - тег и атрибут 'a'
    print(f"MAP_join (R): Обработка {record}. Ключ='{join_key}', Значение={value}")
    yield (join_key, value)
  elif tag == 'S':
    # record это (b, c)
    b, c = record
    join_key = b
    value = ('S', c) # Значение - тег и атрибут 'c'
    print(f"MAP_join (S): Обработка {record}. Ключ='{join_key}', Значение={value}")
    yield (join_key, value)

def REDUCE_join(join_key_b: Any, values_iterator: Iterator[Tuple[str, Any]]):
  """
  Соединяет записи из R и S по ключу join_key_b.
  Выдает тройки (a, b, c).
  """

  list_R_values_a = [] # Список для хранения 'a' из R
  list_S_values_c = [] # Список для хранения 'c' из S

  values_list = list(values_iterator)
  print(f"REDUCE_join: Обработка ключа соединения b = '{join_key_b}'")
  print(f"  REDUCE_join: Полученные значения (тег, атрибут): {values_list}")

  # Разделяем значения по тегам
  for tag, value in values_list:
    if tag == 'R':
      list_R_values_a.append(value) # value это 'a'
    elif tag == 'S':
      list_S_values_c.append(value) # value это 'c'

  print(f"  REDUCE_join: Значения 'a' из R: {list_R_values_a}")
  print(f"  REDUCE_join: Значения 'c' из S: {list_S_values_c}")

  # Если есть значения из ОБОИХ отношений R и S, то выполняем соединение
  if list_R_values_a and list_S_values_c:
    print(f"  REDUCE_join: Найдены совпадения для ключа '{join_key_b}'. Генерация троек (a, b, c)...")
    # Генерируем декартово произведение списков 'a' и 'c'
    for a in list_R_values_a:
      for c in list_S_values_c:
        joined_tuple = (a, join_key_b, c)
        print(f"    REDUCE_join: Выдача (None, {joined_tuple})")
        # Значение - тройка (a, b, c)
        yield (None, joined_tuple)
  else:
    print(f"  REDUCE_join: Нет совпадений для ключа '{join_key_b}' (отсутствуют записи из R или S).")


# Создаем RECORDREADER для Join
record_reader_join = get_record_reader_for_join(relation_R_join, relation_S_join)

# Запуск MapReduce для Natural Join
print("\nЗапуск MapReduce для Natural Join...")
join_output = MapReduce(record_reader_join, MAP_join, REDUCE_join)
join_result = list(join_output) # Результат

print("\nРезультат операции Natural Join (R |x| S):")
# Вывод будет списком пар [(None, (a1, b1, c1)), (None, (a2, b1, c1)), ...]
# Извлечем сами тройки
joined_tuples = sorted([res[1] for res in join_result]) # Сортируем результат
if joined_tuples:
    print("(a, b, c)")
    for tpl in joined_tuples:
        print(tpl)
else:
    print("Результат соединения пуст.")

--- Задание 10: Natural Join (Естественное Соединение) ---
Входное отношение R(A, B):
  ('a1', 'b1')
  ('a2', 'b1')
  ('a3', 'b2')
  ('a4', 'b3')
Входное отношение S(B, C):
  ('b1', 'c1')
  ('b2', 'c2')
  ('b1', 'c3')
  ('b4', 'c4')

Запуск MapReduce для Natural Join...
RECORDREADER_join: Чтение из R...
MAP_join (R): Обработка ('a1', 'b1'). Ключ='b1', Значение=('R', 'a1')
MAP_join (R): Обработка ('a2', 'b1'). Ключ='b1', Значение=('R', 'a2')
MAP_join (R): Обработка ('a3', 'b2'). Ключ='b2', Значение=('R', 'a3')
MAP_join (R): Обработка ('a4', 'b3'). Ключ='b3', Значение=('R', 'a4')
RECORDREADER_join: Чтение из S...
MAP_join (S): Обработка ('b1', 'c1'). Ключ='b1', Значение=('S', 'c1')
MAP_join (S): Обработка ('b2', 'c2'). Ключ='b2', Значение=('S', 'c2')
MAP_join (S): Обработка ('b1', 'c3'). Ключ='b1', Значение=('S', 'c3')
MAP_join (S): Обработка ('b4', 'c4'). Ключ='b4', Значение=('S', 'c4')
REDUCE_join: Обработка ключа соединения b = 'b1'
  REDUCE_join: Полученные значения (тег, атрибут): [

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

#

In [71]:
from typing import NamedTuple, Iterator, Any, Callable, List
from collections import defaultdict

print("--- Задание 11: Grouping and Aggregation ---")

class AggData(NamedTuple):
  group_key: str  # Атрибут 'a' для группировки
  value_to_agg: int # Атрибут 'b' для агрегации
  other_data: str # Атрибут 'c', который будет проигнорирован

# Входные данные
input_data_agg = [
    AggData('group1', 10, 'x'),
    AggData('group2', 5, 'y'),
    AggData('group1', 20, 'z'),
    AggData('group2', 15, 'w'),
    AggData('group1', 5, 'v'),
    AggData('group3', 100, 'u'),
    AggData('group2', 10, 't'),
]
print("Входные данные для группировки:")
for record in input_data_agg: print(f"  {record}")

def RECORDREADER_agg():
  """Читает входные данные для агрегации."""
  print("RECORDREADER_agg: Чтение данных...")
  # Используем индекс как ключ k1, запись как значение v1
  for i, record in enumerate(input_data_agg):
    yield (i, record)

def MAP_agg(_, record: AggData):
  """Извлекает ключ группировки 'a' и значение для агрегации 'b'."""
  # record соответствует (a, b, c)
  key_a = record.group_key
  value_b = record.value_to_agg
  print(f"MAP_agg: Обработка {record}. Выдача ('{key_a}', {value_b})")
  # Выдаем пару (ключ_группировки, значение_для_агрегации)
  yield (key_a, value_b)

def REDUCE_agg_sum(key_a: Any, values_iterator: Iterator[Any]):
  """
  Функция REDUCE для агрегации SUM.
  key_a: Ключ группировки ('a').
  values_iterator: Итератор значений [b1, b2, ...] для этого ключа.
  """
  values_list = list(values_iterator)  # Преобразуем итератор в список
  print(f"REDUCE_agg_sum: Обработка ключа '{key_a}' со значениями {values_list}")

  result_x = sum(values_list) if values_list else 0  # Вычисляем сумму значений, если список не пуст

  print(f"  REDUCE_agg_sum: Результат агрегации = {result_x}. Выдача ('{key_a}', {result_x})")
  yield (key_a, result_x)  # Возвращаем пару (ключ_группировки, результат_агрегации)


def MapReduce(record_reader, map_func, reduce_func):
    """MapReduce"""
    intermediate_data = defaultdict(list)  # Словарь для хранения промежуточных результатов: {ключ: [значения]}

    for k1, v1 in record_reader():
        for key, value in map_func(k1, v1):
            intermediate_data[key].append(value) # Добавляем значение в список для соответствующего ключа

    for key, values in intermediate_data.items():
        for result in reduce_func(key, iter(values)):  # Обрабатываем каждое значение для ключа, передаем итератор
            yield result



# --- Запуск MapReduce для агрегации SUM ---
print("\n--- Запуск MapReduce для агрегации SUM ---")

agg_output_sum = MapReduce(RECORDREADER_agg, MAP_agg, REDUCE_agg_sum) # Запускаем MapReduce процесс
agg_result_sum = sorted(list(agg_output_sum)) # Сортируем результат
print("\nРезультат агрегации SUM (group_key, sum_value):")
for item in agg_result_sum: print(f"  {item}")

--- Задание 11: Grouping and Aggregation ---
Входные данные для группировки:
  AggData(group_key='group1', value_to_agg=10, other_data='x')
  AggData(group_key='group2', value_to_agg=5, other_data='y')
  AggData(group_key='group1', value_to_agg=20, other_data='z')
  AggData(group_key='group2', value_to_agg=15, other_data='w')
  AggData(group_key='group1', value_to_agg=5, other_data='v')
  AggData(group_key='group3', value_to_agg=100, other_data='u')
  AggData(group_key='group2', value_to_agg=10, other_data='t')

--- Запуск MapReduce для агрегации SUM ---
RECORDREADER_agg: Чтение данных...
MAP_agg: Обработка AggData(group_key='group1', value_to_agg=10, other_data='x'). Выдача ('group1', 10)
MAP_agg: Обработка AggData(group_key='group2', value_to_agg=5, other_data='y'). Выдача ('group2', 5)
MAP_agg: Обработка AggData(group_key='group1', value_to_agg=20, other_data='z'). Выдача ('group1', 20)
MAP_agg: Обработка AggData(group_key='group2', value_to_agg=15, other_data='w'). Выдача ('group2'

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [72]:
from typing import NamedTuple, Iterator, Any, Iterable, Dict, List, Tuple, Callable
import math
import numpy as np # Используем для создания тестовых данных

MatrixElement = Tuple[int, int, float] # (i, j, M_ij)
VectorElement = Tuple[int, float]      # (j, v_j)
TaggedValue = Tuple[str, Any]          # ('M', data) or ('V', data)
IntermediateProduct = Tuple[int, float] # (i, M_ij * v_j)
ResultElement = Tuple[int, float]      # (i, x_i)

def flatten(nested_iterable: Iterable[Iterable[Any]]) -> Iterator[Any]:
  """Уплощает итератор итераторов в один итератор."""
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, List[Any]]]:
  """Группирует пары ключ-значение по ключу."""
  t: Dict[Any, List[Any]] = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER: Callable[[], Iterable[Tuple[Any, Any]]],
              MAP: Callable[[Any, Any], Iterable[Tuple[Any, Any]]],
              REDUCE: Callable[[Any, Iterator[Any]], Iterable[Tuple[Any, Any]]]) -> Iterator[Tuple[Any, Any]]:
  """Основная функция MapReduce (не распределенная версия)."""
  print(f"--- Запуск этапа MapReduce с MAP={MAP.__name__}, REDUCE={REDUCE.__name__} ---")
  # Чтение и применение MAP
  map_results = map(lambda x: MAP(*x), RECORDREADER())
  # Уплощение результатов MAP
  flattened_map_results = flatten(map_results)
  # Группировка по ключу
  grouped_results_input = list(flattened_map_results)
  print(f"  MapReduce: Вход для GroupByKey ({len(grouped_results_input)} пар): {grouped_results_input}")
  grouped_results = groupbykey(iter(grouped_results_input)) # Передаем итератор
  # Применение REDUCE к группам и уплощение результатов
  # Материализуем для отладки
  reduce_input = list(grouped_results)
  print(f"  MapReduce: Вход для REDUCE ({len(reduce_input)} групп): {reduce_input}")
  reduce_output_iter = flatten(map(lambda x: REDUCE(*x), iter(reduce_input)))
  # Материализуем результат этапа для возврата
  final_result_list = list(reduce_output_iter)
  print(f"  MapReduce: Выход этапа ({len(final_result_list)} пар): {final_result_list}")
  print(f"--- Завершение этапа MapReduce ---")
  return iter(final_result_list) # Возвращаем как итератор

In [73]:
print("--- Задание 12: Matrix-Vector Multiplication (вектор не в памяти) ---")

# --- Входные данные ---
# Матрица M (m x n) - например, 3x4
m_rows = 3
n_cols = 4
# Представление матрицы как списка кортежей (i, j, value)
matrix_M_data: List[MatrixElement] = [
    (0, 0, 1.0), (0, 1, 2.0),              (0, 3, 1.0), # Строка 0 (пропущен M_02)
                  (1, 1, 3.0), (1, 2, 1.0),             # Строка 1 (пропущен M_10, M_13)
    (2, 0, 4.0),              (2, 2, 2.0), (2, 3, 5.0), # Строка 2 (пропущен M_21)
]
# Вектор v (n x 1) - например, 4x1
# Представление вектора как списка кортежей (j, value)
vector_v_data: List[VectorElement] = [
    (0, 10.0),
    (1, 20.0),
    (2, 30.0),
    (3, 40.0),
]

print("Входная матрица M (i, j, M_ij):")
for elem in matrix_M_data: print(f"  {elem}")
print("Входной вектор v (j, v_j):")
for elem in vector_v_data: print(f"  {elem}")

# --- Этап 1: Распределение вектора и вычисление промежуточных произведений ---
print("\n--- ЭТАП 1: Вычисление промежуточных произведений M_ij * v_j ---")

def RECORDREADER_stage1():
  """Читает матрицу M и вектор v, тегируя элементы."""
  print("RECORDREADER_stage1: Чтение матрицы M...")
  for i, j, m_val in matrix_M_data:
    # Ключ k1 не важен, значение v1 - тег и данные
    yield ( (i,j), ('M', (i, j, m_val)) )
  print("RECORDREADER_stage1: Чтение вектора V...")
  for j, v_val in vector_v_data:
    yield ( j, ('V', (j, v_val)) )

def MAP_stage1(_, tagged_value: TaggedValue):
  """
  MAP для этапа 1. Ключ - индекс j. Значение - тег и нужные данные.
  """
  tag, data = tagged_value
  if tag == 'M':
    # data это (i, j, M_ij)
    i, j, m_val = data
    key_j = j
    value_for_reduce = ('M', (i, m_val)) # Передаем i и M_ij
    print(f"  MAP_stage1 (M): input={data}, output=({key_j}, {value_for_reduce})")
    yield (key_j, value_for_reduce)
  elif tag == 'V':
    # data это (j, v_j)
    j, v_val = data
    key_j = j
    value_for_reduce = ('V', v_val) # Передаем v_j
    print(f"  MAP_stage1 (V): input={data}, output=({key_j}, {value_for_reduce})")
    yield (key_j, value_for_reduce)

def REDUCE_stage1(key_j: int, values_iterator: Iterator[TaggedValue]):
  """
  REDUCE для этапа 1. Находит v_j и вычисляет M_ij * v_j для всех M_ij.
  Выдает (i, M_ij * v_j).
  """
  print(f"REDUCE_stage1: Обработка ключа j = {key_j}")
  vector_value_vj = None
  matrix_elements_for_j = [] # Сохраняем элементы матрицы для этого j

  values_list = list(values_iterator) # Материализуем для двух проходов
  print(f"  REDUCE_stage1: Полученные значения для j={key_j}: {values_list}")

  # Первый проход: найти v_j и собрать элементы M
  for tag, data in values_list:
    if tag == 'V':
      if vector_value_vj is not None:
        print(f"  ПРЕДУПРЕЖДЕНИЕ: Несколько значений v_{key_j} найдено!")
      vector_value_vj = data # data это v_j
    elif tag == 'M':
      matrix_elements_for_j.append(data) # data это (i, M_ij)

  # Второй проход: если v_j найден, вычислить произведения
  if vector_value_vj is not None:
    print(f"  REDUCE_stage1: Найдено v_{key_j} = {vector_value_vj}")
    print(f"  REDUCE_stage1: Элементы матрицы для j={key_j}: {matrix_elements_for_j}")
    for i, m_val in matrix_elements_for_j:
      product = m_val * vector_value_vj
      output_key_i = i
      output_value_product = product
      print(f"    REDUCE_stage1: Вычисление M_{i}{key_j}*v_{key_j} = {m_val}*{vector_value_vj}={product}. Выдача ({output_key_i}, {output_value_product})")
      yield (output_key_i, output_value_product) # Выдаем (i, M_ij * v_j)
  else:
    print(f"  REDUCE_stage1: Значение v_{key_j} не найдено для этого ключа.")


# Запуск Этапа 1
stage1_output_iter = MapReduce(RECORDREADER_stage1, MAP_stage1, REDUCE_stage1)
# Материализуем результат Этапа 1, так как он будет входом для Этапа 2
stage1_result: List[IntermediateProduct] = list(stage1_output_iter)
print(f"\nРезультат Этапа 1 (промежуточные произведения (i, M_ij * v_j)): {stage1_result}")


# --- Этап 2: Суммирование промежуточных произведений по строкам ---
print("\n--- ЭТАП 2: Суммирование промежуточных произведений по строке i ---")

# Используем результат Этапа 1 как вход для Этапа 2
# Нужен RECORDREADER, который читает этот результат

def RECORDREADER_stage2():
  """Читает промежуточные произведения из результата Этапа 1."""
  print("RECORDREADER_stage2: Чтение промежуточных произведений...")
  # stage1_result это список пар (i, product)
  for i, product in stage1_result:
    # Ключ k1 не важен, значение v1 - сама пара (i, product)
    yield (i, (i, product))

def MAP_stage2(_, value: IntermediateProduct):
  """
  MAP для этапа 2. Просто передает пару (i, product) дальше.
  Ключ - i, Значение - product.
  """
  i, product = value
  print(f"  MAP_stage2: input={value}, output=({i}, {product})")
  yield (i, product)

def REDUCE_stage2(key_i: int, products_iterator: Iterator[float]):
  """
  REDUCE для этапа 2. Суммирует все промежуточные произведения для строки i.
  Выдает (i, sum).
  """
  print(f"REDUCE_stage2: Обработка ключа i = {key_i}")
  products_list = list(products_iterator) # Материализуем для суммирования
  print(f"  REDUCE_stage2: Полученные произведения для i={key_i}: {products_list}")
  total_sum = sum(products_list)
  print(f"  REDUCE_stage2: Сумма = {total_sum}. Выдача ({key_i}, {total_sum})")
  yield (key_i, total_sum) # Выдаем (i, x_i)


# Запуск Этапа 2
stage2_output_iter = MapReduce(RECORDREADER_stage2, MAP_stage2, REDUCE_stage2)
final_result_vector: List[ResultElement] = sorted(list(stage2_output_iter)) # Сортируем по i

print("\nИтоговый вектор результата x (i, x_i):")
for elem in final_result_vector:
    print(f"  {elem}")

# Проверка
print("\nПроверка с использованием numpy:")
# Преобразуем наши данные в numpy массивы
m_np = np.zeros((m_rows, n_cols))
for i, j, val in matrix_M_data:
    m_np[i, j] = val
v_np = np.zeros(n_cols)
for j, val in vector_v_data:
    v_np[j] = val

print("Матрица M (numpy):\n", m_np)
print("Вектор v (numpy):\n", v_np)
expected_result = m_np.dot(v_np)
print("Ожидаемый результат (M * v) по numpy:\n", expected_result)

# Сравнение
result_mapreduce = np.zeros(m_rows)
for i, val in final_result_vector:
    result_mapreduce[i] = val
print("Результат MapReduce (numpy):\n", result_mapreduce)

if np.allclose(result_mapreduce, expected_result):
    print("\nРезультаты MapReduce и numpy совпадают!")
else:
    print("\nВНИМАНИЕ: Результаты MapReduce и numpy различаются!")

--- Задание 12: Matrix-Vector Multiplication (вектор не в памяти) ---
Входная матрица M (i, j, M_ij):
  (0, 0, 1.0)
  (0, 1, 2.0)
  (0, 3, 1.0)
  (1, 1, 3.0)
  (1, 2, 1.0)
  (2, 0, 4.0)
  (2, 2, 2.0)
  (2, 3, 5.0)
Входной вектор v (j, v_j):
  (0, 10.0)
  (1, 20.0)
  (2, 30.0)
  (3, 40.0)

--- ЭТАП 1: Вычисление промежуточных произведений M_ij * v_j ---
--- Запуск этапа MapReduce с MAP=MAP_stage1, REDUCE=REDUCE_stage1 ---
RECORDREADER_stage1: Чтение матрицы M...
  MAP_stage1 (M): input=(0, 0, 1.0), output=(0, ('M', (0, 1.0)))
  MAP_stage1 (M): input=(0, 1, 2.0), output=(1, ('M', (0, 2.0)))
  MAP_stage1 (M): input=(0, 3, 1.0), output=(3, ('M', (0, 1.0)))
  MAP_stage1 (M): input=(1, 1, 3.0), output=(1, ('M', (1, 3.0)))
  MAP_stage1 (M): input=(1, 2, 1.0), output=(2, ('M', (1, 1.0)))
  MAP_stage1 (M): input=(2, 0, 4.0), output=(0, ('M', (2, 4.0)))
  MAP_stage1 (M): input=(2, 2, 2.0), output=(2, ('M', (2, 2.0)))
  MAP_stage1 (M): input=(2, 3, 5.0), output=(3, ('M', (2, 5.0)))
RECORDREADER_s

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [74]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [75]:
import numpy as np
from typing import Iterator, Any, Iterable, Dict, List, Tuple, Callable # Для type hints

print("--- Задание 13: Matrix Multiplication (определение функций) ---")

# --- Входные данные и параметры ---
I = 2  # Строк в small_mat (M)
J = 3  # Столбцов в small_mat (M) / Строк в big_mat (N)
K = 40 # K = 40

# Маленькая матрица M (I x J) - доступна глобально
small_mat = np.random.rand(I, J)
print(f"Маленькая матрица M ({I}x{J}) определена (в памяти).")

# Большая матрица N (J x K) - будет читаться через RECORDREADER
big_mat = np.random.rand(J, K)
print(f"Большая матрица N ({J}x{K}) определена (для чтения).")

# --- Реализация RECORDREADER, MAP, REDUCE ---

def RECORDREADER():
  """Читает элементы большой матрицы N."""
  for j in range(big_mat.shape[0]): # J строк
    for k in range(big_mat.shape[1]): # K столбцов
      yield ((j,k), big_mat[j,k])


MapInputKey = Tuple[int, int]      # (j, k)
MapInputValue = float              # n_jk
MapOutputKey = Tuple[int, int]     # (i, k)
MapOutputValue = float             # m_ij * n_jk
ReduceOutputKey = Tuple[int, int]  # (i, k)
ReduceOutputValue = float          # p_ik

def MAP(k1: MapInputKey, v1: MapInputValue):
  """
  MAP функция. Получает ((j, k), n_jk).
  Для каждого i вычисляет m_ij * n_jk и выдает ((i, k), m_ij * n_jk).
  """
  (j, k) = k1
  n_jk = v1
  for i in range(small_mat.shape[0]): # I строк
    m_ij = small_mat[i, j]
    product = m_ij * n_jk
    output_key: MapOutputKey = (i, k)
    output_value: MapOutputValue = product
    yield (output_key, output_value)

def REDUCE(key: MapOutputKey, values: Iterator[MapOutputValue]):
  """
  REDUCE функция. Получает ((i, k), [список произведений m_ij * n_jk]).
  Суммирует произведения для получения p_ik.
  Выдает ((i, k), p_ik).
  """
  (i, k) = key
  # values - это итератор произведений m_ij * n_jk для фиксированных i, k и всех j
  p_ik = sum(values) # sum() может работать напрямую с итератором

  output_key: ReduceOutputKey = (i, k)
  output_value: ReduceOutputValue = p_ik
  # Возвращаем пару ((i, k), p_ik)
  yield (output_key, output_value)

--- Задание 13: Matrix Multiplication (определение функций) ---
Маленькая матрица M (2x3) определена (в памяти).
Большая матрица N (3x40) определена (для чтения).


Проверьте своё решение

In [76]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [77]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [78]:
import numpy as np
from typing import Iterator, Any, Iterable, Dict, List, Tuple, Callable

print("--- Задание 14: Matrix Multiplication (обе матрицы из RECORDREADER) ---")

# --- Входные данные и параметры ---
I = 2  # Строк в M
J = 3  # Столбцов в M / Строк в N
K = 4  # Столбцов в N

# Генерируем матрицы M и N (они будут читаться, а не использоваться напрямую)
matrix_M_source = np.random.rand(I, J)
matrix_N_source = np.random.rand(J, K)
print(f"Матрица M ({I}x{J}) (источник для чтения):\n{matrix_M_source}\n")
print(f"Матрица N ({J}x{K}) (источник для чтения):\n{matrix_N_source}\n")


# Этап 1
RecordReader1Value = Tuple[str, Tuple[int, int, float]] # ('M', (i, j, m_ij)) или ('N', (j, k, n_jk))
Map1OutputKey = int                                     # j
Map1OutputValue = Tuple[str, int, float]                # ('M', i, m_ij) или ('N', k, n_jk)
Reduce1OutputKey = Tuple[int, int]                      # (i, k)
Reduce1OutputValue = float                              # m_ij * n_jk
# Этап 2
RecordReader2Value = Tuple[Tuple[int, int], float]      # ((i, k), m_ij * n_jk)
Map2OutputKey = Tuple[int, int]                         # (i, k)
Map2OutputValue = float                                 # m_ij * n_jk
Reduce2OutputKey = Tuple[int, int]                      # (i, k)
Reduce2OutputValue = float                              # p_ik

# --- Этап 1: Генерация промежуточных произведений ---
print("\n--- ЭТАП 1: Генерация промежуточных произведений m_ij * n_jk ---")

def RECORDREADER_stage1():
  """Читает обе матрицы M и N, добавляя теги."""
  # Читаем M
  print("  RECORDREADER_stage1: Чтение матрицы M...")
  for i in range(matrix_M_source.shape[0]):
    for j in range(matrix_M_source.shape[1]):
      val = matrix_M_source[i, j]
      if val != 0:
         # Ключ k1 не важен, значение v1 - тег и данные
         yield ( (i,j), ('M', (i, j, val)) )
  # Читаем N
  print("  RECORDREADER_stage1: Чтение матрицы N...")
  for j in range(matrix_N_source.shape[0]):
    for k in range(matrix_N_source.shape[1]):
      val = matrix_N_source[j, k]
      if val != 0: # Оптимизация
         yield ( (j,k), ('N', (j, k, val)) )
  print("  RECORDREADER_stage1: Чтение завершено.")


def MAP_stage1(_, v1: RecordReader1Value):
  """
  MAP для этапа 1. Ключ - j. Значение - ('M', i, m_ij) или ('N', k, n_jk).
  """
  tag, data = v1
  if tag == 'M':
    # data это (i, j, m_ij)
    i, j, m_val = data
    key_j: Map1OutputKey = j
    value: Map1OutputValue = ('M', i, m_val)
    yield (key_j, value)
  elif tag == 'N':
    # data это (j, k, n_jk)
    j, k, n_val = data
    key_j: Map1OutputKey = j
    value: Map1OutputValue = ('N', k, n_val)
    yield (key_j, value)

def REDUCE_stage1(key_j: Map1OutputKey, values_iterator: Iterator[Map1OutputValue]):
  """
  REDUCE для этапа 1. Собирает элементы M и N для ключа j.
  Вычисляет произведения m_ij * n_jk и выдает ((i, k), product).
  """
  m_elements = [] # Список для ('M', i, m_ij) -> сохраняем (i, m_ij)
  n_elements = [] # Список для ('N', k, n_jk) -> сохраняем (k, n_jk)

  # Разделяем значения по тегам
  values_list = list(values_iterator) # Материализуем
  for tag, idx, val in values_list:
    if tag == 'M':
      m_elements.append((idx, val)) # (i, m_ij)
    elif tag == 'N':
      n_elements.append((idx, val)) # (k, n_jk)

  # Вычисляем произведения, если есть элементы из обеих матриц
  if m_elements and n_elements:
    for i, m_val in m_elements:
      for k, n_val in n_elements:
        product = m_val * n_val
        output_key: Reduce1OutputKey = (i, k)
        output_value: Reduce1OutputValue = product
        yield (output_key, output_value)



# Запуск Этапа 1
print("\nЗапуск Этапа 1 MapReduce...")
# Предполагаем, что MapReduce определена глобально
stage1_output_iter = MapReduce(RECORDREADER_stage1, MAP_stage1, REDUCE_stage1)
# Материализуем результат Этапа 1, так как он будет входом для Этапа 2
stage1_result: List[Tuple[Reduce1OutputKey, Reduce1OutputValue]] = list(stage1_output_iter)

# --- Этап 2: Суммирование промежуточных произведений по (i, k) ---
print("\n--- ЭТАП 2: Суммирование промежуточных произведений по (i, k) ---")

def RECORDREADER_stage2():
  """Читает промежуточные произведения из результата Этапа 1."""
  print("  RECORDREADER_stage2: Чтение выхода Этапа 1...")
  # stage1_result это список пар ((i, k), product)
  # Ключ k1 не важен, значение v1 - сама пара ((i, k), product)
  for key_ik, product in stage1_result:
      yield (key_ik, (key_ik, product)) # Передаем ((i,k), product) как значение v1
  print("  RECORDREADER_stage2: Чтение завершено.")


def MAP_stage2(_, v1: RecordReader2Value):
  """
  MAP для этапа 2. Ключ - (i, k). Значение - product.
  """
  key_ik, product = v1 # v1 это ((i, k), product)
  output_key: Map2OutputKey = key_ik
  output_value: Map2OutputValue = product
  yield (output_key, output_value)

def REDUCE_stage2(key_ik: Map2OutputKey, products_iterator: Iterator[Map2OutputValue]):
  """
  REDUCE для этапа 2. Суммирует все промежуточные произведения для ключа (i, k).
  Выдает ((i, k), p_ik).
  """
  products_list = list(products_iterator) # Материализуем для суммирования
  p_ik = sum(products_list)

  output_key: Reduce2OutputKey = key_ik
  output_value: Reduce2OutputValue = p_ik
  yield (output_key, output_value) # Выдаем ((i, k), p_ik)


# Запуск Этапа 2
print("\nЗапуск Этапа 2 MapReduce...")
stage2_output_iter = MapReduce(RECORDREADER_stage2, MAP_stage2, REDUCE_stage2)
final_result_list: List[Tuple[Reduce2OutputKey, Reduce2OutputValue]] = list(stage2_output_iter)

# Преобразуем результат
result_matrix = np.zeros((I, K))
print("\nФормирование итоговой матрицы...")
for (i, k), value in final_result_list:
    if 0 <= i < I and 0 <= k < K:
        result_matrix[i, k] = value
    else:
        print(f"Предупреждение: Индекс ({i},{k}) вне ожидаемых границ ({I}x{K})")

print("\nРезультат перемножения матриц P (MapReduce, 2 этапа):")
print(result_matrix)

# --- Проверка с использованием numpy ---
print("\nПроверка с использованием numpy:")
expected_result = np.dot(matrix_M_source, matrix_N_source)
print("Ожидаемый результат P (numpy.dot):")
print(expected_result)

# Сравнение
if np.allclose(result_matrix, expected_result):
    print("\nРезультаты MapReduce и numpy совпадают!")
else:
    print("\nВНИМАНИЕ: Результаты MapReduce и numpy различаются!")

--- Задание 14: Matrix Multiplication (обе матрицы из RECORDREADER) ---
Матрица M (2x3) (источник для чтения):
[[0.90962077 0.13651888 0.99079989]
 [0.00884098 0.04059383 0.11975807]]

Матрица N (3x4) (источник для чтения):
[[0.87868705 0.8548514  0.18989086 0.68109647]
 [0.42407506 0.98023318 0.87902453 0.26350786]
 [0.34725466 0.64358569 0.01148597 0.12986126]]


--- ЭТАП 1: Генерация промежуточных произведений m_ij * n_jk ---

Запуск Этапа 1 MapReduce...
  RECORDREADER_stage1: Чтение матрицы M...
  RECORDREADER_stage1: Чтение матрицы N...
  RECORDREADER_stage1: Чтение завершено.

--- ЭТАП 2: Суммирование промежуточных произведений по (i, k) ---

Запуск Этапа 2 MapReduce...
  RECORDREADER_stage2: Чтение выхода Этапа 1...
  RECORDREADER_stage2: Чтение завершено.

Формирование итоговой матрицы...

Результат перемножения матриц P (MapReduce, 2 этапа):
[[1.20122613 1.54907555 0.30411242 0.78417982]
 [0.06656983 0.12442372 0.03873733 0.03227029]]

Проверка с использованием numpy:
Ожидаемы

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [79]:
import numpy as np
from typing import Iterator, Any, Iterable, Dict, List, Tuple, Callable
import math

print("--- Задание 15: Distributed Matrix Multiplication (обе матрицы из RECORDREADER) ---")

# --- Входные данные и параметры ---
I = 2  # Строк в M
J = 3  # Столбцов в M / Строк в N
K = 4  # Столбцов в N

maps = 2     # Количество мапперов
reducers = 2 # Количество редьюсеров

# Генерируем матрицы M и N
matrix_M_source = np.random.rand(I, J)
matrix_N_source = np.random.rand(J, K)
print(f"Матрица M ({I}x{J}) (источник для чтения):\n{matrix_M_source}\n")
print(f"Матрица N ({J}x{K}) (источник для чтения):\n{matrix_N_source}\n")
print(f"Параметры симуляции: maps={maps}, reducers={reducers}")


# Этап 1
RecordReader1Key = Tuple[int, int]                      # (i,j) or (j,k) - не используется в MAP
RecordReader1Value = Tuple[str, Tuple[int, int, float]] # ('M', (i, j, m_ij)) или ('N', (j, k, n_jk))
Map1OutputKey = int                                     # j
Map1OutputValue = Tuple[str, int, float]                # ('M', i, m_ij) или ('N', k, n_jk)
Reduce1OutputKey = Tuple[int, int]                      # (i, k)
Reduce1OutputValue = float                              # m_ij * n_jk
# Этап 2
Stage1ResultType = List[Tuple[Reduce1OutputKey, Reduce1OutputValue]]
RecordReader2Key = Tuple[int, int]                      # (i, k) - не используется в MAP
RecordReader2Value = Tuple[Tuple[int, int], float]      # ((i, k), m_ij * n_jk)
Map2OutputKey = Tuple[int, int]                         # (i, k)
Map2OutputValue = float                                 # m_ij * n_jk
Reduce2OutputKey = Tuple[int, int]                      # (i, k)
Reduce2OutputValue = float                              # p_ik


# --- Этап 1: Генерация промежуточных произведений ---
print("\n--- ЭТАП 1: Генерация промежуточных произведений m_ij * n_jk ---")

def INPUTFORMAT_dist_mm_s1():
  """
  Генерирует 'maps' читателей. Каждый читатель выдает все элементы M и N.
  """
  global maps
  print("INPUTFORMAT_dist_mm_s1: Генерация читателей...")
  for map_id in range(maps):
    def RECORDREADER(mid=map_id): # Захватываем map_id
        # Читаем M
        for i in range(matrix_M_source.shape[0]):
            for j in range(matrix_M_source.shape[1]):
                val = matrix_M_source[i, j]
                if val != 0:
                    yield ( (i,j), ('M', (i, j, val)) ) # k1, v1
        # Читаем N
        for j in range(matrix_N_source.shape[0]):
            for k in range(matrix_N_source.shape[1]):
                val = matrix_N_source[j, k]
                if val != 0:
                    yield ( (j,k), ('N', (j, k, val)) ) # k1, v1
    yield RECORDREADER()

def MAP_dist_mm_s1(_, v1: RecordReader1Value):
  """
  MAP для этапа 1. Ключ - j. Значение - ('M', i, m_ij) или ('N', k, n_jk).
  """
  tag, data = v1
  if tag == 'M':
    i, j, m_val = data
    yield (j, ('M', i, m_val))
  elif tag == 'N':
    j, k, n_val = data
    yield (j, ('N', k, n_val))

def REDUCE_dist_mm_s1(key_j: Map1OutputKey, values_iterator: Iterator[Map1OutputValue]):
  """
  REDUCE для этапа 1. Собирает УНИКАЛЬНЫЕ элементы M и N для ключа j.
  Вычисляет произведения m_ij * n_jk и выдает ((i, k), product).
  """
  # Используем словари для хранения уникальных элементов по индексам i и k
  m_elements_unique: Dict[int, float] = {} # {i: m_ij}
  n_elements_unique: Dict[int, float] = {} # {k: n_jk}

  values_list = list(values_iterator)
  for tag, idx, val in values_list:
    if tag == 'M':
      # Сохраняем m_ij для индекса i. Дубликаты будут перезаписаны тем же значением.
      m_elements_unique[idx] = val # idx это i
    elif tag == 'N':
      # Сохраняем n_jk для индекса k.
      n_elements_unique[idx] = val # idx это k

  # Вычисляем произведения, используя уникальные элементы
  if m_elements_unique and n_elements_unique:
    for i, m_val in m_elements_unique.items():
      for k, n_val in n_elements_unique.items():
        product = m_val * n_val
        output_key: Reduce1OutputKey = (i, k)
        output_value: Reduce1OutputValue = product

        yield (output_key, output_value)



# Запуск Этапа 1
print("\nЗапуск Этапа 1 MapReduceDistributed...")
stage1_partitioned_output = MapReduceDistributed(
    INPUTFORMAT_dist_mm_s1,
    MAP_dist_mm_s1,
    REDUCE_dist_mm_s1,
    PARTITIONER=PARTITIONER_default, # Партиционирование по j
    COMBINER=None # Комбайнер здесь не применим
)

# Собираем весь выход Этапа 1 из всех партиций редьюсеров
print("\nСбор результатов Этапа 1...")
stage1_result: Stage1ResultType = []
for partition_id, partition_iter in stage1_partitioned_output:
    partition_data = list(partition_iter)
    print(f"  Этап 1: Результат из партиции {partition_id} ({len(partition_data)} пар): {partition_data[:5]}...")
    stage1_result.extend(partition_data)
print(f"Этап 1: Всего получено {len(stage1_result)} промежуточных произведений.")


# --- Этап 2: Суммирование промежуточных произведений по (i, k) ---
print("\n--- ЭТАП 2: Суммирование промежуточных произведений по (i, k) ---")

def INPUTFORMAT_dist_mm_s2():
    """
    Генерирует 'maps' читателей для выхода Этапа 1.
    Делит stage1_result на части.
    """
    global maps
    global stage1_result # Доступ к результату Этапа 1
    print("INPUTFORMAT_dist_mm_s2: Генерация читателей для промежуточных данных...")
    split_size = int(math.ceil(len(stage1_result) / maps))
    if split_size == 0 and len(stage1_result) > 0: split_size = 1 # На случай очень маленьких данных
    if split_size == 0: print("  Предупреждение: Нет данных для Этапа 2."); return

    print(f"  Размер сплита для Этапа 2: {split_size}")
    for i in range(0, len(stage1_result), split_size):
        split = stage1_result[i : i + split_size]
        def RECORDREADER(current_split=split):
            # Входные данные - пары ((i, k), product)
            # Ключ k1 не важен, значение v1 - сама пара
            for key_ik, product in current_split:
                yield (key_ik, (key_ik, product)) # k1, v1=((i,k), product)
        yield RECORDREADER()


def MAP_dist_mm_s2(_, v1: RecordReader2Value):
  """
  MAP для этапа 2. Ключ - (i, k). Значение - product.
  """
  key_ik, product = v1 # v1 это ((i, k), product)
  yield (key_ik, product) # Выдаем ((i, k), product)

def REDUCE_dist_mm_s2(key_ik: Map2OutputKey, products_iterator: Iterator[Map2OutputValue]):
  """
  REDUCE для этапа 2. Суммирует все промежуточные произведения для ключа (i, k).
  Выдает ((i, k), p_ik).
  """
  p_ik = sum(products_iterator) # sum() работает с итератором
  yield (key_ik, p_ik) # Выдаем ((i, k), p_ik)

# COMBINER для Этапа 2 (идентичен REDUCE)
COMBINER_dist_mm_s2 = REDUCE_dist_mm_s2

# Запуск Этапа 2
print("\nЗапуск Этапа 2 MapReduceDistributed...")
# Используем Combiner
stage2_partitioned_output = MapReduceDistributed(
    INPUTFORMAT_dist_mm_s2,
    MAP_dist_mm_s2,
    REDUCE_dist_mm_s2,
    PARTITIONER=PARTITIONER_default, # Партиционирование по (i, k)
    COMBINER=COMBINER_dist_mm_s2     # Используем Combiner
)

# Собираем финальный результат
print("\nСбор финального результата...")
final_result_list: List[Tuple[Reduce2OutputKey, Reduce2OutputValue]] = []
for partition_id, partition_iter in stage2_partitioned_output:
    partition_data = list(partition_iter)
    print(f"  Этап 2: Результат из партиции {partition_id} ({len(partition_data)} пар): {partition_data[:5]}...")
    final_result_list.extend(partition_data)

# Преобразуем результат в матрицу
result_matrix = np.zeros((I, K))
print("\nФормирование итоговой матрицы...")
for (i, k), value in final_result_list:
    if 0 <= i < I and 0 <= k < K:
        result_matrix[i, k] = value
    else:
        print(f"Предупреждение: Индекс ({i},{k}) вне ожидаемых границ ({I}x{K})")

print("\nРезультат перемножения матриц P (MapReduce Distributed, 2 этапа):")
print(result_matrix)

# --- Проверка с использованием numpy ---
print("\nПроверка с использованием numpy:")
expected_result = np.dot(matrix_M_source, matrix_N_source)
print("Ожидаемый результат P (numpy.dot):")
print(expected_result)

# Сравнение
if np.allclose(result_matrix, expected_result):
    print("\nРезультаты MapReduce Distributed и numpy совпадают!")
else:
    print("\nВНИМАНИЕ: Результаты MapReduce Distributed и numpy различаются!")

--- Задание 15: Distributed Matrix Multiplication (обе матрицы из RECORDREADER) ---
Матрица M (2x3) (источник для чтения):
[[0.75307665 0.59942528 0.60386728]
 [0.59331465 0.16121372 0.86347734]]

Матрица N (3x4) (источник для чтения):
[[0.99629572 0.93008898 0.19695064 0.29270421]
 [0.92158041 0.04435036 0.4994959  0.51780366]
 [0.97171941 0.73100223 0.24799823 0.44238458]]

Параметры симуляции: maps=2, reducers=2

--- ЭТАП 1: Генерация промежуточных произведений m_ij * n_jk ---

Запуск Этапа 1 MapReduceDistributed...

MapReduceDistributed: Начало работы.
Параметры: maps=2, reducers=2, COMBINER=нет
INPUTFORMAT_dist_mm_s1: Генерация читателей...
INPUTFORMAT: Сгенерировано 2 Record Reader'ов (сплитов).
MAP: Завершено 2 map-задач.
COMBINER: Не используется.
groupbykey_distributed: Распределение по 2 редьюсерам...
  Обработка выхода маппера/комбайнера 0:
    Передано 18 пар.
  Обработка выхода маппера/комбайнера 1:
    Передано 18 пар.
groupbykey_distributed: Всего получено и распределено

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [80]:
import numpy as np
from typing import Iterator, Any, Iterable, Dict, List, Tuple, Callable
import math

print("--- Задание 16: Distributed Matrix Multiplication (сплиты для обеих матриц) ---")

# --- Входные данные и параметры ---
I = 3  # Строк в M
J = 4  # Столбцов в M / Строк в N
K = 2  # Столбцов в N

maps = 2     # Количество мапперов
reducers = 2 # Количество редьюсеров

# Генерируем матрицы M и N
matrix_M_source = np.random.rand(I, J)
matrix_N_source = np.random.rand(J, K)
print(f"Матрица M ({I}x{J}) (источник для чтения):\n{matrix_M_source}\n")
print(f"Матрица N ({J}x{K}) (источник для чтения):\n{matrix_N_source}\n")
print(f"Параметры симуляции: maps={maps}, reducers={reducers}")

# Преобразуем матрицы в списки элементов для удобного разделения
all_m_elements = []
for i in range(matrix_M_source.shape[0]):
    for j in range(matrix_M_source.shape[1]):
        val = matrix_M_source[i, j]
        if val != 0:
            all_m_elements.append( ('M', (i, j, val)) ) # v1 для Этапа 1

all_n_elements = []
for j in range(matrix_N_source.shape[0]):
    for k in range(matrix_N_source.shape[1]):
        val = matrix_N_source[j, k]
        if val != 0:
             all_n_elements.append( ('N', (j, k, val)) ) # v1 для Этапа 1

print(f"Всего элементов M: {len(all_m_elements)}, N: {len(all_n_elements)}")

RecordReader1Value = Tuple[str, Tuple[int, int, float]]
Map1OutputKey = int
Map1OutputValue = Tuple[str, int, float]
Reduce1OutputKey = Tuple[int, int]
Reduce1OutputValue = float
Stage1ResultType = List[Tuple[Reduce1OutputKey, Reduce1OutputValue]]
RecordReader2Value = Tuple[Tuple[int, int], float]
Map2OutputKey = Tuple[int, int]
Map2OutputValue = float
Reduce2OutputKey = Tuple[int, int]
Reduce2OutputValue = float

# --- Этап 1: Генерация промежуточных произведений ---
print("\n--- ЭТАП 1: Генерация промежуточных произведений m_ij * n_jk ---")

def INPUTFORMAT_dist_mm_s1_split():
    """
    Генерирует 'maps' читателей. Каждый читатель получает свой сплит M и N.
    """
    global maps
    global all_m_elements, all_n_elements
    print("INPUTFORMAT_dist_mm_s1_split: Генерация читателей со сплитами...")

    # Разделяем списки элементов на части
    m_split_size = int(math.ceil(len(all_m_elements) / maps))
    n_split_size = int(math.ceil(len(all_n_elements) / maps))
    if m_split_size == 0 and len(all_m_elements) > 0: m_split_size = 1
    if n_split_size == 0 and len(all_n_elements) > 0: n_split_size = 1

    print(f"  Размер сплита M: {m_split_size}, N: {n_split_size}")

    for map_id in range(maps):
        # Определяем границы сплитов для этого маппера
        m_start = map_id * m_split_size
        m_end = min((map_id + 1) * m_split_size, len(all_m_elements))
        n_start = map_id * n_split_size
        n_end = min((map_id + 1) * n_split_size, len(all_n_elements))

        m_split = all_m_elements[m_start:m_end]
        n_split = all_n_elements[n_start:n_end]

        def RECORDREADER(m_data=m_split, n_data=n_split, mid=map_id): # Захватываем данные
            for tag, data in m_data:
                 # data это (i, j, val)
                 i, j, _ = data
                 yield ( (i,j), (tag, data) ) # k1=(i,j), v1=('M', (i,j,val))
            for tag, data in n_data:
                 # data это (j, k, val)
                 j, k, _ = data
                 yield ( (j,k), (tag, data) ) # k1=(j,k), v1=('N', (j,k,val))
        yield RECORDREADER()

# MAP для Этапа 1
def MAP_dist_mm_s1(_, v1: RecordReader1Value):
  tag, data = v1
  if tag == 'M':
    i, j, m_val = data
    yield (j, ('M', i, m_val))
  elif tag == 'N':
    j, k, n_val = data
    yield (j, ('N', k, n_val))

# REDUCE для Этапа 1
def REDUCE_dist_mm_s1(key_j: Map1OutputKey, values_iterator: Iterator[Map1OutputValue]):
  m_elements_unique: Dict[int, float] = {} # {i: m_ij}
  n_elements_unique: Dict[int, float] = {} # {k: n_jk}
  values_list = list(values_iterator)
  for tag, idx, val in values_list:
    if tag == 'M':
      m_elements_unique[idx] = val # idx это i
    elif tag == 'N':
      n_elements_unique[idx] = val # idx это k
  if m_elements_unique and n_elements_unique:
    for i, m_val in m_elements_unique.items():
      for k, n_val in n_elements_unique.items():
        product = m_val * n_val
        yield ((i, k), product) # Выдаем ((i, k), m_ij * n_jk)

# Запуск Этапа 1
print("\nЗапуск Этапа 1 MapReduceDistributed (со сплитами)...")
stage1_partitioned_output = MapReduceDistributed(
    INPUTFORMAT_dist_mm_s1_split,
    MAP_dist_mm_s1,
    REDUCE_dist_mm_s1,
    PARTITIONER=PARTITIONER_default,
    COMBINER=None
)

# Собираем результат Этапа 1
print("\nСбор результатов Этапа 1...")
stage1_result: Stage1ResultType = []
for partition_id, partition_iter in stage1_partitioned_output:
    partition_data = list(partition_iter)
    print(f"  Этап 1: Результат из партиции {partition_id} ({len(partition_data)} пар): {partition_data[:5]}...")
    stage1_result.extend(partition_data)
print(f"Этап 1: Всего получено {len(stage1_result)} промежуточных произведений.")


# --- Этап 2: Суммирование промежуточных произведений по (i, k) ---
print("\n--- ЭТАП 2: Суммирование промежуточных произведений по (i, k) ---")

# INPUTFORMAT, MAP, REDUCE, COMBINER для Этапа 2
def INPUTFORMAT_dist_mm_s2():
    global maps, stage1_result
    print("INPUTFORMAT_dist_mm_s2: Генерация читателей для промежуточных данных...")
    split_size = int(math.ceil(len(stage1_result) / maps))
    if split_size == 0 and len(stage1_result) > 0: split_size = 1
    if split_size == 0: print("  Предупреждение: Нет данных для Этапа 2."); return [] # Возвращаем пустой список итераторов
    print(f"  Размер сплита для Этапа 2: {split_size}")
    readers = []
    for i in range(0, len(stage1_result), split_size):
        split = stage1_result[i : i + split_size]
        def RECORDREADER(current_split=split):
            for key_ik, product in current_split:
                yield (key_ik, (key_ik, product))
        readers.append(RECORDREADER())
    return readers # Возвращаем список созданных генераторов

def MAP_dist_mm_s2(_, v1: RecordReader2Value):
  key_ik, product = v1
  yield (key_ik, product)

def REDUCE_dist_mm_s2(key_ik: Map2OutputKey, products_iterator: Iterator[Map2OutputValue]):
  p_ik = sum(products_iterator)
  yield (key_ik, p_ik)

COMBINER_dist_mm_s2 = REDUCE_dist_mm_s2

# Запуск Этапа 2
print("\nЗапуск Этапа 2 MapReduceDistributed (со сплитами)...")
stage2_partitioned_output = MapReduceDistributed(
    INPUTFORMAT_dist_mm_s2,
    MAP_dist_mm_s2,
    REDUCE_dist_mm_s2,
    PARTITIONER=PARTITIONER_default,
    COMBINER=COMBINER_dist_mm_s2
)

# Собираем финальный результат
print("\nСбор финального результата...")
final_result_list: List[Tuple[Reduce2OutputKey, Reduce2OutputValue]] = []
for partition_id, partition_iter in stage2_partitioned_output:
    partition_data = list(partition_iter)
    print(f"  Этап 2: Результат из партиции {partition_id} ({len(partition_data)} пар): {partition_data[:5]}...")
    final_result_list.extend(partition_data)

# Преобразуем результат в матрицу
result_matrix = np.zeros((I, K))
print("\nФормирование итоговой матрицы...")
for (i, k), value in final_result_list:
    if 0 <= i < I and 0 <= k < K:
        result_matrix[i, k] = value
    else:
        print(f"Предупреждение: Индекс ({i},{k}) вне ожидаемых границ ({I}x{K})")

print("\nРезультат перемножения матриц P (MapReduce Distributed, сплиты, 2 этапа):")
print(result_matrix)

# --- Проверка с использованием numpy ---
print("\nПроверка с использованием numpy:")
expected_result = np.dot(matrix_M_source, matrix_N_source)
print("Ожидаемый результат P (numpy.dot):")
print(expected_result)

# Сравнение
if np.allclose(result_matrix, expected_result):
    print("\nРезультаты MapReduce Distributed и numpy совпадают!")
else:
    print("\nВНИМАНИЕ: Результаты MapReduce Distributed и numpy различаются!")

--- Задание 16: Distributed Matrix Multiplication (сплиты для обеих матриц) ---
Матрица M (3x4) (источник для чтения):
[[0.13611448 0.78177041 0.35898274 0.61529622]
 [0.45504244 0.46441527 0.78124798 0.02682177]
 [0.78457561 0.20486753 0.66346266 0.9556118 ]]

Матрица N (4x2) (источник для чтения):
[[0.73930585 0.31208152]
 [0.5705808  0.25240995]
 [0.11125565 0.2574859 ]
 [0.26102836 0.74739152]]

Параметры симуляции: maps=2, reducers=2
Всего элементов M: 12, N: 8

--- ЭТАП 1: Генерация промежуточных произведений m_ij * n_jk ---

Запуск Этапа 1 MapReduceDistributed (со сплитами)...

MapReduceDistributed: Начало работы.
Параметры: maps=2, reducers=2, COMBINER=нет
INPUTFORMAT_dist_mm_s1_split: Генерация читателей со сплитами...
  Размер сплита M: 6, N: 4
INPUTFORMAT: Сгенерировано 2 Record Reader'ов (сплитов).
MAP: Завершено 2 map-задач.
COMBINER: Не используется.
groupbykey_distributed: Распределение по 2 редьюсерам...
  Обработка выхода маппера/комбайнера 0:
    Передано 10 пар.
  Об

Будет ли работать решение (из Задания 16), если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

Да, будет работать корректно.

Роль Shuffle & Group: Ключевым элементом, обеспечивающим корректность MapReduce, является фаза Shuffle and Group (в нашей симуляции это groupbykey_distributed и PARTITIONER). Эта фаза гарантирует, что все промежуточные пары ключ-значение с одинаковым ключом будут собраны вместе и отправлены на один и тот же экземпляр Reducer'а, независимо от того, какой Mapper их сгенерировал.
Этап 1:
Даже если RECORDREADERы читают случайные подмножества, каждый MAP_dist_mm_s1 все равно правильно сгенерирует пары (j, ('M', i, m_ij)) и (j, ('N', k, n_jk)) для тех элементов, которые он получил.
Фаза Shuffle соберет все пары с одинаковым ключом j (неважно, из какого случайного подмножества они пришли) на одном REDUCE_dist_mm_s1.
Исправленный REDUCE_dist_mm_s1 найдет все уникальные m_ij и n_jk, пришедшие для этого j, и вычислит все необходимые произведения m_ij * n_jk. Если какой-то нужный элемент m_ij или n_jk не попал ни в одно случайное подмножество, он просто не будет учтен (что эквивалентно умножению на 0, если считать пропущенные элементы нулями).
Этап 2:
Аналогично, MAP_dist_mm_s2 генерирует пары ((i, k), product).
Фаза Shuffle соберет все произведения для одинакового ключа (i, k) на одном REDUCE_dist_mm_s2.
REDUCE_dist_mm_s2 корректно просуммирует все пришедшие для (i, k) произведения, независимо от того, какой маппер или какой редьюсер Этапа 1 их сгенерировал.
Вывод: Корректность алгоритма не зависит от того, являются ли входные данные для мапперов последовательными сплитами или случайными подмножествами. Главное, чтобы механизм MapReduce (особенно Shuffle & Group) правильно собрал все промежуточные данные по ключам перед передачей их в Reducer.