# Тема 30. Работа с большими данными.

# Распараллеливание операций
Количество данных, которые надо обрабатывать, растет изо дня в день. В моем детстве получить в [подарок компьютер](https://www.computer-museum.ru/articles/personalnye-evm/899/) с 40 *мегабайтами* ($40*10^6$ байт) дисковой памяти было просто потрясающе. Сегодня никого уже не удивить [хранилищами](https://www.ixbt.com/news/2017/12/08/toshiba-mg07aca-14.html) на полтора десятка *терабайт* ($15*10^{12}$ байт).   

Но объемы данных растут гораздо быстрее, чем емкость дисков и памяти для их хранения и обработки. Например, количество данных, собранных [Большим Адронным Коллайдером](https://www.lhc-closer.es/taking_a_closer_look_at_lhc/0.lhc_data_analysis) уже перевалило за 200 *петабайт* ($200*10^{15}$ байт), добавляя по 10-15 петабайт каждый год. 

При таких объемах данных привычная нам обработка - когда мы загружаем все данные в память компьютера и запускаем программу -  не работает. Такие данные, когда традиционные подходы к их обработке не работают из-за большого количества этих данных, назвали [**Большими Данными**](https://ru.wikipedia.org/wiki/Большие_данные) (Big Data).

*Большие данные* это не очень точно определенное понятие, нельзя назвать конкретную цифру, когда данные считать большими, а когда нет. Главный признак больших данных - они не помещаются в одну физическую память.



Как же работать с большими данными?

Раз не можем хранить и обрабатывать такие данные на одном устройстве - будем делать это на нескольких, **распараллелим вычисления**! 

Представьте такой пример: надо сложить восемь чисел (a, b, c, d, e, f, g, h), используя несколько вычислителей одновременно. Как это можно сделать? 

Операция сложения *коммутативна и ассоциативна*.
* Коммутативна - мы можем менять порядок слагаемых и сумма не изменится.
* Ассоциативна -  можем объединить слагаемые в группы, сложить их внутри групп, а потом сложить результаты каждой группы и опять сумма не изменится (т.е. можно расставлять скобочки по своему усмотрению).

Пусть у нас есть два вычислителя, тогда можем поступить так:
* первый вычислитель - принимает числа abcd и складывает их.
* второй вычислитель в то же самое время принимает числа efgh и складывает их.
* какой-то, например, первый, вычислитель принимает результаты от первого и второго и складывает их.

А если вычислителей четыре? 

Такое можно посчитать за несколько шагов  (см. рис.).

Шаг 1:
* первый вычислитель принимает два числа a и b и складывает их.
* одновременно второй принимает c и d, складывает их.
* одновременно с ними третий вычислитель принимает e и f, складывает.
* и, наконец, четвертый вычислитель одновременно принимает g и h, складывает.

Шаг 2:
* первый вычислитель принимает результат с шага 1 от себя и второго вычислителя, складывает эти результаты.
* второй вычислитель простаивает, ничего не делает.
* третий вычислитель принимает результат с шага 1 от себя и четвертого вычислителя, складывает эти результаты.
* четвертый простаивает, ничего не делает.

Шаг 3:
* первый вычислитель принимает результат с шага 2 от себя и третьего вычислителя, складывает эти результаты.
* второй, третий и четвертый вычислители простаивают, ничего не делают.


![img](http://www.cburch.com/books/distalg/sum.png)


После третьего шага в первом вычислителе получился искомый результат - сумма всех чисел. Ровно по тому же принципу можно работать, когда чисел триллионы и есть несколько вычислителей.

Обратите внимание, что вычислители работают одновременно друг с другом, в то же самое время и весь алгоритм занимает только три шага. Можно придумать и другие схемы работы вычислителей, например, принимать не по два, а по четыре числа. В любом случае мы имеем некоторую схему распараллеливания вычислений, в которой можем регулировать количество данных обрабатываемых каждым вычислителем. В нашем примере каждый вычислитель принимал только два числа. Давайте всегда делать так, чтобы это количество помещалось в память вычислителя.

Важное наблюдение - каждый вычислитель работает со своей порцией данных, значит можно изначально записать и хранить их так, чтобы порции попадали в память своего вычислителя - нам не нужно одно большое хранилище данных.  



Раз подход работает со сложением - будет работать и с другими ассоциативными операциями, например, поиск самого длинного слова в большом тексте. Действительно, это ассоциативная операция: если слово А длиннее слова Б, а слово В длиннее слова Г, разбив их на группы на первом шаге выделим слова А и В и сравнив их, найдем на втором шаге наиболее длинное. Результат будет таким же, если бы мы сравнивали слова постепенно.



Здесь нам надо сначала найти для слов их длину, а уже потом объединять результаты от разных вычислителей. При объединении же результатов, мы теперь используем не сумму, а выбор максимума.

И другие похожие задачи можно решать аналогичным способом, который назвали **Map-Reduce**.



# Map-Reduce
В подходе Map-Reduce вычисления разбиваются логически на три этапа:
- **Map**, отображение - предварительная обработка входных данных в виде большого списка значений. При этом главный узел (логический вычислитель) кластера (master node) получает этот список, делит его на части и передает рабочим узлам (worker node). Далее каждый рабочий узел применяет заданную функцию Map (например, подсчет количества слов в предложении) к локальным данным и записывает результат в формате «ключ-значение» во временное хранилище. Здесь же может выполняться фильтрация данных, например, отбросить не подходящие. 
- **Shuffle**, распределение -  когда рабочие узлы перераспределяют данные на основе ключей, ранее созданных функцией Map, таким образом, чтобы все данные одного ключа лежали на одном рабочем узле.
- **Reduce**, свертка – параллельная обработка каждым рабочим узлом каждой группы данных по порядку следования ключей и «склейка» результатов на master node. 

Главный узел получает промежуточные ответы от рабочих узлов и передаёт их на свободные узлы для выполнения следующего этапа. Получившийся после прохождения всех необходимых этапов результат – это и есть решение исходной задачи.

![img](https://www.bigdataschool.ru/wp-content/uploads/2019/10/MapReduce3.png)

На рисунке выше показан пример, где для каждого слова в тексте подсчитывается сколько раз оно встретилось. 

Весь текст разбивается на 4 части, каждая поступает на свой узел, где применяется отображение map - подсчет числа вхождений слов в эту часть. Слова это ключи, их число вхождений - значения.

Для каждого ключа-слова создается свой логический узел, куда поступает информация из блоков map, на каждый узел попадает свое слово. Это именно логические узлы, физически они могут располагаться на одном или нескольких вычислителях.

В каждом узле применяется агрегирующая функция reduce, здесь - подсчет количества записей, это и коммутативная и ассоциативная операция.

Затем результаты собираются на главном узле, где мы и получили окончательный ответ - число вхождений слов во всем тексте.








## 1. Простая реализация Map-Reduce. 

Сделаем свою простую реализацию подхода Map-Reduce для поиска наиболее длинного слова в тексте.

*Примечание: в наших простых примерах все помещается в память, но подход работает и для более сложных случаев* 

Сделаем функцию для поиска наиболее длинной строки в списке. Перебираем строки по очереди, вычисляем длину и сохраняем самую длинную строку, пока не закончим.


In [1]:
# Функция для поиска наиболее длинной строки в списке строк
def find_longest_string(list_of_strings): # принимаем список строк
    longest_string = None
    longest_string_len = 0 
    for s in list_of_strings: # берем каждую строку из списка
        if len(s) > longest_string_len: # находим ее длину
            longest_string_len = len(s) # если больше чем предыдущие, то запоминаем длину
            longest_string = s # и саму строку
    return longest_string

Для небольших списков это работает довольно быстро. 

3 строки:

In [2]:
list_of_strings = ['abc', 'python', 'dima']
%time max_length = print(find_longest_string(list_of_strings))

python
CPU times: user 629 µs, sys: 0 ns, total: 629 µs
Wall time: 636 µs


3000 строк:

In [3]:
large_list_of_strings = list_of_strings*1000
%time print(find_longest_string(large_list_of_strings))

python
CPU times: user 2.38 ms, sys: 95 µs, total: 2.48 ms
Wall time: 6.35 ms


300 миллионов строк, уже долго:

In [4]:
large_list_of_strings = list_of_strings*100000000
%time print(find_longest_string(large_list_of_strings))

python
CPU times: user 26.8 s, sys: 41.8 ms, total: 26.8 s
Wall time: 26.8 s


Используемая функция для этапа map здесь это подсчет длины слова, просто функция len. Чтобы применять такую функцию к данным воспользуемся встроенной функцией map().

Используемая функция для этапа reduce здесь это выбор максимального значения (ищем наиболее длинное слово). Встроенной функции reduce нет, воспользуемся такой функцией из библиотеки [`functools`](https://pythonworld.ru/moduli/modul-functools.html). В ней используемая функция должна принимать два агрегируемых аргумента ключ-значение.

Этап распределения shuffle здесь не нужен, так как результат хранится в одном массиве. 

In [5]:
# Используемые функции
mapper = len # подсчет длины слова
def reducer(p, c): # выбор максимального элемента
    if p[1] > c[1]: # каждый аргумент это пара ключ-значение, значения имеют индекс 1.
        return p
    return c

In [6]:
from functools import reduce

In [7]:
%%time
# этап 1
mapped = map(mapper, list_of_strings) # применяем функцию расчета длины к каждому слову
mapped = zip(list_of_strings, mapped) # и делаем пары ключ-значение, ключ - слово, значение - длина 
# этап 2:
reduced = reduce(reducer, mapped) # агрегируем поиском максимума
print(reduced)

('python', 6)
CPU times: user 187 µs, sys: 10 µs, total: 197 µs
Wall time: 148 µs


Выше мы работали сразу со всеми данными, но обычно данные разделяются на части. Сделаем и для такого примера.

Разделим данные на части:

In [8]:
# разделение данных на заданное число частей
def chunkify(seq, number_of_chunks=30):
  return (seq[i::number_of_chunks] for i in range(number_of_chunks))

Для каждой части выполняем функции map-reduce. Но так как теперь частей несколько, то результаты для каждой части также нужно объединить. У нас это та же самая операция reduce (выбираем общий максимум из максимумов частей).

Операции map-reduce для одной части данных определим в виде функции: 

In [9]:
# map-reduce для одной части данных
def chunks_mapper(chunk):
    mapped_chunk = map(mapper, chunk)  # применяем функцию расчета длины к каждому слову
    mapped_chunk = zip(chunk, mapped_chunk) # и делаем пары ключ-значение
    return reduce(reducer, mapped_chunk) # агрегируем поиском максимум


Применим.

In [10]:
%%time
# разделяем на части
data_chunks = chunkify(large_list_of_strings, number_of_chunks=30)
# ко всем частям применяем map-reduce
mapped = map(chunks_mapper, data_chunks)
# объединяем результаты от всех частей
reduced = reduce(reducer, mapped)
print(reduced)

('python', 6)
CPU times: user 1min 15s, sys: 300 ms, total: 1min 16s
Wall time: 1min 15s


Если бы у нас было много процессоров, то с помощью библиотеки multiprocessing можно было бы распараллелить эти вычисления. Но в Colab выделен только один процессор, поэтому никакого прироста скорости не будет. Но если у вас есть компьютер с несколькими процессорами, то вы увидите прирост скорости выполнения. 

In [11]:
%%time
from multiprocessing import Pool
pool = Pool(8) # создаем "кластер" из восьми процессоров
data_chunks = chunkify(large_list_of_strings, number_of_chunks=8) # разделяем данные на 8 частей
# ко всем частям применяем map-reduce, но теперь каждая часть считается на своем процессоре
mapped = pool.map(chunks_mapper, data_chunks)
# объединяем результаты от всех частей
reduced = reduce(reducer, mapped)
print(reduced)

('python', 6)
CPU times: user 15 s, sys: 2.04 s, total: 17 s
Wall time: 1min 20s


## 2. Pyspark

Подход Map-Reduce реализован в нескольких системах: [Hadoop]( https://ru.wikipedia.org/wiki/Hadoop), [Spark]( https://ru.wikipedia.org/wiki/Apache_Spark) и др.

Мы посмотрим на работу [Pyspark](https://spark.apache.org/docs/latest/api/python/) для языка Python.

Здесь работа с данными в подходе Map-Reduce реализована прозрачно для пользователя, система сама будет создавать необходимые узлы, и выполнять необходимые этапы.



### Установка
Прежде нужно установить в Colab все необходимые средства, а именно Apache Spark подходящей версии с hadoop 2.7, Java 8. Также Findspark, которая поможет настроить Spark. Ниже нужно указать подходящую версию, сейчас это 2.4.8:

In [12]:
# Устанавливаем Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# скачиваем Spark
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
# распаковываем
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
# устанавливаем findspark
!pip install -q findspark

Для работы необходимо прописать пути к Java и Spark в системную переменную среды.

In [13]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" # путь к Java
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7" # путь к Spark (укажите вашу версию)

Инициализируем и проверим работу локальной сессии spark:

In [14]:
import findspark
findspark.init() # настраивает pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate() # создаем сессию

### Вычисление линейной регрессии

В Pyspark реализовано множество вычислительных методов, есть и методы машинного обучения. Например, линейная регрессия.

Скачаем данные о домах в Бостоне [отсюда](https://github.com/asifahmed90/pyspark-ML-in-Colab/blob/master/BostonHousing.csv).

In [15]:
!wget https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv

--2021-08-14 16:36:18--  https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [text/plain]
Saving to: ‘BostonHousing.csv’


2021-08-14 16:36:19 (8.65 MB/s) - ‘BostonHousing.csv’ saved [35735/35735]



In [16]:
!ls # проверим папку

BostonHousing.csv  spark-2.4.8-bin-hadoop2.7
sample_data	   spark-2.4.8-bin-hadoop2.7.tgz


Для работы нам потребуется два модуля из Pyspark: VectorAssembler и LinearRegression. VectorAssembler собирает все признаки типа double из нескольких столбцов в один. Если в данных есть строковые признаки, то следует использовать  StringIndexer, но в наших данных этого нет.  

Методом spark.read.csv() читаем данные из файла, указываем, что названия столбцов надо взять из файла и в нем есть заголовок.

In [17]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)

Создается объект DataFrame spark, который по смыслу очень похож на pandas DataFrame, но реализован на spark. 

Посмотрим схему данных - информацию о столбцах.

In [18]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



Собираем все признаки из разных столбцов в один столбец 'Attributes' в переменной outputCol, кроме "medv", который будет целевым значением. 

In [19]:
# Собираем все признаки из разных столбцов в один столбец
# задаем сборщик
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')
# применяем сборщик
output = assembler.transform(dataset)

# таблица входов и целевых значений
finalized_data = output.select("Attributes","medv")
# напечатаем
finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



Разделяем данные на обучающее и тестовое множества (0.8 и 0.2).

Задаем регрессор, обучаем его, проверяем как обучилось. Синтаксис очень похож на sklearn.

In [20]:
# Разделяем данные на обучающее и тестовое множества
train_data,test_data = finalized_data.randomSplit([0.8,0.2])

# Задаем регрессор
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

# обучаем его
regressor = regressor.fit(train_data)

# проверяем как обучилось
pred = regressor.evaluate(test_data)

# печатаем результат
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.0136,75.0,4.0,...|18.9| 15.35710411524662|
|[0.02899,40.0,1.2...|26.6| 21.92403439567257|
|[0.03041,0.0,5.19...|18.5|19.558399887571934|
|[0.03237,0.0,2.18...|33.4| 28.62371762785608|
|[0.03466,35.0,6.0...|19.4| 23.02527470807725|
|[0.03738,0.0,5.19...|20.7|21.574017965979316|
|[0.04741,0.0,11.9...|11.9|22.638194555590267|
|[0.05059,0.0,4.49...|23.9|24.842508436836994|
|[0.05083,0.0,5.19...|22.2|22.152484706038727|
|[0.05497,0.0,5.19...|19.0|  21.3987125603013|
|[0.05644,40.0,6.4...|32.4| 37.07794830806628|
|[0.06211,40.0,1.2...|22.9|20.348580485157004|
|[0.06417,0.0,5.96...|18.9| 24.06867640324641|
|[0.06588,0.0,2.46...|39.8| 35.18985767958472|
|[0.06905,0.0,2.18...|36.2| 28.04773781775772|
|[0.06911,45.0,3.4...|30.5|29.889488988907367|
|[0.07165,0.0,25.6...|20.3|23.053793272197208|
|[0.0795,60.0,1.69...|24.1|20.255568043598714|
|[0.07978,40.

Можно напечатать полученные коэффициенты линейной регрессии и смещение:

In [21]:
# коэффициенты
coeff = regressor.coefficients

# смещение
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)


The coefficient of the model is : DenseVector([-0.1086, 0.0517, 0.0261, 3.5586, -19.1003, 3.7844, 0.0106, -1.5005, 0.3368, -0.0149, -0.9116, 0.0098, -0.5219])
The Intercept of the model is : 36.453686


### Анализ

Имея обученную регрессию можно считать различные статистики с помощью модуля RegressionEvaluator из Pyspark.

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator

# создаем подсчитыватель статистики
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

# считаем Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# считаем Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# считаем Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# считаем r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)



RMSE: 4.688
MSE: 21.977
MAE: 3.643
r2: 0.623


# Задания и вопросы
* Какая была функция map в примере со сложением чисел? (Никакая, числа брались как есть, без преобразований).
* Приведите примеры не коммутативных и\или не ассоциативных операций. (например, вычитание, возведение в степень).
* Измените п.1 на поиск самого короткого слова
* Изучите документацию на [MLlib pyspark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html) и примените другой тип регрессии - на основе деревьев (DecisionTreeRegressor).

# Ссылки
Использованы и адаптированы материалы:
* https://github.com/asifahmed90/pyspark-ML-in-Colab 
* https://www.bigdataschool.ru/wiki/mapreduce 
* https://colab.research.google.com/github/RPI-DATA/course-intro-ml-app/blob/master/content/notebooks/18-big-data/01-intro-mapreduce.ipynb 

Большое количество примеров машинного обучения с pyspark можно найти здесь:
* https://github.com/apache/spark/tree/v3.1.2-rc1/examples/src/main/python/mllib
