# Примитивы для знакомства со спарком и Spark UI

In [6]:
import os
import sys
import time

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

import pyspark.sql.functions as F

- Spark UI расположен по адресу http://localhost:4040/ В конфигурации спарк-сессии вы можете указать другой порт при необходимости с помощью параметра `spark.ui.port`
- Для локального использования, без сети, без кластера в настройке `master` можно указать `local[*]`. Если звездочка, то используются все доступные ядра. Вместо звездочки можно указать меньшее значение
- Проверим что сессия создается и интерфес Spark UI работает, в дальнейшем перед каждым тестом будет пересоздавать сессию `spark = spark_restart(spark)`, иначе спарк все подряд кэширует и оптимизирует и тесты выходят некорректные 😇

In [7]:
# Spark UI http://localhost:4040/jobs/

def spark_start():
    spark = (SparkSession.builder
        .master('local[*]')
        .appName('spark_in_examples')
        .config('spark.driver.memory', '4g')
        .config('spark.executor.memory', '4g')
        .config('spark.driver.bindAddress','localhost')
        .config('spark.ui.port', '4040')
        .getOrCreate()
    )
    return spark

def spark_restart(spark=None):
    if spark and (spark.getActiveSession()):
        print('Restart session')
        spark.stop()
    return spark_start()

spark = spark_restart()

In [3]:
spark.getActiveSession()

## Распараллеливание

Для имитации длительного процесса будем использовать функцию, которая делает задежку на 5 секунд и потом выводит число `num` полученное в качестве аргумента.

In [5]:
def process_data(num):
    time.sleep(5)
    print('Число ', num, '  ')

Выполним функцию `process_data` 10 раз.

Обратите внимание, что вывод значений идет не по порядку – процессы выполняются параллельно и финишируют наперегонки.


In [8]:
%%time
spark = spark_restart(spark)
(spark
 .sparkContext
 .parallelize(range(10))
 .foreach(process_data)
)

Restart session


Число  7                                                            (0 + 8) / 8]
Число  3   
Число  0   
Число  5   
Число  1   
Число  6   
Число  8   
Число  2   

CPU times: user 13.8 ms, sys: 8.25 ms, total: 22 ms
Wall time: 11.1 s


Число  4   
Число  9   
                                                                                

Время выполнения задания 10-15 секунд.

- Перейдите в Spark-UI на вкладку Stages http://localhost:4040/stages/
- Кликните в списке стадий по стадии foreach
- Перейдите в раздел Event Timeline – здесь вы увидите диаграмму, на которой отображены таски.

Максимально количество параллельно выполняемых тасков соответствует количеству ядер. У меня на компе 8 ядер, а итераций в цикле 10. Видно, что все ядра загружены, 8 циклов выполняются одновременно, и два ядра работают дольше других – доделывают оставшиеся два цикла.

![event timeline foreach](images/foreach.png)



Для примера выполним без использования спарка аналогичный цикл.

In [9]:
%%time
for i in range(10):
    process_data(i)

Число  0   
Число  1   
Число  2   
Число  3   
Число  4   
Число  5   
Число  6   
Число  7   
Число  8   
Число  9   
CPU times: user 20.9 ms, sys: 8.51 ms, total: 29.4 ms
Wall time: 50 s


Все шаги выполняются строго последовательно, порядок вывода значений не нарушен. Время выполнения – честные 50 секунд, то есть 10 итераций с задержкой по 5 сек.

## План вычислений

Спарк оптимизирует ленивые вычисления и может выполнять их не в том порядке как написано в коде.

Посмотрим на примере фильтра и сортировки:
- создаем датафрейм, по умолчанию 8 партиций
- в коде сначала сортировка и потом два фильтра
- в плане спарк объединяет фильтры в один и потом упорядочивает
- под капотом операции`orderBy` выполняются два действия
    - репартиционирование по указанному столбцу
    - сортировка
- на выходе у нас упорядоченный датасет из 1 партиции

In [87]:
spark = spark_restart(spark)
rdd = spark.sparkContext.parallelize(range(10))
df = spark.createDataFrame(rdd, IntegerType())
print('NumPartitions', df.rdd.getNumPartitions())

Restart session
NumPartitions 8


In [88]:
df = df.orderBy('value', ascending=False)
df = df.filter(df['value']<8)
df = df.filter(df['value']>2)

In [89]:
df.explain(mode='formatted')
print('NumPartitions', df.rdd.getNumPartitions())

== Physical Plan ==
AdaptiveSparkPlan (5)
+- Sort (4)
   +- Exchange (3)
      +- Filter (2)
         +- Scan ExistingRDD (1)


(1) Scan ExistingRDD
Output [1]: [value#310]
Arguments: [value#310], MapPartitionsRDD[4] at applySchemaToPythonRDD at <unknown>:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter
Input [1]: [value#310]
Condition : (isnotnull(value#310) AND ((value#310 < 8) AND (value#310 > 2)))

(3) Exchange
Input [1]: [value#310]
Arguments: rangepartitioning(value#310 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1227]

(4) Sort
Input [1]: [value#310]
Arguments: [value#310 DESC NULLS LAST], true, 0

(5) AdaptiveSparkPlan
Output [1]: [value#310]
Arguments: isFinalPlan=false




                                                                                

NumPartitions 1


# Партиционирование

### Партиционирование при группировке

Проверим репартиционирование при разном количестве групп

In [132]:
spark = spark_restart(spark)
rdd = spark.sparkContext.parallelize(range(2_000_000))
df = spark.createDataFrame(rdd, IntegerType())
print('NumPartitions при создании', df.rdd.getNumPartitions())

Restart session
NumPartitions при создании 8


In [134]:
def test_groupby(divideBy=1):
    print('NumPartitions', 
          df.withColumn('group', df['value']%divideBy).groupBy('group').count().rdd.getNumPartitions(),
          f'количество групп {divideBy}',
    
         )

test_groupby(200_000)
test_groupby(2_000)



NumPartitions 8 количество групп 200000
NumPartitions 1 количество групп 2000


[Stage 3:>                                                          (0 + 8) / 8]

То есть маленький датафрейм у нас схлопнулся в одну партицию 🤔

Партиционирование – важный фактор при распараллеливании процессов! Перейдем к следующему тесту 🤓

### Распараллеливание и скорость вычислений


Датафреймы в спарке изначально параллельные.
- Создадим тестовый датафрейм с колонкой чисел от 0 до 200_000_000
- Проверим сколько партиций он содержит (по умолчанию соответствует количеству ядер, в моем случае 8)



#### 8 partitions, 1 stage, 8 tasks

In [8]:
spark = spark_restart(spark)
rdd = spark.sparkContext.parallelize(range(200_000_000))
df = spark.createDataFrame(rdd, IntegerType())
print('NumPartitions при создании', df.rdd.getNumPartitions())

(df.withColumn('col2', df['value']*0.1)
 .filter(df['value']%20_000_000 == 0)
 .collect()
)

Restart session
NumPartitions при создании 8


                                                                                

[Row(value=0, col2=0.0),
 Row(value=20000000, col2=2000000.0),
 Row(value=40000000, col2=4000000.0),
 Row(value=60000000, col2=6000000.0),
 Row(value=80000000, col2=8000000.0),
 Row(value=100000000, col2=10000000.0),
 Row(value=120000000, col2=12000000.0),
 Row(value=140000000, col2=14000000.0),
 Row(value=160000000, col2=16000000.0),
 Row(value=180000000, col2=18000000.0)]

Перейдем в Spark UI
- Задание выполняется в одну стадию, 8 тасков, все ядра заняты
- Время выполнения 30 секунд

![event timeline filter 8 partitions in one stage](images/filter_8_partitions_1_stage.png)

![event timeline filter and collect 8 partitions](images/filter_8_partitions_8_tasks.png)


#### 2 partitions, 1 stage, 2 tasks


Создаем датафрейм на две партиции


In [9]:
spark = spark_restart(spark)
rdd = spark.sparkContext.parallelize(range(200_000_000), numSlices=2)
df = spark.createDataFrame(rdd, IntegerType())
print('NumPartitions при создании', df.rdd.getNumPartitions())

(df.withColumn('col2', df['value']*0.1)
 .filter(df['value']%20_000_000 == 0)
 .collect()
)

Restart session
NumPartitions при создании 2


                                                                                

[Row(value=0, col2=0.0),
 Row(value=20000000, col2=2000000.0),
 Row(value=40000000, col2=4000000.0),
 Row(value=60000000, col2=6000000.0),
 Row(value=80000000, col2=8000000.0),
 Row(value=100000000, col2=10000000.0),
 Row(value=120000000, col2=12000000.0),
 Row(value=140000000, col2=14000000.0),
 Row(value=160000000, col2=16000000.0),
 Row(value=180000000, col2=18000000.0)]

Время выполнения 58 секунд, заметна разница с предыдущим тестом на 8 ядер

- То есть для больших датасетов оптимизация количества партиций важна! (на маленьких это не заметно)
- В данном случае на двух партициях задача выполняется около минуты, а на 8 партициях за 30 сек

![event timeline filter 2 partitions in 1 stages](images/filter_2_partitions_1_stage.png)

![event timeline filter 2 partitions 2 tasks](images/filter_2_partitions_2_tasks.png)

# Заключение

- Распараллеливание зависит от партиционирования
- Партиционирование зависит от ...?
- Оптимизацию надо подбирать под конкретные задачи 🤓

Не забываем останавливать спарк-сессию. После остановки Spark UI будет недоступен.

In [71]:
spark.stop()