<a href="https://colab.research.google.com/github/MatveySTEP/BigData/blob/main/BIgData_Stepanov_6403.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [2]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [14]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [15]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.3359661451543126),
 (1, 2.3359661451543126),
 (2, 2.3359661451543126),
 (3, 2.3359661451543126),
 (4, 2.3359661451543126)]

## Inverted index

In [16]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [17]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [18]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [19]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('banana', 2), ('is', 18), ('what', 10)]),
 (1, [('a', 2), ('it', 18)])]

## TeraSort

In [20]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.023495961668422383),
   (None, 0.040826074119588496),
   (None, 0.06726722076690739),
   (None, 0.09148628408734705),
   (None, 0.15007666908617123),
   (None, 0.21344611783494638),
   (None, 0.22251189264926952),
   (None, 0.2365720776819229),
   (None, 0.2666436169220978),
   (None, 0.2749118761478072),
   (None, 0.29022653757119987),
   (None, 0.3066867749766584),
   (None, 0.33136814460050557),
   (None, 0.3341579972927945),
   (None, 0.3649125828565537),
   (None, 0.37512827899430856),
   (None, 0.49362582569946734)]),
 (1,
  [(None, 0.5409087262305615),
   (None, 0.541902903477593),
   (None, 0.5958149412330571),
   (None, 0.6073670750132751),
   (None, 0.6651349194158259),
   (None, 0.6749089515987643),
   (None, 0.6759720399827103),
   (None, 0.7500436956983101),
   (None, 0.7803751044641465),
   (None, 0.8597438249466852),
   (None, 0.8795068599307714),
   (None, 0.8882841327093267),
   (None, 0.9599674510074171)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [26]:
import numpy as np

# Генерация 20 случайных чисел в диапазоне [0, 1]
data_pool = np.random.rand(20)
print("Список чисел:", data_pool)

# Источник данных в виде генератора
def DATASTREAM():
    for num in data_pool:
        yield ("group", num)  # Все числа относятся к одной группе

# Мап: преобразует входные данные в пары (ключ, значение)
def MAP(_, num):
    yield ("group", num)

# Редьюс: выбирает максимальное число из списка значений
def REDUCE(_, num_list):
  yield ("max_value", max(num_list))

max_value_output = list(MapReduce(DATASTREAM, MAP, REDUCE))

print("Максимальное значение:", max_value_output[0][1])


Список чисел: [0.76191576 0.43287458 0.04810183 0.6170251  0.22577299 0.66919637
 0.94307651 0.25333585 0.74805479 0.92835646 0.38082249 0.39322544
 0.4013015  0.9100802  0.31199921 0.02491345 0.74776948 0.85620348
 0.72363225 0.39201924]
Максимальное значение: 0.9430765056945035


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [35]:
# Создаем массив из 30 случайных чисел в диапазоне [0, 1]
number_series = np.random.rand(30)
print("Сгенерированные числа:", number_series)

# Генератор, предоставляющий данные
def DATASTREAM():
    for num in number_series:
        yield ("batch", num)  # Все значения сгруппированы под одним ключом

# Функция отображения данных
def MAP(_, num):
    """Каждое число привязывается к единому ключу."""
    yield ("batch", num)

# Функция сведения данных
def REDUCE(_, num_list):
    """Вычисляет среднее значение из переданных чисел."""
    yield ("mean_value", sum(num_list) / len(num_list))

# Запуск обработки
result = list(MapReduce(DATASTREAM, MAP, REDUCE))

# Вывод среднего значения
print("Среднее арифметическое:", result[0][1])


Сгенерированные числа: [0.9895779  0.9880709  0.40145518 0.07216722 0.69851304 0.77408977
 0.7402936  0.41531473 0.90527973 0.21028795 0.21133235 0.21038605
 0.11443481 0.1258494  0.79767692 0.87477658 0.55282726 0.86682912
 0.8346485  0.19036269 0.51006387 0.20392434 0.57691324 0.20957706
 0.39517742 0.54476776 0.45077067 0.27273783 0.77670873 0.5595453 ]
Среднее арифметическое: 0.515811997773663


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [40]:
# Функция группировки пар (ключ, значение) без использования groupby
def custom_groupby(iterable):
    sorted_data = sorted(iterable, key=lambda x: x[0])  # Сортировка по ключу
    grouped = []

    if not sorted_data:
        return grouped

    current_key, current_values = sorted_data[0][0], [] #Первый ключ
    for key, value in sorted_data:
        if key == current_key:
            current_values.append(value)
        else:
            grouped.append((current_key, current_values))
            current_key, current_values = key, [value]

    grouped.append((current_key, current_values))
    return grouped

# Реализация процесса обработки
def execute_mapreduce(reader, map_func, reduce_func):
    mapped_data = (map_func(*entry) for entry in reader())
    flattened_data = [pair for sublist in mapped_data for pair in sublist]
    grouped_data = custom_groupby(flattened_data)
    reduced_data = [result for key, values in grouped_data for result in reduce_func(key, values)]
    return reduced_data

#Возмем тестовые данные
input_data = [
    ("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5), ("c", 6)
]

# Запуск обработки
result = custom_groupby(input_data)
print(result)


[('a', [1, 4]), ('b', [2, 5]), ('c', [3, 6])]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [43]:
reducers = 2  # Количество редьюсеров

def INPUTFORMAT():
    global maps
    input_values = ["car1", "car2", "car1", "car", "car2", "car3", "car5", "car10"]

    def RECORDREADER(split):
        for value in split:
            yield (value, None)  # Формат (значение, None)

    split_size = max(1, len(input_values) // maps)
    for i in range(0, len(input_values), split_size):
        yield RECORDREADER(input_values[i:i+split_size])

def MAP(value, _):
    yield (value, None)  # Просто передаем ключ, значение не важно

def PARTITIONER(key):
    global reducers
    return hash(key) % reducers  # Определяет, какой редьюсер обработает ключ

def REDUCE(value, _):
    yield (value, None)  # Возвращаем только одно значение


maps = 2  # Количество мапперов
unique_values_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER)

# Преобразуем результат в удобный формат
unique_values_output = [value for (_, partition) in unique_values_output for (value, _) in partition]
print("Unique values:", unique_values_output)  # Ожидаем ['apple', 'banana', 'orange', 'grape']


8 key-value pairs were sent over a network.
Unique values: ['car', 'car10', 'car3', 'car5', 'car1', 'car2']


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [44]:
def C(t):
    """Предикат: проверяет, делится ли сумма элементов кортежа на 2 без остатка."""
    return (t[0] + t[1]) % 2 == 0

def RECORDREADER():
    """Генерирует кортежи из набора данных."""
    dataset = [(0, 2), (6, 1), (5, 5)]
    for t in dataset:
        yield (t, None)  # Фиксированный ключ для обработки MapReduce

def MAP(t, _):
    """Применяет предикат к каждому элементу и передает в Reduce, если условие выполняется."""
    if C(t):
        yield (t, t)  # Ключ и значение равны отобранному элементу

def REDUCE(_, values):
    """Функция идентичности — возвращает полученные значения."""
    yield from values

# Запуск MapReduce
result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Выбранные кортежи по условию деления суммы элементов кортежа на 2 без остатка:", result)


Выбранные кортежи по условию деления суммы элементов кортежа на 2 без остатка: [(0, 2), (5, 5)]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [45]:
S = {1, 5, 10}  # Оставляем только эти атрибуты

def MAP(t, _):
    """Оставляем только нужные атрибуты."""
    result = {k: v for k, v in t.items() if k in S}
    key = tuple(sorted(result.items()))  # Делаем ключ предсказуемым
    yield (key, key)  # Пара (t', t')

def REDUCE(_, values):
    """Удаляем дубликаты."""
    yield values[0]  # Оставляем один экземпляр

def RECORDREADER():
    """Генератор данных."""
    yield ({1: "apple", 2: "banana", 5: "grape"}, None)
    yield ({1: "orange", 3: "pear", 10: "kiwi"}, None)
    yield ({5: "melon", 7: "cherry", 10: "mango"}, None)

# Выполнение MapReduce
result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Проекция по атрибутам:", result)

Проекция по атрибутам: [((1, 'apple'), (5, 'grape')), ((1, 'orange'), (10, 'kiwi')), ((5, 'melon'), (10, 'mango'))]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [47]:
# Генератор данных
def RECORDREADER():
    """Формируем объединение двух множеств."""
    group_a = [(10, None), (20, None), (30, None)]
    group_b = [(20, None), (40, None), (50, None)]
    return group_a + group_b  # Объединение двух наборов данных

# Функция отображения (маппинга)
def MAP(t, _):
    """Каждое значение преобразуется в пару (t, t)."""
    yield (t, t)

# Функция редукции данных
def REDUCE(t, values):
    """Удаляет повторяющиеся элементы, оставляя единственный экземпляр."""
    yield (t, t)

result = list(MapReduce(RECORDREADER, MAP, REDUCE))


# Запуск обработки
result = execute_mapreduce(RECORDREADER, MAP, REDUCE)

# Вывод результата
print("Объединенное множество:", result)

Объединенное множество: [(10, 10), (20, 20), (30, 30), (40, 40), (50, 50)]


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [48]:
# Генератор данных
def RECORDREADER():
    """Формируем пересечение двух множеств."""
    group_a = [(10, None), (20, None), (30, None), (40, None)]
    group_b = [(20, None), (30, None), (50, None), (60, None)]
    return group_a + group_b  # Объединение данных перед MapReduce

# Функция отображения (маппинга)
def MAP(t, _):
    """Каждое значение преобразуется в пару (t, t)."""
    yield (t, t)

# Функция редукции данных
def REDUCE(t, values):
    """Если элемент встречается дважды, он принадлежит пересечению."""
    if len(values) == 2:
        yield (t, t)

# Запуск обработки
result = list(MapReduce(RECORDREADER, MAP, REDUCE))
# Вывод результата
print("Пересечение множеств:", result)


Пересечение множеств: [(20, 20), (30, 30)]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [49]:
# Генератор данных
def RECORDREADER():
    """Формируем два множества: A и B."""
    A = [(10, "A"), (20, "A"), (30, "A"), (40, "A")]  # Множество A
    B = [(20, "B"), (30, "B"), (50, "B"), (60, "B")]  # Множество B
    return A + B  # Объединяем их в один поток данных

# Функция отображения (маппинга)
def MAP(t, source):
    """Каждое значение помечается его источником (A или B)."""
    yield (t, source)

# Функция редукции данных
def REDUCE(t, values):
    """Если элемент присутствует только в A, он включается в результат."""
    if values == ["A"]:  # Элемент отсутствует в B
        yield (t, t)

# Запуск обработки
result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Разность A - B:", result)


Разность A - B: [(10, 10), (40, 40)]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [50]:
import numpy as np

# Генератор данных
def RECORDREADER():
    """Формируем данные для соединения: X(p, q) и Y(q, r)."""
    X = [("Alpha", 10), ("Beta", 20), ("Gamma", 30)]  # (p, q)
    Y = [(10, "Orange"), (20, "Grape"), (30, "Peach")]  # (q, r)
    return [(x, "X") for x in X] + [(y, "Y") for y in Y]

# Функция отображения (маппинга)
def MAP(record, source):
    """Формирует пары (q, (источник, значение)) для соединения."""
    if source == "X":
        p, q = record
        yield (q, ("X", p))
    elif source == "Y":
        q, r = record
        yield (q, ("Y", r))

def REDUCE(q, values):
    """Создает пары (p, q, r) при совпадении."""
    x_values = [p for src, p in values if src == "X"]
    y_values = [r for src, r in values if src == "Y"]

    for p in x_values:
        for r in y_values:
            yield (p, q, r)  # q оставляем для наглядности


# Запуск обработки
result = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Вывод результата
print("Natural Join:", result)


Natural Join: [('Alpha', 10, 'Orange'), ('Beta', 20, 'Grape'), ('Gamma', 30, 'Peach')]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [56]:
from collections import defaultdict
from typing import Callable, List, Tuple

def MAP(key, value, category):
    """Маппинг данных: создаем пару (ключ, значение для агрегации)."""
    yield (key, value)

def REDUCE(key, values_group, aggregate_func: Callable[[List[int]], int]):
    """Агрегация значений с помощью заданной функции."""
    yield (key, aggregate_func(values_group))

def RECORDREADER():
    """Пример исходных данных для обработки."""
    return [(1, 10, "cat1"), (2, 20, "cat2"), (1, 15, "cat3"), (2, 25, "cat4"), (3, 30, "cat5")]

def group_by_key(mapped_items: List[Tuple[int, int]]) -> dict:
    """Группируем все значения по ключу."""
    grouped = defaultdict(list)
    for key, value in mapped_items:
        grouped[key].append(value)
    return grouped

def MapReduce(record_reader, mapper, reducer, aggregation_func):
    """Главная функция MapReduce."""
    raw_data = record_reader()

    # Этап маппинга
    mapped_items = []
    for record in raw_data:
        mapped_items.extend(mapper(*record))  # Распаковываем и применяем маппинг

    # Этап шифрования
    grouped_data = group_by_key(mapped_items)

    # Этап редуцирования
    reduced_results = []
    for key, values in grouped_data.items():
        reduced_results.extend(reducer(key, values, aggregation_func))

    return reduced_results


# Применение для суммы
print("Aggregation with SUM:", MapReduce(RECORDREADER, MAP, REDUCE, sum))

# Применение для максимума
print("Aggregation with MAX:", MapReduce(RECORDREADER, MAP, REDUCE, max))


Aggregation with SUM: [(1, 25), (2, 45), (3, 30)]
Aggregation with MAX: [(1, 15), (2, 25), (3, 30)]


#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [57]:
from typing import List, Tuple, Dict, NamedTuple
from collections import defaultdict

NUM_PARTITIONS = 2  # Число разделов для редуцирования
VECTOR_CHUNK_SIZE = 2  # Размер каждого чанка вектора

# Функция отображения для матричных данных
def map_matrix_entry(entry: NamedTuple) -> Tuple[int, Tuple[str, NamedTuple]]:
    return entry.col % NUM_PARTITIONS, ('M', entry)

# Функция отображения для векторных данных, разбитых на чанки
def map_vector_chunk(vector_segment: List[NamedTuple]) -> List[Tuple[int, Tuple[str, NamedTuple]]]:
    return [(el.index % NUM_PARTITIONS, ('V', el)) for el in vector_segment]

# Функция редукции для вычисления частичных произведений
def reduce_partial_product(partition_id: int, items: List[Tuple[str, NamedTuple]]) -> List[Tuple[int, float]]:
    matrix_rows, vector_elements = [], []
    for identifier, record in items:
        (matrix_rows if identifier == 'M' else vector_elements).append(record)

    return [(row.row, row.value * element.value) for row in matrix_rows for element in vector_elements if row.col == element.index]

# Функция для группировки данных по разделам
def group_by_partition(mapped_items: List[Tuple[int, Tuple[str, NamedTuple]]]) -> Dict[int, List[Tuple[str, NamedTuple]]]:
    grouped = defaultdict(list)
    for partition_id, pair in mapped_items:
        grouped[partition_id].append(pair)
    return grouped

# Функция для агрегации результатов (суммирование частичных произведений)
def aggregate_results(partial_results: List[Tuple[int, float]]) -> Dict[int, float]:
    aggregated = defaultdict(float)
    for row, product in partial_results:
        aggregated[row] += product
    return aggregated

# Функция для преобразования матрицы в формат (M, запись)
def read_matrix_data(matrix: List[NamedTuple]) -> List[Tuple[str, NamedTuple]]:
    return [('M', row) for row in matrix]

# Функция для разбивания вектора на чанки
def split_vector_into_chunks(vector: List[NamedTuple]) -> List[List[NamedTuple]]:
    return [vector[i:i + VECTOR_CHUNK_SIZE] for i in range(0, len(vector), VECTOR_CHUNK_SIZE)]

# Главная функция MapReduce для матрично-векторного умножения
def map_reduce(matrix, vector):
    matrix_mapped = [map_matrix_entry(row) for _, row in read_matrix_data(matrix)]
    vector_mapped = [pair for chunk in split_vector_into_chunks(vector) for pair in map_vector_chunk(chunk)]
    grouped_data = group_by_partition(matrix_mapped + vector_mapped)
    partial_products = [result for partition_id, values in grouped_data.items() for result in reduce_partial_product(partition_id, values)]
    return aggregate_results(partial_products)

from collections import namedtuple
MatrixRow = namedtuple('MatrixRow', ['row', 'col', 'value'])
VectorElement = namedtuple('VectorElement', ['index', 'value'])

matrix = [MatrixRow(0, 0, 1.0), MatrixRow(0, 1, 2.0), MatrixRow(1, 0, 3.0), MatrixRow(1, 1, 4.0)]
vector = [VectorElement(0, 0.5), VectorElement(1, 0.7)]

result = map_reduce(matrix, vector)
print("Matrix-Vector Multiplication Result:", result)


Matrix-Vector Multiplication Result: defaultdict(<class 'float'>, {0: 1.9, 1: 4.3})


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [63]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [64]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1

  for i in range(small_mat.shape[0]):  # Пробегаем все строки маленькой матрицы
        v = small_mat[i, j]  # m_{ij}
        yield ((i, k), v * w)  # (i,k) - ключ, произведение - значение

  # solution code that yield(k2,v2) pairs

def REDUCE(key, values):
  (i, k) = key
  yield ((i, k), sum(values))
  # solution code that yield(k3,v3) pairs

Проверьте своё решение

In [65]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [66]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [67]:
import numpy as np  # Импортируем библиотеку для работы с матрицами

# Размерности матриц
ROWS = 2
COLS = 3
WIDTH = 40

# Генерация случайных матриц
np.random.seed(42)  # Устанавливаем seed для воспроизводимости
matrix_a = np.random.rand(ROWS, COLS)  # Матрица A размером (ROWS x COLS)
matrix_b = np.random.rand(COLS, WIDTH)  # Матрица B размером (COLS x WIDTH)

# Эталонное решение с использованием стандартного умножения матриц
expected_solution = np.matmul(matrix_a, matrix_b)

# Функция, которая генерирует данные для обеих матриц
def data_reader():
    # Чтение первой матрицы (A)
    for row in range(matrix_a.shape[0]):
        for col in range(matrix_a.shape[1]):
            yield ((0, row, col), matrix_a[row, col])  # Формат (0, row, col, A[row, col])

    # Чтение второй матрицы (B)
    for col in range(matrix_b.shape[0]):
        for width in range(matrix_b.shape[1]):
            yield ((1, col, width), matrix_b[col, width])  # Формат (1, col, width, B[col, width])

# Функция MAP для объединения данных матриц по индексу
def map_join_data(key, value):
    matrix_type, row_or_col, col_or_width = key  # Разбираем номер матрицы и индексы
    val = value

    if matrix_type == 0:  # Если это элемент первой матрицы A
        yield (col_or_width, (0, row_or_col, val))  # Группируем по col
    else:  # Если это элемент второй матрицы B
        yield (row_or_col, (1, col_or_width, val))  # Группируем по col

# Функция REDUCE для объединения матриц по индексу
def reduce_join_data(key, values):
    matrix_a_values = [v for v in values if v[0] == 0]  # Элементы первой матрицы A
    matrix_b_values = [v for v in values if v[0] == 1]  # Элементы второй матрицы B

    for a in matrix_a_values:
        for b in matrix_b_values:
            yield ((a[1], b[1]), a[2] * b[2])  # Умножение значений A[row, col] * B[col, width]

# Функция MAP для финальной агрегации (суммирование произведений)
def map_multiply(key, value):
    yield (key, value)  # Ключ (row, width) оставляем без изменений

# Функция REDUCE для суммирования произведений
def reduce_multiply(key, values):
    yield (key, sum(values))  # Суммируем все элементы для ключа (row, width)

# Генерация объединённых данных через MapReduce
mapped_data = MapReduce(data_reader, map_join_data, reduce_join_data)

# Финальное выполнение MapReduce для получения результата
final_solution = MapReduce(lambda: mapped_data, map_multiply, reduce_multiply)

# Проверка на совпадение с эталонным решением
print(np.allclose(expected_solution, asmatrix(final_solution)))  # Должно вывести True


True


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [68]:
import numpy as np

MAPPER_COUNT = 2  # Число мапперов
REDUCER_COUNT = 2  # Число редукторов
ROWS_MATRIX_A = 2
COLS_MATRIX_A = 3
COLS_MATRIX_B = 40  # K = 4 * 10

# Генерация данных
matrix_a = np.random.rand(ROWS_MATRIX_A, COLS_MATRIX_A)
matrix_b = np.random.rand(COLS_MATRIX_A, COLS_MATRIX_B)

def chunk_data():
    """Разбиение матриц на части для обработки мапперами."""
    chunk_size = int(np.ceil(ROWS_MATRIX_A / MAPPER_COUNT))
    for start in range(0, ROWS_MATRIX_A, chunk_size):
        yield process_data_chunk(range(start, min(start + chunk_size, ROWS_MATRIX_A)))

def process_data_chunk(rows):
    """Генерирует пары (индексы, значение) из двух матриц для каждого чанка."""
    for i in rows:
        for j in range(COLS_MATRIX_A):
            for k in range(COLS_MATRIX_B):
                yield ((i, j), matrix_a[i, j]), ((j, k), matrix_b[j, k])

def map_step(pair_a, pair_b):
    """Выполняет умножение элементов и группирует по (i, k)."""
    (i, j), value_a = pair_a
    (j, k), value_b = pair_b
    yield (i, k), value_a * value_b

def reduce_step(index, values):
    """Суммирует элементы с одинаковыми индексами (i, k)."""
    yield index, sum(values)

# Запуск MapReduce
intermediate_results = MapReduceDistributed(chunk_data, map_step, reduce_step)

# Объединение результатов
final_output = {}
for partition_id, partition in intermediate_results:
    for index, value in partition:
        final_output[index] = final_output.get(index, 0) + value

print("Matrix Multiplication Result:\n", final_output)


240 key-value pairs were sent over a network.
Matrix Multiplication Result:
 {(0, 1): 0.47268518561234973, (0, 2): 1.331585175830705, (0, 4): 1.021351852495352, (0, 5): 0.9131445349756118, (0, 8): 1.0854341486196752, (0, 9): 0.3075827214969634, (0, 11): 1.003832121786526, (0, 12): 0.45686569948009204, (0, 15): 0.8918100486065025, (0, 16): 0.5041984539756577, (0, 18): 0.8286369376706371, (0, 19): 0.9717206052489281, (0, 22): 1.099447290027778, (0, 23): 0.5036027285086736, (0, 25): 1.395355376421955, (0, 26): 0.9624208429352515, (0, 29): 0.5962891831345462, (0, 32): 0.5959378597563265, (0, 33): 0.6976597508277623, (0, 36): 0.04441053975372091, (0, 39): 0.6117816957363831, (1, 0): 0.4197798780682388, (1, 1): 0.31678983211806466, (1, 3): 0.2905523671212979, (1, 4): 0.6256497395186525, (1, 7): 0.7423569849982339, (1, 10): 0.6607515214848413, (1, 11): 0.6015564366326857, (1, 14): 0.8685641215901408, (1, 17): 0.38345055566397046, (1, 18): 0.6792902693535021, (1, 21): 0.6236422180184992, (1, 2

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [69]:
import numpy as np

# Определение размеров матриц
rows_matrix_a = 2
cols_matrix_a = 3
cols_matrix_b = 4 * 10

# Генерация случайных матриц
matrix_a = np.random.rand(rows_matrix_a, cols_matrix_a)
matrix_b = np.random.rand(cols_matrix_a, cols_matrix_b)

# Эталонный результат через стандартное умножение матриц
expected_output = np.matmul(matrix_a, matrix_b)

# Функция для "разглаживания" вложенных итерируемых объектов
def flatten(nested_iterable):
    for item in nested_iterable:
        for subitem in item:
            yield subitem

# Функция для группировки элементов по ключу
def group_by_key(iterable):
    grouped = {}
    for (key, value) in iterable:
        grouped[key] = grouped.get(key, []) + [value]
    return grouped.items()

# Функция для распределенной группировки элементов по ключу
def distributed_partition(map_partitions, partition_function):
    global num_reducers
    partitions = [dict() for _ in range(num_reducers)]
    for partition in map_partitions:
        for (key, value) in partition:
            partition_list = partitions[partition_function(key)]
            partition_list[key] = partition_list.get(key, []) + [value]
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for partition_id, partition in enumerate(partitions)]

# Функция для определения разделителя (по ключу)
def partition_selector(obj):
    global num_reducers
    return hash(obj) % num_reducers

# Функция для выполнения MapReduce на распределенных данных
def distributed_mapreduce(input_provider, map_function, reduce_function, partition_function=partition_selector, combiner=None):
    map_partitions = map(lambda reader: flatten(map(lambda kv: map_function(*kv), reader)), input_provider())

    if combiner:
        map_partitions = map(lambda partition: flatten(map(lambda kv: combiner(*kv), group_by_key(partition))), map_partitions)

    reduce_partitions = distributed_partition(map_partitions, partition_function)

    reduced_outputs = map(lambda partition: (partition[0], flatten(map(lambda group: reduce_function(*group), partition[1]))), reduce_partitions)

    return reduced_outputs

# Функция для преобразования REDUCE-выхода в матрицу
def to_matrix(reduced_output):
    reduced_output = list(reduced_output)
    rows = max(i for ((i, k), value) in reduced_output) + 1
    cols = max(k for ((i, k), value) in reduced_output) + 1
    matrix_result = np.empty(shape=(rows, cols))
    for ((i, k), value) in reduced_output:
        matrix_result[i, k] = value
    return matrix_result

# Генератор для ввода данных
def input_data():
    data_a = []

    for i in range(matrix_a.shape[0]):
        for j in range(matrix_a.shape[1]):
            data_a.append(((0, i, j), matrix_a[i, j]))  # Первая матрица

    global num_mappers
    chunk_size = int(np.ceil(len(data_a) / num_mappers))

    for i in range(0, len(data_a), chunk_size):
        yield data_a[i:i + chunk_size]

    data_b = []

    for j in range(matrix_b.shape[0]):
        for k in range(matrix_b.shape[1]):
            data_b.append(((1, j, k), matrix_b[j, k]))  # Вторая матрица

    chunk_size = int(np.ceil(len(data_b) / num_mappers))

    for i in range(0, len(data_b), chunk_size):
        yield data_b[i:i + chunk_size]

# MAP-функция для соединения матриц
def join_map_function(key, value):
    (matrix_id, i, j) = key
    element = value

    if matrix_id == 0:
        yield (j, (matrix_id, i, element))
    else:
        yield (i, (matrix_id, j, element))

# REDUCE-функция для соединения матриц
def join_reduce_function(key, values):
    matrix_a_elements = [v for v in values if v[0] == 0]
    matrix_b_elements = [v for v in values if v[0] == 1]

    for a_elem in matrix_a_elements:
        for b_elem in matrix_b_elements:
            yield ((a_elem[1], b_elem[1]), a_elem[2] * b_elem[2])

# Генератор для получения объединенных данных
def get_joined_data():
    for item in joined_data:
        yield item[1]

# MAP-функция для передачи значений
def multiplication_map_function(key, value):
    yield (key, value)

# REDUCE-функция для суммирования значений
def multiplication_reduce_function(key, values):
    total = 0

    for val in values:
        total += val
    yield (key, total)

num_mappers = 3
num_reducers = 2

# Выполнение MapReduce для соединения матриц
partitioned_result = distributed_mapreduce(input_data, join_map_function, join_reduce_function, combiner=None)
joined_data = [(partition_id, list(partition)) for partition_id, partition in partitioned_result]

# Выполнение MapReduce для умножения значений
multiplication_output = distributed_mapreduce(get_joined_data, multiplication_map_function, multiplication_reduce_function, combiner=None)
pre_result = [(partition_id, list(partition)) for partition_id, partition in multiplication_output]

# Формирование окончательного результата
final_result = []

for partition in pre_result:
    for value in partition[1]:
        final_result.append(value)

# Проверка на соответствие с эталонным решением
is_result_close = np.allclose(expected_output, to_matrix(final_result))

# Вывод части матрицы
result_matrix = to_matrix(final_result)
print("Результат умножения матриц (первая и последняя строка):")
print(result_matrix[0])  # Первая строка
print(result_matrix[-1])  # Последняя строка
print("Сравнение с эталонным решением:", is_result_close)



Результат умножения матриц (первая и последняя строка):
[0.76297669 0.12634115 0.6231596  1.09131914 0.31992627 0.71671965
 1.04442238 1.04414646 0.86452637 0.70794068 0.62148223 0.4417982
 0.91327461 0.85955059 0.85767505 1.10999319 0.71212365 0.75631386
 0.81355796 0.70741363 0.8064103  0.8541745  0.99737019 0.62318393
 0.60657404 0.26118678 0.95438471 0.32988915 0.73488937 0.80918288
 0.41005144 0.59314749 0.1558566  0.27505499 0.72071451 0.45912213
 0.13479107 0.5114723  1.06968427 0.41064228]
[1.23057179 0.20210192 0.73299819 1.37824387 0.70467077 1.18731333
 1.44562747 1.7546595  1.31348039 0.72501421 1.20532402 0.85263036
 1.21179633 1.12066193 0.96498958 1.52308862 1.11644083 1.37792744
 0.84578768 0.9318483  1.19173359 1.07791162 1.2798951  1.30491628
 1.14612965 0.69415626 1.87117636 0.97953737 1.44722403 1.50496154
 0.6886336  0.60916879 0.42037259 0.74293971 0.53723468 0.78460963
 0.16336247 0.55093071 1.81763528 0.79510005]
Сравнение с эталонным решением: True
