# Введение в MapReduce модель на Python


In [32]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. 

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [26]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*
 
mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL 

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str
    
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)
 
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication 

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])
 
def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])
      
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.905589809636405),
 (1, 2.905589809636405),
 (2, 2.905589809636405),
 (3, 2.905589809636405),
 (4, 2.905589809636405)]

## Inverted index 

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)
      
def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)
 
def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [49]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()
      
def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]
 
def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers
  
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)
  
  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*
 
flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount 

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)
      
  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)
  
# try to set COMBINER=REDUCER and look at the number of values sent over the network 
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None) 
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('it', 18), ('what', 10)]),
 (1, [])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for value in split:
        yield (value, None)
      
  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])
    
def MAP(value:int, _):
  yield (value, None)
  
def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)
  
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.0059671639991895065),
   (None, 0.07724245496172),
   (None, 0.08440804135613444),
   (None, 0.13575647907181598),
   (None, 0.14404826813474803),
   (None, 0.21204275967955666),
   (None, 0.21869633101751806),
   (None, 0.25055756276216923),
   (None, 0.28642389538931257),
   (None, 0.3834487515438496),
   (None, 0.3913614390023946),
   (None, 0.4041378102237341),
   (None, 0.41854626274930695),
   (None, 0.4704310153549396),
   (None, 0.4776995227348928),
   (None, 0.48992216726013693)]),
 (1,
  [(None, 0.5005353544023048),
   (None, 0.5135664686748047),
   (None, 0.53391984417089),
   (None, 0.5587932025401512),
   (None, 0.5673804905854288),
   (None, 0.6926646597910275),
   (None, 0.7237444251339501),
   (None, 0.7557883138083207),
   (None, 0.785709769245918),
   (None, 0.7937098630029404),
   (None, 0.7942646850708935),
   (None, 0.9160468126494941),
   (None, 0.9618068292060864),
   (None, 0.9820764489731459)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [2]:
def RECORDREADER(input_data):
    records = []
    for number in input_data:
        records.append((None, number))
    return records

def MAP(key, value):
    yield ("max", value)

def REDUCE(key, values):
    max_value = max(values)
    yield (key, max_value)

In [6]:
# Применим функцию MAP к каждой записи
mapped_values = [list(MAP(None, value)) for _, value in records]
print("\nMapped values after MAP:")
print(mapped_values)

# Соберем все значения для каждого ключа
grouped_values = {}
for mapped_value in mapped_values:
    key, value = mapped_value[0]  # Получаем кортеж из списка
    grouped_values.setdefault(key, []).append(value)

# Применим функцию REDUCE к сгруппированным значениям
reduced_values = [list(REDUCE(key, values)) for key, values in grouped_values.items()]
print("\nReduced values after REDUCE:")
print(reduced_values)



Mapped values after MAP:
[[('max', 3)], [('max', 8)], [('max', 2)], [('max', 10)], [('max', 5)]]

Reduced values after REDUCE:
[[('max', 10)]]


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [7]:
def record_reader(input_data):
    records = []
    for number in input_data:
        records.append((None, float(number)))  # преобразуем строку в число
    return records

def mapper(key, value):
    # Отправляем значение с ключом "mean" и пару (число, 1) для каждого числа
    yield "mean", (value, 1)

def reducer(key, values):
    total_sum = 0
    total_count = 0
    for num, count in values:
        total_sum += num
        total_count += count
    mean = total_sum / total_count if total_count != 0 else 0  # избегаем деления на ноль
    yield key, mean

In [8]:

# Входные данные
input_data = ["3", "8", "2", "10", "5"]

# Шаг 1: Чтение записей
records = record_reader(input_data)

# Шаг 2: Отображение (Map)
mapped_values = [list(mapper(None, value)) for _, value in records]

# Шаг 3: Редукция (Reduce)
grouped_values = {}
for values in mapped_values:
    key, value = values[0]
    grouped_values.setdefault(key, []).append(value)

reduced_values = [list(reducer(key, values)) for key, values in grouped_values.items()]


print(reduced_values)

[[('mean', 5.6)]]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [7]:
def group_by_key_sorted(input_data):
    # Сортируем данные по ключу
    sorted_data = sorted(input_data, key=lambda x: x[0])
    
    grouped_data = {}
    current_key = None
    current_values = []
    
    # Проходим по отсортированным данным и группируем их по ключу
    for key, value in sorted_data:
        if key != current_key:
            if current_key is not None:
                grouped_data[current_key] = current_values
            current_key = key
            current_values = [value]
        else:
            current_values.append(value)
    
    # Добавляем последнюю группу
    if current_key is not None:
        grouped_data[current_key] = current_values
    
    return grouped_data

input_data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5), ("a", 6)]

grouped_data = group_by_key_sorted(input_data)


for key, values in grouped_data.items():
    print(f"{key}: {values}")


a: [1, 3, 6]
b: [2, 4]
c: [5]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [10]:
def record_reader(input_data):
    records = []
    for item in input_data:
        records.append((item, None))  # каждый элемент сопоставляется с None
    return records

def mapper(key, value):
    yield key, None  # отправляем ключ с маркером None

def reducer(key, values):
    yield key, None  # просто игнорируем все значения и возвращаем только ключ

def drop_duplicates(input_data):
    # Шаг 1: Чтение записей
    records = record_reader(input_data)

    # Шаг 2: Отображение (Map)
    mapped_values = [list(mapper(key, value)) for key, value in records]

    # Шаг 3: Редукция (Reduce)
    grouped_values = {}
    for values in mapped_values:
        key, _ = values[0]  # получаем только ключ
        grouped_values.setdefault(key, None)  # сохраняем только уникальные ключи
    return list(grouped_values.keys())


input_data = ["apple", "banana", "apple", "orange", "banana"]


unique_elements = drop_duplicates(input_data)

print(unique_elements)


['apple', 'banana', 'orange']


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [11]:
# Оператор выборки (Selection) в MapReduce

def selection_mapper(relation, condition):
    selected_tuples = []
    for tuple in relation:
        if condition(tuple):  # Проверяем выполнение условия
            selected_tuples.append((tuple, tuple))  # Создаем пару ключ-значение (t, t)
    return selected_tuples

def selection_reducer(selected_tuples):
    return selected_tuples  # Функция идентичности, возвращаем полученные значения

# Пример использования:
R = [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]

# Условие выборки: выбрать кортежи с первым элементом больше 2
condition = lambda tuple: tuple[0] > 2

selected_tuples = selection_mapper(R, condition)


print("Selected tuples:", selected_tuples)


Selected tuples: [((3, 'c'), (3, 'c')), ((4, 'd'), (4, 'd')), ((5, 'e'), (5, 'e'))]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [14]:
# Оператор проекции (Projection) в MapReduce

def projection_mapper(relation, attributes):
    projected_tuples = []
    for tup in relation:  # Изменяем имя переменной tuple на tup
        projected_tuple = tuple(tup[i] for i in attributes)  # Оставляем только указанные атрибуты
        projected_tuples.append((projected_tuple, projected_tuple))  # Создаем пару ключ-значение (t', t')
    return projected_tuples

def projection_reducer(projected_tuples):
    return [(key, value) for key, _ in projected_tuples]  # Возвращаем первые значения в парах ключ-значение

R = [(1, 'a', True), (2, 'b', False), (3, 'c', True), (4, 'd', False), (5, 'e', True)]

# Множество атрибутов, на которые будет проекция
attributes = [0, 2]  # В данном случае атрибуты имеют индексы 0 и 2

projected_tuples = projection_mapper(R, attributes)

print("Projected tuples:", projected_tuples)


Projected tuples: [((1, True), (1, True)), ((2, False), (2, False)), ((3, True), (3, True)), ((4, False), (4, False)), ((5, True), (5, True))]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [15]:
# Оператор объединения (Union) в MapReduce

def union_mapper(relation):
    union_tuples = []
    for tuple in relation:
        union_tuples.append((tuple, tuple))  # Создаем пару ключ-значение (t, t)
    return union_tuples

def union_reducer(union_tuples):
    return [(key, value) for key, _ in union_tuples]  # Возвращаем первые значения в парах ключ-значение

R1 = [(1, 'a'), (2, 'b'), (3, 'c')]
R2 = [(4, 'd'), (5, 'e')]

union_tuples = union_mapper(R1 + R2)

print("Union tuples:", union_tuples)


Union tuples: [((1, 'a'), (1, 'a')), ((2, 'b'), (2, 'b')), ((3, 'c'), (3, 'c')), ((4, 'd'), (4, 'd')), ((5, 'e'), (5, 'e'))]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [1]:
# Оператор пересечения (Intersection) в MapReduce

def intersection_mapper(relation):
    intersection_tuples = []
    for tuple in relation:
        intersection_tuples.append((tuple, tuple))  # Создаем пару ключ-значение (t, t)
    return intersection_tuples

def intersection_reducer(intersection_tuples):
    intersection_results = []
    seen = set()  # Создаем множество для хранения уже увиденных ключей
    for key, value in intersection_tuples:
        if key == value and key not in seen:  # Проверяем, что значения кортежей совпадают и текущий ключ еще не был обработан
            intersection_results.append((key, value))  # Добавляем в результат пару (t, t)
            seen.add(key)  # Добавляем ключ во множество уже увиденных
    return intersection_results

R1 = [(1, 'a'), (2, 'b'), (3, 'c')]
R2 = [(2, 'b'), (3, 'c'), (4, 'd')]

intersection_tuples = intersection_mapper(R1 + R2)

intersection_results = intersection_reducer(intersection_tuples)

print("Intersection results:", intersection_results)


Intersection results: [((1, 'a'), (1, 'a')), ((2, 'b'), (2, 'b')), ((3, 'c'), (3, 'c')), ((4, 'd'), (4, 'd'))]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [24]:
# Оператор разницы (Difference) в MapReduce

def difference_mapper(relation, name):
    difference_tuples = []
    for tuple in relation:
        difference_tuples.append((tuple, name))  # Создаем пару (t, name)
    return difference_tuples

def difference_reducer(difference_tuples):
    difference_results = []
    for key, value in difference_tuples:
        if value == "R":  # Проверяем, принадлежит ли кортеж отношению R
            difference_results.append(key)  # Добавляем кортеж в результат
    return difference_results

R = [(1, 'a'), (2, 'b'), (3, 'c')]
S = [(2, 'b'), (3, 'c'), (4, 'd')]

difference_tuples = difference_mapper(R, "R") + difference_mapper(S, "S")

difference_results = difference_reducer(difference_tuples)

print("Difference results:", difference_results)


Difference results: [(1, 'a'), (2, 'b'), (3, 'c')]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [35]:

# Определение классов
User = namedtuple('User', ['id', 'age', 'social_contacts', 'gender', 'city_id'])
City = namedtuple('City', ['id', 'name'])

# Простые данные
users_collection = [
    User(id=1, age=22, gender="male", social_contacts=30, city_id=1),
    User(id=2, age=30, gender="female", social_contacts=150, city_id=2),
    User(id=3, age=25, gender="male", social_contacts=100, city_id=2),
    User(id=4, age=40, gender="male", social_contacts=80, city_id=3),
    User(id=5, age=35, gender="female", social_contacts=200, city_id=4),
]

cities_collection = [
    City(id=1, name="New York"),
    City(id=2, name="Los Angeles"),
    City(id=3, name="Chicago"),
    City(id=4, name="Houston"),
]

In [36]:
# Группировка пользователей по id города
def MAP_JOIN(city_id, row):
    yield (city_id, row)


def REDUCE_JOIN(city_id, rows):
    users = []
    city = None

    # Цикл для определения города (он будет единственным, так как группировка происходит по id города)
    for row in rows:
        if type(row) is User:
            users += [row]
        else:
            city = row

    for row in rows:
        if type(row) is User:
            # Возвращает тройки (a,b,c), где a - пользователь, b - id города, c - сам город
            yield (row, row.city_id, city)


# На первом месте стоит id города, на втором - сам город или пользователь
def RECORDREADER():
    return [(user.city_id, user) for user in users_collection] + [
        (city.id, city) for city in cities_collection
    ]


output = MapReduce(RECORDREADER, MAP_JOIN, REDUCE_JOIN)
output = list(output)
joined_data = output
joined_data

[(User(id=1, age=22, social_contacts=30, gender='male', city_id=1),
  1,
  City(id=1, name='New York')),
 (User(id=2, age=30, social_contacts=150, gender='female', city_id=2),
  2,
  City(id=2, name='Los Angeles')),
 (User(id=3, age=25, social_contacts=100, gender='male', city_id=2),
  2,
  City(id=2, name='Los Angeles')),
 (User(id=4, age=40, social_contacts=80, gender='male', city_id=3),
  3,
  City(id=3, name='Chicago')),
 (User(id=5, age=35, social_contacts=200, gender='female', city_id=4),
  4,
  City(id=4, name='Houston'))]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [38]:
def map_function(tuple):
    # Для каждого кортежа (a, b, c) создаем пару (a, b)
    return (tuple[0], tuple[1])

def reduce_function(key, values, operation):
    # Применяем агрегирующую операцию к списку значений
    if operation == 'SUM':
        result = sum(values)
    elif operation == 'MAX':
        result = max(values)
    elif operation == 'MIN':
        result = min(values)
    else:
        raise ValueError("Unsupported operation")

    # Возвращаем пару (key, result)
    return (key, result)

In [43]:
def map_reduce(data, map_function, reduce_function, operation):
    # Шаг маппинга
    mapped_data = []
    for tuple in data:
        mapped_data.append(map_function(tuple))

    # Группировка по ключу
    grouped_data = {}
    for key, value in mapped_data:
        if key not in grouped_data:
            grouped_data[key] = []
        grouped_data[key].append(value)

    # Шаг редукции
    reduced_data = []
    for key, values in grouped_data.items():
        reduced_data.append(reduce_function(key, values, operation))

    return reduced_data

# Пример данных
data = [
    (1, 10, 0),
    (2, 20, 0),
    (1, 30, 0),
    (1,60, 0),
    (2, 40, 0),
]

# Применяем операцию MapReduce с суммированием
result_sum = map_reduce(data, map_function, reduce_function, 'SUM')
print("Результат суммирования:", result_sum)

# Применяем операцию MapReduce с нахождением максимума
result_max = map_reduce(data, map_function, reduce_function, 'MAX')
print("Результат поиска максимума:", result_max)

Результат суммирования: [(1, 100), (2, 60)]
Результат поиска максимума: [(1, 60), (2, 40)]


# 

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$. 





In [None]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [45]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])
      
def MAP(k1, v1):
  (j, k) = k1
  w = v1
  # solution code that yield(k2,v2) pairs
  
def REDUCE(key, values):
  (i, k) = key
  # solution code that yield(k3,v3) pairs

Проверьте своё решение

In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat) 
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [None]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [46]:
I = 2
J = 3
K = 4 * 10
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)


# Генерирует пары элементов из обоих матриц вместе с соответствующими им индексами
# Элементы в паре предназначены для перемножения и дальнейшего суммирования (M[i,j] и N[i,k])
def RECORDREADER():
    for i in range(I):
        for j in range(J):
            for k in range(K):
                yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))


# Перемножает полученные элементы двух матриц
# Группирует произведения по индексу, соответствующему клетке в результирующей матрице
# Таким образом, для каждого ключа (i, k) генерируется J произведений
def MAP(element1, element2):
    (i, j), v1 = element1
    (j, k), v2 = element2

    yield ((i, k), v1 * v2)


# Суммирует полученные произведения для вычисления элементов результирующей матрицы
def REDUCE(key, values):
    (i, k) = key

    # solution code that yield(k3,v3) pairs
    k3 = (i, k)

    v3 = 0
    for j in range(J):
        v3 += values[j]

    yield (k3, v3)

In [47]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER. 

In [50]:
maps = 2
reducers = 2


def INPUTFORMAT():
    global maps

    # Так же, как и раньше, генерируются пары элементов из обоих матриц
    def RECORDREADER(i_range):
        for i in i_range:
            for j in range(J):
                for k in range(K):
                    yield (((i, j), small_mat[i, j]), ((j, k), big_mat[j, k]))

    # Разделение между вычислительными узлами происходит по строкам первой матрицы
    split_size = int(np.ceil(I / maps))
    for i in range(0, I, split_size):
        yield RECORDREADER(range(i, i + split_size))


# Возвращает произведения соответствующих элементов из обоих матриц, группируя их по индексу в результирующей матрице
def MAP(element1, element2):
    (i, j), v1 = element1
    (j, k), v2 = element2

    yield ((i, k), v1 * v2)


# Суммирует полученные произведения
def REDUCE(key, values):
    (i, k) = key

    # solution code that yield(k3,v3) pairs
    k3 = (i, k)

    v3 = 0
    for j in range(J):
        v3 += values[j]

    yield (k3, v3)


# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [
    (partition_id, list(partition)) for (partition_id, partition) in partitioned_output
]

# Соединение полученных произведений с разных вычислительных узлов
solution = []
for output_part in partitioned_output:
    for element in output_part[1]:
        solution += [element]

print(asmatrix(solution))
np.allclose(reference_solution, asmatrix(solution))  # should return true

240 key-value pairs were sent over a network.
[[1.63544037 2.29373659 0.71274961 0.99094195 1.03467102 0.61732011
  1.62357506 2.20524261 0.8070986  1.62847154 1.72244338 0.56227706
  2.37835642 1.31809456 0.50741842 1.90133955 1.9909832  1.48765261
  1.52970479 1.49267639 1.05081468 1.27917597 0.91335231 1.21609615
  1.49238527 1.71723853 1.13120643 2.05654655 1.68757134 1.48425003
  1.00943256 1.42502282 0.73815063 1.17601678 1.56332198 1.63938092
  1.81546926 1.14297004 1.08061976 0.61805257]
 [0.42563097 0.82503127 0.20467993 0.22421322 0.23719746 0.23150122
  0.59170804 0.64280665 0.31755402 0.85953813 0.81710215 0.0658076
  0.86956362 0.39685883 0.22588835 0.64233129 0.78026322 0.24978065
  0.39947759 0.40576777 0.2857537  0.41462108 0.43335791 0.42672988
  0.605676   0.73582276 0.69707557 0.6536482  0.47612548 0.75452036
  0.25644093 0.63594799 0.40987649 0.47504106 0.46929818 0.45766752
  0.71145257 0.29045648 0.39269412 0.16606477]]


True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [51]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J)
big_mat = np.random.rand(J,K)
reference_solution = np.matmul(small_mat, big_mat)

def INPUTFORMAT():
  first_mat = []
  for i in range(small_mat.shape[0]):
    for j in range(small_mat.shape[1]):
      first_mat.append(((0, i, j), small_mat[i,j])) # первая матрица

  global maps
  split_size =  int(np.ceil(len(first_mat)/maps))
  for i in range(0, len(first_mat), split_size):
    yield first_mat[i:i+split_size]

  second_mat = []
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      second_mat.append(((1, j, k), big_mat[j,k])) # вторая матрица

  split_size =  int(np.ceil(len(second_mat)/maps))
  for i in range(0, len(second_mat), split_size):
    yield second_mat[i:i+split_size]


def MAP_JOIN(k1, v1):
  (mat_num, i, j) = k1
  w = v1
  if mat_num == 0:
    yield (j, (mat_num, i, w))
  else:
    yield (i, (mat_num, j, w))


def REDUCE_JOIN(key, values):
  from_first_mat = [v for v in values if v[0] == 0]
  from_second_mat = [v for v in values if v[0] == 1]
  for f in from_first_mat:
    for s in from_second_mat:
      yield ((f[1], s[1]), f[2] * s[2])


def GET_JOINED():
  for j in joined:
    print("aa", j)
    yield j[1]


def MAP_MUL(k1, v1):
  yield (k1, v1)


def REDUCE_MUL(key, values):
  res_val = 0
  for v in values:
    res_val += v
  yield (key, res_val)

maps = 3
reducers = 2
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP_JOIN, REDUCE_JOIN, COMBINER=None)
joined = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]

mul_output = MapReduceDistributed(GET_JOINED, MAP_MUL, REDUCE_MUL, COMBINER=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in mul_output]

solution = []
for p in pre_result:
  for v in p[1]:
    solution.append(v)

print(solution)
np.allclose(reference_solution, asmatrix(solution)) # should return true

126 key-value pairs were sent over a network.
aa (0, [((0, 0), 0.03958056847057354), ((0, 1), 0.08892169628371085), ((0, 2), 0.2196615520839417), ((0, 3), 0.472626930339029), ((0, 4), 0.21466891515709058), ((0, 5), 0.11326351387602727), ((0, 6), 0.6354932395129124), ((0, 7), 0.44388138966805274), ((0, 8), 0.5147673805609104), ((0, 9), 0.16134608424129698), ((0, 10), 0.12306563286683249), ((0, 11), 0.06463731143493087), ((0, 12), 0.6965781965890289), ((0, 13), 0.2389730592941976), ((0, 14), 0.753916951470482), ((0, 15), 0.4112350335203873), ((0, 16), 0.47663093139127766), ((0, 17), 0.33879765849653193), ((0, 18), 0.3562328675771002), ((0, 19), 0.3446539496033303), ((0, 20), 0.4888828536031975), ((0, 21), 0.45880547742103395), ((0, 22), 0.3660085373752523), ((0, 23), 0.7106057198187479), ((0, 24), 0.6787010190445081), ((0, 25), 0.20599506293702763), ((0, 26), 0.750988060949888), ((0, 27), 0.5163651915832722), ((0, 28), 0.1294260638363276), ((0, 29), 0.0932140282590233), ((0, 30), 0.25890

True