# Введение в MapReduce модель на Python


In [93]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [94]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [95]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [96]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [97]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [98]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [99]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [100]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [101]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [102]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [103]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [104]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [105]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [106]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [107]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.6270776597288044),
 (1, 1.6270776597288044),
 (2, 1.6270776597288044),
 (3, 1.6270776597288044),
 (4, 1.6270776597288044)]

## Inverted index

In [108]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [109]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [110]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [111]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('what', 10)]),
 (1, [('a', 2), ('banana', 2), ('is', 18), ('it', 18)])]

## TeraSort

In [112]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.06939610420054843),
   (None, 0.13746043525942275),
   (None, 0.16847133923551616),
   (None, 0.18755811215844165),
   (None, 0.2016177027795375),
   (None, 0.21971787375387253),
   (None, 0.22969566434000543),
   (None, 0.25087857622953635),
   (None, 0.3139145204201683),
   (None, 0.3813705683690818),
   (None, 0.38674559339556636),
   (None, 0.3977572739863977),
   (None, 0.40697470658887347),
   (None, 0.442689175658985)]),
 (1,
  [(None, 0.5087515575029077),
   (None, 0.5690772663203993),
   (None, 0.5714486558062077),
   (None, 0.5765327932787665),
   (None, 0.5866964450381529),
   (None, 0.5943612386890614),
   (None, 0.6163840505786415),
   (None, 0.6282896912544205),
   (None, 0.6703912920737205),
   (None, 0.6800641232783328),
   (None, 0.7590992505944185),
   (None, 0.7685744644295737),
   (None, 0.7754433551256569),
   (None, 0.8633999683263243),
   (None, 0.94606355881858),
   (None, 0.9635575333373616)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [113]:
# Пример входных данных
numbers = [15, 22, 8, 42, 32, 3]

# Функция RECORDREADER создает пары (index, number)
def RECORDREADER():
  for index, number in enumerate(numbers):
    yield (index, number)

# Функция MAP выводит каждое число как (key, value) пару, где key - это фиктивный ключ
def MAP(index: int, value: int):
  yield ("max", value)

# Функция REDUCE находит максимальное значение среди всех значений, связанных с данным ключом
def REDUCE(key: str, values: Iterator[int]):
  yield (key, max(values))

# Здесь мы используем базовую версию MapReduce, не распределенную
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
print(output)

[('max', 42)]


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [114]:
# Пример входных данных
numbers = [15, 22, 8, 42, 32, 3]

# Функция RECORDREADER создает пары (index, number)
def RECORDREADER():
  for index, number in enumerate(numbers):
    yield (index, number)

# Функция MAP выводит каждое число с ключом "avg", вместе с 1, чтобы считать количество элементов
def MAP(index: int, value: int):
  yield ("avg", (value, 1))

# Функция REDUCE суммирует все значения и их количество, чтобы вычислить среднее
def REDUCE(key: str, value_pairs: Iterator[tuple]):
  total_sum, count = 0, 0
  for value, cnt in value_pairs:
    total_sum += value
    count += cnt
  yield (key, total_sum / count if count != 0 else 0)

# Повторное использование предыдущих определений flatten и groupbykey
# Определения не предоставлены в этом примере, но предполагается, что они уже определены

# Функция MapReduce для выполнения алгоритма
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполнение MapReduce алгоритма
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
print(output)

[('avg', 20.333333333333332)]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [115]:
from itertools import groupby
from operator import itemgetter

def groupByKey(iterable):
  # Сортировка итерируемого объекта по ключу
  sorted_iterable = sorted(iterable, key=itemgetter(0))
  # Группировка значений по их ключам
  for key, group in groupby(sorted_iterable, key=itemgetter(0)):
    yield (key, [item[1] for item in group])

# Пример использования функции groupByKey
pairs = [("apple", 1), ("banana", 2), ("apple", 3), ("banana", 4), ("cherry", 5)]

grouped = groupByKey(pairs)
print(list(grouped))

[('apple', [1, 3]), ('banana', [2, 4]), ('cherry', [5])]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [116]:
from typing import Iterator

# Пример входных данных
data = [1, 2, 2, 3, 4, 4, 4, 5, 6, 6]

# Функция RECORDREADER создает пары (index, value)
def RECORDREADER():
  for index, value in enumerate(data):
    yield (index, value)

# Функция MAP выводит каждое значение как (key, None) пару
def MAP(index: int, value: str):
  yield (value, None)

# Функция COMBINER агрегирует данные на этапе map
def COMBINER(key: str, values: Iterator[None]):
  yield (key, None)

# Функция REDUCE находит уникальные ключи
def REDUCE(key: str, values: Iterator[None]):
  yield key

# Helper functions to flatten lists and group by key
from itertools import groupby
from operator import itemgetter

def flatten(lst):
  return [item for sublist in lst for item in sublist]

def groupByKey(iterable):
  sorted_iterable = sorted(iterable, key=itemgetter(0))
  for key, group in groupby(sorted_iterable, key=itemgetter(0)):
    yield (key, [item[1] for item in group])

# Основная функция MapReduce
def MapReduce(RECORDREADER, MAP, COMBINER, REDUCE):
  # Шаг MAP
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  # Шаг COMBINE
  combine_output = flatten(map(lambda x: COMBINER(*x), groupByKey(map_output)))
  # Шаг SHUFFLE & SORT
  shuffle_output = groupByKey(combine_output)
  # Шаг REDUCE
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм
output = MapReduce(RECORDREADER, MAP, COMBINER, REDUCE)
output = list(output)
print(output)

[1, 2, 3, 4, 5, 6]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [117]:
from typing import Iterator

# Пример входных данных - список кортежей
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35), (4, "David", 40), (5, "Eve", 45)]

# Функция RECORDREADER создает пары (index, tuple)
def RECORDREADER():
  for index, tuple in enumerate(data):
    yield (index, tuple)

# Функция MAP применяет предикат к каждому кортежу и выводит кортеж, если предикат истинен
def MAP(index: int, tuple: tuple):
  if tuple[2] > 30:  # Предикат: возраст больше 30
    yield (tuple, tuple)

# Функция REDUCE просто возвращает полученный кортеж
def REDUCE(key: tuple, values: Iterator[tuple]):
  yield key

# Функция MapReduce остается такой же, как в предыдущих примерах
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм
output = MapReduce(RECORDREADER, MAP, REDUCE)
selected_tuples = list(output)
print(selected_tuples)

[(3, 'Charlie', 35), (4, 'David', 40), (5, 'Eve', 45)]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [118]:
# Выбираем атрибуты для проекции
# Допустим, мы хотим спроецировать только на "ID" и "Name"
S = [0, 1]  # Индексы атрибутов

# Функция RECORDREADER остается той же
def RECORDREADER():
  for index, tuple in enumerate(data):
    yield (index, tuple)

# Функция MAP для проекции
def MAP(index: int, input_tuple: tuple):
  projected_tuple = tuple(i for i in input_tuple)
  projected_tuple = tuple(projected_tuple)  # Преобразуем в кортеж
  yield (projected_tuple, projected_tuple)

# Функция REDUCE для проекции
def REDUCE(key: tuple, values: Iterator[tuple]):
  yield key

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм для проекции
output = MapReduce(RECORDREADER, MAP, REDUCE)
projected_tuples = list(output)
print(projected_tuples)

[(1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 35), (4, 'David', 40), (5, 'Eve', 45)]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [119]:
# Пример данных из двух наборов (R и S)
R = [
  (1, 'Alice', 'Math'),
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry')
]

S = [
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry'),
  (4, 'David', 'Biology')
]

# Функция RECORDREADER остается той же, но теперь будет объединять два набора данных
def RECORDREADER():
  for index, tuple in enumerate(R):
    yield (index, tuple)
  for index, tuple in enumerate(S, start=len(R)):
    yield (index, tuple)

# Функция MAP для объединения
def MAP(index: int, input_tuple: tuple):
  yield (input_tuple, input_tuple)

# Функция REDUCE для объединения
def REDUCE(key: tuple, values):
  yield (key, key)

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм для объединения
output = MapReduce(RECORDREADER, MAP, REDUCE)
union_tuples = list(output)
print(union_tuples)

[((1, 'Alice', 'Math'), (1, 'Alice', 'Math')), ((2, 'Bob', 'Physics'), (2, 'Bob', 'Physics')), ((3, 'Charlie', 'Chemistry'), (3, 'Charlie', 'Chemistry')), ((4, 'David', 'Biology'), (4, 'David', 'Biology'))]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [120]:
# Пример данных из двух наборов (R и S)
R = [
  (1, 'Alice', 'Math'),
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry')
]

S = [
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry'),
  (4, 'David', 'Biology')
]

# Функция RECORDREADER объединяет два набора данных с меткой, откуда они пришли
def RECORDREADER():
  for index, tuple in enumerate(R):
    yield (index, tuple, 'R')
  for index, tuple in enumerate(S, start=len(R)):
    yield (index, tuple, 'S')

# Функция MAP для пересечения
def MAP(index: int, input_tuple: tuple, source: str):
  yield (input_tuple, source)

# Функция REDUCE для пересечения
def REDUCE(key: tuple, values):
  sources = list(values)
  if len(sources) == 2 and 'R' in sources and 'S' in sources:
    yield (key, key)

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм для пересечения
output = MapReduce(RECORDREADER, MAP, REDUCE)
intersection_tuples = list(output)
print(intersection_tuples)

[((2, 'Bob', 'Physics'), (2, 'Bob', 'Physics')), ((3, 'Charlie', 'Chemistry'), (3, 'Charlie', 'Chemistry'))]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [121]:
# Пример данных из двух наборов (R и S)
R = [
  (1, 'Alice', 'Math'),
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry')
]

S = [
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry'),
  (4, 'David', 'Biology')
]

# Функция RECORDREADER объединяет два набора данных с меткой, откуда они пришли
def RECORDREADER():
  for index, tuple in enumerate(R):
    yield (index, tuple, 'R')
  for index, tuple in enumerate(S, start=len(R)):
    yield (index, tuple, 'S')

# Функция MAP для разницы
def MAP(index: int, input_tuple: tuple, source: str):
  yield (input_tuple, source)

# Функция REDUCE для разницы
def REDUCE(key: tuple, values):
  sources = list(values)
  if sources == ['R']:
    yield (key, key)

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм для разницы
output = MapReduce(RECORDREADER, MAP, REDUCE)
difference_tuples = list(output)
print(difference_tuples)

[((1, 'Alice', 'Math'), (1, 'Alice', 'Math'))]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [122]:
# Пример данных из двух наборов (R и S)
R = [
  (1, 'Alice', 'Math'),
  (2, 'Bob', 'Physics'),
  (3, 'Charlie', 'Chemistry')
]

S = [
  ('Math', 'Dr. Smith'),
  ('Physics', 'Dr. Johnson'),
  ('Biology', 'Dr. Davis')
]

# Функция RECORDREADER объединяет два набора данных с меткой, откуда они пришли
def RECORDREADER():
  for index, tuple in enumerate(R):
    yield (index, tuple, 'R')
  for index, tuple in enumerate(S, start=len(R)):
    yield (index, tuple, 'S')

# Функция MAP для естественного соединения
def MAP(index: int, input_tuple: tuple, source: str):
  if source == 'R':
    _, name, subject = input_tuple
    yield (subject, (source, name))
  elif source == 'S':
    subject, professor = input_tuple
    yield (subject, (source, professor))

# Функция REDUCE для естественного соединения
def REDUCE(key: str, values):
  r_values = [value[1] for value in values if value[0] == 'R']
  s_values = [value[1] for value in values if value[0] == 'S']
  for r_value in r_values:
    for s_value in s_values:
      yield (r_value, key, s_value)

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполняем MapReduce алгоритм для естественного соединения
output = MapReduce(RECORDREADER, MAP, REDUCE)
join_tuples = list(output)
print(join_tuples)

[('Alice', 'Math', 'Dr. Smith'), ('Bob', 'Physics', 'Dr. Johnson')]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [123]:
# Пример данных
data = [
  ('Alice', 'Math', 80),
  ('Alice', 'Physics', 90),
  ('Bob', 'Math', 85),
  ('Bob', 'Physics', 75),
  ('Charlie', 'Math', 95),
  ('Charlie', 'Physics', 85)
]

# Функция RECORDREADER для чтения данных
def RECORDREADER():
  for index, tuple in enumerate(data):
    yield (index, tuple)

# Функция MAP для группировки
def MAP(index: int, input_tuple: tuple):
  student, subject, score = input_tuple
  yield (student, score)

# Пример функции агрегации SUM
def REDUCE_SUM(key: str, values):
  yield (key, sum(values))

# Пример функции агрегации MAX
def REDUCE_MAX(key: str, values):
  yield (key, max(values))

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Пример использования с функцией SUM
output_sum = MapReduce(RECORDREADER, MAP, REDUCE_SUM)
aggregated_sum = list(output_sum)
print("Aggregated SUM:", aggregated_sum)

# Пример использования с функцией MAX
output_max = MapReduce(RECORDREADER, MAP, REDUCE_MAX)
aggregated_max = list(output_max)
print("Aggregated MAX:", aggregated_max)

Aggregated SUM: [('Alice', 170), ('Bob', 160), ('Charlie', 180)]
Aggregated MAX: [('Alice', 90), ('Bob', 85), ('Charlie', 95)]


#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [124]:
# Пример данных матрицы A и разбиения вектора V на части
A = [
  (0, 0, 1), (0, 1, 2), (0, 2, 3),
  (1, 0, 4), (1, 1, 5), (1, 2, 6),
  (2, 0, 7), (2, 1, 8), (2, 2, 9)
]

V_parts = {
  0: [(0, 1)],  # Часть вектора V[0]
  1: [(1, 2)],  # Часть вектора V[1]
  2: [(2, 3)]   # Часть вектора V[2]
}

# Функция RECORDREADER объединяет элементы матрицы и части вектора
def RECORDREADER():
  for index, (i, j, value) in enumerate(A):
    yield (index, (i, j, value))
  for part, elements in V_parts.items():
    for index, (j, value) in enumerate(elements, start=len(A)):
      yield (index, (j, value))

# Функция MAP для умножения матрицы на вектор
def MAP(index: int, input_tuple: tuple):
  if len(input_tuple) == 3:  # Это элемент матрицы
    i, j, aij = input_tuple
    yield (j, ('matrix', i, aij))
  elif len(input_tuple) == 2:  # Это элемент вектора
    j, vj = input_tuple
    yield (j, ('vector', vj))

# Функция REDUCE для сопоставления элементов матрицы и вектора
def REDUCE(key: int, values):
  matrix_elements = []
  vector_value = None
  for value in values:
    if value[0] == 'matrix':
      matrix_elements.append((value[1], value[2]))
    elif value[0] == 'vector':
      vector_value = value[1]

  if vector_value is not None:
    for i, aij in matrix_elements:
      yield (i, aij * vector_value)

# Второй этап Reduce для суммирования по строкам
def REDUCE_SUM(key: int, values):
  yield (key, sum(values))

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Первый этап MapReduce для умножения матрицы на вектор
intermediate_output = MapReduce(RECORDREADER, MAP, REDUCE)
intermediate_pairs = list(intermediate_output)

# Второй этап MapReduce для суммирования по строкам
def RECORDREADER_SUM():
  for index, pair in enumerate(intermediate_pairs):
    yield (index, pair)

# Функция MAP для второго этапа
def MAP_SUM(index: int, pair: tuple):
  yield pair[0], pair[1]

output = MapReduce(RECORDREADER_SUM, MAP_SUM, REDUCE_SUM)
final_result = list(output)
print(final_result)

[(0, 14), (1, 32), (2, 50)]


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [125]:
# Пример данных для матрицы M и N
M = [
  (0, 0, 1), (0, 1, 2), (1, 0, 3), (1, 1, 4)
]
N = [
  (0, 0, 5), (0, 1, 6), (1, 0, 7), (1, 1, 8)
]

# Функция RECORDREADER объединяет элементы обеих матриц
def RECORDREADER():
  for index, (i, j, mij) in enumerate(M):
    yield (index, ('M', i, j, mij))
  for index, (j, k, njk) in enumerate(N, start=len(M)):
    yield (index, ('N', j, k, njk))

# Функция MAP для перемножения матриц
def MAP(index: int, input_tuple: tuple):
  if input_tuple[0] == 'M':
    _, i, j, mij = input_tuple
    yield (j, ('M', i, mij))
  elif input_tuple[0] == 'N':
    _, j, k, njk = input_tuple
    yield (j, ('N', k, njk))

# Функция REDUCE для объединения пар элементов матриц M и N
def REDUCE(key: int, values):
  m_elements = []
  n_elements = []
  for value in values:
    if value[0] == 'M':
      m_elements.append((value[1], value[2]))
    elif value[0] == 'N':
      n_elements.append((value[1], value[2]))

  for (i, mij) in m_elements:
    for (k, njk) in n_elements:
      yield (i, k, mij * njk)

# Второй этап Reduce для суммирования по парам (i, k)
def REDUCE_SUM(key: tuple, values):
  yield (key[0], key[1], sum(values))

# Функция flatten для разворачивания вложенных списков
def flatten(lst):
  for sublist in lst:
    for item in sublist:
      yield item

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(pairs):
  grouped = defaultdict(list)
  for key, value in pairs:
    grouped[key].append(value)
  return grouped.items()

# Функция MapReduce остается такой же
def MapReduce(RECORDREADER, MAP, REDUCE):
  map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
  shuffle_output = groupbykey(map_output)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Первый этап MapReduce для объединения пар элементов матриц M и N
intermediate_output = MapReduce(RECORDREADER, MAP, REDUCE)
intermediate_pairs = list(intermediate_output)

# Второй этап MapReduce для суммирования по парам (i, k)
def RECORDREADER_SUM():
  for index, pair in enumerate(intermediate_pairs):
    yield (index, pair)

# Функция MAP_SUM для второго этапа
def MAP_SUM(index: int, pair: tuple):
  yield (pair[0], pair[1]), pair[2]

output = MapReduce(RECORDREADER_SUM, MAP_SUM, REDUCE_SUM)
final_result = list(output)
print(final_result)

[(0, 0, 19), (0, 1, 22), (1, 0, 43), (1, 1, 50)]


Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [126]:
import numpy as np

# Размерности матриц
I = 2
J = 3
K = 4 * 10

# Матрица, которая хранится в памяти
small_mat = np.random.rand(I, J)

# Большая матрица, которая будет обрабатываться через RECORDREADER
big_mat = np.random.rand(J, K)

# Функция RECORDREADER для чтения большой матрицы
def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j, k), big_mat[j, k])

# Функция MAP для перемножения матриц
def MAP(k1, v1):
  (j, k) = k1
  w = v1
  for i in range(small_mat.shape[0]):
    yield ((i, k), small_mat[i, j] * w)

# Функция REDUCE для суммирования произведений
def REDUCE(key, values):
  yield (key, sum(values))

# Функция flatten для разворачивания вложенных списков
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

# Функция groupbykey для группировки значений по ключу
from collections import defaultdict

def groupbykey(iterable):
  grouped = defaultdict(list)
  for (k2, v2) in iterable:
    grouped[k2].append(v2)
  return grouped.items()

# Функция MapReduce
def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Выполнение MapReduce для перемножения матриц
output = MapReduce(RECORDREADER, MAP, REDUCE)
final_result = list(output)

# Преобразуем результат в numpy массив
result = np.zeros((I, K))
for ((i, k), value) in final_result:
  result[i, k] = value

print("Resulting Matrix P:")
print(result)

Resulting Matrix P:
[[0.87628182 1.07220129 1.13798572 0.40196932 1.34223776 0.7975413
  0.47292603 0.61971085 0.55249335 1.15597973 0.41800862 1.22227524
  1.5761527  0.63456494 0.32221875 0.51035887 0.3568985  1.14534246
  1.26391603 1.27428072 0.6024595  1.19168772 0.35344    1.17495639
  0.80773392 0.59284767 0.76464422 1.03564724 0.78852058 0.56943234
  1.06254314 1.04611639 0.82741984 0.73660651 1.33841848 0.93383633
  1.28296374 0.69354685 1.27783083 0.70921614]
 [0.54434386 0.59390888 0.65373334 0.36231805 0.98110427 0.97051386
  0.64071239 0.37055782 0.35032685 0.9526138  0.54998662 1.28232777
  1.36234757 0.48304172 0.26679643 0.35402038 0.17108097 1.12450495
  0.81890508 1.16921786 0.79957576 0.97146797 0.33989061 0.59626075
  0.86555736 0.68130905 1.03228519 0.44273534 0.53111884 0.61846827
  0.88646656 0.79221512 1.04623349 0.46621995 1.09033509 0.52911184
  1.08668821 0.54217854 0.84008226 0.89352057]]


Проверьте своё решение

In [127]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [128]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [129]:
import numpy as np
from collections import defaultdict

# Размеры матриц
I = 2
J = 3
K = 4 * 10

# Генерация матриц с использованием numpy
M = np.random.rand(I, J)
N = np.random.rand(J, K)

# Функция RECORDREADER для генерации обеих матриц
def RECORDREADER():
  # Чтение первой матрицы M
  for i in range(M.shape[0]):
    for j in range(M.shape[1]):
      yield ('M', (i, j, M[i, j]))

  # Чтение второй матрицы N
  for j in range(N.shape[0]):
    for k in range(N.shape[1]):
      yield ('N', (j, k, N[j, k]))

# Функция MAP для перемножения матриц
def MAP(key, value):
  label, (x, y, v) = key, value

  if label == 'M':
    i, j, mij = x, y, v
    for k in range(K):  # Генерируем пары для всех столбцов
      yield ((i, k), ('M', j, mij))
  elif label == 'N':
    j, k, njk = x, y, v
    for i in range(I):  # Генерируем пары для всех строк
      yield ((i, k), ('N', j, njk))

# Функция REDUCE для перемножения элементов матриц
def REDUCE(key, values):
  m_elements = {}
  n_elements = {}

  for label, idx, val in values:
    if label == 'M':
      m_elements[idx] = val
    elif label == 'N':
      n_elements[idx] = val

  result = 0
  for j in m_elements:
    if j in n_elements:
      result += m_elements[j] * n_elements[j]

  yield (key, result)

# Функция flatten для разворачивания вложенных списков
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

# Функция groupbykey для группировки значений по ключу
def groupbykey(iterable):
  grouped = defaultdict(list)
  for (k2, v2) in iterable:
    grouped[k2].append(v2)
  return grouped.items()

# Функция MapReduce
def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Выполнение MapReduce для перемножения матриц
output = MapReduce(RECORDREADER, MAP, REDUCE)
final_result = list(output)

# Преобразуем результат в numpy массив
result = np.zeros((I, K))
for ((i, k), value) in final_result:
  result[i, k] = value

print("Resulting Matrix P:")
print(result)

Resulting Matrix P:
[[0.44132547 0.63837578 0.28181828 0.45810148 0.2712535  0.3078513
  0.12288188 0.14125358 0.48763706 0.57892861 0.44628637 0.38051695
  0.27650911 0.52217286 0.18717489 0.40478612 0.74502857 0.43473919
  0.41125028 0.4168957  0.47483787 0.51410015 0.35796236 0.51573554
  0.18721301 0.11330107 0.2687599  0.17132594 0.53230741 0.41033101
  0.56625092 0.58628379 0.37983701 0.21823606 0.16498484 0.32928377
  0.56998549 0.61755381 0.43710259 0.27289418]
 [0.81635225 1.11104245 0.38862306 1.0776314  0.67682796 0.35872222
  0.41422935 0.65331146 0.7257337  1.3762253  0.94782887 0.79461783
  0.36360472 1.01360117 0.64037756 0.9600877  1.43359601 1.14437433
  0.65377906 1.03787954 1.23435129 1.07946698 0.8913828  1.18946476
  0.65277557 0.52595692 0.8335858  0.36490441 1.19948234 0.49194087
  1.07739505 0.84977736 0.95493601 0.36861285 0.74967774 0.74799914
  1.31930838 1.3500017  0.84201312 0.73093398]]


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [130]:
import numpy as np
from collections import defaultdict

# Размеры матриц
I = 2
J = 3
K = 4 * 10

# Генерация матриц с использованием numpy
M = np.random.rand(I, J)
N = np.random.rand(J, K)

# Функция RECORDREADER для чтения первой матрицы M
def RECORDREADER_M():
  for i in range(M.shape[0]):
    for j in range(M.shape[1]):
      yield ('M', (i, j, M[i, j]))

# Функция RECORDREADER для чтения второй матрицы N
def RECORDREADER_N():
  for j in range(N.shape[0]):
    for k in range(N.shape[1]):
      yield ('N', (j, k, N[j, k]))

# Функция flatten для разворачивания вложенных списков
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

# Функция groupbykey для группировки значений по ключу
def groupbykey(iterable):
  grouped = defaultdict(list)
  for (k2, v2) in iterable:
    grouped[k2].append(v2)
  return grouped.items()

# Функция MAP для матрицы M
def MAP_M(key, value):
  label, (i, j, mij) = key, value
  for k in range(K):  # Проход по всем столбцам матрицы N
    yield ((i, k), ('M', j, mij))

# Функция MAP для матрицы N
def MAP_N(key, value):
  label, (j, k, njk) = key, value
  for i in range(I):  # Проход по всем строкам матрицы M
    yield ((i, k), ('N', j, njk))

# Функция REDUCE для объединения пар элементов матриц
def REDUCE(key, values):
  m_elements = {}
  n_elements = {}

  for label, idx, val in values:
    if label == 'M':
      m_elements[idx] = val
    elif label == 'N':
      n_elements[idx] = val

  result = 0
  for j in m_elements:
    if j in n_elements:
      result += m_elements[j] * n_elements[j]

  yield (key, result)

# Функция MapReduce для объединения данных из двух RECORDREADERS
def MapReduce_Dual(RECORDREADER_1, RECORDREADER_2, MAP_1, MAP_2, REDUCE):
  map_output_1 = flatten(map(lambda x: MAP_1(*x), RECORDREADER_1()))
  map_output_2 = flatten(map(lambda x: MAP_2(*x), RECORDREADER_2()))
  shuffle_output = groupbykey(flatten([map_output_1, map_output_2]))
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Выполнение MapReduce для перемножения матриц
output = MapReduce_Dual(RECORDREADER_M, RECORDREADER_N, MAP_M, MAP_N, REDUCE)
final_result = list(output)

# Преобразуем результат в numpy массив
result = np.zeros((I, K))
for ((i, k), value) in final_result:
  result[i, k] = value

print("Resulting Matrix P:")
print(result)

Resulting Matrix P:
[[0.28009127 0.46426341 0.56609324 0.26119633 0.03777006 0.83076508
  0.75200157 0.30085162 0.77109818 0.61759607 0.35171347 0.6283101
  0.69916692 0.84308084 0.81547412 0.51381594 0.79467067 0.74283153
  0.9470885  1.18290131 0.67403221 0.63304156 0.91867352 0.93415733
  0.84799752 0.56378341 0.99636798 0.29811095 0.58893762 0.63162706
  0.66032358 0.77867565 0.52054541 1.14907653 1.0803194  0.68742623
  0.79644498 0.64760762 0.70669825 0.96787373]
 [0.20906926 0.42275258 0.5687427  0.17672481 0.03662875 0.64143621
  0.67596641 0.22172287 0.75524708 0.4536242  0.29269473 0.61373173
  0.52588806 0.70817357 0.58166477 0.47188072 0.62088606 0.68812268
  0.71244002 0.94965832 0.42811147 0.47473735 0.72168512 0.75116891
  0.66993895 0.33537849 0.79948243 0.24343265 0.5058955  0.62005972
  0.49487026 0.70376963 0.52836955 0.92922395 0.86524747 0.44602764
  0.6670271  0.54748399 0.48602166 0.71479006]]


Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [131]:
import numpy as np
from collections import defaultdict
from itertools import chain
import random

# Размеры матриц
I = 2
J = 3
K = 4 * 10

# Генерация матриц с использованием numpy
M = np.random.rand(I, J)
N = np.random.rand(J, K)

# Определение случайного подмножества элементов для матрицы M
def RECORDREADER_M1():
  indices = random.sample(range(I * J), k=int(I * J * 0.5))
  for index in indices:
    i, j = divmod(index, J)
    yield ('M', (i, j, M[i, j]))

def RECORDREADER_M2():
  indices = random.sample(range(I * J), k=int(I * J * 0.5))
  for index in indices:
    i, j = divmod(index, J)
    yield ('M', (i, j, M[i, j]))

# Определение случайного подмножества элементов для матрицы N
def RECORDREADER_N1():
  indices = random.sample(range(J * K), k=int(J * K * 0.5))
  for index in indices:
    j, k = divmod(index, K)
    yield ('N', (j, k, N[j, k]))

def RECORDREADER_N2():
  indices = random.sample(range(J * K), k=int(J * K * 0.5))
  for index in indices:
    j, k = divmod(index, K)
    yield ('N', (j, k, N[j, k]))

# Функция flatten для разворачивания вложенных списков
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

# Функция groupbykey для группировки значений по ключу
def groupbykey(iterable):
  grouped = defaultdict(list)
  for (k2, v2) in iterable:
    grouped[k2].append(v2)
  return grouped.items()

# Функция MAP для матрицы M
def MAP_M(key, value):
  label, (i, j, mij) = key, value
  for k in range(K):  # Проход по всем столбцам матрицы N
    yield ((i, k), ('M', j, mij))

# Функция MAP для матрицы N
def MAP_N(key, value):
  label, (j, k, njk) = key, value
  for i in range(I):  # Проход по всем строкам матрицы M
    yield ((i, k), ('N', j, njk))

# Функция REDUCE для объединения пар элементов матриц
def REDUCE(key, values):
  m_elements = {}
  n_elements = {}

  for label, idx, val in values:
    if label == 'M':
      m_elements[idx] = val
    elif label == 'N':
      n_elements[idx] = val

  result = 0
  for j in m_elements:
    if j in n_elements:
      result += m_elements[j] * n_elements[j]

  yield (key, result)

# Функция MapReduce для объединения данных из нескольких RECORDREADER-ов
def MapReduce_Multi(RECORDREADERS, MAPS, REDUCE):
  map_outputs = flatten(map(lambda x: MAPS[0](*x), RECORDREADERS[0]()))
  for i in range(1, len(RECORDREADERS)):
    map_outputs = chain(map_outputs, flatten(map(lambda x: MAPS[i](*x), RECORDREADERS[i]())))
  shuffle_output = groupbykey(map_outputs)
  reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
  return reduce_output

# Список RECORDREADERS и MAP-функций
RECORDREADERS = [RECORDREADER_M1, RECORDREADER_M2, RECORDREADER_N1, RECORDREADER_N2]
MAPS = [MAP_M, MAP_M, MAP_N, MAP_N]

# Выполнение MapReduce для перемножения матриц
output = MapReduce_Multi(RECORDREADERS, MAPS, REDUCE)
final_result = list(output)

# Преобразуем результат в numpy массив
result = np.zeros((I, K))
for ((i, k), value) in final_result:
  result[i, k] = value

print("Resulting Matrix P:")
print(result)

Resulting Matrix P:
[[0.00000000e+00 7.51813272e-01 4.51070200e-01 8.75509397e-01
  6.21586701e-01 5.50947182e-01 4.78641538e-01 3.62152619e-04
  0.00000000e+00 0.00000000e+00 2.77256358e-01 5.14044137e-01
  8.76720275e-01 7.06463173e-01 8.83296649e-02 0.00000000e+00
  2.80758122e-01 6.62291395e-01 0.00000000e+00 0.00000000e+00
  1.11275327e-01 7.23289761e-01 2.50450818e-01 5.00240558e-01
  2.00282983e-01 7.63353348e-01 5.76768868e-01 0.00000000e+00
  2.84377180e-01 7.68841853e-01 7.90034057e-02 0.00000000e+00
  9.39042418e-01 2.37598263e-02 0.00000000e+00 3.07952350e-01
  0.00000000e+00 0.00000000e+00 8.77249507e-01 0.00000000e+00]
 [5.29877850e-01 9.77019320e-01 1.10797055e+00 8.85652841e-01
  6.53075875e-01 7.48896634e-01 7.44457335e-01 4.65476640e-01
  7.72719192e-01 2.54768544e-01 3.89619034e-01 4.48611407e-01
  1.46281057e+00 6.16569768e-01 6.68150650e-01 1.77335238e-01
  8.54505649e-01 5.77988256e-01 2.04322254e-02 0.00000000e+00
  6.65325926e-01 6.31222134e-01 3.23209634e-01 1.