# Введение в MapReduce модель на Python


In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
# MAP выполняет преобразование и фильтрацию данных, а REDUCE выполняет агрегацию и анализ этих данных.
def MAP(_, row:NamedTuple):
  #для каждого объекта row с женским полом, создается ключ (возраст) и соответствующий ему объект row.
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
#применение функции MAP к каждому элементу результата RECORDREADER() с помощью lambda-функции
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
#группировка элементов по ключу входного итерируемого объекта
def groupbykey(iterable):
  #Для каждого элемента, функция проверяет, существует ли уже ключ k2 в словаре t, используя метод get.
  #Если ключ уже существует, то к соответствующему значению добавляется новое значение v2.
  #Если ключ не существует, то создается новая запись в словаре t с ключом k2 и значением [v2] в виде списка.
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

#группировка элементов по ключу входного итерируемого объекта
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

#(номер строки, значение*вектор с номером столбца)
def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

#Суммирует значения из одной группы(походу так)
def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

#(координаты, значение)
def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.9758209666654833),
 (1, 1.9758209666654833),
 (2, 1.9758209666654833),
 (3, 1.9758209666654833),
 (4, 1.9758209666654833)]

## Inverted index

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

#чтение записей из списка documents и добавления им уникального идентификатора docid с использованием генератора
#(номер, фраза)
def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

#присваеваем номер фразы каждому уникальному слову из фразы
def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)
#сортируем по айдишнику
def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('it', ['0', '1', '2']),
 ('is', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]
print(documents)

# Выводим номер предложения и номер строки (0:1 it is what it is)
def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

# Разделяем по словам и ставим единичку (what 1)
def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

#Считаем количество повторов слов (it 9)
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

['\nit is what it is\nit is what it is\nit is what it is', '\nwhat is it\nwhat is it', '\nit is a banana']


[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [None]:
#возвращает каждый элемент последовательно
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

#группирует значения с одинаковыми ключами в словаре.
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

#Результат: список из пар номер раздела и отсортированные пары ключ-значение
def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers #редьюсера у нас 2 штуки
  #Cписок словарей с количеством элементов, определенным в глобальной переменной reducers.
  #Каждый словарь будет представлять собой раздел, в котором будут храниться отображения ключей на соответствующие значения.
  partitions = [dict() for _ in range(reducers)]
  #Каждая строчку из мap_partitions (слово, 1)
  for map_partition in map_partitions:
    #Распаковывает каждую пару ключ-значение
    for (k2, v2) in map_partition:
      #получение соответствующего словаря из списка 0 или 1
      p = partitions[PARTITIONER(k2)]
      #добавление значения v2 в соответствующую запись словаря
      p[k2] = p.get(k2, []) + [v2]

  # Каждый элемент списка представляет собой пару (partition_id, sorted(partition.items(), key=lambda x: x[0])),
  # где partition_id - это индекс раздела, а sorted(partition.items(), key=lambda x: x[0]) - это отсортированный список пар ключ-значение для данного раздела.
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

#определение, в какой раздел (редьюсер) должен быть отправлен объект obj
def PARTITIONER(obj):
  global reducers #редьюсера у нас 2 штуки
  # hash() возвращает уникальное целое число для каждого объекта
  return hash(obj) % reducers # возвращает остаток от деления


def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  #map(что сделать, с чем сделать) - поэлементное вычисление
  #по идее на выходе имеем все слова с единичкой
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  #Если задана доп функция
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)

  #Результат: список из пар (номер раздела и отсортированные пары ключ-значение)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle

  #содержит кортежи с идентификаторами разделов и списками, состоящих из кортежей посчитанных уникальных слов
  #([(0, [('', 6), ('it', 18), ('what', 10)]), (1, [('a', 2), ('banana', 2), ('is', 18)])])
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  #считаем типо сколько всего слов было
  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

#На выходе номер документа : номер строки, а значение - сама строка
def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    #сначала по каждому документу в разделе
    for (docid, document) in enumerate(split):
      #затем по каждой строке в документе
      for (lineid, line) in enumerate(document.split('\n')):
        #Для каждой строки генерируется пара ключ-значение, где ключом является составное значение
        # формата "docid:lineid", представляющее идентификатор документа и номер строки, а значением - сама строка.
        yield ("{}:{}".format(docid,lineid), line)

  #выполняется разделение всего набора данных на несколько подмножеств размером split_size
  split_size =  int(np.ceil(len(documents)/maps)) #np.ceil()-округление вверх значения аргумента до ближайшего целого числа.
  for i in range(0, len(documents), split_size):
    #передаем в функцию порции документа, в данном случае будет 3 порции: 0-1, 2-3, 4-5
    yield RECORDREADER(documents[i:i+split_size])

#Передаем номер дока и строку, получаем (слово, 1)
def MAP(docId:str, line:str):
  #дл каждого слова в строке
  for word in line.split(" "):
    yield (word, 1)

#Считаем количество слов
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('it', 18)]),
 (1, [('is', 18), ('what', 10)])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  #выполняется разделение всего набора данных на несколько подмножеств размером split_size
  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    #передаем в функцию порции документа, в данном случае будет 3 порции: 0-9 10-19 20-29
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  #value - это значение value, переданное в функцию MAP
  #None - это отсутствие информации связанное с каждым значением.
  yield (value, None)

#Распихиваем по разделам
def PARTITIONER(key):
  global reducers
  global max_value
  global min_value

  #размер каждого раздела (бакета) данных, в данном случае bucket_size = 0.5
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  #если key больше, чем верхняя граница текущего раздела данных и
  # верхняя граница следующего раздела данных меньше, чем max_value
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  #возвращает идентификатор раздела (бакета) данных, на который будет отображен ключ.
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.01488878828168283),
   (None, 0.03163748541784295),
   (None, 0.06852995380200355),
   (None, 0.09707150981956247),
   (None, 0.22408903637575583),
   (None, 0.2331183913982896),
   (None, 0.2705509431991123),
   (None, 0.38347241651585096),
   (None, 0.41863606261442565),
   (None, 0.4433491591317591),
   (None, 0.443477195154091),
   (None, 0.4827946006315249)]),
 (1,
  [(None, 0.5015110539688712),
   (None, 0.5386838262858766),
   (None, 0.5559514411717907),
   (None, 0.5791711028339818),
   (None, 0.5859181330547648),
   (None, 0.5919054475829714),
   (None, 0.5940488320629793),
   (None, 0.6672624578732929),
   (None, 0.7136071187846663),
   (None, 0.7390672897058819),
   (None, 0.755491347703195),
   (None, 0.7778583549990876),
   (None, 0.7890597618092786),
   (None, 0.8149761669048884),
   (None, 0.8972478792445862),
   (None, 0.9443627957074038),
   (None, 0.9615828989917868),
   (None, 0.9792664092527378)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [None]:
#Создаем входной список чисел
rand_list = np.random.rand(10)
print(rand_list)

#Создаем пару ключ-значение, в данном случае ключ - none
def MAP(_, value):
  yield (None, value)

#Выдаем конечный результат, т.е. максимальное число
def REDUCE(_, val):
  yield max(val)

#Чтение элементов
def RECORDREADER():
  for i in range(len(rand_list)):
    yield(None, rand_list[i])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[0.0112636  0.8140364  0.69869154 0.1337702  0.18784082 0.5221562
 0.32170672 0.54392952 0.59499065 0.45401093]


[0.814036397900506]

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [None]:
def MAP(_, value):
  yield (None, value)

def REDUCE(_, val):
  yield np.mean(val)

def RECORDREADER():
  for i in range(len(rand_list)):
    yield(None, rand_list[i])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[0.428239657226805]

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [None]:
from operator import itemgetter
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return sorted(t.items(), key=itemgetter(0)) #Функция sorted возвращает список

shuffle_output = groupbykey(map_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [None]:
input_collection = [
    User(id=0, age=22, gender="female", social_contacts=30),
    User(id=1, age=28, gender="male", social_contacts=150),
    User(id=2, age=33, gender="female", social_contacts=420),
    User(id=3, age=22, gender="male", social_contacts=320),
    User(id=4, age=19, gender="female", social_contacts=180),
    User(id=2, age=31, gender="female", social_contacts=410),
    User(id=1, age=27, gender="male", social_contacts=170),
    User(id=4, age=21, gender="female", social_contacts=200),
]

maps = 3
reducers = 2

#Выполняет разделение набора
def INPUTFORMAT():
  global maps
  #Чтение данных
  def RECORDREADER(split):
    for user in split:
      yield (None, user)

  split_size =  int(np.ceil(len(input_collection)/maps))
  for i in range(0, len(input_collection), split_size):
    yield RECORDREADER(input_collection[i:i+split_size])

#Возвращает ключ-значение
def MAP(_, user:NamedTuple):
  yield user.id, user

#Выдаем конечный результат
def REDUCE(userId:int, user:Iterator[str]):
  yield user[0]

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

8 key-value pairs were sent over a network.


[(0, [User(id=0, age=22, social_contacts=30, gender='female')]),
 (1,
  [User(id=1, age=28, social_contacts=150, gender='male'),
   User(id=2, age=33, social_contacts=420, gender='female'),
   User(id=3, age=22, social_contacts=320, gender='male'),
   User(id=4, age=19, social_contacts=180, gender='female')])]

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [None]:
#Для примера создадим класс "табло аэропорта"
class flight(NamedTuple):
  flight_id: int
  terminal: str
  destination: str
  time: str
  gate: int
  status: str
  #"check-in, boarding, delayed, departed, arrived"

FlightsBoard_1 = [
    flight(flight_id = 123, terminal = 'A',  destination = 'Dubai', time = '20/04/24 16:30', gate = 11,  status = 'check-in' ),
    flight(flight_id = 987, terminal = 'B',  destination = 'Bali', time = '20/04/24 17:30', gate = 6,  status = 'boarding' ),
    flight(flight_id = 456, terminal = 'C', destination = 'Maldives', time = '20/04/24 18:30', gate = 5,  status = 'check-in' ),
    flight(flight_id = 765, terminal = 'A',destination = 'Dubai', time = '20/04/24 18:30', gate = 10,  status = 'departed' ),
]

#Выбираем только те кортежи, где пункт назначения Дубаи
def MAP(_, row:NamedTuple):
  if (row.destination == 'Dubai'):
    yield (row, row)
#Возвращаем что получили на вход
def REDUCE(row1:NamedTuple, row2:NamedTuple):
  yield (row1, row2)

#Чтение элементов
def RECORDREADER():
  return [(f.flight_id, f) for f in FlightsBoard_1]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(flight(flight_id=123, terminal='A', destination='Dubai', time='20/04/24 16:30', gate=11, status='check-in'),
  [flight(flight_id=123, terminal='A', destination='Dubai', time='20/04/24 16:30', gate=11, status='check-in')]),
 (flight(flight_id=765, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed'),
  [flight(flight_id=765, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed')])]

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [None]:
#Возвращаем в качесвте ключа и значения статус рейса
def MAP(_, row:NamedTuple):
  yield (row.status, row.status)

#Возвращаются преобразованные ключ значения без повторений
def REDUCE(row1:NamedTuple, row2:NamedTuple):
  yield (row1, row2[0])
#Чтение данных
def RECORDREADER():
  return [(f.flight_id, f) for f in FlightsBoard_1]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('boarding', 'boarding'), ('check-in', 'check-in'), ('departed', 'departed')]

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [None]:
#Допустим у нас появился второй аэропорт в городе
FlightsBoard_2 = [
    flight(flight_id = 123, terminal = 'A',  destination = 'Dubai', time = '20/04/24 16:30', gate = 11,  status = 'check-in' ),
    flight(flight_id = 456, terminal = 'B',  destination = 'Bali', time = '20/04/24 17:30', gate = 6,  status = 'boarding' ),
    flight(flight_id = 789, terminal = 'C', destination = 'Maldives', time = '20/04/24 18:30', gate = 5,  status = 'check-in' ),
    flight(flight_id = 321, terminal = 'A',destination = 'Dubai', time = '20/04/24 18:30', gate = 10,  status = 'departed' ),
]

#Возвращаем кортежи
def MAP(_, row:NamedTuple):
  yield (row, row)

def REDUCE(row1:NamedTuple, row2:NamedTuple):
  yield (row1, row2[0])

#Считываем данные с первой таблицы и второй
def RECORDREADER():
  return [(f.flight_id, f) for f in FlightsBoard_1 + FlightsBoard_2]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(flight(flight_id=123, terminal='A', destination='Dubai', time='20/04/24 16:30', gate=11, status='check-in'),
  flight(flight_id=123, terminal='A', destination='Dubai', time='20/04/24 16:30', gate=11, status='check-in')),
 (flight(flight_id=321, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed'),
  flight(flight_id=321, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed')),
 (flight(flight_id=456, terminal='B', destination='Bali', time='20/04/24 17:30', gate=6, status='boarding'),
  flight(flight_id=456, terminal='B', destination='Bali', time='20/04/24 17:30', gate=6, status='boarding')),
 (flight(flight_id=456, terminal='C', destination='Maldives', time='20/04/24 18:30', gate=5, status='check-in'),
  flight(flight_id=456, terminal='C', destination='Maldives', time='20/04/24 18:30', gate=5, status='check-in')),
 (flight(flight_id=765, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed'

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [None]:
#Возвращает 2 кортежа в качестве ключ-значение
def MAP(_, row:NamedTuple):
  yield (row, row)

#Возвращает 2 кортежа в качестве ключ-значение если длина = 2
def REDUCE(row1:NamedTuple, row2:NamedTuple):
  if (len(row2) == 2):
    yield (row1, row2[0])

#Чтение из двух таблиц
def RECORDREADER():
  return [(f.flight_id, f) for f in FlightsBoard_1 + FlightsBoard_2]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(flight(flight_id=123, terminal='A', destination='Dubai', time='20/04/24 16:30', gate=11, status='check-in'),
  flight(flight_id=123, terminal='A', destination='Dubai', time='20/04/24 16:30', gate=11, status='check-in'))]

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [None]:
#Возвращаем кортеж, 0 если принадлежит первой таблице, а кортеж и 1 если второй таблице
def MAP(key, value):
    if value in FlightsBoard_1:
        yield (value, 0)
    elif value in FlightsBoard_2:
        yield (value, 1)

#Возвращает 2 кортежа в качестве ключ-значение если является списком R, то есть 0
def REDUCE(key, rows):
  if (rows == [0]):
    yield (key, key)

#Чтение данных
def RECORDREADER():
  return [(f.flight_id, f) for f in FlightsBoard_1 + FlightsBoard_2]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(sorted(output))
output

[(flight(flight_id=456, terminal='C', destination='Maldives', time='20/04/24 18:30', gate=5, status='check-in'),
  flight(flight_id=456, terminal='C', destination='Maldives', time='20/04/24 18:30', gate=5, status='check-in')),
 (flight(flight_id=765, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed'),
  flight(flight_id=765, terminal='A', destination='Dubai', time='20/04/24 18:30', gate=10, status='departed')),
 (flight(flight_id=987, terminal='B', destination='Bali', time='20/04/24 17:30', gate=6, status='boarding'),
  flight(flight_id=987, terminal='B', destination='Bali', time='20/04/24 17:30', gate=6, status='boarding'))]

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [None]:
#Создадим класс билет
class Ticket(NamedTuple):
  passenger: str
  flight_id: int
  price: float
  seat_number: str

tickets = [
    Ticket('Passenger 1', 123, 299.99, '14A'),
    Ticket('Passenger 2', 456, 375.50, '22B'),
    Ticket('Passenger 3', 789, 425.75, '9C'),
    Ticket('Passenger 4', 123, 319.00, '18D'),
    Ticket('Passenger 5', 567, 399.25, '27E')
]

In [None]:
#Создаем пары (b,(R,a)) и (b,(S,c))
#Пусть а = destination , b = flight_id, c = passenger
def MAP(_, row:NamedTuple):
  if ((row in FlightsBoard_1) or (row in FlightsBoard_2)):
      yield (row.flight_id, (0, (row.destination)))
  elif (row in tickets) and row.flight_id in [row1.flight_id for row1 in FlightsBoard_1 + FlightsBoard_2]:
    yield (row.flight_id, (1, (row.passenger)))

def REDUCE(key, info:NamedTuple):
  for rowS in info:
    if (rowS[0] == 1):
      for rowR in info:
        if (rowR[0] == 0):
          yield (None, (rowR[1], key, rowS[1]))

#Чтение данных с таблиц
def RECORDREADER():
  return [(f.flight_id, f) for f in FlightsBoard_1 + FlightsBoard_2+tickets]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(None, ('Dubai', 123, 'Passenger 1')),
 (None, ('Dubai', 123, 'Passenger 1')),
 (None, ('Dubai', 123, 'Passenger 4')),
 (None, ('Dubai', 123, 'Passenger 4')),
 (None, ('Maldives', 456, 'Passenger 2')),
 (None, ('Bali', 456, 'Passenger 2')),
 (None, ('Maldives', 789, 'Passenger 3'))]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [None]:
#Создаем пару номер рейса и цена билета
def MAP(key, value):
  yield (value.flight_id, value.price)

#Вычисляем среднюю цену по каждому рейсу
def REDUCE(key, rows):
  yield (key, sum(rows) / len(rows))

#Считываем данные
def RECORDREADER():
  return [(f.flight_id, f) for f in tickets]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(sorted(output))
output

[(123, 309.495), (456, 375.5), (567, 399.25), (789, 425.75)]

#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [None]:
from typing import Iterator
import numpy as np

d = 4 #Размер матрицы
mat = np.ones((d+1,d)) #Создание матрицы из единиц
vec1 = np.random.rand(d//2) #Генерация векторов
vec2 = np.random.rand(d-d//2)

#Принимает координаты и значение
#Выдает новое значение, умноженное на эл-т vec1 или vec2 в зависимости от индекса j
def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  if (j < len(vec1)):
    yield (i, value*vec1[j])
  else:
    yield (i, value*vec2[j-len(vec1)])
#Принимает индекс строки i и преобразованные значения из строки
#Суммирует значения и выдает кортеж
def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

#Выдает координаты и значение mat[i,j] для каждого элемента матрицы
def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.381588273133806),
 (1, 1.381588273133806),
 (2, 1.381588273133806),
 (3, 1.381588273133806),
 (4, 1.381588273133806)]

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [None]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [None]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1
  for i in range(small_mat.shape[0]):
    v2 = (small_mat[i][j]*w)
    k2 = (i, k)
    yield (k2, v2)
  # solution code that yield(k2,v2) pairs

def REDUCE(key, values):
  (i, k) = key
  yield (key, sum(values))
  # solution code that yield(k3,v3) pairs

Проверьте своё решение

In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [None]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [None]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for i in range(small_mat.shape[0]):
    for j in range(big_mat.shape[0]):
      for k in range(big_mat.shape[1]):
        yield (((i,j), small_mat[i,j]),((j,k), big_mat[j,k]))

def MAP(list1, list2):
  (k1, v1) = list1
  (k2, v2) = list2
  yield ((k1[0], k2[1]), v1*v2)

def REDUCE(key, values):
  (i, k) = key
  yield (key, sum(values))

In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [None]:
import numpy as np

I = 2
J = 3
K = 4*10

maps = 2
reducers = 3

small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def INPUTFORMAT():
  def RECORDREADER(key, split):
    mat = []
    for i in range(split.shape[0]):
      for j in range(split.shape[1]):
        mat.append(((key, i, j), split[i, j]))
    return mat

  list1 = RECORDREADER('S', small_mat)
  list2 = RECORDREADER('B', big_mat)
  yield list1
  yield list2


def MAP(k, v):
  (mat, i, j) = k
  w = v
  if mat == 'S':
    yield (j, (mat, i, w))
  else:
    yield (i, (mat, j, w))

def REDUCE(key, values):
  small = [v for v in values if v[0] == 'S']
  big = [v for v in values if v[0] == 'B']
  for s in small:
    for b in big:
      yield ((s[1], b[1]), s[2] * b[2])

def INPUT_MUL():
  for j in joined:
    yield j[1]

def MAP_MUL(k1, v1):
  yield (k1, v1)

def REDUCE_MUL(key, values):
  res_val = 0
  for v in values:
    res_val += v
  yield (key, res_val)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
joined = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]

mul_output = MapReduceDistributed(INPUT_MUL, MAP_MUL, REDUCE_MUL, COMBINER=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in mul_output]

solution = []
for p in pre_result:
  for v in p[1]:
    solution.append(v)

126 key-value pairs were sent over a network.
240 key-value pairs were sent over a network.


In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [None]:
import numpy as np

I = 2
J = 3
K = 4*10

maps = 2
reducers = 3

small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def INPUTFORMAT():
  global maps
  def RECORDREADER(key, split):
    mat = []
    for i in range(split.shape[0]):
      for j in range(split.shape[1]):
        mat.append(((key, i, j), split[i, j]))
    return mat

  small = RECORDREADER('S', small_mat)
  split_size =  int(np.ceil(len(small)/maps))
  for i in range(0, len(small), split_size):
    yield small[i:i+split_size]

  big = RECORDREADER('B', big_mat)
  split_size =  int(np.ceil(len(big)/maps))
  for i in range(0, len(big), split_size):
    yield big[i:i+split_size]

def MAP(k, v):
  (mat, i, j) = k
  w = v
  if mat == 'S':
    yield (j, (mat, i, w))
  else:
    yield (i, (mat, j, w))

def REDUCE(key, values):
  small = [v for v in values if v[0] == 'S']
  big = [v for v in values if v[0] == 'B']
  for s in small:
    for b in big:
      yield ((s[1], b[1]), s[2] * b[2])

def INPUT_MUL():
  for j in joined:
    yield j[1]

def MAP_MUL(k1, v1):
  yield (k1, v1)

def REDUCE_MUL(key, values):
  res_val = 0
  for v in values:
    res_val += v
  yield (key, res_val)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
joined = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]

mul_output = MapReduceDistributed(INPUT_MUL, MAP_MUL, REDUCE_MUL, COMBINER=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in mul_output]

solution = []
for p in pre_result:
  for v in p[1]:
    solution.append(v)


126 key-value pairs were sent over a network.
240 key-value pairs were sent over a network.


In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True