# Введение в MapReduce модель на Python


In [2]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [3]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row) #Генерируем кортеж (возраст, строка данных)

#Вычисляет среднее количество социальных контактов
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [4]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [5]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [6]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [7]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [8]:
#Функция разворачивает вложенную итерируемую структуру в плоский итератор
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [9]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [10]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [11]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [12]:
#Среднее число соц. контактов для каждой группы возрастов
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [13]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [14]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [15]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [16]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.526197406579079),
 (1, 2.526197406579079),
 (2, 2.526197406579079),
 (3, 2.526197406579079),
 (4, 2.526197406579079)]

## Inverted index

In [17]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [18]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [19]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [20]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('banana', 2), ('is', 18)]),
 (1, [('a', 2), ('it', 18), ('what', 10)])]

## TeraSort

In [21]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.031216556673922535),
   (None, 0.09453408781038397),
   (None, 0.12511323447876743),
   (None, 0.12579813463751333),
   (None, 0.1425213347593055),
   (None, 0.14416645629430092),
   (None, 0.2195804564956244),
   (None, 0.22174562691663158),
   (None, 0.232515523348782),
   (None, 0.24395702642817496),
   (None, 0.24934930103690633),
   (None, 0.24945705419674125),
   (None, 0.2506306975401358),
   (None, 0.2906836904285658),
   (None, 0.3148948380315676),
   (None, 0.37372643460358457),
   (None, 0.4863965625656025),
   (None, 0.49592666856002454)]),
 (1,
  [(None, 0.5081101948221368),
   (None, 0.5147338418033647),
   (None, 0.5207434627736146),
   (None, 0.5248388514199077),
   (None, 0.6691562099043395),
   (None, 0.6868137375078842),
   (None, 0.7184420602661447),
   (None, 0.7209444146699195),
   (None, 0.7303666752361283),
   (None, 0.8454457579009479),
   (None, 0.8950828505858539),
   (None, 0.9847950752311518)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [22]:
def get_user_data():
    """Генерирует данные для MapReduce."""
    return [(user.id, user) for user in input_collection]

def map_user_contacts(_, user_data: NamedTuple):
    """Преобразует данные пользователя."""
    yield ("", user_data)

def reduce_max_contacts(_,user_data: Iterator[NamedTuple]):
    """Находит пользователя с максимальным количеством контактов."""
    max_contacts = 0
    most_connected_user = None
    for user_data in user_data:
        if user_data.social_contacts > max_contacts:
            max_contacts = user_data.social_contacts
            most_connected_user = user_data
    yield most_connected_user

result = list(MapReduce(get_user_data, map_user_contacts, reduce_max_contacts))
print(result)



[User(id=3, age=33, social_contacts=800, gender='female')]


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [23]:
def get_numbers():
    """Генерирует данные для MapReduce."""
    return [(user.id, user) for user in input_collection]

def map_numbers(_, number: NamedTuple):
    """Преобразует данные пользователя."""
    yield ("", number)

def reduce_calculate_average(_, numbers: Iterator[NamedTuple]):
    """Вычисляет среднее значение чисел."""
    total = 0
    n=0
    for number in numbers:
      total+=number.social_contacts
      n+=1
    average = total / n if n > 0 else 0
    return ('AVG', average)

result = list(MapReduce(get_numbers, map_numbers, reduce_calculate_average))
print(result)


['AVG', 390.0]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [24]:
def group_by_key(items):
    """Группирует элементы по ключу."""
    items = sorted(items, key=lambda e: e[0])  # Сортировка по первому элементу (ключу)
    grouped_items = {}
    for key1, group in items:
      group[key1]=group.get(key1, [])+[group]
    return grouped_items.items()

result = list(MapReduce(get_user_data, map_user_contacts, reduce_max_contacts))
print(result)

result = list(MapReduce(get_numbers, map_numbers, reduce_calculate_average))
print(result)

[User(id=3, age=33, social_contacts=800, gender='female')]
['AVG', 390.0]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [25]:
from typing import Iterator, Tuple
import numpy as np
from collections import defaultdict
from math import log

documents = [
    """it is what it is
it is what it is
it is what it is""",
    """what is it
what is it""",
    """it is a user""",
    """it is what it is
it is what it is
it is what it is""",
    """what is it
what is it""",
    """it is a user"""
]

maps = 3
reducers = 2


def data_splitter(documents, num_splits):
    """Разделяет данные на части для Map-задач."""
    split_size = int(np.ceil(len(documents) / num_splits))
    for i in range(0, len(documents), split_size):
        yield documents[i:i + split_size]


def mapper(doc_id: int, line: str) -> Iterator[Tuple[str, Tuple[int, int]]]:
    """Map-функция: подсчитывает частоту слов в каждой строке."""
    for word in line.lower().split():
        word = word.strip('.,!?"').replace("'s", "")  # простая очистка текста
        yield (word, (doc_id, 1))


def reducer(key: str, values: Iterator[Tuple[int, int]]) -> Iterator[Tuple[str, dict]]:
    """Reduce-функция: подсчитывает TF-IDF."""
    doc_counts = defaultdict(int)
    for doc_id, count in values:
        doc_counts[doc_id] += count

    df = len(doc_counts)  # количество документов, где встречается слово
    num_docs = len(documents)
    idf = log(num_docs / df) if df > 0 else 0  # Обработка случая, когда df = 0

    for doc_id, count in doc_counts.items():
        tf = count / sum(doc_counts.values()) if sum(doc_counts.values()) > 0 else 0  # Обработка деления на ноль
        tfidf = tf * idf
        yield (key, {doc_id: tfidf})


def simple_mapreduce(mapper_func, reducer_func, data):
    """Выполняет упрощенную MapReduce операцию."""
    mapped_data = defaultdict(list)
    sent_count = 0
    for doc_index, doc_list in enumerate(data):
        for doc_id, doc in enumerate(doc_list):
            for line in doc.split('\n'):
                for k, v in mapper_func(doc_index * len(doc_list) + doc_id, line):
                    mapped_data[k].append(v)
                    sent_count += 1

    reduced_data = {}
    for key, value in mapped_data.items():
        reduced_data[key] = list(reducer_func(key, iter(value)))

    print(f"{sent_count} key-value pairs were sent over a network.")
    return reduced_data


# Пример использования:
split_data = list(data_splitter(documents, maps))
result = simple_mapreduce(mapper, reducer, split_data)
print(result)


50 key-value pairs were sent over a network.
{'it': [('it', {0: 0.0}), ('it', {1: 0.0}), ('it', {2: 0.0}), ('it', {3: 0.0}), ('it', {4: 0.0}), ('it', {5: 0.0})], 'is': [('is', {0: 0.0}), ('is', {1: 0.0}), ('is', {2: 0.0}), ('is', {3: 0.0}), ('is', {4: 0.0}), ('is', {5: 0.0})], 'what': [('what', {0: 0.12163953243244931}), ('what', {1: 0.08109302162163289}), ('what', {3: 0.12163953243244931}), ('what', {4: 0.08109302162163289})], 'a': [('a', {2: 0.5493061443340549}), ('a', {5: 0.5493061443340549})], 'user': [('user', {2: 0.5493061443340549}), ('user', {5: 0.5493061443340549})]}


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [26]:
def get_user_data():
    """Генерирует данные для MapReduce."""
    return [(user.id, user) for user in input_collection]

def map_users(_, user_data: NamedTuple):
    """Отбирает данные о женщинах."""
    if user_data.gender == "female":
        yield (user_data, user_data)

def reduce_users(user_data: str, rows: Iterator[NamedTuple]):
    """Группирует данные о женщинах."""
    yield (user_data.gender, rows)

result = list(MapReduce(get_user_data, map_users, reduce_users))
print(result)


[('female', [User(id=1, age=25, social_contacts=240, gender='female')]), ('female', [User(id=2, age=25, social_contacts=500, gender='female')]), ('female', [User(id=3, age=33, social_contacts=800, gender='female')])]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [27]:
def map_select(_, row: NamedTuple):
    if row.gender == "female":
        yield (row, row)


def reduce_select(row: str, rows: Iterator[NamedTuple]):
    yield (row, rows)


def recordreader():
    return [(user.id, user) for user in input_collection]

result = list(MapReduce(recordreader, map_select, reduce_select))
print(result)

[(User(id=1, age=25, social_contacts=240, gender='female'), [User(id=1, age=25, social_contacts=240, gender='female')]), (User(id=2, age=25, social_contacts=500, gender='female'), [User(id=2, age=25, social_contacts=500, gender='female')]), (User(id=3, age=33, social_contacts=800, gender='female'), [User(id=3, age=33, social_contacts=800, gender='female')])]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [28]:
input_collection_1 = [
    User(id=0, age=40, gender='male', social_contacts=200),
    User(id=1, age=35, gender='female', social_contacts=40),
    User(id=2, age=55, gender='female', social_contacts=600),
    User(id=2, age=32, gender='male', social_contacts=270),
    User(id=3, age=30, gender='female', social_contacts=901)
]
input_collection_2 = [
    User(id=3, age=32, gender='female', social_contacts=270),
    User(id=4, age=46, gender='female', social_contacts=145)
]

def map_select(_, row: NamedTuple):
        yield (row.id, row)


def reduce_select(row: str, rows: Iterator[NamedTuple]):
    yield (rows[0], rows[0])


def recordreader():
    return [(user.id, user) for user in input_collection]


result = list(MapReduce(recordreader, map_select, reduce_select))
print(result)


[(User(id=0, age=55, social_contacts=20, gender='male'), User(id=0, age=55, social_contacts=20, gender='male')), (User(id=1, age=25, social_contacts=240, gender='female'), User(id=1, age=25, social_contacts=240, gender='female')), (User(id=2, age=25, social_contacts=500, gender='female'), User(id=2, age=25, social_contacts=500, gender='female')), (User(id=3, age=33, social_contacts=800, gender='female'), User(id=3, age=33, social_contacts=800, gender='female'))]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [29]:
def recordreader():
    return [(user.id, user) for user in input_collection_1 + input_collection_2]

def map_intersection(_, row: NamedTuple):
    yield (row.id, row)

def reduce_intersection(row_id: int, rows: Iterator[NamedTuple]):
    if len(rows) == 2:
        yield rows


result = list(MapReduce(recordreader, map_intersection, reduce_intersection))
print(result)

[[User(id=2, age=55, social_contacts=600, gender='female'), User(id=2, age=32, social_contacts=270, gender='male')], [User(id=3, age=30, social_contacts=901, gender='female'), User(id=3, age=32, social_contacts=270, gender='female')]]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [30]:
def recordreader():
    return [(0, n) for n in input_collection_1] + [(1, k) for k in input_collection_2]

def map_diff (id, user):
  yield (user, id)

def reduce_diff (user, collections):
  if collections == [0]:
    yield (user)

result = list(MapReduce(recordreader,map_diff, reduce_diff))
print(result)


[User(id=0, age=40, social_contacts=200, gender='male'), User(id=1, age=35, social_contacts=40, gender='female'), User(id=2, age=55, social_contacts=600, gender='female'), User(id=2, age=32, social_contacts=270, gender='male'), User(id=3, age=30, social_contacts=901, gender='female')]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [32]:
class Person(NamedTuple):
  id: int
  age: str
  gender: str
  social_contacts: str
  sport_type_id: int

class Sportstype (NamedTuple):
  id: int
  type_sport: str

input_collection3 = [
    Person(id=0, age=40, gender='male', social_contacts=200, sport_type_id=3),
    Person(id=1, age=35, gender='female', social_contacts=40, sport_type_id=30),
    Person(id=2, age=55, gender='female', social_contacts=600, sport_type_id=1),
    Person(id=2, age=32, gender='male', social_contacts=270, sport_type_id=30),
    Person(id=3, age=30, gender='female', social_contacts=901, sport_type_id=0)
]

sport_collection = [
    Sportstype (id=3, type_sport="Hockey"),
    Sportstype (id=30, type_sport="Tennis"),
    Sportstype (id=1, type_sport="Golf"),
    Sportstype (id=0, type_sport="Skiing"),
]

def recordreader():
    return [(person.sport_type_id, person) for person in input_collection3] + [(sport_type_id.id, sport_type_id) for sport_type_id in sport_collection]

def map_join(sport_type_id, row):
    yield (sport_type_id, row)

def reduce_join(sport_type_id, rows):
    people = []
    sport_type_id = None

    for row in rows:
        if type(row) is Person:
            people += [row]
        else:
            sport_type_id = row

    for row in rows:
        if type(row) is Person:
            yield (row, row.sport_type_id, sport_type_id)

result = MapReduce(recordreader, map_join, reduce_join)
result= list(result)
join = result
join

[(Person(id=0, age=40, gender='male', social_contacts=200, sport_type_id=3),
  3,
  Sportstype(id=3, type_sport='Hockey')),
 (Person(id=1, age=35, gender='female', social_contacts=40, sport_type_id=30),
  30,
  Sportstype(id=30, type_sport='Tennis')),
 (Person(id=2, age=32, gender='male', social_contacts=270, sport_type_id=30),
  30,
  Sportstype(id=30, type_sport='Tennis')),
 (Person(id=2, age=55, gender='female', social_contacts=600, sport_type_id=1),
  1,
  Sportstype(id=1, type_sport='Golf')),
 (Person(id=3, age=30, gender='female', social_contacts=901, sport_type_id=0),
  0,
  Sportstype(id=0, type_sport='Skiing'))]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [33]:
def record_reader():
  return [(sport_type_id, person, sport_type) for person, sport_type_id, sport_type in join]

def map_group(sport_type_id, person, sport_type):
  yield(sport_type_id, person)

def reduce_group(sport_type_id, rows):
  yield f"sport type id={sport_type_id} for {len(rows)} people"

result = list(MapReduce(record_reader,map_group, reduce_group))
print(result)

['sport type id=3 for 1 people', 'sport type id=30 for 2 people', 'sport type id=1 for 1 people', 'sport type id=0 for 1 people']


#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [34]:
def record_reader():
    return [(None, m) for m in matrix]
matrix = [
    (3, 6, 9), (2, 4, 8), (1, 5, 7),
    (3, 6, 9), (2, 4, 8), (1, 5, 7),
    (3, 6, 9), (2, 4, 8), (1, 5, 79),
]

vector = [
    (3, 6), (2, 4), (5, 7)
]

def map_matrix_vector(_, matrix_row):
    row, col, value = matrix_row
    for vector_col, vector_value in vector:
        if vector_col == col:
            yield (row, value * vector_value)


def reduce_matrix_vector(row, values: Iterator[int]):
    yield (row, sum(values))

result = list(MapReduce(record_reader, map_matrix_vector, reduce_matrix_vector))
print(result)




[(1, 651)]


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [35]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [36]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  """
  Читает матрицу big_mat и выдает пары (ключ, значение).
  Ключ - кортеж (j, k), представляющий индекс элемента в матрице.
  Значение - само значение элемента.
  """
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k]) #Выдача пары (индекс, значение)

def MAP(k1, v1):
  """
  Выполняет операцию map.
  Вход: k1 (кортеж (j, k)) - индекс из RECORDREADER, v1 (число) - значение из RECORDREADER.
  Выход: Выдает пары (ключ, значение). Ключ - (i, k), значение - результат вычислений.
  """
  (j, k) = k1
  w = v1
  # solution code that yield(k2,v2) pairs
  for i in range(I):
    k2 = (i,k)
    v2 = small_mat[i,j]*w
    yield (k2, v2)

def REDUCE(key, values):
  """
  Выполняет операцию reduce.
  Вход: key (кортеж (i, k)) - ключ, values (список) - список значений из MAP с тем же ключом.
  Выход: Выдает одну пару (ключ, значение) для каждого уникального ключа.
  """
  (i, k) = key
  # solution code that yield(k3,v3) pairs
  k3=(i,k)

  v3=0
  for j in range(J):
    v3+=values[j]

  yield (k3,v3)

Проверьте своё решение

In [37]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [38]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [39]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  """
  Генератор, который производит тройки:
  ((i,j), значение из small_mat, ((j,k), значение из big_mat)).
  во вложенных циклах перебираются все возможные комбинации индексов.
  """
  for i in range(I):
    for j in range (J):
      for k in range (K):
        yield (((i,j), small_mat[i,j], ((j,k), big_mat[j,k])))

def MAP(el1, el2):
  """
  Функция Map принимает два элемента:
  el1: ((i,j), значение из small_mat)
  el2: ((j,k), значение из big_mat)
  Вычисляет произведение значений и выдает пару ((i,k), произведение).
  """
  (i,j), v1 = el1
  (j,k), v2 = el2
  yield ((i,k), v1*v2)

def REDUCE(key, values):
  """
  Функция Reduce принимает ключ (i,k) и список значений,
  которые соответствуют этому ключу. Вычисляет сумму значений.
  """
  (i, k) = key
  # solution code that yield(k3,v3) pairs
  k3=(i,k)

  v3=0
  for j in range(J):
    v3+=values[j]

  yield (k3,v3)




Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [40]:
import numpy as np
I = 2
J = 3
K = 4 * 10

small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def INPUTFORMAT():
    """
    Генератор, разбивающий входные данные на части для параллельной обработки.
    Разбивает строки small_mat на части размером split_size для передачи в RECORDREADER.
    """
    def RECORDREADER(i_range):
        """
        Внутренний генератор, который производит пары элементов из small_mat и big_mat.
        Перебирает все комбинации индексов i, j, k в заданном диапазоне i.
        """
        for i in i_range:
            for j in range(J):
                for k in range(K):
                    yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

    maps = 2  # Число map-задач
    split_size = int(np.ceil(I / maps))  # Размер каждой части входных данных
    for i in range(0, I, split_size):
        yield RECORDREADER(range(i, min(i + split_size, I)))


def MAP(element1, element2):
    """
    Функция Map принимает пары элементов из small_mat и big_mat,
    вычисляет их произведение и выдает пару (ключ, значение).
    Ключ - (i, k), значение - произведение элементов.
    """
    (i, j), v1 = element1
    (j, k), v2 = element2
    yield ((i, k), v1 * v2)


def REDUCE(key, values):
    """
    Функция Reduce суммирует значения для каждого ключа (i, k).
    """
    (i, k) = key
    v3 = sum(values)
    yield ((i, k), v3)


def partitioner(key):
    """
    Функция распределения ключей по reducer-задачам.
    """
    return hash(key) % reducers


reference_solution = np.matmul(small_mat, big_mat)

maps = 2
reducers = 2
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for partition_id, partition in partitioned_output]


solution = []
for output_part in partitioned_output:
    for element in output_part[1]:
        solution.append(element)

# Преобразование результата в матрицу для сравнения
solution_matrix = np.zeros((I, K))
for (i, k), v in solution:
    solution_matrix[i, k] = v

print(np.allclose(reference_solution, solution_matrix))
print(solution_matrix)



240 key-value pairs were sent over a network.
True
[[0.36794092 0.65297661 0.63071417 0.61649128 0.6692687  0.72858805
  0.58504596 0.88816083 0.71347861 0.54641612 0.95235339 0.35371872
  0.32212991 0.40618131 0.53232158 0.3002591  0.60363939 0.70196235
  0.31536324 0.61000841 0.29555466 0.42057168 0.72281822 0.72165035
  0.80099845 0.46562391 0.63637346 0.73389411 0.52495449 0.61975477
  0.47065042 0.16152204 0.56292242 0.75293401 0.45647995 0.37762146
  0.44302378 0.80100509 0.30794896 0.59935822]
 [0.33547899 0.64730209 0.63371201 0.67354952 0.58918959 0.64577155
  0.66245038 0.87132764 0.72440008 0.64278878 0.91375023 0.29451422
  0.28177176 0.36297924 0.53850912 0.36190831 0.67910724 0.66795628
  0.2611174  0.43317894 0.28299674 0.47362682 0.78307441 0.66847344
  0.8107369  0.41896278 0.56692834 0.64435408 0.60225194 0.64382079
  0.37942731 0.1925814  0.61172541 0.7194738  0.42230828 0.31844425
  0.372821   0.75381787 0.30095368 0.56492381]]


Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

Для того, чтобы обобщить все предыдущие решения (когда каждая матрица генерируется нексколькими RECORDREADER-ами), необходимо расширить решение для того чтобы появилась возможность работать с несколькими генераторами данных для каждой матрицы.
Если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы необходимо проверить, что все возможные индексы матрицы будут правильно обработаны.