# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [2]:
def MAP(_, row:NamedTuple):
  #проверка, что пол женский. Если девушка, значение возращается
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    # цикл по переданному rows
    sum += row.social_contacts
    count += 1
  if (count > 0):
    #возращается кортеж с возрастом и среднем суммы (если контакты были)
    yield (age, sum/count)
  else:
    # если контактов нет, ничего не возращается
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  #инициализируем класс user, в котором есть id, возраст, количество социальных контактов и пол
  id: int
  age: str
  social_contacts: int
  gender: str

In [4]:
input_collection = [
    #создаем начальный набор user-ов
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  #генератор, который "читает" user-ов из input_collection и присваивает прочитанным элементам id.
  #По прочтении возвращает id и сам прочитанный элемент (user-a)
  return [(u.id, u) for u in input_collection]

In [6]:
# все прочитанное функцией сразу определяем как list
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  #данная функция ждет какой-то итерируемый объект, содержащий другие итерируемые объекты (например, список списков)
  #после чего мы перебираем в [nested, iterable] iterable-лы (то есть элементы списка iterable), после чего возвращаем эти самые элементы
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
# flatten - пример использования. мы применяем функцию MAP (отбор из списка только девушек) ко всем элементам, которые вернет RECORDREADER() 
# (это применение обеспечивается за счет использования map). Анонимная лямбда функция принимает кортеж вида (id, user). MAP(*x) - кортеж 
# распаковали и получили MAP(1, User(...)). 
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  # iterable - это итерируемый объект, содержащий кортежи, где первый элемент кортежа является ключом,
  # t - словарь, идет цикл по кортежу iterable. k2 - ключ (возраст, для примера ниже), v2 - значения (инфа об user-е)
  #  t.get(k2, []): Если ключ k2 уже существует в словаре, возвращает соответствующее значение. Если ключ k2 не существует, возвращает 
  # пустой список []. Потом прибавляется значение v2. То есть в данном случае код группирует те объекты у которых ключи одинаковы, 
  # и возвращает список с уникальными ключами
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
# берем получившийся выше список, и поочередно прогоняем через REDUCE - эта функция возвращает либо возраст и количество соц контактов, 
# либо общий возраст и среднее количество соц контактов на всех представителей данного возраста. 
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. 

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [14]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*
 
mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL 

In [16]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str
    
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)
 
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication 

In [17]:
from typing import Iterator
import numpy as np

#начальные данные единичная матрица размером 5х4, вектор из 4 чисел в диапазоне от 0 до 1

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  #распаковка кортежа координат на i и j, возвращаем i и результат умножения элемента на элементр вектора [j] (умножение матрицы на вектор столбец поэлементно)
  i, j = coordinates
  yield (i, value*vec[j])
 
def REDUCE(i:int, products:Iterator[NamedTuple]):
#принимает индекст строки и Iterator
  sum = 0
  for p in products:
  #Начинается цикл for, который перебирает значения (результаты умножения) из итератора products.
    sum += p

  #после пробега по всем продуктам возращает сумму
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
  #Внешний цикл for, который перебирает строки матрицы. mat.shape[0] возвращает количество строк в матрице.
    for j in range(mat.shape[1]):
    #Внутренний цикл for, который перебирает столбцы матрицы. mat.shape[1] возвращает количество столбцов в матрице.
    # возращает кортеж с координатами и значение элемента, расположенного по этим координатам
      yield ((i, j), mat[i,j])
      
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.2548250790451094),
 (1, 1.2548250790451094),
 (2, 1.2548250790451094),
 (3, 1.2548250790451094),
 (4, 1.2548250790451094)]

## Inverted index 

In [18]:
from typing import Iterator
# определяем 3 строки
d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
# создаем список, в котором содержатся все строки
documents = [d1, d2, d3]

def RECORDREADER():
  #Перебирает элементы списка documents с помощью функции enumerate.  enumerate возвращает пары (индекс, элемент). 
  # docid - индекс текущей строки (начиная с 0).
  # document - строка.
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)
      
def MAP(docId:str, body:str):
  #цикл перебирает уникальные слова в строке.
  # body.split(' ') -  разбивает строку body на список слов, используя пробел как разделитель, после чего слова становятся множеством (фун-я set)
  # for word in ... - перебирает каждое уникальное слово в множестве. После чего возвращается кортеж (слово, идентификатор строки)
  for word in set(body.split(' ')):
    yield (word, docId)
 
def REDUCE(word:str, docIds:Iterator[str]):
  # принимает (слово; идентификаторы строк, где встречается это слово)
  # возвращает (слово; отсортированные по порядку идентификаторы строк)
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [19]:
from typing import Iterator

# определение строк и списка, включающего все строки

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  # Перебирает строки в списке documents вместе с их индексами.
  for (docid, document) in enumerate(documents):
    # Для каждой строки (d1, d2, d3) делает разбивку на строки (по \n) и перебирает эти строки вместе с их индексами.
    for (lineid, line) in enumerate(document.split('\n')):
      # возвращает (идентификатор строки, строку(без \n))
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  # читает строки (d1, d2, d3) и разбивает их на слова
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  # подсчет количества появления слова и возвращение (слово, количество появлений)
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [20]:
def flatten(nested_iterable):
  #возвращаем из (neated, [element, element]) element
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  # возвращает список с уникальными ключами
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()
      
def groupbykey_distributed(map_partitions, PARTITIONER):
  # Создает список из reducers пустых словарей.
  global reducers
  partitions = [dict() for _ in range(reducers)]
  # определяем к какой партиции (reduce'ру) должна быть отправлена эта пара
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      # Добавляет значение v2 к списку значений, связанных с ключом k2 в соответствующей партиции p
      p[k2] = p.get(k2, []) + [v2]
      # Преобразует список партиций в список кортежей (partition_id, sorted_partition).
      # Сортирует элементы (пары ключ-значение) внутри каждой партиции по ключу. 
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]
 
def PARTITIONER(obj):
  # Вычисляет хеш объекта (ключа) и берет остаток от деления на reducers. 
  # Это обеспечивает более или менее равномерное распределение ключей между reduce'рами.
  global reducers
  return hash(obj) % reducers
  
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  # Применяем функцию MAP к каждой паре ключ-значение (k1, v1) в каждом record_reader.
  # После чего применяем весь этот процесс ко всем record_reader'ам, полученным от INPUTFORMAT().
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  #COMBINER выполняет локальную агрегацию значений для одного и того же ключа на той же машине, где работает mapper. 
  if COMBINER != None:
    #Применяет функцию COMBINER к каждой группе (ключ, список значений).  *k2v2 распаковывает пару (ключ, список значений) в аргументы для COMBINER. 
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  # Применяет функцию REDUCE к каждой группе (ключ, список значений) в партиции.  
  # REDUCE должна агрегировать список значений для данного ключа, чтобы получить окончательный результат для этого ключа.
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)
  
  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*
 
flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount 

In [21]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  # Объявляет, что функция использует глобальную переменную maps.
  global maps
  
  def RECORDREADER(split):
    # Возвращает кортеж, содержащий идентификатор документа и строки (docid:lineid) и саму строку.
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)
  #Вычисляет размер каждого сплита, чтобы равномерно разделить документы между map-задачами. Используется округление вверх.
  split_size =  int(np.ceil(len(documents)/maps))
  # Перебирает документы с шагом split_size.
  for i in range(0, len(documents), split_size):
    # возвращает RECORDREADER, который преобразует их в поток (stream) пар (docid:lineid, line).
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  # Возвращает кортеж, содержащий слово и счетчик 1 (для каждой встреченной версии слова).
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  # Перебирает счетчики для данного слова, суммируя их. Возвращает слово и количество его повторений
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)
  
# try to set COMBINER=REDUCER and look at the number of values sent over the network 
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)# Запускает процесс MapReduce с заданными функциями и параметрами. 
# COMBINER=None означает, что combiner не используется.

partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output] # Преобразует итератор partition в список, чтобы напечатать его.
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('it', 18), ('what', 10)]),
 (1, [('a', 2), ('banana', 2), ('is', 18)])]

## TeraSort

In [22]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps #Объявляет, что функция использует глобальную переменную maps
  
  def RECORDREADER(split):
    # Перебирает значения в текущем сплите. Возвращает кортеж, содержащий значение и None (ключ-значение пара)
    for value in split:
        yield (value, None)

   # Вычисляет размер каждого сплита, чтобы равномерно разделить входные значения между map-задачами.   
  split_size =  int(np.ceil(len(input_values)/maps))
  # Перебирает входные значения с шагом split_size. Bозвращает RECORDREADER, который преобразует их в поток пар (value, None).
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])
    
def MAP(value:int, _):
  yield (value, None)
  
def PARTITIONER(key):
  #использует глобальные переменные reducers, min_value и max_value
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers #Вычисляет размер каждого бакета (диапазона значений) на основе количества reduce-задач.
  bucket_id = 0

  # Находит правильный бакет для данного ключа.
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    # Увеличивает идентификатор бакета, пока ключ не попадет в текущий бакет. Возвращает идентификатор бакета, к которому принадлежит ключ.
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)
  
# Запускает процесс MapReduce с заданными функциями и параметрами. PARTITIONER используется для определения того, 
# какая reduce-задача получит какое значение. После чего преобразует результаты в список списков для более удобной визуализации.
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.025792979286890394),
   (None, 0.043264007000797244),
   (None, 0.06675116507924173),
   (None, 0.11959444027051502),
   (None, 0.14728257743083295),
   (None, 0.2492219170110631),
   (None, 0.2719425965428681),
   (None, 0.34111615221191993),
   (None, 0.45071783158942613),
   (None, 0.4663268371659578),
   (None, 0.472587130910855)]),
 (1,
  [(None, 0.524486300667639),
   (None, 0.538394058736404),
   (None, 0.5387298967478329),
   (None, 0.5553404162444036),
   (None, 0.6213712516741151),
   (None, 0.6495375182387296),
   (None, 0.6728397324452573),
   (None, 0.674127953538846),
   (None, 0.6969053371184614),
   (None, 0.7136781529932946),
   (None, 0.7661598530670479),
   (None, 0.8065343746302002),
   (None, 0.8408441030302031),
   (None, 0.8481031386059075),
   (None, 0.8551853486838957),
   (None, 0.8823155336572647),
   (None, 0.9043361775802701),
   (None, 0.9398413568658897),
   (None, 0.9517183232237912)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [23]:
import random
from collections import defaultdict

def INPUT_FORMAT(data, num_splits):
    # Разбивает входной список на num_splits частей.
    split_size = len(data) // num_splits
    remainder = len(data) % num_splits
    start = 0
    for i in range(num_splits):
        end = start + split_size + (1 if i < remainder else 0)
        yield data[start:end]
        start = end

def MAP(split):
    # Находит максимальное число в данном сплите.
    if not split:
        return []
    max_item = max(split, key=lambda item: item[1])
    yield ("max", max_item)

def REDUCE(key, values):
    # Находит максимальное число среди всех максимумов от mapper'ов."""
    max_item = max(values, key=lambda item: item[1])
    yield (key, max_item)

def MapReduce(data, num_maps, num_reducers):
    # MapReduce для поиска максимального значения.
    map_results = [list(MAP(split)) for split in INPUT_FORMAT(data, num_maps)]

    combined_results = defaultdict(list)
    for result in map_results:
        for key, value in result:
            combined_results[key].append(value)

    reduce_results = [list(REDUCE(key, values)) for key, values in combined_results.items()]

    # Возвращение результата
    if reduce_results:
        return reduce_results[0][0][1]
    else:
        return None


def RECORDREADER(num_records):
    # Возвращает список пар (идентификатор, случайное целое число).
    return [(i, random.randint(1, 100)) for i in range(num_records)]


# Пример использования
num_records = 10
data = RECORDREADER(num_records)
print("Сгенерированный список данных:", data)

num_maps = 5
num_reducers = 1
max_id, max_value = MapReduce(data, num_maps, num_reducers)

print(f"Максимальное значение: {max_value}, ID: {max_id}")



Сгенерированный список данных: [(0, 2), (1, 2), (2, 43), (3, 33), (4, 33), (5, 22), (6, 58), (7, 69), (8, 47), (9, 5)]
Максимальное значение: 69, ID: 7


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [24]:
import random
from collections import defaultdict

def INPUT_FORMAT(data, num_splits):
    #Разбивает входной список на num_splits частей.
    split_size = len(data) // num_splits
    remainder = len(data) % num_splits
    start = 0
    for i in range(num_splits):
        end = start + split_size + (1 if i < remainder else 0)
        yield data[start:end]
        start = end

def MAP(split):
    # Вычисляет сумму и количество элементов в данном сплите.
    s = sum(item[1] for item in split)  # Суммируем только значения
    count = len(split)
    yield ("average", (s, count))

def REDUCE(key, values):
    # Вычисление общего среднего арифметического.
    total_sum = 0
    total_count = 0
    for s, count in values:
        total_sum += s
        total_count += count

    if total_count == 0:
        return 0
    else:
        return total_sum / total_count

def MapReduce(data, num_maps, num_reducers):
    # MapReduce для вычисления среднего арифметического.

    map_results = []
    for split in INPUT_FORMAT(data, num_maps):
        for key, value in MAP(split):
            map_results.append((key, value))

    combined_results = defaultdict(list)
    for key, value in map_results:
        combined_results[key].append(value)

    if "average" in combined_results:
        return REDUCE("average", combined_results["average"])
    else:
        return 0

# Пример использования
num_records = 10
data = [(i, random.randint(1, 10)) for i in range(num_records)]  # Список (идентификатор, значение)

print("Сгенерированный список данных:", data)

num_maps = 4
num_reducers = 1
average = MapReduce(data, num_maps, num_reducers)

print(f"Среднее арифметическое: {average}")


Сгенерированный список данных: [(0, 9), (1, 5), (2, 9), (3, 3), (4, 7), (5, 4), (6, 3), (7, 1), (8, 3), (9, 7)]
Среднее арифметическое: 5.1


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [25]:
def group_by_key_sorted(data):
    # Выполняет GroupByKey для списка пар (ключ, значение), предполагая, что данные отсортированы по 
    #ключу.

    grouped_data = {}
    current_key = None
    current_values = []

    for key, value in data:
        if key != current_key:
            # Новый ключ
            if current_key is not None:
                grouped_data[current_key] = current_values
            current_key = key
            current_values = [value]
        else:
            # Тот же ключ
            current_values.append(value)

    # Добавляем последнюю группу (если есть)
    if current_key is not None:
        grouped_data[current_key] = current_values

    return grouped_data


# Пример 1 
sorted_data1 = [("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)]
grouped_data1 = group_by_key_sorted(sorted_data1)
print("Пример 1:")
print("Входные данные:", sorted_data1)
print("Результат:", grouped_data1)

# Пример 2 (ключи не одного типа (число и буква))
sorted_data6 = [(1, "one"), ("a", "letter"), (1, "uno"), ("a", "another")]
grouped_data6 = group_by_key_sorted(sorted_data6)
print("\nПример 2:")
print("Входные данные:", sorted_data6)
print("Результат:", grouped_data6)


Пример 1:
Входные данные: [('a', 1), ('a', 2), ('b', 3), ('b', 4), ('c', 5)]
Результат: {'a': [1, 2], 'b': [3, 4], 'c': [5]}

Пример 2:
Входные данные: [(1, 'one'), ('a', 'letter'), (1, 'uno'), ('a', 'another')]
Результат: {1: ['uno'], 'a': ['another']}


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [26]:
import hashlib
import random
from collections import defaultdict

def INPUT_FORMAT(data, num_splits):
    # Разбивает входной список на num_splits частей.
    split_size = len(data) // num_splits
    remainder = len(data) % num_splits
    start = 0
    for i in range(num_splits):
        end = start + split_size + (1 if i < remainder else 0)
        yield data[start:end]
        start = end

def MAP(split):
    # Вычисляет хэш каждой записи и выдает пару (хэш, запись).
    for record in split:
        hash_value = hashlib.md5(str(record).encode('utf-8')).hexdigest() # Хэшируем запись
        yield (hash_value, record)

def PARTITION(key, num_reducers):
    # Определяет, какому reducer'у отправить запись на основе хэша.
    return int(key, 16) % num_reducers #  Распределение на основе хэша

def REDUCE(key, values):
    # Выдает только первую запись для данного хэша (устраняет дубликаты).
    yield values[0] # Берем первую запись, остальные игнорируем

def MapReduce(data, num_maps, num_reducers):

    map_results = []
    for split in INPUT_FORMAT(data, num_maps):
        map_results.extend(MAP(split)) # Добавляем записи

    partitioned_data = [[] for _ in range(num_reducers)]
    for key, value in map_results:
        reducer_id = PARTITION(key, num_reducers)
        partitioned_data[reducer_id].append((key, value))

    reduce_results = []
    for reducer_id in range(num_reducers):
        reducer_input = partitioned_data[reducer_id]
        # Группируем по хэшу (ключу)
        grouped_data = defaultdict(list)
        for key, value in reducer_input:
            grouped_data[key].append(value)

        # Применяем REDUCE к каждой группе
        for key, values in grouped_data.items():
            reduce_results.extend(REDUCE(key, values)) # Добавляем уникальные записи

    return reduce_results # Возвращаем список уникальных записей

# Пример использования
num_records = 20
data = [random.randint(1, 10) for _ in range(num_records)]  # Список случайных чисел с дубликатами

# Добавим несколько дубликатов
data.extend([1, 2, 3, 1, 2, 3, 4, 4])
random.shuffle(data) # Перемешаем, чтобы дубликаты не шли подряд

print("Исходный список данных:", data)

num_maps = 7
num_reducers = 5 

unique_data = MapReduce(data, num_maps, num_reducers)

print("Список данных без дубликатов:", unique_data)


Исходный список данных: [9, 3, 3, 8, 3, 3, 2, 3, 3, 9, 1, 8, 4, 7, 5, 6, 1, 4, 1, 5, 2, 3, 5, 2, 2, 1, 1, 4]
Список данных без дубликатов: [4, 7, 8, 1, 2, 6, 3, 5, 9]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.


In [27]:
def MAP_SELECTION(relation, predicate):
    
    for t in relation:
        if predicate(t):
            yield (t, t)  # Ключ и значение одинаковы и равны кортежу t


def REDUCE_SELECTION(key, values):

    # Выполняет Reduce-функцию (идентичность) для операции Selection.
    yield key # Возвращаем ключ (который равен кортежу t)


def Selection(relation, predicate, num_maps=1, num_reducers=1):
    # Выполняет операцию Selection с использованием MapReduce.

    map_results = []
    for t in relation:  # Нет необходимости разбивать данные, если num_maps мало
        for key, value in MAP_SELECTION([t], predicate): # Оборачиваем t в список
             map_results.append((key, value))

    selected_tuples = []
    for key, value in map_results:
       for t in REDUCE_SELECTION(key,[value]):
           selected_tuples.append(t)

    return selected_tuples # Возвращает список выбранных кортежей



# Пример использования:
# Отношение (список кортежей, где каждый кортеж - это словарь)
relation = [
    {"id": 1, "name": "Alice", "age": 30, "city": "New York"},
    {"id": 2, "name": "Bob", "age": 25, "city": "Los Angeles"},
    {"id": 3, "name": "Charlie", "age": 35, "city": "Chicago"},
    {"id": 4, "name": "David", "age": 28, "city": "New York"},
]


# Предикат: возраст > 28
def age_greater_than_28(tuple):
    return tuple["age"] > 28


# Выполняем Selection
selected_relation = Selection(relation, age_greater_than_28)

# Выводим результат
print("Отношение:")
for t in relation:
    print(t)

print("\nВыбранные кортежи (age > 28):")
for t in selected_relation:
    print(t)


Отношение:
{'id': 1, 'name': 'Alice', 'age': 30, 'city': 'New York'}
{'id': 2, 'name': 'Bob', 'age': 25, 'city': 'Los Angeles'}
{'id': 3, 'name': 'Charlie', 'age': 35, 'city': 'Chicago'}
{'id': 4, 'name': 'David', 'age': 28, 'city': 'New York'}

Выбранные кортежи (age > 28):
{'id': 1, 'name': 'Alice', 'age': 30, 'city': 'New York'}
{'id': 3, 'name': 'Charlie', 'age': 35, 'city': 'Chicago'}


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [28]:
from typing import List, Tuple, Set

def projection_map(record: dict, attributes: Set[str]) -> Tuple[dict, dict]:
    # Для каждого кортежа создает кортеж с только указанными атрибутами.
   
    projected_record = {k: v for k, v in record.items() if k in attributes}
    return projected_record, projected_record


def projection_reduce(key: dict, values: List[dict]) -> Tuple[dict, dict]:
    # Удаляет дубликаты спроектированных кортежей. 
    return key, key


def shuffle_phase(mapped_data: List[Tuple[dict, dict]]) -> dict:
    # Группирует данные по ключам.
 
    grouped_data = {}
    for key, value in mapped_data:
        key_tuple = tuple(sorted(key.items()))  # Преобразование словаря в кортеж для использования в качестве ключа
        if key_tuple not in grouped_data:
            grouped_data[key_tuple] = []
        grouped_data[key_tuple].append(value)
    return grouped_data


def reduce_phase(grouped_data: dict) -> List[Tuple[dict, dict]]:
    # Выполняет фазу reduce, применяя reduce функцию к каждой группе данных.

    reduced_data = []
    for key_tuple, values in grouped_data.items():
        key = dict(key_tuple) # Преобразование ключа-кортежа обратно в словарь
        reduced_data.append(projection_reduce(key, values))
    return reduced_data


if __name__ == '__main__':
    # Пример данных:
    relation = [
        {'name': 'Alice', 'age': 30, 'city': 'New York'},
        {'name': 'Bob', 'age': 25, 'city': 'London'},
        {'name': 'Alice', 'age': 30, 'city': 'New York'},
        {'name': 'Alice', 'age': 30, 'city': 'Paris'}, # Duplicate for testing reduce
    ]

    # Атрибуты, которые нужно сохранить:
    attributes_to_keep = {'name', 'city'}

    mapped_data = []
    for record in relation:
        mapped_data.append(projection_map(record, attributes_to_keep))

    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_phase(grouped_data)
    projected_relation = [record for key, record in reduced_data]

    # Вывод результатов:
    print("Исходные данные:", relation)
    print("Проекция по атрибутам 'name' и 'city':", projected_relation)



Исходные данные: [{'name': 'Alice', 'age': 30, 'city': 'New York'}, {'name': 'Bob', 'age': 25, 'city': 'London'}, {'name': 'Alice', 'age': 30, 'city': 'New York'}, {'name': 'Alice', 'age': 30, 'city': 'Paris'}]
Проекция по атрибутам 'name' и 'city': [{'city': 'New York', 'name': 'Alice'}, {'city': 'London', 'name': 'Bob'}, {'city': 'Paris', 'name': 'Alice'}]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [29]:
from typing import List, Tuple, Dict, Set

def union_map(record: dict) -> Tuple[dict, dict]:
   # Преобразует каждый входной кортеж в пару ключ-значение (record, record). 
    return record, record


def union_reduce(key: dict, values: List[dict]) -> Tuple[dict, dict]:
    # Для каждого ключа (record) возвращает пару (key, key), удаляя дубликаты. 
    return key, key

def shuffle_phase(mapped_data: List[Tuple[dict, dict]]) -> Dict[tuple, List[dict]]:
    # Группирует данные по ключам.
    grouped_data = {}
    for key, value in mapped_data:
        key_tuple = tuple(sorted(key.items()))
        if key_tuple not in grouped_data:
            grouped_data[key_tuple] = []
        grouped_data[key_tuple].append(value)
    return grouped_data


def reduce_phase(grouped_data: Dict[tuple, List[dict]]) -> List[Tuple[dict, dict]]:
    # Применяет reduce функцию к каждой группе данных.

    reduced_data = []
    for key_tuple, values in grouped_data.items():
        key = dict(key_tuple)
        reduced_data.append(union_reduce(key, values))
    return reduced_data


if __name__ == '__main__':
    # Пример данных:
    relation1 = [
        {'name': 'Alice', 'age': 30},
        {'name': 'Bob', 'age': 25}
    ]
    relation2 = [
        {'name': 'Bob', 'age': 25},
        {'name': 'Charlie', 'age': 35}
    ]

    # Объединяем два отношения:
    all_records = relation1 + relation2

    mapped_data = []
    for record in all_records:
        mapped_data.append(union_map(record))

    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_phase(grouped_data)
    union_relation = [record for key, record in reduced_data]

    # Вывод результатов:
    print("Relation 1:", relation1)
    print("Relation 2:", relation2)
    print("Union:", union_relation)


Relation 1: [{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}]
Relation 2: [{'name': 'Bob', 'age': 25}, {'name': 'Charlie', 'age': 35}]
Union: [{'age': 30, 'name': 'Alice'}, {'age': 25, 'name': 'Bob'}, {'age': 35, 'name': 'Charlie'}]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [31]:
from typing import List, Tuple, Dict, Iterator, NamedTuple
from collections import defaultdict


def MAP_INTERSECTION(_, row: NamedTuple) -> Tuple[int, NamedTuple]:
    # Возвращает (row.id, row). Игнорирует ключ _.
    return (row.id, row)


def REDUCE_INTERSECTION(row_id: int, rows: Iterator[NamedTuple]) -> Iterator[NamedTuple]:
    # Возвращает итератор с пользователями только если он присутствует в обоих выборках 
    rows_list = list(rows)  
    if len(rows_list) == 2:
        yield from rows_list #
    else:
        pass


def shuffle_phase(mapped_data: List[Tuple[int, NamedTuple]]) -> Dict[int, List[NamedTuple]]:
    # Группирует данные по row.id.
    grouped_data = defaultdict(list)
    for row_id, row in mapped_data:
        grouped_data[row_id].append(row)
    return grouped_data

def reduce_phase(grouped_data: Dict[int, List[NamedTuple]]) -> List[List[NamedTuple]]:
    """ Возвращает список списков.  Каждый список содержит найденные пересечения. """
    reduced_data: List[NamedTuple] = []

    for row_id, rows in grouped_data.items():
        result_iterator = REDUCE_INTERSECTION(row_id, iter(rows))
        if result_iterator:
            reduced_data.append(result_iterator)

    return reduced_data

def RECORDREADER(input_collection_a: List[NamedTuple], input_collection_b: List[NamedTuple]) -> List[Tuple[int, NamedTuple]]:
#  Возвращает список (id пользователя, пользователь) для объединения входных коллекций.
    records = [(user.id, user) for user in input_collection_a + input_collection_b]
    return records

def MapReduce(recordreader, map_func, reduce_func):
    records = recordreader()

    mapped_data = []
    for _, record in enumerate(records):
        mapped_data.append(map_func(_, record[1]))

    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_phase(grouped_data)
    flat_reduced_data = [item for sublist in reduced_data for item in sublist]

    return flat_reduced_data

if __name__ == '__main__':
    # Пример данных:
    from collections import namedtuple
    User = namedtuple('User', ['id', 'name', 'age'])

    input_collection_a = [
        User(id=1, name='Alice', age=30),
        User(id=2, name='Bob', age=25),
        User(id=3, name='Charlie', age=35)
    ]
    input_collection_b = [
        User(id=2, name='Bob', age=25),
        User(id=4, name='David', age=40),
        User(id=1, name='Alice', age=30)
    ]

    output = MapReduce(lambda: RECORDREADER(input_collection_a, input_collection_b), MAP_INTERSECTION, REDUCE_INTERSECTION) 
    output = list(output)
    print("Пересечение:", output)


Пересечение: [User(id=1, name='Alice', age=30), User(id=1, name='Alice', age=30), User(id=2, name='Bob', age=25), User(id=2, name='Bob', age=25)]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [32]:
from typing import List, Tuple, Dict, Iterator, NamedTuple
from collections import defaultdict

def difference_map(relation_name: str, record: NamedTuple) -> Tuple[NamedTuple, str]:
    # Возвращает (record, relation_name), где relation_name - 'R' или 'S'
    return record, relation_name

def difference_reduce(key: NamedTuple, values: List[str]) -> Tuple[NamedTuple, NamedTuple] or None:
    # Возвращает (key, key), если key встречается только в отношении 'R'
    if len(values) == 1 and values[0] == 'R':
        return key, key
    else:
        return None 


def shuffle_phase(mapped_data: List[Tuple[NamedTuple, str]]) -> Dict[NamedTuple, List[str]]:
   # Группирует данные по NamedTuple (record)
    grouped_data: Dict[NamedTuple, List[str]] = defaultdict(list)
    for record, relation_name in mapped_data:
        grouped_data[record].append(relation_name)
    return grouped_data


def reduce_phase(grouped_data: Dict[NamedTuple, List[str]]) -> List[NamedTuple]:
    # Возвращает список NamedTuple, которые есть только в R
    reduced_data: List[NamedTuple] = []
    for record, relation_names in grouped_data.items():
        result = difference_reduce(record, relation_names)
        if result:
            reduced_data.append(result[0])
    return reduced_data

def RECORDREADER(input_collection_r: List[NamedTuple], input_collection_s: List[NamedTuple]) -> List[Tuple[str, NamedTuple]]:
    # Добавляет имя отношения к каждой записи
    records_r = [('R', record) for record in input_collection_r]
    records_s = [('S', record) for record in input_collection_s]
    return records_r + records_s

def MapReduce(recordreader, map_func, reduce_func):
   # Эмулирует MapReduce pipeline.

    records = recordreader()
    mapped_data = []
    for relation_name, record in records:
        mapped_data.append(map_func(relation_name, record))
    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_func(grouped_data)

    return reduced_data


if __name__ == '__main__':
    # Пример данных:
    from collections import namedtuple
    User = namedtuple('User', ['id', 'name', 'age'])

    input_collection_r = [
        User(id=1, name='Alice', age=30),
        User(id=2, name='Bob', age=25),
        User(id=3, name='Charlie', age=35)
    ]
    input_collection_s = [
        User(id=2, name='Bob', age=25),
        User(id=4, name='David', age=40),
        User(id=1, name='Alice', age=30)
    ]

    output = MapReduce(lambda: RECORDREADER(input_collection_r, input_collection_s), difference_map, reduce_phase)
    print("Difference (R - S):", output)


Difference (R - S): [User(id=3, name='Charlie', age=35)]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [34]:
from typing import List, Tuple, Dict, Iterator, NamedTuple, Any
from collections import defaultdict

class User(NamedTuple):
    id: int
    age: str
    name: str
    city_id: int


users_collection = [
    User(id=1, age=54, name="Charlie", city_id=1),
    User(id=2, age=25, name="Alice", city_id=2),
    User(id=3, age=27, name="Mark", city_id=2),
    User(id=4, age=16, name="David", city_id=3),
    User(id=5, age=35, name="Bob", city_id=4),
]


class City(NamedTuple):
    id: int
    name: str


cities_collection = [
    City(id=1, name="Samara"),
    City(id=2, name="Moscow"),
    City(id=3, name="London"),
    City(id=4, name="New Yourk"), 
    City(id=5, name="Paris"),
]

def natural_join_map_user(record: User) -> Tuple[int, Tuple[str, User]]:
   # Возвращает (record.city_id, ('User', record)).
 
    return record.city_id, ('User', record)


def natural_join_map_city(record: City) -> Tuple[int, Tuple[str, City]]:
    # Возвращает (record.id, ('City', record)).

    return record.id, ('City', record)


def natural_join_reduce(city_id: int, values: List[Tuple[str, NamedTuple]]) -> List[Tuple[User, int, City]]:
    # Объединяет Users и Cities на основе общего city_id.

    users: List[User] = []
    city: City | None = None 

    for relation_name, record in values:
        if relation_name == 'User':
            users.append(record)
        elif relation_name == 'City':
            city = record

    joined_records: List[Tuple[User, int, City]] = []
    if city:
        for user in users:
            joined_records.append((user, city_id, city))

    return joined_records


def shuffle_phase(mapped_data: List[Tuple[int, Tuple[str, NamedTuple]]]) -> Dict[int, List[Tuple[str, NamedTuple]]]:
   # Группирует данные по city_id.

    grouped_data: Dict[int, List[Tuple[str, NamedTuple]]] = defaultdict(list)
    for city_id, (relation_name, record) in mapped_data:
        grouped_data[city_id].append((relation_name, record))
    return grouped_data


def reduce_phase(grouped_data: Dict[int, List[Tuple[str, NamedTuple]]]) -> List[Tuple[User, int, City]]:
  # Возвращает список троек (user, city_id, city).

    joined_data: List[Tuple[User, int, City]] = []
    for city_id, records in grouped_data.items():
        joined_data.extend(natural_join_reduce(city_id, records))
    return joined_data

def RECORDREADER(users_collection: List[User], cities_collection: List[City]) -> List[Tuple[str, NamedTuple]]:
    # Добавляет имя отношения к каждой записи. 

    records_users = [('User', record) for record in users_collection]
    records_cities = [('City', record) for record in cities_collection]
    return records_users + records_cities


def MapReduce(recordreader, map_func_user, map_func_city, reduce_func):
  
    records = recordreader(users_collection, cities_collection)


    mapped_data = []
    for relation_name, record in records:
        if relation_name == 'User':
            mapped_data.append(map_func_user(record))
        elif relation_name == 'City':
            mapped_data.append(map_func_city(record))

    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_func(grouped_data)

    return reduced_data


if __name__ == '__main__':
    output = MapReduce(RECORDREADER, natural_join_map_user, natural_join_map_city, reduce_phase)
    joined_data = output

    print("Joined Data:", joined_data)


Joined Data: [(User(id=1, age=54, name='Charlie', city_id=1), 1, City(id=1, name='Samara')), (User(id=2, age=25, name='Alice', city_id=2), 2, City(id=2, name='Moscow')), (User(id=3, age=27, name='Mark', city_id=2), 2, City(id=2, name='Moscow')), (User(id=4, age=16, name='David', city_id=3), 3, City(id=3, name='London')), (User(id=5, age=35, name='Bob', city_id=4), 4, City(id=4, name='New Yourk'))]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [35]:
from typing import List, Tuple, Dict, Iterator, NamedTuple, Callable
from collections import defaultdict


def grouping_map(record: NamedTuple) -> Tuple[any, any]:
    # Возвращает (record.a, record.b).
    return record.a, record.b


def grouping_reduce(key: any, values: List[any], aggregation_function: Callable[[List[any]], any]) -> Tuple[any, any]:
# Применяет aggregation_function к списку значений.
    aggregated_value = aggregation_function(values)
    return key, aggregated_value


def shuffle_phase(mapped_data: List[Tuple[any, any]]) -> Dict[any, List[any]]:
    # Группирует данные по ключу 'a' 
    grouped_data: Dict[any, List[any]] = defaultdict(list)
    for a, b in mapped_data:
        grouped_data[a].append(b)
    return grouped_data


def reduce_phase(grouped_data: Dict[any, List[any]], aggregation_function: Callable[[List[any]], any]) -> List[Tuple[any, any]]:
# Возвращает список пар (a, x), где x - результат агрегации.
    aggregated_data: List[Tuple[any, any]] = []
    for a, values in grouped_data.items():
        aggregated_data.append(grouping_reduce(a, values, aggregation_function))
    return aggregated_data

def RECORDREADER(input_collection: List[NamedTuple]) -> List[NamedTuple]:
    return input_collection


def MapReduce(recordreader, map_func, reduce_func, aggregation_function):
    records = recordreader()

    mapped_data = []
    for record in records:
        mapped_data.append(map_func(record))

    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_func(grouped_data, aggregation_function)

    return reduced_data

if __name__ == '__main__':
    # Пример данных:
    from collections import namedtuple
    DataRecord = namedtuple('DataRecord', ['a', 'b', 'c'])

    input_collection = [
        DataRecord(a='group1', b=10, c='data1'),
        DataRecord(a='group1', b=20, c='data2'),
        DataRecord(a='group2', b=30, c='data3'),
        DataRecord(a='group2', b=40, c='data4'),
        DataRecord(a='group3', b=15, c='data5')
    ]

    # Пример агрегирующей функции (поиск суммы элементов)
    def sum_aggregation(values: List[int]) -> int:
        return sum(values)

    output_sum = MapReduce(lambda: RECORDREADER(input_collection), grouping_map, reduce_phase, sum_aggregation)
    print("Grouping and Aggregation (SUM):", output_sum)



Grouping and Aggregation (SUM): [('group1', 30), ('group2', 70), ('group3', 15)]


# 

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [36]:
# код работает с вектором, который не помещается в памяти одной Map-задачи, разбивая его на чанки и  
# распределяя их по разным редукторам. каждый редуктор получает часть матрицы и соответствующие части 
# вектора, выполняет частичное умножение и выдает частичные результаты. 
# Затем эти результаты агрегируются для получения окончательного результата.

from typing import List, Tuple, Dict, Iterator, NamedTuple, Callable
from collections import defaultdict

NUM_REDUCERS = 2
VECTOR_CHUNK_SIZE = 2 

def matrix_vector_map(matrix_row: NamedTuple) -> Tuple[int, Tuple[str, NamedTuple]]:
    return matrix_row.col_index % NUM_REDUCERS, ('M', matrix_row)


def vector_map(vector_chunk: List[NamedTuple], chunk_id: int) -> List[Tuple[int, Tuple[str, List[NamedTuple]]]]:
    mapped_data = []
    for element in vector_chunk:
        reducer_id = element.index % NUM_REDUCERS
        mapped_data.append((reducer_id, ('V', element)))
    return mapped_data


def matrix_vector_reduce(reducer_id: int, values: List[Tuple[str, NamedTuple]]) -> List[Tuple[int, float]]:
    # Выполняет частичное умножение для данного reducer_id.
    matrix_rows: List[NamedTuple] = []
    vector_elements: List[NamedTuple] = []

    for relation_name, record in values:
        if relation_name == 'M':
            matrix_rows.append(record)
        elif relation_name == 'V':
            vector_elements.append(record)

    partial_products: List[Tuple[int, float]] = []
    for row in matrix_rows:
        for element in vector_elements:
            if row.col_index == element.index:
                partial_products.append((row.row_index, row.value * element.value))

    return partial_products

def shuffle_phase(mapped_data: List[Tuple[int, Tuple[str, NamedTuple]]]) -> Dict[int, List[Tuple[str, NamedTuple]]]:
    grouped_data: Dict[int, List[Tuple[str, NamedTuple]]] = defaultdict(list)
    for reducer_id, (relation_name, record) in mapped_data:
        grouped_data[reducer_id].append((relation_name, record))
    return grouped_data


def reduce_phase(grouped_data: Dict[int, List[Tuple[str, NamedTuple]]]) -> List[Tuple[int, float]]:
    partial_results: List[Tuple[int, float]] = []
    for reducer_id, records in grouped_data.items():
        partial_results.extend(matrix_vector_reduce(reducer_id, records))
    return partial_results


def RECORDREADER(matrix: List[NamedTuple], vector: List[NamedTuple]) -> List[Tuple[str, NamedTuple]]:
    return [('M', row) for row in matrix]

def VECTORREADER(vector: List[NamedTuple], chunk_size: int) -> List[List[NamedTuple]]:
    vector_chunks = [vector[i:i + chunk_size] for i in range(0, len(vector), chunk_size)]
    return vector_chunks


def MapReduceMatrixVector(recordreader, vectorreader, map_func_matrix, vector_map, reduce_func, num_reducers):
    matrix_records = recordreader()
    vector_chunks = vectorreader()

    mapped_matrix_data: List[Tuple[int, Tuple[str, NamedTuple]]] = []
    for relation_name, record in matrix_records:
        mapped_matrix_data.append(map_func_matrix(record))

    mapped_vector_data: List[Tuple[int, Tuple[str, NamedTuple]]] = []
    for chunk_id, vector_chunk in enumerate(vector_chunks):
        chunk_mapped_data = vector_map(vector_chunk, chunk_id)
        mapped_vector_data.extend(chunk_mapped_data)

    mapped_data = mapped_matrix_data + mapped_vector_data
    grouped_data = shuffle_phase(mapped_data)
    reduced_data = reduce_phase(grouped_data)

    final_result: Dict[int, float] = defaultdict(float)
    for row_index, partial_product in reduced_data:
        final_result[row_index] += partial_product

    return dict(final_result)


if __name__ == '__main__':
    # Пример данных:
    from collections import namedtuple
    MatrixRow = namedtuple('MatrixRow', ['row_index', 'col_index', 'value'])
    VectorElement = namedtuple('VectorElement', ['index', 'value'])

    matrix = [
        MatrixRow(row_index=0, col_index=0, value=1.0),
        MatrixRow(row_index=0, col_index=1, value=2.0),
        MatrixRow(row_index=1, col_index=0, value=3.0),
        MatrixRow(row_index=1, col_index=1, value=4.0)
    ]

    vector = [
        VectorElement(index=0, value=0.5),
        VectorElement(index=1, value=0.7)
    ]

    result = MapReduceMatrixVector(
        lambda: RECORDREADER(matrix, vector),
        lambda: VECTORREADER(vector, VECTOR_CHUNK_SIZE),
        matrix_vector_map,
        vector_map,
        reduce_phase,
        NUM_REDUCERS
    )

    print("Matrix-Vector Multiplication Result:", result)

Matrix-Vector Multiplication Result: {0: 1.9, 1: 4.3}


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$. 





In [37]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [38]:
import numpy as np

I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

def RECORDREADER():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])

def MAP(k1, v1):
    (j, k) = k1
    w = v1
    for i in range(I):
        yield ((i, k), small_mat[i, j] * w)

def REDUCE(key, values):
    (i, k) = key
    yield ((i, k), sum(values))


# Вызов функции MapReduce
result = MapReduce(RECORDREADER, MAP, REDUCE)
# Преобразование результата в массив NumPy для упрощения проверки
output_mat = np.zeros((I, K))
for (i, k), value in result:
    output_mat[i, k] = value


print("Результат (MapReduce):")
print(output_mat)


Результат (MapReduce):
[[1.06127062 0.42641858 0.71073421 0.87011492 0.84088963 0.55693987
  0.75090908 0.47824752 0.77414151 0.97118095 1.08530508 1.13482988
  1.09652394 0.6351541  0.60043162 0.27671249 1.27076398 0.5544196
  0.39607333 0.68241971 0.68702705 0.71375306 1.29015371 0.98325024
  0.763647   0.74349762 0.97314673 1.15165735 0.93828544 0.50095989
  0.89803489 0.25120314 1.18145585 0.5263002  0.99282181 0.7452801
  1.43195699 0.94672849 1.21775101 0.20733469]
 [1.01650868 0.28363733 0.98064373 1.17793803 0.51664101 0.75841389
  0.57052586 0.45356341 0.80639585 0.89683788 0.53060164 0.50647092
  0.94384142 0.71851593 0.62108481 0.55370551 1.27717183 0.60177518
  0.72358405 0.47831795 0.60978919 0.2682104  1.07871876 0.97226589
  0.68289339 0.63528846 0.62946369 0.83408056 0.84660571 0.38115743
  0.59999573 0.74402473 0.89950082 0.19476933 1.21356648 1.06391726
  0.63893097 0.45863286 0.60210542 0.36215977]]


Проверьте своё решение

In [39]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat) 
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [40]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [41]:
import numpy as np

rows = 2
cols = 3
depth = 4 * 10
matrix_a = np.random.rand(rows, cols)
matrix_b = np.random.rand(cols, depth)

# Генерирует пары элементов из обеих матриц вместе с их индексами
def generate_pairs():
    for row in range(rows):
        for col in range(cols):
            for dep in range(depth):
                yield (((row, col), matrix_a[row, col]), ((col, dep), matrix_b[col, dep]))

# Перемножает элементы двух матриц
# Группирует произведения по индексу, соответствующему клетке в результирующей матрице,
# для каждого ключа (row, dep) генерируется cols произведений
def multiply_elements(pair1, pair2):
    (row, col), value_a = pair1
    (col, dep), value_b = pair2

    yield ((row, dep), value_a * value_b)

# Суммирует произведения для вычисления элементов результирующей матрицы
def aggregate_results(key, values):
    (row, dep) = key
    dep_result = (row, dep)

    total_value = 0
    for _ in range(cols):
        total_value += values[_]

    yield (dep_result, total_value)

# Проверка решения
expected_result = np.matmul(matrix_a, matrix_b)
final_result = MapReduce(generate_pairs, multiply_elements, aggregate_results)

def reconstruct_matrix(reduce_output):
    reduce_output = list(reduce_output)
    max_rows = max(i for ((i, k), vw) in reduce_output) + 1
    max_depth = max(k for ((i, k), vw) in reduce_output) + 1
    result_matrix = np.empty(shape=(max_rows, max_depth))
    for (i, k), vw in reduce_output:
        result_matrix[i, k] = vw
    return result_matrix

np.allclose(expected_result, reconstruct_matrix(final_result))  # должно вернуть true


True

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER. 

In [42]:
maps = 2
reducers = 2
I = 2
J = 3
K = 4 * 10


def INPUTFORMAT():
    global maps
    # генерируются пары элементов из обоих матриц
    def RECORDREADER(i_range):
        for i in i_range:
            for j in range(J):
                for k in range(K):
                    yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

    # Разделение происходит по строкам первой матрицы
    split_size = int(np.ceil(I / maps))
    for i in range(0, I, split_size):
        yield RECORDREADER(range(i, i + split_size))


# Возвращает произведения соответствующих элементов из обоих матриц, группируя их по индексу в результирующей матрице
def MAP(element1, element2):
    (i, j), v1 = element1
    (j, k), v2 = element2

    yield ((i, k), v1 * v2)


# Суммирует полученные произведения
def REDUCE(key, values):
    (i, k) = key
    k3 = (i, k)
    v3 = 0
    for j in range(J):
        v3 += values[j]

    yield (k3, v3)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [
    (partition_id, list(partition)) for (partition_id, partition) in partitioned_output
]

# Соединение полученных произведений с разных вычислительных узлов
solution = []
for output_part in partitioned_output:
    for element in output_part[1]:
        solution += [element]

print(asmatrix(solution))
np.allclose(reference_solution, asmatrix(solution)) 


240 key-value pairs were sent over a network.
[[1.06127062 0.42641858 0.71073421 0.87011492 0.84088963 0.55693987
  0.75090908 0.47824752 0.77414151 0.97118095 1.08530508 1.13482988
  1.09652394 0.6351541  0.60043162 0.27671249 1.27076398 0.5544196
  0.39607333 0.68241971 0.68702705 0.71375306 1.29015371 0.98325024
  0.763647   0.74349762 0.97314673 1.15165735 0.93828544 0.50095989
  0.89803489 0.25120314 1.18145585 0.5263002  0.99282181 0.7452801
  1.43195699 0.94672849 1.21775101 0.20733469]
 [1.01650868 0.28363733 0.98064373 1.17793803 0.51664101 0.75841389
  0.57052586 0.45356341 0.80639585 0.89683788 0.53060164 0.50647092
  0.94384142 0.71851593 0.62108481 0.55370551 1.27717183 0.60177518
  0.72358405 0.47831795 0.60978919 0.2682104  1.07871876 0.97226589
  0.68289339 0.63528846 0.62946369 0.83408056 0.84660571 0.38115743
  0.59999573 0.74402473 0.89950082 0.19476933 1.21356648 1.06391726
  0.63893097 0.45863286 0.60210542 0.36215977]]


True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

Да, написанное решение будет работать, так как каждая пара элементов (которые генерируют RECORDREADER-ы), идентифицируются по ее индексам.

In [None]:
I = 2  
J = 3  
K = 4*10  

# Генерация случайных матриц
small_mat = np.random.rand(I, J)  
big_mat = np.random.rand(J, K) 

# Получение эталонного решения через умножение матриц
reference_solution = np.matmul(small_mat, big_mat)

# Функция для "разглаживания" вложенных итерируемых объектов
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

# Функция для группировки элементов по ключу
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

# Функция для распределенной группировки элементов по ключу
def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

# Функция для определения разделителя (по ключу)
def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

# Функция для выполнения MapReduce на распределенных данных
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())

  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)

  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) 

  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

# Функция для преобразования результата REDUCE в матрицу
def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output) + 1
  K = max(k for ((i,k), vw) in reduce_output) + 1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

# Генератор для вводных данных
def INPUTFORMAT():
  first_mat = []

  for i in range(small_mat.shape[0]):
    for j in range(small_mat.shape[1]):
      first_mat.append(((0, i, j), small_mat[i,j]))  # первая матрица

  global maps
  split_size = int(np.ceil(len(first_mat)/maps))

  for i in range(0, len(first_mat), split_size):
    yield first_mat[i:i+split_size]

  second_mat = []

  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      second_mat.append(((1, j, k), big_mat[j,k]))  # вторая матрица

  split_size = int(np.ceil(len(second_mat)/maps))

  for i in range(0, len(second_mat), split_size):
    yield second_mat[i:i+split_size]

# MAP функция для соединения матриц
def MAP_JOIN(k1, v1):
  (mat_num, i, j) = k1
  w = v1

  if mat_num == 0:
    yield (j, (mat_num, i, w))

  else:
    yield (i, (mat_num, j, w))

# REDUCE функция для соединения матриц
def REDUCE_JOIN(key, values):
  from_first_mat = [v for v in values if v[0] == 0]
  from_second_mat = [v for v in values if v[0] == 1]

  for f in from_first_mat:
    for s in from_second_mat:
      yield ((f[1], s[1]), f[2] * s[2])

# Генератор для получения соединенных данных
def GET_JOINED():

  for j in joined:
    print("aa", j)
    yield j[1]

# MAP функция для умножения значений
def MAP_MUL(k1, v1):
  yield (k1, v1)

# REDUCE функция для умножения значений
def REDUCE_MUL(key, values):
  res_val = 0

  for v in values:
    res_val += v
  yield (key, res_val)

maps = 3
reducers = 2

# Выполнение MapReduce для соединения матриц
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP_JOIN, REDUCE_JOIN, COMBINER=None)
joined = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
print(joined)

# Выполнение MapReduce для умножения значений
mul_output = MapReduceDistributed(GET_JOINED, MAP_MUL, REDUCE_MUL, COMBINER=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in mul_output]
print(pre_result)

solution = []

for p in pre_result:

  for v in p[1]:
    solution.append(v)

print(solution)

np.allclose(reference_solution, asmatrix(solution))

126 key-value pairs were sent over a network.
[(0, [((0, 0), 0.3016559915461936), ((0, 1), 0.18694399994286143), ((0, 2), 0.2788654782407294), ((0, 3), 0.27757216096382875), ((0, 4), 0.3441335124358848), ((0, 5), 0.02967617656575322), ((0, 6), 0.21482290499938547), ((0, 7), 0.22601455297226242), ((0, 8), 0.09295291893000537), ((0, 9), 0.12992890858139267), ((0, 10), 0.3510662596884314), ((0, 11), 0.1854584550841563), ((0, 12), 0.11614165973563575), ((0, 13), 0.06508864340356206), ((0, 14), 0.23938246114051323), ((0, 15), 0.36187746490379014), ((0, 16), 0.2542387300134281), ((0, 17), 0.1330368862724118), ((0, 18), 0.12416968886522481), ((0, 19), 0.23502727502770474), ((0, 20), 0.241929635398574), ((0, 21), 0.28719347979375925), ((0, 22), 0.1672248493883784), ((0, 23), 0.34144746804385523), ((0, 24), 0.20006310053653295), ((0, 25), 0.3999331996302832), ((0, 26), 0.025879153945023103), ((0, 27), 0.06104820347141307), ((0, 28), 0.39386463114220943), ((0, 29), 0.33665531799926435), ((0, 30)

True