# Введение в MapReduce модель на Python


In [54]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [55]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [56]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [57]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [58]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [59]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [60]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [61]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [62]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [63]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [64]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [65]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [66]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [67]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [68]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.6232705057934689),
 (1, 1.6232705057934689),
 (2, 1.6232705057934689),
 (3, 1.6232705057934689),
 (4, 1.6232705057934689)]

## Inverted index

In [69]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [70]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [71]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [72]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('banana', 2), ('what', 10)]),
 (1, [('a', 2), ('is', 18), ('it', 18)])]

## TeraSort

In [73]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.06306477149158185),
   (None, 0.06793145179451954),
   (None, 0.11213391953194918),
   (None, 0.1431927294190546),
   (None, 0.1808716643431334),
   (None, 0.1972875362135058),
   (None, 0.24359164787601773),
   (None, 0.2436473722328688),
   (None, 0.27434415506367094),
   (None, 0.38961798713554685),
   (None, 0.4185248400750009)]),
 (1,
  [(None, 0.5178595317999),
   (None, 0.5319502097708432),
   (None, 0.5408443123688192),
   (None, 0.5509860471226606),
   (None, 0.5718467043474912),
   (None, 0.5933929949952094),
   (None, 0.6387640678494104),
   (None, 0.6583566597891234),
   (None, 0.7020277742387662),
   (None, 0.7296799306513202),
   (None, 0.7318401984802079),
   (None, 0.7593328402151177),
   (None, 0.8055368208523699),
   (None, 0.8672644090879833),
   (None, 0.9215647075979578),
   (None, 0.9367306494640797),
   (None, 0.9597236497584413),
   (None, 0.9852095300036809),
   (None, 0.9853248097389504)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

In [74]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [75]:
import pandas as pd
stations = pd.read_csv('/content/drive/My Drive/Colab Notebooks/Большие данные/stations.csv') # датасет с данными

# добавим дублирующийся id, чтобы потом можно было проверять устранение дубликатов
stations.loc[len(stations)] = {'id': 84, 'name': 'Duplicate id', 'lat': 37.342725, 'long': -121.89561699999999, 'dock_count': 15, 'city': 'San Jose', 'installation_date': '4/9/2014'}

In [76]:
class Station(NamedTuple): # наследующийся от именованного кортежа класс станции
  id: int
  name: str
  lat: float
  long: float
  dock_count: int
  city: str
  installation_date: str

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [77]:
# найдём все записи с максимальным значением dock_count

def RECORDREADER_MAXIMUM():
  return [(station.id, Station(*station)) for index, station in stations.iterrows()] # считываем строки датафрейма и
                                                                                     # возращаем объект типа Station с его id

def MAP_MAXIMUM(_, row:NamedTuple): # берём всю группу станций с одним ключом
  yield (0, row)

def REDUCE_MAXIMUM(_, rows:Iterator[NamedTuple]): # поиск максимального значения dock_count
  max_dock_count = 0
  target_stations = []
  for row in rows:
    if row.dock_count >= max_dock_count:
      max_dock_count = row.dock_count
      target_stations.append(row)
  yield target_stations

output = MapReduce(RECORDREADER_MAXIMUM, MAP_MAXIMUM, REDUCE_MAXIMUM)
output = list(output)
output

[[Station(id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.901782, dock_count=27, city='San Jose', installation_date='8/6/2013'),
  Station(id=61, name='2nd at Townsend', lat=37.780526, long=-122.39028799999998, dock_count=27, city='San Francisco', installation_date='8/22/2013'),
  Station(id=67, name='Market at 10th', lat=37.776619, long=-122.417385, dock_count=27, city='San Francisco', installation_date='8/23/2013'),
  Station(id=77, name='Market at Sansome', lat=37.789625, long=-122.400811, dock_count=27, city='San Francisco', installation_date='8/25/2013')]]

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [78]:
# найдем среднее значение dock_count по всем станциям

def RECORDREADER_AVERAGE():
  return [(station.id, Station(*station)) for index, station in stations.iterrows()]

def MAP_AVERAGE(_, row:NamedTuple):
  yield (0, row)

def REDUCE_AVERAGE(_, rows:Iterator[NamedTuple]): # вычисление среднего значения
  sum = 0
  count = 0
  for row in rows:
    sum += row.dock_count
    count += 1
  if count > 0:
    yield sum/count
  else:
    yield 0

output = MapReduce(RECORDREADER_AVERAGE, MAP_AVERAGE, REDUCE_AVERAGE)
output = list(output)
output

[17.619718309859156]

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [79]:
def groupbykey(iterable):
  iterable = sorted(iterable, key=lambda x: x[0]) # перед группировкой выполняется сортировка по первому полю элементов
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [80]:
output = MapReduce(RECORDREADER_MAXIMUM, MAP_MAXIMUM, REDUCE_MAXIMUM)
output = list(output)
output

[[Station(id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.901782, dock_count=27, city='San Jose', installation_date='8/6/2013'),
  Station(id=61, name='2nd at Townsend', lat=37.780526, long=-122.39028799999998, dock_count=27, city='San Francisco', installation_date='8/22/2013'),
  Station(id=67, name='Market at 10th', lat=37.776619, long=-122.417385, dock_count=27, city='San Francisco', installation_date='8/23/2013'),
  Station(id=77, name='Market at Sansome', lat=37.789625, long=-122.400811, dock_count=27, city='San Francisco', installation_date='8/25/2013')]]

In [81]:
output = MapReduce(RECORDREADER_AVERAGE, MAP_AVERAGE, REDUCE_AVERAGE)
output = list(output)
output

[17.619718309859156]

### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [82]:
maps = 3
reducers = 3

def INPUTFORMAT():
  global maps

  def RECORDREADER(split): # считывание записей по части (split) множества
    for index, station in split.iterrows():
      yield (station.id, Station(*station))

  split_size = int(np.ceil(len(stations)/maps)) # размер части
  for i in range(0, len(stations), split_size):
    yield RECORDREADER(stations[i:i+split_size]) # вызов RECORDREADER для каждой части

def MAP(_, row:NamedTuple): # группировка станций по dock_count
    yield (row.dock_count, row)

def REDUCE(_, rows:Iterator[NamedTuple]): # оставляем в каждой группе с одним dock_count только первую запись
    yield rows[0]

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

71 key-value pairs were sent over a network.


[(0,
  [Station(id=3, name='San Jose Civic Center', lat=37.330698, long=-121.888979, dock_count=15, city='San Jose', installation_date='8/5/2013'),
   Station(id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.901782, dock_count=27, city='San Jose', installation_date='8/6/2013')]),
 (1,
  [Station(id=5, name='Adobe on Almaden', lat=37.331415, long=-121.8932, dock_count=19, city='San Jose', installation_date='8/5/2013'),
   Station(id=22, name='Redwood City Caltrain Station', lat=37.486078000000006, long=-122.232089, dock_count=25, city='Redwood City', installation_date='8/15/2013')]),
 (2,
  [Station(id=4, name='Santa Clara at Almaden', lat=37.333988, long=-121.894902, dock_count=11, city='San Jose', installation_date='8/6/2013'),
   Station(id=28, name='Mountain View Caltrain Station', lat=37.394358, long=-122.07671299999998, dock_count=23, city='Mountain View', installation_date='8/15/2013')])]

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [83]:
# выбираем все записи, у которых город Redwood City

def RECORDREADER():
  return [(station.id, Station(*station)) for index, station in stations.iterrows()]

def MAP(_, row:NamedTuple): # возвращаем записи, у которых город Redwood City
  if row.city == "Redwood City":
    yield (row.id, row)

def REDUCE(id:int, rows:Iterator[NamedTuple]):
  yield (id, rows)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(21,
  [Station(id=21, name='Franklin at Maple', lat=37.481758, long=-122.226904, dock_count=15, city='Redwood City', installation_date='8/12/2013')]),
 (22,
  [Station(id=22, name='Redwood City Caltrain Station', lat=37.486078000000006, long=-122.232089, dock_count=25, city='Redwood City', installation_date='8/15/2013')]),
 (23,
  [Station(id=23, name='San Mateo County Center', lat=37.487616, long=-122.229951, dock_count=15, city='Redwood City', installation_date='8/15/2013')]),
 (24,
  [Station(id=24, name='Redwood City Public Library', lat=37.484219, long=-122.227424, dock_count=15, city='Redwood City', installation_date='8/12/2013')]),
 (25,
  [Station(id=25, name='Stanford in Redwood City', lat=37.48537, long=-122.203288, dock_count=15, city='Redwood City', installation_date='8/12/2013')]),
 (26,
  [Station(id=26, name='Redwood City Medical Center', lat=37.487682, long=-122.223492, dock_count=15, city='Redwood City', installation_date='8/12/2013')]),
 (83,
  [Station(id=83, name=

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [84]:
S = ["name","dock_count"] # множество атрибутов S, на которое выполняется проекция

def RECORDREADER():
  return [(station.id, Station(*station)) for index, station in stations.iterrows()]

def MAP(_, row:NamedTuple):
  yield (row.id, tuple(getattr(row, field) for field in S)) # создание нового кортежа с полями из S

def REDUCE(id:int, rows:Iterator[tuple]): # возвращаются записи с уникальными id
  yield (id, rows[0])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(2, ('San Jose Diridon Caltrain Station', 27)),
 (3, ('San Jose Civic Center', 15)),
 (4, ('Santa Clara at Almaden', 11)),
 (5, ('Adobe on Almaden', 19)),
 (6, ('San Pedro Square', 15)),
 (7, ('Paseo de San Antonio', 15)),
 (8, ('San Salvador at 1st', 15)),
 (9, ('Japantown', 15)),
 (10, ('San Jose City Hall', 15)),
 (11, ('MLK Library', 19)),
 (12, ('SJSU 4th at San Carlos', 19)),
 (13, ('St James Park', 15)),
 (14, ('Arena Green / SAP Center', 19)),
 (16, ('SJSU - San Salvador at 9th', 15)),
 (21, ('Franklin at Maple', 15)),
 (22, ('Redwood City Caltrain Station', 25)),
 (23, ('San Mateo County Center', 15)),
 (24, ('Redwood City Public Library', 15)),
 (25, ('Stanford in Redwood City', 15)),
 (26, ('Redwood City Medical Center', 15)),
 (27, ('Mountain View City Hall', 15)),
 (28, ('Mountain View Caltrain Station', 23)),
 (29, ('San Antonio Caltrain Station', 23)),
 (30, ('Evelyn Park and Ride', 15)),
 (31, ('San Antonio Shopping Center', 15)),
 (32, ('Castro Street and El Camino Re

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [85]:
# разделение данных на две пересекающиеся части
stations_a = stations.iloc[0:50]
stations_b = stations.iloc[30:len(stations)]

In [86]:
def RECORDREADER(): # считывание записей из обоих множеств и их объединение
  return [(station.id, Station(*station)) for index, station in stations_a.iterrows()] + [(station.id, Station(*station)) for index, station in stations_b.iterrows()]

def MAP(_, row:NamedTuple):
  yield (row.id, row)

def REDUCE(id:int, rows:Iterator[NamedTuple]):
  yield (id, rows)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(2,
  [Station(id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.901782, dock_count=27, city='San Jose', installation_date='8/6/2013')]),
 (3,
  [Station(id=3, name='San Jose Civic Center', lat=37.330698, long=-121.888979, dock_count=15, city='San Jose', installation_date='8/5/2013')]),
 (4,
  [Station(id=4, name='Santa Clara at Almaden', lat=37.333988, long=-121.894902, dock_count=11, city='San Jose', installation_date='8/6/2013')]),
 (5,
  [Station(id=5, name='Adobe on Almaden', lat=37.331415, long=-121.8932, dock_count=19, city='San Jose', installation_date='8/5/2013')]),
 (6,
  [Station(id=6, name='San Pedro Square', lat=37.336721, long=-121.894074, dock_count=15, city='San Jose', installation_date='8/7/2013')]),
 (7,
  [Station(id=7, name='Paseo de San Antonio', lat=37.333798, long=-121.886943, dock_count=15, city='San Jose', installation_date='8/7/2013')]),
 (8,
  [Station(id=8, name='San Salvador at 1st', lat=37.330165, long=-121.885831, dock_count=15, ci

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [87]:
def RECORDREADER():
  return [(station.id, Station(*station)) for index, station in stations_a.iterrows()] + [(station.id, Station(*station)) for index, station in stations_b.iterrows()]

def MAP(_, row:NamedTuple):
  yield (row.id, row)

def REDUCE(id:int, rows:Iterator[NamedTuple]): # создаем пару только тогда, когда для ключа в списке два элемента и они равны
  if len(rows) == 2 and rows[0] == rows[1]:
    yield (id, rows[0])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(37,
  Station(id=37, name='Cowper at University', lat=37.448598, long=-122.159504, dock_count=11, city='Palo Alto', installation_date='8/14/2013')),
 (38,
  Station(id=38, name='Park at Olive', lat=37.4256839, long=-122.13777749999998, dock_count=15, city='Palo Alto', installation_date='8/14/2013')),
 (39,
  Station(id=39, name='Powell Street BART', lat=37.783871000000005, long=-122.408433, dock_count=19, city='San Francisco', installation_date='8/25/2013')),
 (41,
  Station(id=41, name='Clay at Battery', lat=37.795001, long=-122.39997, dock_count=15, city='San Francisco', installation_date='8/19/2013')),
 (42,
  Station(id=42, name='Davis at Jackson', lat=37.79728, long=-122.398436, dock_count=15, city='San Francisco', installation_date='8/19/2013')),
 (45,
  Station(id=45, name='Commercial at Montgomery', lat=37.794231, long=-122.402923, dock_count=15, city='San Francisco', installation_date='8/19/2013')),
 (46,
  Station(id=46, name='Washington at Kearney', lat=37.795425, long=-12

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [88]:
def RECORDREADER(): # при считывании записей ставим у них id множеств 0 или 1
  return [(0, Station(*station)) for index, station in stations_a.iterrows()] + [(1, Station(*station)) for index, station in stations_b.iterrows()]

def MAP(id:int, row:NamedTuple): # группируем одинаковые записи
  yield (row, id)

def REDUCE(rows:Iterator[NamedTuple], ids): # возвращаем только те записи, у которых единственный id множества 0
  if ids == [0]:
   yield rows

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[Station(id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.901782, dock_count=27, city='San Jose', installation_date='8/6/2013'),
 Station(id=3, name='San Jose Civic Center', lat=37.330698, long=-121.888979, dock_count=15, city='San Jose', installation_date='8/5/2013'),
 Station(id=4, name='Santa Clara at Almaden', lat=37.333988, long=-121.894902, dock_count=11, city='San Jose', installation_date='8/6/2013'),
 Station(id=5, name='Adobe on Almaden', lat=37.331415, long=-121.8932, dock_count=19, city='San Jose', installation_date='8/5/2013'),
 Station(id=6, name='San Pedro Square', lat=37.336721, long=-121.894074, dock_count=15, city='San Jose', installation_date='8/7/2013'),
 Station(id=7, name='Paseo de San Antonio', lat=37.333798, long=-121.886943, dock_count=15, city='San Jose', installation_date='8/7/2013'),
 Station(id=8, name='San Salvador at 1st', lat=37.330165, long=-121.885831, dock_count=15, city='San Jose', installation_date='8/5/2013'),
 Station(id=9, 

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [89]:
# в датафрейме stations вместо названий городов ставим их id и создаем отдельный датафрейм с названиями и id городов
new_stations = stations.copy(deep = True)
cities = new_stations[['city']].drop_duplicates().reset_index(drop=True)
cities['city_id'] = range(1, len(cities) + 1)
city_mapping = dict(zip(cities['city'], cities['city_id']))
new_stations['city'] = new_stations['city'].map(city_mapping)

class City(NamedTuple):
  city: str
  city_id: int

In [90]:
new_stations

Unnamed: 0,id,name,lat,long,dock_count,city,installation_date
0,2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,1,8/6/2013
1,3,San Jose Civic Center,37.330698,-121.888979,15,1,8/5/2013
2,4,Santa Clara at Almaden,37.333988,-121.894902,11,1,8/6/2013
3,5,Adobe on Almaden,37.331415,-121.893200,19,1,8/5/2013
4,6,San Pedro Square,37.336721,-121.894074,15,1,8/7/2013
...,...,...,...,...,...,...,...
66,80,Santa Clara County Civic Center,37.352601,-121.905733,15,1,12/31/2013
67,82,Broadway St at Battery St,37.798541,-122.400862,15,5,1/22/2014
68,83,Mezes Park,37.491269,-122.236234,15,2,2/20/2014
69,84,Ryland Park,37.342725,-121.895617,15,1,4/9/2014


In [91]:
cities

Unnamed: 0,city,city_id
0,San Jose,1
1,Redwood City,2
2,Mountain View,3
3,Palo Alto,4
4,San Francisco,5


In [92]:
def RECORDREADER(): # считываем станции и города, ставя у них id города
  return [(station.city, Station(*station)) for index, station in new_stations.iterrows()] + [(city.city_id, City(*city)) for index, city in cities.iterrows()]

def MAP(id:int, row:NamedTuple): # группируем по id городов
  yield (id, row)

def REDUCE(id, rows:Iterator[NamedTuple]): # в списке записей находим город и присоединяем его к станциям
  city = None
  for row in rows:
    if type(row) is City:
      city = row

  for row in rows:
    if type(row) is Station:
      yield (row, row.city, city)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
join_result = output
output

[(Station(id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.901782, dock_count=27, city=1, installation_date='8/6/2013'),
  1,
  City(city='San Jose', city_id=1)),
 (Station(id=3, name='San Jose Civic Center', lat=37.330698, long=-121.888979, dock_count=15, city=1, installation_date='8/5/2013'),
  1,
  City(city='San Jose', city_id=1)),
 (Station(id=4, name='Santa Clara at Almaden', lat=37.333988, long=-121.894902, dock_count=11, city=1, installation_date='8/6/2013'),
  1,
  City(city='San Jose', city_id=1)),
 (Station(id=5, name='Adobe on Almaden', lat=37.331415, long=-121.8932, dock_count=19, city=1, installation_date='8/5/2013'),
  1,
  City(city='San Jose', city_id=1)),
 (Station(id=6, name='San Pedro Square', lat=37.336721, long=-121.894074, dock_count=15, city=1, installation_date='8/7/2013'),
  1,
  City(city='San Jose', city_id=1)),
 (Station(id=7, name='Paseo de San Antonio', lat=37.333798, long=-121.886943, dock_count=15, city=1, installation_date='8/7/

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [93]:
# найдем количество станций в каждом городе

def RECORDREADER(): # считываем тройки
  return [(city_id, station, city) for station, city_id, city in join_result]

def MAP(city_id:int, station:NamedTuple, city:NamedTuple): # группируем станции по id города
  yield (city_id, station)

def REDUCE(city_id:int, rows:Iterator[NamedTuple]): # для каждого id города считаем количество станций
  yield f"(city_id = {city_id}: {len(rows)} stations)"

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

['(city_id = 1: 17 stations)',
 '(city_id = 2: 7 stations)',
 '(city_id = 3: 7 stations)',
 '(city_id = 4: 5 stations)',
 '(city_id = 5: 35 stations)']

#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [94]:
# когда не помещается в памяти map задачи, матрица и вектор делятся на соответствующие части (stripes)

mat = np.array([[1, 2, 3, 4, 5, 6],
                [7, 8, 9, 10, 11, 12],
                [13, 14, 15, 16, 17, 18]])

vec = np.array([1, 2, 3, 4, 5, 6])

stripe_width = 2 # ширина
stripes_num = int(len(mat[0])/stripe_width) # количество частей

def RECORDREADER(): # разделение матрицы и вектора на части
  a = 0
  b = stripe_width
  for s in range(stripes_num):
    stripe_mat = mat[:,a:b]
    stripe_vec = vec[a:b]
    a += stripe_width
    b += stripe_width
    yield (s ,(stripe_mat, stripe_vec))

def MAP(num, stripe): # умножение соответствующих частей матрицы и вектора
  yield (num, stripe[0] @ stripe[1])

def REDUCE(num, results):
  yield (num, results)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, [array([ 5, 23, 41])]),
 (1, [array([ 25,  67, 109])]),
 (2, [array([ 61, 127, 193])])]

In [95]:
result = np.sum([v[1][0] for v in output], axis=0) # объединение результатов для получения итогового ответа
result

array([ 91, 217, 343])

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [96]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [97]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1
  # solution code that yield(k2,v2) pairs
  for i in range(I):
    k2 = (i, k)
    v2 = small_mat[i, j] * w
    yield (k2, v2)

def REDUCE(key, values):
  (i, k) = key
  # solution code that yield(k3,v3) pairs
  k3 = (i, k)
  v3 = 0
  for j in range(J):
    v3 += values[j]
  yield (k3, v3)

Проверьте своё решение

In [98]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [99]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [100]:
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J)
big_mat = np.random.rand(J,K)

def RECORDREADER(): # обе матрицы
  for i in range(I):
    for j in range(J):
      for k in range(K):
        yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

def MAP(kv1, kv2):
  (i, j), v1 = kv1
  (j, k), v2 = kv2
  yield ((i, k), v1*v2)

def REDUCE(key, values):
  v3 = 0
  for j in range(J):
    v3 += values[j]
  yield (key, v3)

In [101]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [102]:
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J)
big_mat = np.random.rand(J,K)

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for i in split:
      for j in range(J):
        for k in range(K):
          yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

  split_size = int(np.ceil(I/maps))
  for i in range(0, I, split_size):
    yield RECORDREADER(range(i, i+split_size))

def MAP(kv1, kv2):
  (i, j), v1 = kv1
  (j, k), v2 = kv2
  yield ((i, k), v1*v2)

def REDUCE(key, values):
  v3 = 0
  for j in range(J):
    v3 += values[j]
  yield (key, v3)

In [103]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
solution = []
for partition in partitioned_output:
  for p in partition[1]:
    solution += [p]

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

240 key-value pairs were sent over a network.


True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

Для каждого генерируемого элемента матрицы возвращается также его индекс, который его идентифицирует, поэтому решение будет работать