# Введение в MapReduce модель на Python


In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.905589809636405),
 (1, 2.905589809636405),
 (2, 2.905589809636405),
 (3, 2.905589809636405),
 (4, 2.905589809636405)]

## Inverted index

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('it', 18), ('what', 10)]),
 (1, [])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.0059671639991895065),
   (None, 0.07724245496172),
   (None, 0.08440804135613444),
   (None, 0.13575647907181598),
   (None, 0.14404826813474803),
   (None, 0.21204275967955666),
   (None, 0.21869633101751806),
   (None, 0.25055756276216923),
   (None, 0.28642389538931257),
   (None, 0.3834487515438496),
   (None, 0.3913614390023946),
   (None, 0.4041378102237341),
   (None, 0.41854626274930695),
   (None, 0.4704310153549396),
   (None, 0.4776995227348928),
   (None, 0.48992216726013693)]),
 (1,
  [(None, 0.5005353544023048),
   (None, 0.5135664686748047),
   (None, 0.53391984417089),
   (None, 0.5587932025401512),
   (None, 0.5673804905854288),
   (None, 0.6926646597910275),
   (None, 0.7237444251339501),
   (None, 0.7557883138083207),
   (None, 0.785709769245918),
   (None, 0.7937098630029404),
   (None, 0.7942646850708935),
   (None, 0.9160468126494941),
   (None, 0.9618068292060864),
   (None, 0.9820764489731459)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [11]:
from functools import reduce

# Функция MAP для нахождения максимального значения в списке чисел
def MAP(numList):
    # Функция MAP принимает список чисел (numList) и возвращает максимальное значение в нем
    return max(numList)

# Функция REDUCE для нахождения максимального значения среди всех списков чисел
def REDUCE(max1, max2):
    return max(max1, max2)

# Данные для тестирования
sample_data = [10, 20, 30, 40, 50, 60, 20, 30, 10, 100]
print("Sample data:", sample_data)

# Выполнение Map
mapped_values = map(MAP, [sample_data])

# Выполнение Reduce
result = reduce(REDUCE, mapped_values)
print("Max value:", result)

Sample data: [10, 20, 30, 40, 50, 60, 20, 30, 10, 100]
Max value: 100


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [12]:
from typing import Iterator, Tuple

# Функция MAP: создает пару (ключ, значение), где ключ всегда равен 1, а значение - элемент из входного списка
def MAP(num):
    return (1, num)

# Функция REDUCE: для каждого ключа вычисляем среднее значение по всем числам
def REDUCE(key, nums: Iterator[Tuple[int, int]]):
    sum = 0
    count = 0
    for _, n in nums:  # Перебираем только числа, пропуская ключи
        sum += n
        count += 1
    yield sum / count  # Вычисляем и возвращаем среднее значение

# Данные для тестирования
sample_data = [10, 20, 30, 40, 50, 60, 20, 30, 10]

# Выполнение MAP
map_output = list(map(MAP, sample_data))
print("MAP:", map_output)

# Группируем данные по ключу
grouped_avg = {}
for k, v in map_output:
    if k not in grouped_avg:
        grouped_avg[k] = []
    grouped_avg[k].append(v)  # Добавляем значение в список для этого ключа

# Выполнение REDUCE
reduce_output = list(REDUCE(1, map_output))
print("Reduce output:", reduce_output)

MAP: [(1, 10), (1, 20), (1, 30), (1, 40), (1, 50), (1, 60), (1, 20), (1, 30), (1, 10)]
Reduce output: [30.0]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [13]:
from typing import Iterator, NamedTuple, List, Tuple

# Функция MAP
def MAP(num):
    return (num[0], num[1])  # Возвращаем ключ и значение как пару

# Функция REDUCE
def REDUCE(key, nums: Iterator[int]):
    total = 0
    count = 0
    for n in nums:
        total += n  # Суммируем значения
        count += 1  # Подсчитываем количество элементов
    if count > 0:
        yield total / count  # Вычисляем среднее значение
    else:
        yield 0

# Функция для группировки значений по ключу с использованием сортировки
def group_by_key(iterable: List[Tuple[int, int]]):
    grouped_data = {}
    # Сортируем по ключу перед группировкой
    for key, value in sorted(iterable, key=lambda x: x[0]):
        grouped_data.setdefault(key, []).append(value)
    return list(grouped_data.items())

# Данные для тестирования
sample_pairs = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)]

# Применение MAP для получения ключей и значений
map_output = list(map(lambda x: MAP(x), sample_pairs))
print("MAP:", map_output)

# Группировка по ключу и преобразование в список
shuffle_output = group_by_key(map_output)
print("GroupByKey output:", shuffle_output)

# Применение функции REDUCE к результату группировки
reduce_output = [next(REDUCE(key, values)) for key, values in shuffle_output]  # Применяем REDUCE и извлекаем значения
print("Reduce:", reduce_output)

MAP: [('a', 1), ('b', 2), ('a', 3), ('b', 4), ('c', 5)]
GroupByKey output: [('a', [1, 3]), ('b', [2, 4]), ('c', [5])]
Reduce: [2.0, 3.0, 5.0]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [14]:
import numpy as np
from typing import Iterator

def MAP(_, value):
    yield value, None  # Каждое значение выдаётся с None, чтобы группировка работала

def REDUCE(key: str, _):
    yield key  # Функция идентичности (оставляем только ключ)

# Функция flatten для разворачивания списков
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция для распределенной группировки
def groupbykey_distributed(map_partitions, PARTITIONER):
    global reducers
    partitions = [{} for _ in range(reducers)]
    for map_partition in map_partitions:
        for key, value in map_partition:
            partition_index = PARTITIONER(key)
            partitions[partition_index].setdefault(key, []).append(value)

    # Убираем пустые партиции
    return [(partition_id, list(partition.items())) for partition_id, partition in enumerate(partitions) if partition]

# Функция партиционирования
def PARTITIONER(obj):
    global reducers
    return hash(obj) % reducers

# Функция для выполнения MapReduce
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER):
    map_partitions = list(flatten(map(lambda record_reader: map(lambda k1v1: MAP(*k1v1), record_reader), INPUTFORMAT())))

    # Группировка данных по ключам с разбиением по партициям
    reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER)

    # приводим reduce_outputs к списку
    reduce_outputs = [(partition_id, list(flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), partition))))
                      for partition_id, partition in reduce_partitions]

    return reduce_outputs

# Данные для тестирования
sample_data = ["a", "b", "a", "c", "b", "d", "e", "d"]
maps = 3
reducers = 2
print("data:", sample_data)

def INPUTFORMAT():
    global maps
    def RECORDREADER(split):
        for value in split:
            yield (value, value)  # передаём значение дважды

    split_size = int(np.ceil(len(sample_data) / maps))
    for i in range(0, len(sample_data), split_size):
        yield RECORDREADER(sample_data[i:i + split_size])

# Выполнение распределенного MapReduce
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE)
print("Уникальные элементы:", partitioned_output)

data: ['a', 'b', 'a', 'c', 'b', 'd', 'e', 'd']
Уникальные элементы: [(0, ['a', 'c', 'd']), (1, ['b', 'e'])]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [15]:
from collections import defaultdict

def C(t):
    return t.startswith("a")  # Фильтруем слова, начинающиеся на "a"

def MAP(el_list):
    mapped_result = defaultdict(set)  # Используем set() для удаления дубликатов
    for t in el_list:
        if C(t):
            mapped_result[t].add(t)
    return mapped_result.items()

def REDUCE(mapped_items):
    reduced_result = set()
    for key, values in mapped_items:  # key - слово, values - set()
        reduced_result.add(key)  # Добавляем только ключ
    return sorted(reduced_result)  # Гарантируем сортировку

# Данные для тестирования
sample_data = ["apple", "banana", "apricot", "cherry", "avocado"]
part_count = 4
print("data:", sample_data)

# Разбиение записей на равные части
record_partitioned = [sample_data[d:d + part_count] for d in range(0, len(sample_data), part_count)]

# Применение Map и Reduce
mapped_results = list(map(lambda x: MAP(x), record_partitioned))
reduced_results = REDUCE(flatten(mapped_results))

print("Фильтрованные слова на 'a':", reduced_results)

data: ['apple', 'banana', 'apricot', 'cherry', 'avocado']
Фильтрованные слова на 'a': ['apple', 'apricot', 'avocado']


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [16]:
from typing import Iterator

# Множество для проекции
S = {0, 2}  # оставить только элементы с индексами 0 и 2

# Функция MAP для проекции
def MAP(_, value):
    # Проекция: исключаем атрибуты, которые не принадлежат множеству S
    t_prime = tuple(value[i] for i in S)
    return (t_prime, t_prime)

# Функция REDUCE для агрегации
def REDUCE(key, values: Iterator[str]):
    yield key, key

# Функция для группировки значений по ключу
def groupbykey(iterable):
    grouped_data = {}
    for key, value in iterable:
        grouped_data.setdefault(key, []).append(value)  # Группируем значения по ключу
    return grouped_data.items()  # Возвращаем список кортежей (ключ, список значений)

# Данные для тестирования
sample_data = [("a", 1, "red"), ("b", 2, "green"), ("a", 3, "blue"), ("b", 4, "yellow"), ("c", 5, "red")]
print("data:", sample_data)

# Функция MapReduce
def MapReduce(data, MAP, REDUCE):
    # Применяем MAP ко всем данным, затем группируем по ключу с помощью groupbykey
    # Применяем REDUCE к сгруппированным данным
    return flatten(map(lambda x: REDUCE(*x),groupbykey(flatten(map(lambda x: MAP(None, x), data)))))

# Функция flatten для "выравнивания" вложенной структуры данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Выполнение MapReduce
result = MapReduce(sample_data, MAP, REDUCE)
print("MapReduce output:", list(result))  # Преобразуем результат в список для вывода

data: [('a', 1, 'red'), ('b', 2, 'green'), ('a', 3, 'blue'), ('b', 4, 'yellow'), ('c', 5, 'red')]
MapReduce output: [('a', 'a'), ('b', 'b'), ('c', 'c')]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [20]:
from typing import Iterator

# Функция MAP для объединения
def MAP(value):
    return (value, value)  # Каждому входному кортежу ставим в соответствие пару (t, t)

# Функция REDUCE для объединения
def REDUCE(key, values: Iterator[str]):
    yield key, key  # Всегда создаем (t, t) в качестве выходного значения

# Функция flatten для "выравнивания" вложенной структуры данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Данные для тестирования
sample_data = [("a", 1, "red"), ("b", 2, "green"), ("a", 3, "blue"), ("b", 4, "yellow"), ("c", 5, "red")]

map_output = list(map(lambda x: MAP(x), sample_data))  # Применяем функцию MAP ко всем элементам sample_data
print("MAP output:", map_output)

reduce_output = list(flatten(map(lambda x: REDUCE(*x), shuffle_output)))  # Применяем функцию REDUCE с flatten
print("Reduce output:", reduce_output)


MAP output: [(('a', 1, 'red'), ('a', 1, 'red')), (('b', 2, 'green'), ('b', 2, 'green')), (('a', 3, 'blue'), ('a', 3, 'blue')), (('b', 4, 'yellow'), ('b', 4, 'yellow')), (('c', 5, 'red'), ('c', 5, 'red'))]
Reduce output: [('a', 'a'), ('b', 'b'), ('c', 'c')]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [25]:
from typing import Iterator
from collections import defaultdict

# Функция MAP для пересечения
def MAP(value):
    return [(value, 1)]  # Создаем корректную пару (ключ, значение)

# Функция REDUCE для пересечения
def REDUCE(key, values: Iterator[int]):
    if len(values) == 2:
        yield key, key  # Создаем пару (t, t)

# Функция flatten для "выравнивания" вложенной структуры данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция shuffle
def shuffle(mapped_data):
    grouped_data = defaultdict(list)
    for key, value in mapped_data:
        grouped_data[key].append(value)
    return grouped_data.items()

# Данные для тестирования
sample_data = [("a", 1, "red"), ("b", 2, "green"), ("a", 3, "blue"), ("b", 4, "yellow"), ("c", 5, "red"), ("a", 1, "red"), ("b", 2, "green")]

# Применяем MAP
map_output = list(flatten(map(MAP, sample_data)))
print("MAP output:", map_output)

# Группируем данные (shuffle step)
shuffle_output = shuffle(map_output)
print("Shuffle output:", list(shuffle_output))

# Применяем REDUCE
reduce_output = list(flatten(map(lambda x: REDUCE(*x), shuffle_output)))
print("Reduce output:", reduce_output)

MAP output: [(('a', 1, 'red'), 1), (('b', 2, 'green'), 1), (('a', 3, 'blue'), 1), (('b', 4, 'yellow'), 1), (('c', 5, 'red'), 1), (('a', 1, 'red'), 1), (('b', 2, 'green'), 1)]
Shuffle output: [(('a', 1, 'red'), [1, 1]), (('b', 2, 'green'), [1, 1]), (('a', 3, 'blue'), [1]), (('b', 4, 'yellow'), [1]), (('c', 5, 'red'), [1])]
Reduce output: [(('a', 1, 'red'), ('a', 1, 'red')), (('b', 2, 'green'), ('b', 2, 'green'))]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [34]:
from typing import Iterator

# Функция MAP для разницы
def MAP(value, relation):
    return (value, relation)  # Создаем пару (t, R) или (t, S)

# Функция REDUCE для разницы
def REDUCE(key, values: Iterator[str]):
    if values == ["R"]:  # Если ключ принадлежит только R
        yield key  # Создаем пару (t, t)

# Функция для группировки значений по ключу
def group_by_key(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]  # Группируем значения по ключу k2
    return t.items()

# Функция flatten для вложенной структуры данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Данные для тестирования
R = [("a", 1, "red"), ("b", 2, "green"), ("a", 3, "blue")]
S = [("p", 5, "pink"), ("h", 8, "black")]

# Применяем MAP ко всем элементам R и S
map_output = list(map(lambda x: MAP(x, "R"), R)) + list(map(lambda x: MAP(x, "S"), S))
print("MAP output:", map_output)

make_output = group_by_key(map_output)  # Группируем значения по ключу
make_output = list(make_output)  # Преобразуем результат в список
print("Make output:", make_output)

reduce_output = list(flatten(map(lambda x: REDUCE(*x), make_output)))  # Применяем функцию REDUCE с flatten
print("Reduce output:", reduce_output)

MAP output: [(('a', 1, 'red'), 'R'), (('b', 2, 'green'), 'R'), (('a', 3, 'blue'), 'R'), (('p', 5, 'pink'), 'S'), (('h', 8, 'black'), 'S')]
Make output: [(('a', 1, 'red'), ['R']), (('b', 2, 'green'), ['R']), (('a', 3, 'blue'), ['R']), (('p', 5, 'pink'), ['S']), (('h', 8, 'black'), ['S'])]
Reduce output: [('a', 1, 'red'), ('b', 2, 'green'), ('a', 3, 'blue')]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [36]:
from typing import Iterator

# Функция MAP для естественного соединения
def MAP(value, relation):
    if relation == "R":
        return (value[1], ("R", value[0]))  # (b, (R, a))
    elif relation == "S":
        return (value[0], ("S", value[1]))  # (b, (S, c))

# Функция REDUCE для естественного соединения
def REDUCE(key, values: Iterator[str]):
    r_values = [v[1] for v in values if v[0] == "R"]  # Извлекаем все (R, a)
    s_values = [v[1] for v in values if v[0] == "S"]  # Извлекаем все (S, c)

    for a in r_values:
        for c in s_values:
            yield (a, key, c)  # Создаем (a, b, c)

# Функция для группировки значений по ключу
def group_by_key(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]  # Группируем значения по ключу k2
    return t.items()

# Функция flatten для "выравнивания" вложенной структуры данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Данные для тестирования
R = [("a", 1), ("b", 2), ("c", 3)]
S = [(1, "x"), (2, "y"), (4, "z")]

# Применяем MAP ко всем элементам R и S
map_output = list(map(lambda x: MAP(x, "R"), R)) + list(map(lambda x: MAP(x, "S"), S))
print("MAP output:", map_output)

make_output = group_by_key(map_output)  # Группируем значения по ключу
make_output = list(make_output)  # Преобразуем результат в список
print("Make output:", make_output)

reduce_output = list(flatten(map(lambda x: REDUCE(*x), make_output)))  # Применяем функцию REDUCE с flatten
print("Reduce output:", reduce_output)

MAP output: [(1, ('R', 'a')), (2, ('R', 'b')), (3, ('R', 'c')), (1, ('S', 'x')), (2, ('S', 'y')), (4, ('S', 'z'))]
Make output: [(1, [('R', 'a'), ('S', 'x')]), (2, [('R', 'b'), ('S', 'y')]), (3, [('R', 'c')]), (4, [('S', 'z')])]
Reduce output: [('a', 1, 'x'), ('b', 2, 'y')]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [37]:
from typing import Iterator, NamedTuple, List, Tuple

# Функция MAP: группирует данные по (a, b)
def MAP(_, row: Tuple[str, int, int]):
    yield (row[0], row[1])  # (a, b) - убираем c

# Функция REDUCE: выполняет агрегацию (например, SUM или MAX)
def REDUCE(key: str, values: Iterator[int], aggregation="SUM"):
    if aggregation == "SUM":
        yield (key, sum(values))  # Суммирование значений b
    elif aggregation == "MAX":
        yield (key, max(values))  # Максимум среди значений b
    else:
        raise ValueError("Unknown aggregation function. Use 'SUM' or 'MAX'.")

# Функция группировки по ключу
def groupbykey(iterable):
    grouped_data = {}
    for key, value in iterable:
        grouped_data.setdefault(key, []).append(value)  # Группируем значения по ключу a
    return grouped_data.items()

# Функция flatten
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция MapReduce
def MapReduce(data: List[Tuple[str, int, int]], MAP, REDUCE, aggregation="SUM"):
    return flatten(
        map(lambda x: REDUCE(*x, aggregation),  # Применяем REDUCE к каждой группе данных
            groupbykey(flatten(map(lambda x: MAP(None, x), data)))  # Применяем MAP и группируем
        ))

# Данные для тестирования
sample_data = [("a", 10, 100), ("b", 20, 200), ("a", 30, 300), ("b", 40, 400), ("c", 50, 500)]

# Выполнение MapReduce с SUM
result_sum = MapReduce(sample_data, MAP, REDUCE, aggregation="SUM")
print("Aggregation (SUM):", list(result_sum))

# Выполнение MapReduce с MAX
result_max = MapReduce(sample_data, MAP, REDUCE, aggregation="MAX")
print("Aggregation (MAX):", list(result_max))

Aggregation (SUM): [('a', 40), ('b', 60), ('c', 50)]
Aggregation (MAX): [('a', 30), ('b', 40), ('c', 50)]


### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [38]:
from typing import Iterator, NamedTuple
import numpy as np

# Определение структуры для представления элементов матрицы
class MatrixElement(NamedTuple):
    row: int
    col: int
    value: float

# Матрица (5x4)
mat = np.ones((5, 4))
print("Matrix: ", mat)
# Вектор (4,)
vec = np.random.rand(4)  # Вектор для умножения
print("Vector: ", vec)

# Функция MAP: передаем только индекс строки и значение
def MAP(_, row: MatrixElement):
    yield (row.row, (row.col, row.value))

# Функция REDUCE: cуммирование произведений для строки
def REDUCE(i: int, products: Iterator[Tuple[int, float]]):
    total = 0
    for col, value in products:
        total += value * vec[col]  # Умножаем значение на соответствующий элемент вектора
    yield (i, total)  # Возвращаем результат для строки i

# Функция для группировки значений по ключу
def groupbykey(iterable):
    grouped_data = {}
    for key, value in iterable:
        grouped_data.setdefault(key, []).append(value)  # Группируем значения по ключу
    return grouped_data.items()

# Функция для вложенной структуры данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Функция MapReduce для умножения матрицы на вектор
def MapReduce(matrix: np.ndarray, vector: np.ndarray, MAP, REDUCE):
    # Генерируем все элементы матрицы в виде объектов MatrixElement
    matrix_elements = [MatrixElement(i, j, matrix[i, j]) for i in range(matrix.shape[0]) for j in range(matrix.shape[1])]

    # Применяем MAP ко всем элементам матрицы
    map_results = flatten(map(lambda x: MAP(None, x), matrix_elements))  # Применяем MAP ко всем элементам матрицы

    # Группируем по строкам и применяем REDUCE
    return flatten(map(lambda x: REDUCE(*x), groupbykey(map_results)))

# Выполнение MapReduce
output = MapReduce(mat, vec, MAP, REDUCE)
output = list(output)

print("Matrix-Vector multiplication result:", output)

Matrix:  [[1. 1. 1. 1.]
 [1. 1. 1. 1.]
 [1. 1. 1. 1.]
 [1. 1. 1. 1.]
 [1. 1. 1. 1.]]
Vector:  [0.03048744 0.63120279 0.45847886 0.63021976]
Matrix-Vector multiplication result: [(0, 1.7503888535026086), (1, 1.7503888535026086), (2, 1.7503888535026086), (3, 1.7503888535026086), (4, 1.7503888535026086)]


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [None]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [None]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1
  # solution code that yield(k2,v2) pairs

def REDUCE(key, values):
  (i, k) = key
  # solution code that yield(k3,v3) pairs

Проверьте своё решение

In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [None]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [39]:
import numpy as np

# Размерности матриц
I = 2
J = 3
K = 4 * 10

# Генерация случайных матриц
small_mat = np.random.rand(I, J)  # small_mat (I×J)
big_mat = np.random.rand(J, K)    # big_mat (J×K)

# Вывод матриц для визуализации
print("Матрица small_mat (размерность {}x{}):")
print(small_mat)
print("\nМатрица big_mat (размерность {}x{}):")
print(big_mat)

# Эталонное решение для проверки
reference_solution = np.matmul(small_mat, big_mat)

# RECORDREADER генерирует пары ключ-значение для обеих матриц
def RECORDREADER():
    # Генерация элементов из первой матрицы small_mat
    for i in range(small_mat.shape[0]):
        for j in range(small_mat.shape[1]):
            yield ((0, i, j), small_mat[i, j])  # Индекс 0 для small_mat

    # Генерация элементов из второй матрицы big_mat
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((1, j, k), big_mat[j, k])  # Индекс 1 для big_mat

# Функция MAP_JOIN создает пары, которые можно соединить по общему индексу
def MAP_JOIN(k1, v1):
    (mat_num, i, j) = k1  # Распаковка ключа
    if mat_num == 0:  # Для small_mat
        # Создаем пару для дальнейшего соединения
        yield (j, (mat_num, i, v1))
    elif mat_num == 1:  # Для big_mat
        # Создаем пару для дальнейшего соединения
        yield (i, (mat_num, j, v1))

# REDUCE_JOIN собирает элементы по ключу и умножает соответствующие элементы
def REDUCE_JOIN(key, values):
    # Разделяем значения на две группы: из первой и второй матрицы
    from_first_mat = [v for v in values if v[0] == 0]
    from_second_mat = [v for v in values if v[0] == 1]

    # Перемножаем элементы из первой и второй матриц
    for f in from_first_mat:
        for s in from_second_mat:
            yield ((f[1], s[1]), f[2] * s[2])  # Умножение соответствующих элементов

# Функция MAP_MUL распределяет произведенные элементы по индексу (i, k)
def MAP_MUL(k1, v1):
    (i, k) = k1
    yield ((i, k), v1)  # Отправляем пару (i, k) с соответствующим произведением

# REDUCE_MUL суммирует произведения для каждого индекса (i, k)
def REDUCE_MUL(key, values):
    res_el_value = sum(values)  # Суммируем все значения для данного ключа
    yield (key, res_el_value)  # Отправляем результат

# Функция для преобразования выводов MapReduce в матрицу
def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), _) in reduce_output) + 1
    K = max(k for ((i, k), _) in reduce_output) + 1
    mat = np.empty((I, K))
    for ((i, k), v) in reduce_output:
        mat[i, k] = v
    return mat

# Основная функция MapReduce
def MapReduce(RECORDREADER, MAP, REDUCE):
    # Выполняем MapReduce с использованием функций RECORDREADER, MAP и REDUCE
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Вспомогательные функции для работы с MapReduce
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

# Выполнение MapReduce для получения результата
joined = MapReduce(RECORDREADER, MAP_JOIN, REDUCE_JOIN)
solution = MapReduce(lambda: joined, MAP_MUL, REDUCE_MUL)

# Преобразуем вывод MapReduce в матрицу
mapreduce_result = asmatrix(solution)

# Вывод матрицы результата
print("\nРешение через MapReduce:")
print(mapreduce_result)

# Проверка, что решение через MapReduce равно истинному решению
print("\nПроверка на совпадение с истинным решением:", np.allclose(reference_solution, mapreduce_result))

Матрица small_mat (размерность {}x{}):
[[0.44510582 0.12370698 0.76443164]
 [0.58837589 0.78511671 0.30896753]]

Матрица big_mat (размерность {}x{}):
[[0.39115355 0.98114278 0.44036857 0.37777387 0.83373695 0.96329351
  0.15705398 0.48100761 0.33484315 0.87286675 0.91618507 0.14842461
  0.91932193 0.13364551 0.09396709 0.63013056 0.57180855 0.54315852
  0.56489191 0.76349523 0.79978919 0.62375615 0.71359342 0.30695617
  0.02533333 0.13808222 0.0164084  0.97315514 0.83691632 0.51403966
  0.39514487 0.95229579 0.62625114 0.6785523  0.18208847 0.72561575
  0.73064519 0.29335862 0.69737893 0.7360569 ]
 [0.21607802 0.65558106 0.67346101 0.13475383 0.92353899 0.95778129
  0.60934769 0.01934773 0.4057042  0.36809447 0.55434696 0.14901832
  0.92753021 0.05065735 0.44651027 0.8107101  0.19274698 0.35030077
  0.60411463 0.03685685 0.74270448 0.10706775 0.25101176 0.78669021
  0.88180957 0.58126266 0.03975852 0.72438272 0.50266599 0.36595889
  0.08014962 0.73691955 0.86230492 0.33087774 0.8483158

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [40]:
import numpy as np

# Размеры матриц
I = 2
J = 3
K = 4 * 10

# Генерация случайных матриц
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

# Вывод матриц для визуализации
print("Матрица small_mat:")
print(small_mat)
print("\nМатрица big_mat:")
print(big_mat)

# RECORDREADER для матрицы small_mat
def RECORDREADER_small():
    for i in range(small_mat.shape[0]):
        for j in range(small_mat.shape[1]):
            yield ((i, j), small_mat[i, j])  # Возвращаем (i, j) и значение

# RECORDREADER для матрицы big_mat
def RECORDREADER_big():
    for j in range(big_mat.shape[1]):
        for k in range(big_mat.shape[0]):
            yield ((j, k), big_mat[k, j])  # Возвращаем (j, k) и значение

# MAP функция для перемножения матриц
def MAP(k1, v1):
    (i, j) = k1
    w = v1
    # Для каждой строки в small_mat и каждого столбца в big_mat генерируем (i, k) и произведение
    for k in range(big_mat.shape[1]):
        yield ((i, k), w * big_mat[j, k])

# REDUCE функция для суммирования результатов перемножения
def REDUCE(key, values):
    # key — это (i, k)
    # values — список произведений для данной пары
    result = sum(values)  # Суммируем все произведения
    yield (key, result)  # Возвращаем пару (i, k) и итоговое значение

# MapReduce модель
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Проверка решения
reference_solution = np.matmul(small_mat, big_mat)  # Истинное решение
solution = MapReduce(RECORDREADER_small, MAP, REDUCE)  # Решение через MapReduce

# Преобразование вывода REDUCE в матрицу
def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), _) in reduce_output) + 1
    K = max(k for ((i, k), _) in reduce_output) + 1
    mat = np.empty(shape=(I, K))
    for ((i, k), value) in reduce_output:
        mat[i, k] = value
    return mat

# Преобразуем вывод MapReduce в матрицу
mapreduce_result = asmatrix(solution)

# Вывод матрицы результата
print("\nРешение через MapReduce:")
print(mapreduce_result)

# Проверка, что решение через MapReduce равно истинному решению
print("\nПроверка на совпадение с истинным решением:", np.allclose(reference_solution, mapreduce_result))

Матрица small_mat:
[[0.7992291  0.10844054 0.45583703]
 [0.68879738 0.07168596 0.65995047]]

Матрица big_mat:
[[0.99525184 0.47385135 0.61579719 0.30680011 0.56384647 0.19507645
  0.48857635 0.84986107 0.70211544 0.67590679 0.26827511 0.71147098
  0.04129313 0.0145894  0.93462393 0.88031409 0.05597549 0.88913744
  0.02298517 0.62625715 0.87777035 0.56006311 0.76250228 0.49484333
  0.74635284 0.84989837 0.56174507 0.48554646 0.78591128 0.44540268
  0.00173244 0.60098811 0.37744649 0.32153398 0.37366134 0.75833176
  0.21540476 0.55825419 0.56916857 0.51810924]
 [0.55505143 0.21406966 0.60946398 0.03421658 0.84343516 0.14696267
  0.2560948  0.4694255  0.87319254 0.56475537 0.22303752 0.37021511
  0.19558815 0.53203245 0.64522069 0.53055234 0.41107264 0.91304476
  0.36627167 0.63343633 0.03121635 0.55712518 0.3926322  0.44055194
  0.91704944 0.24476871 0.64365077 0.10438962 0.92748822 0.96509922
  0.78637148 0.69230166 0.4987926  0.99950686 0.66424079 0.75770325
  0.16988008 0.16148102 0.3

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

Ответ: будет работать, если случайные подмножества покрываю всю матрицу.

In [41]:
import numpy as np
import random

# Размеры матриц
I = 2
J = 3
K = 4 * 10

# Генерация случайных матриц
small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def RECORDREADER_small_1():
    #Генерирует первую половину строк small_mat
    for i in range(0, small_mat.shape[0] // 2):
        for j in range(small_mat.shape[1]):
            yield ((i, j), small_mat[i, j])

def RECORDREADER_small_2():
    #Генерирует вторую половину строк small_mat
    for i in range(small_mat.shape[0] // 2, small_mat.shape[0]):
        for j in range(small_mat.shape[1]):
            yield ((i, j), small_mat[i, j])

def RECORDREADER_big_1():
    #Генерирует первую половину столбцов big_mat
    for j in range(big_mat.shape[0]):
        for k in range(0, big_mat.shape[1] // 2):
            yield ((j, k), big_mat[j, k])

def RECORDREADER_big_2():
    #Генерирует вторую половину столбцов big_mat
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1] // 2, big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])


# Объединяем все RECORDREADER-ы
def COMBINED_RECORDREADER(*readers):
    #Объединяет потоки данных от нескольких RECORDREADER-ов
    for reader in readers:
        yield from reader()

# MAP функция для перемножения матриц
def MAP(k1, v1):
    (i, j) = k1
    w = v1
    for k in range(big_mat.shape[1]):
        yield ((i, k), w * big_mat[j, k])

# REDUCE функция для суммирования результатов перемножения
def REDUCE(key, values):
    yield (key, sum(values))

# Функции для MapReduce
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

# Проверка решения
reference_solution = np.matmul(small_mat, big_mat)  # Истинное решение

# Используем объединённые RECORDREADER-ы
solution = MapReduce(
    lambda: COMBINED_RECORDREADER(RECORDREADER_small_1, RECORDREADER_small_2),
    MAP,
    REDUCE
)

# Преобразование вывода REDUCE в матрицу
def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), _) in reduce_output) + 1
    K = max(k for ((i, k), _) in reduce_output) + 1
    mat = np.empty(shape=(I, K))
    for ((i, k), value) in reduce_output:
        mat[i, k] = value
    return mat

# Получаем результат
mapreduce_result = asmatrix(solution)

# Вывод результатов
print("\nРешение через MapReduce:")
print(mapreduce_result)
print("\nПроверка на совпадение с истинным решением:", np.allclose(reference_solution, mapreduce_result))


Решение через MapReduce:
[[0.75981158 0.31155902 0.67347239 0.35447203 0.55665501 0.38395298
  0.11959173 0.57379054 0.49235634 0.62639348 0.85309878 0.46914647
  0.15923851 0.73047096 0.605867   0.73065881 0.74863887 0.48668526
  0.31352582 0.58484103 0.16101763 0.56715534 0.77143907 0.80198867
  0.28274127 0.25204351 0.10750164 0.4708323  0.30335856 0.54563677
  0.0268274  0.57480992 0.43945311 0.20686675 0.39311949 0.26405562
  0.85555698 0.34010668 0.80581911 0.64086786]
 [0.45014309 0.66575835 0.38678058 0.23519916 0.95570852 0.55131515
  0.41210949 0.94008785 0.7691209  0.77739877 0.89607137 1.04230715
  0.40388455 0.85664777 0.94612159 0.95552626 1.00043971 0.78236174
  0.84502146 0.53688124 0.23916828 0.4155383  0.78027438 0.7863813
  0.59430804 0.72581534 0.37550738 0.46056661 0.49865587 0.99571094
  0.0434087  0.61119781 0.58361644 0.67735303 0.96303788 0.84385455
  1.05376132 0.22191735 0.66778087 0.77944861]]

Проверка на совпадение с истинным решением: True
