# Apache Spark

Автор ноутбука: Алексей Космачев

Сегодня будет говорить про Apache Spark - более удобный фреймворк для обработки больших данных на базе Hadoop.

С Spark можно работать из ноутбуков в Data Sphere, но так как нам еще потребуется запускать bash команды, то я буду запускать все команды ниже из ноутбука на мастер-ноде

PySpark - это не обычная библиотека, поэтому по-умолчанию ее нет в списке установленных пакетов

Чтобы решить эту проблему простым способом, добрые люди сделали небольшую библиотеку findspark

In [None]:
! pip install findspark

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark
sc = pyspark.SparkContext(appName="lsml-app")

# Работаем с RDD

RDD - это базовый строительный блок для Spark. Спарк внимательно следит за тем, где лежат части RDD и как они были созданы. RDD по сути своей представляют упорядоченный набор записей. Большое число функций считают, что это пары ключ-значение (также как было в MapReduce), но на деле это может быть и произвольные данные.

RDD сами по себе неизменяемые. Можно лишь получить новый RDD, применяя различные операции к изначальному RDD.

Существуют два вида операций над RDD - Действия (actions) и Трансформации (transformations).

Трансформации не применяются сразу - они лишь записываются в пул примененных операций. Чтобы что-то действительно началось считаться, нужно применить уже действие - тогда все указанные транформации действительно начнут считаться на кластере.

Давайте сразу смотреть на примерах, чтобы стало понятно.

In [None]:
rdd = sc.parallelize(range(10))  # Создаем rdd из обычного списка

In [None]:
rdd

In [None]:
rdd.collect()  # Получить значение всего RDD в память. Аккуратнее - если RDD большой, у вас просто лопнем питон

In [None]:
rdd.count()  # Считаем количество элементов в RDD

In [None]:
rdd.first()  # Берем только первый элемент

In [None]:
rdd.take(2)  # Берем первые N элементов

In [None]:
rdd.mean()  # Считаем среднее по всем элементам. Важно, чтобы элементы внутри RDD поддерживали суммирование и деление

In [None]:
rdd = sc.parallelize(["biba", "kuka"])  # Можем положить и строки

In [None]:
rdd.collect()

In [None]:
! hdfs dfs -ls /user

In [None]:
! hdfs dfs -mkdir -p /user/spark-example

In [None]:
! hdfs dfs -rm -r /user/spark-example/biba_and_kuka.txt || true

In [None]:
rdd.saveAsTextFile("/user/spark-example/biba_and_kuka.txt")  # Сохраняем RDD в HDFS

In [None]:
! hdfs dfs -ls /user/spark-example/biba_and_kuka.txt

In [None]:
! hdfs dfs -cat /user/spark-example/biba_and_kuka.txt/*

Добавим теперь еще трансформации

In [None]:
rdd = sc.parallelize(range(10))  # Создаем rdd из обычного списка

In [None]:
# Создаем rdd в котором каждый элемент возведен в квадрат
# map работает примерно также как и map в MapReduce. 
# Разница - мы не обрабатываем блок самостоятельно, а пишем функцию для обработки ровно одной записи
squares = rdd.map(lambda x: x**2).map(lambda x: x + 1)

# ВАЖНО - на самом деле ничего считаться в этот момент не начало
# Мы лишь записали наше желание получить новый RDD и записали это желание в squares

In [None]:
squares.first() 

# Так как мы применили Action то вот теперь все трансформации запустились
# Но так как action требует только первую строку, то Spark оптимизировал вычисления
# он прочитал только первую строку и для нее вычислил значение

In [None]:
squares.collect()

#### Начнем работать с данными

Датасет - тот же, что и на предыдущем семинаре

In [None]:
! hdfs dfs -ls /user/tweets/data 

In [None]:
data = sc.textFile("/user/tweets/data/*")

In [None]:
data.first()

In [None]:
import csv

def extract_text(raw_string):
    parsed_line = next(csv.reader([raw_string]))
    text = parsed_line[2]
    return text

In [None]:
data.map(extract_text).first()

In [None]:
import re

def extract_words(text):
    pattern = re.compile(r"[a-z]+")
    result = []
    for match in pattern.finditer(text.lower()):
        word = match.group(0)
        result.append(word)
    return result

In [None]:
data.map(extract_text).map(extract_words).take(2)

In [None]:
data.map(extract_text).flatMap(extract_words).first()

In [None]:
data.map(extract_text).flatMap(extract_words).take(10)

Все трансформации вычисляются каждый раз с самого первого RDD. Чтобы уменьшить количество лишних вычислений можно закешировать временный результат. Тогда он будет по максимуму переиспользоваться.

In [None]:
words = data.map(extract_text).flatMap(extract_words).cache()

In [None]:
%%time

words.count()

In [None]:
%%time

words.count()

На моем запуске второй запуск `words.count()` работал 2 секунды вместо 17

#### Word count

Попробуем реализовать тот же алгоритм, что и для классического MapReduce

In [None]:
words.map(lambda x: (x, 1)).first()  # Строим пары ключ значение

In [None]:
(
    words
    .map(lambda x: (x, 1))
    .groupByKey()  # Функция работает, только если RDD - это пары ключ-значение
    .take(2)
)

In [None]:
# ПЛОХОЙ ВАРИАНТ: материализуем весь x через list
(
    words
    .map(lambda x: (x, 1))
    .groupByKey()  # Функция работает, только если RDD - это пары ключ-значение
    .map(lambda x: (x[0], sum(list(x[1]))))
    .take(10)
)

In [None]:
# ИСПРАВЛЕННЫЙ ВАРИАНТ
(
    words
    .map(lambda x: (x, 1))
    .groupByKey()  # Функция работает, только если RDD - это пары ключ-значение
    .map(lambda x: (x[0], sum(x[1])))
    .take(10)
)

In [None]:
(
    words
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)  # Если уже готовая функция для reduce
    .take(10)
)

In [None]:
from operator import add

# то же самое с помощью оператора самого спарка

(
    words
    .map(lambda x: (x, 1))
    .reduceByKey(add) 
    .take(10)
)

In [None]:
(
    words
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .takeOrdered(10, lambda x: -x[1])  # Сортируем по значению функции
)

In [None]:
result_50 = (
    words
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .takeOrdered(50, lambda x: -x[1])
)

stop_words = [word for word, _ in result_50]  # Предподсчитали стоп слова

In [None]:
stop_words

In [None]:
(
    words
    .filter(lambda x: x not in stop_words)
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .takeOrdered(50, lambda x: -x[1])
)

Кроме базовых, есть еще и много продвинутых сложных функций
Например можем посчитать уникальные слова в датасете

Список всех можно смотреть в документации

https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

In [None]:
words.distinct().take(10)

In [None]:
words.distinct().count()

Однако иногда каких-то базовых примитивов может и не найтись. Например для RDD нет функции `limit` или около того.

Поэтому чтобы решить задачу top10 и сохранить это в HDFS нужно применить некоторую изобретательность

In [None]:
(
    words
    .filter(lambda x: x not in stop_words)
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .zipWithIndex()
    .take(10)
)

In [None]:
(
    words
    .filter(lambda x: x not in stop_words)
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .zipWithIndex()
    .filter(lambda x: x[1] < 10)
    .map(lambda x: x[0])
    .collect()
)

In [None]:
! hdfs dfs -ls /user/tweets/

In [None]:
! hdfs dfs -rm -r /user/tweets/spark/top10 || true

In [None]:
(
    words
    .filter(lambda x: x not in stop_words)
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .zipWithIndex()
    .filter(lambda x: x[1] < 10)
    .map(lambda x: x[0])
    .saveAsTextFile('/user/tweets/spark/top10')
)

In [None]:
! hdfs dfs -cat /user/tweets/spark/top10/*

#### Partitions

Под капотом Spark эксплуатирует примерно те же идеи, что и классический MapReduce. Это означает, что при необходимости сортировки, он разбивает ключи на группы и передает редюсерам на обработку только их часть.

На этот процесс также можно влиять. Это может позводить улучшить производительность программ, а также решить проблемы переполнения редюсеров.

In [None]:
words.getNumPartitions()

In [None]:
numbers = sc.parallelize(range(10))

In [None]:
numbers.glom().collect()  # Получаем доступ до данных в каждей партиции

In [None]:
squares = numbers.map(lambda x: (x, x**2))

Операции изменения партиций предполагают наличие ключа, поэтому вначале преобразуем данные к виду ключ-значение

In [None]:
squares.partitionBy(2).glom().collect()

In [None]:
squares.partitionBy(15).glom().collect()

In [None]:
def custom_partitioner(value):
    return value % 3

In [None]:
squares.partitionBy(3, custom_partitioner).glom().collect()

Таким образом можно выбирать более удачные способы разбиения и например увеличивать количество редюсеров под вашу задачу.

Или наоборот, уменьшать количество количество партиций, если они избыточны. Например вы отфильтровали гигантский датасет и теперь вам больше не требуется такое гигантское количество партиций для работы.

Для этого можно использовать и `repartition` как делали выше, однако этот метод запустит пересортировку вообще всего RDD, что дорого и излишне. Чтобы так не было, можно использовать функцию `coalesce` - она просто схлопнуть вместе те партиции, котороые уже находятся на одной машине, что значительно уменьшит количество лишних телодвижений.

In [None]:
numbers = sc.parallelize(range(10), 10) # явно указали вторым аргументом количество партиций
numbers.glom().collect()

In [None]:
squares = numbers.map(lambda x: (x, x**2))

In [None]:
squares.filter(lambda x: x[0] >= 7).glom().collect()

In [None]:
squares.filter(lambda x: x[0] >= 7).coalesce(2).glom().collect()

In [None]:
squares.filter(lambda x: x[0] >= 7).partitionBy(2).glom().collect()

#### DataFrame и SQL

Уже текущий набор функций - это большой шаг вперед относительно классического MapReduce. Однако на этом плюшки Spark не заканчиваются. Разработчики пошли дальше и начали внедрять еще более высокоуровневый интерфейс для работы с данными, который может сильно упростить жизнь разработчикам.

DataFrame - это модель таблицы, построенная поверх RDD. О ней можно думать как о Pandas на стероидах.

In [None]:
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
rdd.collect()

In [None]:
from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

In [None]:
df = se.createDataFrame(rdd)
df.printSchema()
df.show()

In [None]:
df = se.createDataFrame(
    rdd.map(lambda x: Row(pipa=x[0], pupa=x[1]))
)
df.printSchema()
df.show()

Для удобства есть встроенные функции конвертации в pandas и оттуда

In [None]:
pandas_df = df.toPandas()
pandas_df

In [None]:
df = se.createDataFrame(pandas_df)
df.printSchema()
df.show()

Есть специальные функции, которые умеют работать с популярными форматами хранения таблиц, и строить их в HDFS.

Прочтем нашу таблицу через DataFrame

In [None]:
df = se.read.csv('/user/tweets/data/*', header=False, inferSchema=True)

In [None]:
df.printSchema()

In [None]:
columns = [
    'external_author_id',
    'author',
    'content',
    'region',
    'language',
    'publish_date',
    'harvested_date',
    'following',
    'followers',
    'updates',
    'post_type',
    'account_type',
    'retweet',
    'account_category',
    'new_june_2018',
    'alt_external_id',
    'tweet_id',
    'article_url',
    'tco1_step1',
    'tco2_step1',
    'tco3_step1'
]
df = df.toDF(*columns)
df.printSchema()

In [None]:
df.show()

In [None]:
df[['author', 'content']].show()

In [None]:
df.registerTempTable('tweets')  # Регистрируем как временную таблицу для SQL

In [None]:
se.sql("""
    SELECT author, content, followers
    FROM tweets
    WHERE followers > 100
    LIMIT 10
""").show()

In [None]:
se.sql("""
    SELECT language, count(*) as tw_count
    FROM tweets
    WHERE followers > 100
    GROUP BY language
""").show()

In [None]:
top5_lang = se.sql("""
    SELECT language, count(*) as tw_count
    FROM tweets
    WHERE followers > 100
    GROUP BY language
    ORDER BY tw_count DESC
    LIMIT 5
""")
top5_lang.show()

In [None]:
only_langs_df = se.sql("""
    SELECT language
    FROM (
        SELECT language, count(*) as tw_count
        FROM tweets
        WHERE followers > 100
        GROUP BY language
        ORDER BY tw_count DESC
        LIMIT 5
    )
""")
only_langs_df.show()

In [None]:
only_langs_df.registerTempTable('languages')

In [None]:
se.sql("""
    SELECT author, language
    FROM tweets
    WHERE language in (SELECT * FROM languages)
    LIMIT 10
""").show()

In [None]:
se.sql("""
    SELECT author, t.language
    FROM tweets t
        inner join languages l on l.language = t.language
    LIMIT 10
""").show()

In [None]:
# Из под датафрейма всегда можно вынуть RDD и работать напрямую уже с ним

top5_lang.rdd.map(lambda x: x.language.upper()).collect()

In [None]:
top5_lang.collect()