<a href="https://colab.research.google.com/github/L1KASA/big-data/blob/main/lab1/MapReduceExamples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [2]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

# groupbykey на основе сортировки
def groupbykey1(iterable):
    sorted_iterable = sorted(iterable, key=lambda x: x[0])
    result = []
    current_key = None
    for key, value in sorted_iterable:
        if key != current_key:
            if current_key is not None:
                result.append((current_key, current_group))
            current_key = key
            current_group = [value]
        else:
            current_group.append(value)
    result.append((current_key, current_group))
    return result

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

def MapReduce1(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey1(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [14]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

In [15]:
output1 = MapReduce1(RECORDREADER, MAP, REDUCE)
output1 = list(output1)
output1

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [16]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 1.7061981381508153),
 (1, 1.7061981381508153),
 (2, 1.7061981381508153),
 (3, 1.7061981381508153),
 (4, 1.7061981381508153)]

In [17]:
output1 = MapReduce1(RECORDREADER, MAP, REDUCE)
output1 = list(output1)
output1

[(0, 1.7061981381508153),
 (1, 1.7061981381508153),
 (2, 1.7061981381508153),
 (3, 1.7061981381508153),
 (4, 1.7061981381508153)]

## Inverted index

In [18]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('it', ['0', '1', '2']),
 ('is', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

In [19]:
output1 = MapReduce1(RECORDREADER, MAP, REDUCE)
output1 = list(output1)
output1

[('a', ['2']),
 ('banana', ['2']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('what', ['0', '1'])]

## WordCount

In [20]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

In [21]:
output1 = MapReduce1(RECORDREADER, MAP, REDUCE)
output1 = list(output1)
output1

[('', 3), ('a', 1), ('banana', 1), ('is', 9), ('it', 9), ('what', 5)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [22]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

In [23]:
def MapReduceDistributed1(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey1(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [24]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18)]),
 (1, [('it', 18), ('what', 10)])]

In [25]:
partitioned_output1 = MapReduceDistributed1(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output1 = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output1

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18)]),
 (1, [('it', 18), ('what', 10)])]

## TeraSort

In [26]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.016457595372783462),
   (None, 0.06099191936309645),
   (None, 0.07412856667987566),
   (None, 0.0743006810428457),
   (None, 0.0984700959553052),
   (None, 0.15034535980464125),
   (None, 0.16822287892617982),
   (None, 0.1787413051634552),
   (None, 0.1830105827064047),
   (None, 0.18636735238302526),
   (None, 0.20305936008758219),
   (None, 0.22725236298806728),
   (None, 0.23512594785239882),
   (None, 0.29204019226273625),
   (None, 0.29496658430681943),
   (None, 0.2965845501374573),
   (None, 0.32814901090679616),
   (None, 0.43499549173747876),
   (None, 0.4368679594110365),
   (None, 0.4397379734234539)]),
 (1,
  [(None, 0.5494127535810139),
   (None, 0.5551627151507308),
   (None, 0.5693373163267791),
   (None, 0.5779062660887198),
   (None, 0.638572879273244),
   (None, 0.7342159985806295),
   (None, 0.7642760391771181),
   (None, 0.7785295496290972),
   (None, 0.8298209140186441),
   (None, 0.921909247727795)])]

In [27]:
partitioned_output1 = MapReduceDistributed1(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output1 = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output1

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.016457595372783462),
   (None, 0.06099191936309645),
   (None, 0.07412856667987566),
   (None, 0.0743006810428457),
   (None, 0.0984700959553052),
   (None, 0.15034535980464125),
   (None, 0.16822287892617982),
   (None, 0.1787413051634552),
   (None, 0.1830105827064047),
   (None, 0.18636735238302526),
   (None, 0.20305936008758219),
   (None, 0.22725236298806728),
   (None, 0.23512594785239882),
   (None, 0.29204019226273625),
   (None, 0.29496658430681943),
   (None, 0.2965845501374573),
   (None, 0.32814901090679616),
   (None, 0.43499549173747876),
   (None, 0.4368679594110365),
   (None, 0.4397379734234539)]),
 (1,
  [(None, 0.5494127535810139),
   (None, 0.5551627151507308),
   (None, 0.5693373163267791),
   (None, 0.5779062660887198),
   (None, 0.638572879273244),
   (None, 0.7342159985806295),
   (None, 0.7642760391771181),
   (None, 0.7785295496290972),
   (None, 0.8298209140186441),
   (None, 0.921909247727795)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [28]:
def RECORDREADER(numbers):
    for number in numbers:
        yield (None, number)

def MAP(_, number):
    yield (None, number)

def REDUCE(_, numbers: Iterator[int]):
    max_number = max(numbers)
    yield max_number

numbers = [10, 5, 8, 20, 15, 12]

output = MapReduce(lambda: RECORDREADER(numbers), MAP, REDUCE)
max_number = list(output) # В целом можно и next() использовать
max_number

[20]

### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [29]:
from typing import Iterator

def RECORDREADER(numbers):
    for i, num in enumerate(numbers):
        yield (i, num)

def MAP(_, number):
    yield (None, number)

def REDUCE(_, numbers: Iterator[int]):
    total_sum = 0
    count = 0
    for num in numbers:
        total_sum += num
        count += 1
    if count > 0:
        average = total_sum / count
        yield average

numbers = [10, 20, 30, 40, 15]

output = MapReduce(lambda: RECORDREADER(numbers), MAP, REDUCE)
output = list(output)
output

[23.0]

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [30]:
# функция groupbykey1 добавлена в самое начало, проверка ее работы находится после каждого примера

def groupbykey1(iterable):
    sorted_iterable = sorted(iterable, key=lambda x: x[0])
    result = []
    current_key = None
    for key, value in sorted_iterable:
        if key != current_key:
            if current_key is not None:
                result.append((current_key, current_group))
            current_key = key
            current_group = [value]
        else:
            current_group.append(value)
    result.append((current_key, current_group))
    return result


input_list = [(1, 'a'), (2, 'b'), (1, 'c'), (3, 'd'), (2, 'e'), (1, 'f')]
grouped_list1 = groupbykey(input_list)
grouped_list = groupbykey1(input_list)
for key, group in grouped_list1:
    print(f"Key: {key}, Group: {group}")
print()
for key, group in grouped_list:
    print(f"Key: {key}, Group: {group}")

Key: 1, Group: ['a', 'c', 'f']
Key: 2, Group: ['b', 'e']
Key: 3, Group: ['d']

Key: 1, Group: ['a', 'c', 'f']
Key: 2, Group: ['b', 'e']
Key: 3, Group: ['d']


# Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [31]:
data = [1, 2, 3, 4, 5, 1, 2, 3]  # Дубликаты: 1, 2, 3

# set construction
def RECORDREADER(data):
    for value in data:
        yield (None, value)

def MAP(_, value):
    yield (value, None)

def REDUCE(key, _):
    yield (key, None)

output = MapReduce(lambda: RECORDREADER(data), MAP, REDUCE)
output = set([value for value, _ in output])
print(output)

# unique elements
def MAP(_, value):
    yield (value, 1)

output = MapReduce(lambda: RECORDREADER(data), MAP, REDUCE)
unique_elements = [key for key, _ in output]
print(unique_elements)

# distinct
def REDUCE(key, counts):
    if sum(counts) == 1:
        yield (key, None)

output = MapReduce(lambda: RECORDREADER(data), MAP, REDUCE)
distinct_values = [key for key, _ in output]
print(distinct_values)

{1, 2, 3, 4, 5}
[1, 2, 3, 4, 5]
[4, 5]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [32]:
# Пример данных, аналогичных записям в базе данных
db_data = [
    {'id': 1, 'name': 'John', 'age': 25},
    {'id': 2, 'name': 'Alice', 'age': 25},
    {'id': 3, 'name': 'Bob', 'age': 35}
]

# Определение функции RECORDREADER
def RECORDREADER():
    for record in db_data:
        yield (record['id'], record)  # Используем 'id' в качестве ключа, предполагая, что это уникальный и хешируемый идентификатор

# Операция селекции
def MAP(_, record):
    if record['age'] == 25:  # Проверяем значение атрибута 'age'
        yield (record['id'], record)  # Возвращаем кортеж, удовлетворяющий условию

# Функция reduce для операции селекции
def REDUCE(key, tuples):
    for record in tuples:
        yield record  # Возвращаем только записи, удовлетворяющие условию

# Вызываем MapReduce для операции селекции
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Выводим результат операции селекции
selected_values = list(output)
print(selected_values)


[{'id': 1, 'name': 'John', 'age': 25}, {'id': 2, 'name': 'Alice', 'age': 25}]


Со вложенными кортежами

In [33]:
from typing import NamedTuple

# Определение именованного кортежа для представления записей в базе данных
class Record(NamedTuple):
    id: int
    name: str
    age: int

def RECORDREADER():
    for record in db_data:
        yield (record.id, record)  # Каждая запись представляется парой (id, record)

# Операция селекции
def MAP(_, record):
    if record.age == 25:  # Проверяем значение атрибута 'name'
        yield record, record  # Возвращаем кортеж с ключом None и значением - именованным кортежем Record

# Функция reduce для операции селекции
def REDUCE(key, tuples):
    for record in tuples:
        yield record  # Возвращаем все кортежи из списка

# Пример данных, аналогичных записям в базе данных
db_data = [
    Record(id = 1, name = 'John', age = 25),
    Record(id = 2, name = 'Alice', age = 25),
    Record(id = 3, name = 'Bob', age = 35)
]

# Вызываем MapReduce для операции селекции
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Выводим результат операции селекции (только значения)
selected_values = [tuple(record) for record in output]
selected_values

[(1, 'John', 25), (2, 'Alice', 25)]

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [34]:
# Пример данных, аналогичных записям в базе данных
db_data = [
    {'id': 1, 'name': 'John', 'age': 25},
    {'id': 2, 'name': 'Alice', 'age': 25},
    {'id': 3, 'name': 'Bob', 'age': 35}
]

# Операция проекции
def MAP(_, record):
    attributes_to_keep = ['id', 'name']  # Имена атрибутов, которые мы хотим оставить
    projected_record = tuple(record[attr] for attr in attributes_to_keep)
    yield (record['id'], projected_record)  # Используем id в качестве ключа

# Функция reduce для операции проекции
def REDUCE(key, records):
    yield records[0]  # Возвращаем первый кортеж из списка кортежей с одинаковым ключом

# Функция RecordReader для чтения данных из списка db_data
def RECORDREADER():
    for record in db_data:
        yield (record['id'], record)  # Каждая запись представляется парой (id, record)

# Вызываем MapReduce для операции проекции
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Выводим результат операции проекции
selected_values = list(output)
selected_values

[(1, 'John'), (2, 'Alice'), (3, 'Bob')]

In [35]:
from typing import NamedTuple

# Определение именованного кортежа для представления записей в базе данных
class Record(NamedTuple):
    id: int
    name: str
    age: int

# Определение функции RECORDREADER
def RECORDREADER():
    for record in db_data:
        yield (record.id, record)  # Каждая запись представляется парой (None, record)

# Операция проекции
def MAP(_, record):
    projected_record = (record.id, record.name)
    yield (projected_record, projected_record)

# Функция reduce для операции проекции
def REDUCE(key, records):
    yield records[0]  # Возвращаем первый кортеж из списка кортежей с одинаковым ключом

# Пример данных, аналогичных записям в базе данных
db_data = [
    Record(id=1, name='John', age=25),
    Record(id=2, name='Alice', age=25),
    Record(id=3, name='Bob', age=35)
]

# Вызываем MapReduce для операции проекции
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Выводим результат операции проекции
selected_values = list(output)
selected_values


[(1, 'John'), (2, 'Alice'), (3, 'Bob')]

Со вложенными кортежами

In [36]:
# Определение именованного кортежа для представления записей в базе данных
class Record(NamedTuple):
    id: int
    name: str
    age: int

def RECORDREADER():
    for record in db_data:
        yield (record.id, record)

# Операция проекции
def MAP(_, record):
    projected_record = (record.id, record.name)  # Изменяем формат проецируемого кортежа
    yield (record.id, projected_record)  # Используем id в качестве ключа

# Функция reduce для операции проекции
def REDUCE(key, records):
    for _, projected_record in records:
        yield projected_record  # Возвращаем проекции из каждой группы

# Пример данных, аналогичных записям в базе данных
db_data = [
    Record(id=1, name='John', age=25),
    Record(id=2, name='Alice', age=25),
    Record(id=3, name='Bob', age=35)
]

# Вызываем MapReduce для операции проекции
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Выводим результат операции проекции
selected_values = list(output)
selected_values

['John', 'Alice', 'Bob']

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [37]:
# Определение функции recordreader для объединения данных из таблиц R и S
def RECORDREADER():
    for record in table_R + table_S:
        yield (None, record)  # Превращаем каждую запись в пару (None, record)

# Определение функции map для операции объединения
def MAP(_, record):
    yield (record, record)  # Превращаем каждую запись в пару ключ-значение (t, t)

# Определение функции reduce для операции объединения
def REDUCE(key, values):
    for value in set(values):
        yield (value)

# Пример данных для таблиц R и S
table_R = [
    (1, 'John'),
    (2, 'Alice'),
    (5, 'Bob')
]

table_S = [
    (1, 'John'),
    (4, 'Los Angeles'),
    (6, 'Chicago')
]

# Выполнение операции MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Получение результата операции Union
result = list(output)

# Вывод результата
result


[(1, 'John'), (2, 'Alice'), (5, 'Bob'), (4, 'Los Angeles'), (6, 'Chicago')]

In [38]:
# Пример данных для таблиц R и S
table_R = [
    (1, 'John'),
    (2, 'Alice'),
    (5, 'Bob')
]

table_S = [
    (1, 'John'),
    (4, 'Los Angeles'),
    (6, 'Chicago')
]

# Определение функции recordreader
def RECORDREADER():
    for record in table_R + table_S:
        yield (None, record)  # Каждая запись представляется парой (None, record)

# Определение функции map для объединения данных из таблиц R и S
def MAP(_, record):
    yield (record, record)  # Просто возвращаем каждую запись как пару ключ-значение

# Определение функции reduce для операции Union
def REDUCE(key, values):
    unique_values = set()  # Создаем множество для хранения уникальных значений
    for value in values:
        # Если значение уже есть в множестве, пропускаем его
        if value[1] in unique_values:
            continue
        unique_values.add(value[1])  # Добавляем значение в множество
        yield value  # Возвращаем уникальное значение

# Выполнение операции MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Получение результата операции Union
result = list(output)

# Вывод результата
result


[(1, 'John'), (2, 'Alice'), (5, 'Bob'), (4, 'Los Angeles'), (6, 'Chicago')]

Со вложенными классами

In [39]:
from typing import NamedTuple

class Record(NamedTuple):
    id: int
    name: str
    age: int

def RECORDREADER():
    for record in table_R + table_S:
        yield (None, record)

def MAP(_, record):
    yield (record, record)

def REDUCE(key, values):
    for value in set(values):
        yield (value)

table_R = [
    Record(id=1, name='John', age=20),
    Record(id=2, name='Alice', age=25),
    Record(id=3, name='Bob', age=30)
]

table_S = [
    Record(id=4, name='Charlie', age=22),
    Record(id=5, name='David', age=27),
    Record(id=6, name='Emma', age=32)
]

output = MapReduce(RECORDREADER, MAP, REDUCE)
result = list(output)

# Преобразование результатов к нужному формату
result = [tuple(record) for record in result]

# Вывод результата
result


[(1, 'John', 20),
 (2, 'Alice', 25),
 (3, 'Bob', 30),
 (4, 'Charlie', 22),
 (5, 'David', 27),
 (6, 'Emma', 32)]

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [40]:
# Определение функции recordreader
def RECORDREADER():
    for record in table_R + table_S:
        yield (None, record)  # Каждая запись представляется парой (None, record)

# Map Function
def MAP(_, record):
    # Emit each tuple as a key-value pair with the tuple itself as both key and value
    yield (record[1:], record)

# Reduce Function
def REDUCE(key, values):
    # Check if there are two records with matching non-ID attributes for the key
    if len(values) == 2 and values[0][1:] == values[1][1:]:
        # If such records exist, emit one of them as the result
        yield (values[0])

# Example data for tables R and S
table_R = [
    (1, 'John'),
    (2, 'Alice'),
    (3, 'Bob')
]

table_S = [
    (12, 'Alice'),
    (3, 'Bob'),
    (4, 'Charlie')
]

# Perform MapReduce operation
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Get the result of the intersection operation
result = list(output)

# Display the result
result


[(2, 'Alice'), (3, 'Bob')]

Со вложенными кортежами

In [41]:
from typing import NamedTuple

class Record(NamedTuple):
    id: int
    name: str
    age: int

# Определение функции recordreader для объединения данных из таблиц R и S
def RECORDREADER():
    for record in table_R + table_S:
        yield (None, Record(id=record[0], name=record[1], age=record[2]))  # Каждая запись представляется парой (None, Record)

# Map Function
def MAP(_, record):
    # Emit each tuple as a key-value pair with the tuple itself as both key and value
    yield ((record.name, record.age), record)

# Reduce Function
def REDUCE(key, values):
    # Check if there are two records with matching non-ID attributes for the key
    if len(values) == 2 and values[0][1:] == values[1][1:]:
        # If such records exist, emit one of them as the result
        yield (values[0][1:])

# Example data for tables R and S
table_R = [
    Record(id=1, name='John', age=25),
    Record(id=2, name='Alice', age=30),
    Record(id=3, name='Bob', age=35)
]

table_S = [
    Record(id=12, name='Alice', age=10),
    Record(id=3, name='Bob', age=35),
    Record(id=4, name='Charlie', age=40)
]

# Perform MapReduce operation
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Get the result of the intersection operation
result = list(output)

# Display the result
result


[('Bob', 35)]

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [42]:
# Функция Map
def MAP(_, record):
    # Создаем пару (t, R) для кортежей из R и пару (t, S) для кортежей из S
    if record[1] == 'R':
        yield (record[0], 'R')
    elif record[1] == 'S':
        yield (record[0], 'S')

# Функция Reduce
def REDUCE(key, values):
    # Если значение соответствует только одному отношению, создаем пару (t, t)
    if len(values) == 1:
        yield (key)

# Функция RECORDREADER
def RECORDREADER():
    # Создаем пару (None, (record, 'R')) для каждой записи из таблицы R
    for record in table_R:
        yield (None, (record, 'R'))

    # Создаем пару (None, (record, 'S')) для каждой записи из таблицы S
    for record in table_S:
        yield (None, (record, 'S'))

# Пример данных для таблиц R и S
table_R = [
    (1, 'John'),
    (2, 'Alice'),
    (3, 'Bob')
]

table_S = [
    (2, 'Alice'),
    (3, 'Bob'),
    (4, 'Charlie')
]

# Выполняем операцию MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Получаем результат операции разницы
result = list(output)

# Выводим результат
result


[(1, 'John'), (4, 'Charlie')]

Со вложенными кортежами

In [43]:
from typing import NamedTuple

class Record(NamedTuple):
    id: int
    name: str

def MAP(_, record):
    # Создаем пару (name, R) для кортежей из R и пару (name, S) для кортежей из S
    if record[1] == 'R':
        yield (record[0].name, 'R')
    elif record[1] == 'S':
        yield (record[0].name, 'S')

def REDUCE(key, values):
    # Если значение соответствует только одному отношению, создаем пару (name, name)
    if len(values) == 1:
        yield (key)

def RECORDREADER():
    # Создаем пару (None, (record, 'R')) для каждой записи из таблицы R
    for record in table_R:
        yield (None, (record, 'R'))

    # Создаем пару (None, (record, 'S')) для каждой записи из таблицы S
    for record in table_S:
        yield (None, (record, 'S'))

# Пример данных для таблиц R и S
table_R = [
    Record(id=1, name='John'),
    Record(id=2, name='Alice'),
    Record(id=3, name='Bob')
]

table_S = [
    Record(id=12, name='Alice'),
    Record(id=3, name='Bob'),
    Record(id=4, name='Charlie')
]

# Выполняем операцию MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Получаем результат операции разницы
result = list(output)

# Выводим результат
print(result)


['John', 'Charlie']


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [44]:
# Define the map function for both tables R and S
def MAP(_, record):
    record_id = record['id']
    if 'name' in record:
        yield (record_id, ('R', record))  # Emit a key-value pair with table name 'R'
    elif 'city' in record:
        yield (record_id, ('S', record))  # Emit a key-value pair with table name 'S'

# Define the reduce function for the natural join operation
def REDUCE(key, values):
    records_R = [record for table, record in values if table == 'R']  # Get records from table R
    records_S = [record for table, record in values if table == 'S']  # Get records from table S
    for record_R in records_R:
        for record_S in records_S:
            yield (record_R | record_S)  # Emit the combined records as the natural join

# Sample data for tables R and S
table_R = [
    {'id': 1, 'name': 'John', 'age': 30},
    {'id': 2, 'name': 'Alice', 'age': 25},
    {'id': 3, 'name': 'Bob', 'age': 35}
]

table_S = [
    {'id': 1, 'city': 'New York'},
    {'id': 2, 'city': 'Los Angeles'},
    {'id': 4, 'city': 'Chicago'}
]

# Combine the records from both tables into a single list
combined_data = table_R + table_S

# Perform the MapReduce operation
output = MapReduce(lambda: [(None, record) for record in combined_data], MAP, REDUCE)

# Retrieve the result of the natural join operation
result = list(output)

# Display the result
result


[{'id': 1, 'name': 'John', 'age': 30, 'city': 'New York'},
 {'id': 2, 'name': 'Alice', 'age': 25, 'city': 'Los Angeles'}]

In [45]:
# Define the map function for both tables R and S
def MAP(_, record):
    record_id = record[0]  # Extracting the id from the record tuple
    if len(record) == 3:  # Checking if it's a record from table R
        yield (record_id, ('R', record))  # Emitting a key-value pair with table name 'R'
    elif len(record) == 2:  # Checking if it's a record from table S
        yield (record_id, ('S', record))  # Emitting a key-value pair with table name 'S'

# Define the reduce function for the natural join operation
def REDUCE(key, values):
    records_R = [record for table, record in values if table == 'R']  # Get records from table R
    records_S = [record for table, record in values if table == 'S']  # Get records from table S
    for record_R in records_R:
        for record_S in records_S:
            yield (record_R + record_S[1:])  # Emit the combined records as the natural join

# Function to read records from tables R and S
def RECORDREADER():
    # Yielding records from table R
    for record in table_R:
        yield (None, record)

    # Yielding records from table S
    for record in table_S:
        yield (None, record)

# Sample data for tables R and S
table_R = [
    (1, 'John', 30),
    (2, 'Alice', 25),
    (3, 'Bob', 35)
]

table_S = [
    (1, 'New York'),
    (2, 'Los Angeles'),
    (4, 'Chicago')
]

# Perform the MapReduce operation
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Retrieve the result of the natural join operation
result = list(output)

# Display the result
result


[(1, 'John', 30, 'New York'), (2, 'Alice', 25, 'Los Angeles')]

Со вложенными кортежами

In [46]:
from typing import NamedTuple

# Define NamedTuple for records in table R
class RecordR(NamedTuple):
    id: int
    name: str
    age: int

# Define NamedTuple for records in table S
class RecordS(NamedTuple):
    id: int
    city: str

# Define the map function for both tables R and S
def MAP(_, record):
    record_id = record[0]  # Extracting the id from the record tuple
    if len(record) == 3:  # Checking if it's a record from table R
        yield (record_id, ('R', record))  # Emitting a key-value pair with table name 'R'
    elif len(record) == 2:  # Checking if it's a record from table S
        yield (record_id, ('S', record))  # Emitting a key-value pair with table name 'S'

# Define the reduce function for the natural join operation
def REDUCE(key, values):
    records_R = [record for table, record in values if table == 'R']  # Get records from table R
    records_S = [record for table, record in values if table == 'S']  # Get records from table S
    for record_R in records_R:
        for record_S in records_S:
            yield (record_R + record_S[1:])  # Emit the combined records as the natural join

# Function to read records from tables R and S
def RECORDREADER():
    # Yielding records from table R
    for record in table_R:
        yield (None, record)

    # Yielding records from table S
    for record in table_S:
        yield (None, record)

# Sample data for tables R and S
table_R = [
    RecordR(id=1, name='John', age=30),
    RecordR(id=2, name='Alice', age=25),
    RecordR(id=3, name='Bob', age=35)
]

table_S = [
    RecordS(id=1, city='New York'),
    RecordS(id=2, city='Los Angeles'),
    RecordS(id=4, city='Chicago')
]

# Perform the MapReduce operation
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Retrieve the result of the natural join operation
result = list(output)

# Display the result
result


[(1, 'John', 30, 'New York'), (2, 'Alice', 25, 'Los Angeles')]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [47]:
# Функция Map
def MAP(record):
    # Создаем пару (a, b) для каждого кортежа (a, b, c)
    a, b, _ = record
    yield (a, b)

# Функция Reduce
def REDUCE(key, values):
    # Извлекаем числовые значения из кортежей
    numeric_values = [value for value in values if isinstance(value, int)]
    # Применяем операцию суммирования к числовым значениям
    result = sum(numeric_values)
    # Возвращаем пару (a, x), где x - результат агрегации
    yield (key, result)

# Функция RECORDREADER
def RECORDREADER():
    # Перебираем все записи из примера данных
    for record in data:
        yield (record,)

# Пример данных для операции
data = [
    (1, 10, 20),
    (2, 15, 25),
    (1, 5, 30),
    (2, 20, 35)
]

# Выполняем операцию MapReduce
output = MapReduce(RECORDREADER, MAP, REDUCE)

# Получаем результат операции группировки и агрегации
result = list(output)

# Выводим результат
print(result)


[(1, 15), (2, 35)]


### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [57]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [70]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1
  # solution code that yield(k2,v2) pairs

def REDUCE(key, values):
  (i, k) = key
  # solution code that yield(k3,v3) pairs

In [72]:
def RECORDREADER():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])

def MAP(k1, v1):
    (j, k) = k1
    w = v1
    for i in range(small_mat.shape[0]):
        mij = small_mat[i, j]
        yield ((i, k), (j, k, mij, w))

def REDUCE(key, values):
    (i, k) = key
    result = 0
    for value in values:
        (j, k, mij, w) = value
        result += mij * w
    yield (i, k), result

Проверьте своё решение

In [73]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [52]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [53]:
def RECORDREADER():
    for i in range(small_mat.shape[0]):
        for j in range(small_mat.shape[1]):
            for k in range(big_mat.shape[1]):
                yield ((i, j, k), small_mat[i, j], big_mat[j, k])

def MAP(k1, v1, v2):
    (i, j, k) = k1
    mij = v1
    w = v2
    yield ((i, k), (j, mij, w))

def REDUCE(key, values):
    (i, k) = key
    result = 0
    for value in values:
        (j, mij, w) = value
        result += mij * w
    yield (i, k), result

# Применяем MapReduce к нашим данным
result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Преобразуем результат в матрицу
result_mat = np.zeros((small_mat.shape[0], big_mat.shape[1]))
for ((i, k), value) in result:
    result_mat[i, k] = value

print("Result Matrix:")
print(result_mat)


Result Matrix:
[[0.48294292 0.82728062 0.40382742 0.61483236 0.70536566 0.33545257
  0.52943821 0.63041353 0.3282974  0.54347432 0.44239858 0.31614641
  0.65371697 0.78775903 0.47217118 0.3986045  0.33340181 0.66910089
  0.51687584 0.49680108 0.40234265 0.41765462 0.62070967 0.77554775
  0.7964594  0.67820033 0.64194672 0.31430376 0.19448541 0.71057608
  0.69357548 0.72994297 0.50016372 0.33369323 0.43034187 0.27302645
  0.74889223 0.63797053 0.17650931 0.33878069]
 [0.09671392 0.19597488 0.05761083 0.12380454 0.136106   0.05123982
  0.13243665 0.0910058  0.0404381  0.08500146 0.0518767  0.0669251
  0.0726112  0.15537414 0.05542873 0.04554133 0.11677753 0.06662481
  0.07950863 0.10443068 0.06022281 0.09184921 0.08342062 0.16068572
  0.18485536 0.05943524 0.16663745 0.06612176 0.09161176 0.14707392
  0.07861791 0.13539079 0.14822796 0.10562056 0.08578975 0.10578602
  0.16848935 0.10826839 0.0878622  0.14717498]]


In [54]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [55]:
import numpy as np

# Генерируем случайные матрицы для примера
matrix_A = np.random.randint(0, 10, size=(3, 3))
matrix_B = np.random.randint(0, 10, size=(3, 3))

maps = 3  # Количество задач MAP
reducers = 2  # Количество задач REDUCE

def INPUTFORMAT():
    global maps
    global matrix_A
    global matrix_B

    def RECORDREADER_A(split):
        for i, row in enumerate(split):
            for j, value in enumerate(row):
                yield ((i, j), ('A', value))

    def RECORDREADER_B(split):
        for i, row in enumerate(split):
            for j, value in enumerate(row):
                yield ((j, i), ('B', value))

    # Разбиваем матрицы на части для параллельной обработки
    split_size = len(matrix_A) // maps
    for i in range(0, len(matrix_A), split_size):
        yield RECORDREADER_A(matrix_A[i:i+split_size])

    split_size = len(matrix_B[0]) // maps
    for i in range(0, len(matrix_B[0]), split_size):
        yield RECORDREADER_B(matrix_B[:, i:i+split_size])

def MAP(key, value):
    global matrix_A
    global matrix_B

    i, j = key
    if value[0] == 'A':
        for k in range(len(matrix_A[0])):
            yield ((i, k), (value[1], j))
    elif value[0] == 'B':
        for k in range(len(matrix_A)):
            yield ((k, j), (value[1], i))

def PARTITIONER(key):
    global reducers
    i, j = key
    return (i + j) % reducers  # Разделение ключей на редукторы

def REDUCE(key, values):
    result = 0
    for value, index in values:
        result += value * index
    yield (key, result)

# Вывод перемноженной матрицы
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER)
partitioned_output = sorted([(partition_id, list(partition)) for partition_id, partition in partitioned_output])

# Форматируем результаты в матрицу
result_matrix = np.zeros((len(matrix_A), len(matrix_B[0])))
for _, partition in partitioned_output:
    for (i, j), value in partition:
        result_matrix[i][j] = value

print("Result Matrix:")
print(result_matrix)


54 key-value pairs were sent over a network.
Result Matrix:
[[48. 48. 48.]
 [ 0.  0.  0.]
 [ 0.  0.  0.]]


In [56]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
#solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(partitioned_output)) # should return true

TypeError: cannot unpack non-iterable int object

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?