<img align="right" width="200" height="200" src="https://static.tildacdn.com/tild6236-6337-4339-b337-313363643735/new_logo.png">

# Spark Dataframes (Scala)
**Сергей Гришаев**  
serg.grishaev@gmail.com  

## На этом занятии
+ Сравнение RDD API и DataFrame API 
+ Базовые функции
+ Очистка данных
+ Агрегаты
+ Кеширование 
+ Репартиционирование
+ Встроенные функции
+ Пользовательские функции
+ Соединения
+ Оконные функции

## Сравнение RDD API и DataFrame API 

### Типы данных
**RDD**: низкоуревная распределенная коллекция данных любого типа  
**DF**: таблица со схемой, состоящей из колонок разных типов, описанных в `org.apache.spark.sql.types`  

### Обработка данных
**RDD**: сериализуемые функции  
**DF**: кодогенерация SQL > Java код  

### Функции и алгоритмы
**RDD**: нет ограничений  
**DF**: ограничен SQL операторами, функциями `org.apache.spark.sql.functions` и пользовательскими функциями  

### Источники данных
**RDD**: каждый источник имеет свое API  
**DF**: единое API для всех источников 

### Производительность
**RDD**: напрямую зависит от качества кода
**DF**: встроенные механизмы оптимизации SQL запроса


### Потоковая обработка данных
**RDD**: устаревший DStreams  
**DF**: активно развивающийся Structured Streaming


### Выводы:
+ На текущий момент RDD является низкоуровневым API, которое постепенно уходит "под капот" Apache Spark
+ DF API представляет собой библиотеку для обработки данных с использованием SQL примитивов

## Базовые функции

Создать dataframe можно на основе:
+ локальной коллекции
+ файлов
+ базы данных

In [1]:
import org.apache.spark.sql.DataFrame

val cityList: Vector[String] = Vector("Moscow", "Paris", "Madrid", "London", "New York")

// метод toDF изначально отсутствует у Vector[T], он добавляется через import spark.implicits._
val df: DataFrame = cityList.toDF

cityList = Vector(Moscow, Paris, Madrid, London, New York)
df = [value: string]


[value: string]

У любого DF есть схема:

In [2]:
df.printSchema

root
 |-- value: string (nullable = true)



Посмотреть содержимое DF можно с помощью метода `show()`:

In [3]:
df.show

+--------+
|   value|
+--------+
|  Moscow|
|   Paris|
|  Madrid|
|  London|
|New York|
+--------+



Также можно вывести содержимое в вертикальной ориентации - это удобно при большое количестве столбцов:

In [7]:
df.show(numRows = 20, truncate = 500, vertical=true)

-RECORD 0---------
 value | Moscow   
-RECORD 1---------
 value | Paris    
-RECORD 2---------
 value | Madrid   
-RECORD 3---------
 value | London   
-RECORD 4---------
 value | New York 



Подсчет количества элементов в DF с помощью `count()`:

In [8]:
df.count

5

Отфильтровать данные можно с помощью метода `filter`. В отличие от RDD, он принимает SQL выражение:

In [9]:
val foo: org.apache.spark.sql.Column = 'value

foo = value


value

In [12]:
val foo: org.apache.spark.sql.Column = $"value"

foo = value


value

In [14]:
import org.apache.spark.sql.functions._

In [15]:
val foo: org.apache.spark.sql.Column = col("value")

foo = value


value

In [21]:
def ===(x: Int) = { 
    x + 1
}

$eq$eq$eq: (x: Int)Int


In [26]:
===(1)

2

In [42]:
val condition: org.apache.spark.sql.Column = 'value === "Moscow"

condition = (value = Moscow)


(value = Moscow)

In [37]:
1 == 2

false

In [16]:
// Требует наличия import spark.implicits._

df.filter( 'value.===("Moscow") ).show

+------+
| value|
+------+
|Moscow|
+------+



In [46]:
df.filter( condition ).show

+--------+
|   value|
+--------+
|  Moscow|
|   Paris|
|  Madrid|
|  London|
|New York|
+--------+



In [43]:
// Требует наличия import spark.implicits._

df.filter($"value" === "Moscow").show

+------+
| value|
+------+
|Moscow|
+------+



In [45]:
val condition = col("value") === 'value

condition = (value = value)


(value = value)

In [48]:
// sugar free & type safe
// Три знака равно здесь используются, тк на самом деле это метод,
// применяемый к колонке org.apache.spark.sql.Column

import org.apache.spark.sql.functions.col

df.filter(col("value") === "Moscow" ).show

+------+
| value|
+------+
|Moscow|
+------+



In [53]:
// легко ошибиться и получить ошибку в рантайме

df.filter("value = 'Moscow'").show

+------+
| value|
+------+
|Moscow|
+------+



lastException: Throwable = null


In [None]:
// промежуточный вариант между col и обычной строкой
// expr также может использоваться для вызова SQL builtin функций, 
// отсутствующих в org.apache.spark.sql.functions

import org.apache.spark.sql.functions.expr

df.filter(expr("value = 'Moscow'")).show

Добавить новую колонку можно с помощью метода `withColumn`. Необходимо помнить, что данный метод, как и другие, является трансформацией и не изменяет оригинальный DF, а создает новый.

In [54]:
import org.apache.spark.sql.functions.upper
df.withColumn("upperCity", upper('value)).show

+--------+---------+
|   value|upperCity|
+--------+---------+
|  Moscow|   MOSCOW|
|   Paris|    PARIS|
|  Madrid|   MADRID|
|  London|   LONDON|
|New York| NEW YORK|
+--------+---------+



Аналогичный результат получить, используя метод `select`. Данный метод может быть использован не только для выбора определенных колонок, но и для создания новых.

In [55]:
val withUpper = df.select('value, upper('value).alias("upperCity"))
withUpper.show

+--------+---------+
|   value|upperCity|
+--------+---------+
|  Moscow|   MOSCOW|
|   Paris|    PARIS|
|  Madrid|   MADRID|
|  London|   LONDON|
|New York| NEW YORK|
+--------+---------+



withUpper = [value: string, upperCity: string]


[value: string, upperCity: string]

Если передать `col("*")` в `select`, то вы получите DF со всеми колонками. Это полезно, когда вы не знаете список всех колонок (например вы получили его через API), но вам нужно их все выбрать и добавить новую колонку. Это можно сделать следующим образом:

In [56]:
// методы name, as и alias часто являются взаимозаменяемыми

import org.apache.spark.sql.functions._

withUpper.select(
    col("*"), 
    lower($"value").name("lowerCity"), 
    (length('value) + 1).as("length"),
    lit("foo").alias("bar")).show

+--------+---------+---------+------+---+
|   value|upperCity|lowerCity|length|bar|
+--------+---------+---------+------+---+
|  Moscow|   MOSCOW|   moscow|     7|foo|
|   Paris|    PARIS|    paris|     6|foo|
|  Madrid|   MADRID|   madrid|     7|foo|
|  London|   LONDON|   london|     7|foo|
|New York| NEW YORK| new york|     9|foo|
+--------+---------+---------+------+---+



При необходимости в `select` можно передать список колонок, используя обычные строки:

In [60]:
withUpper.select("value", "upperCity").show

+--------+---------+
|   value|upperCity|
+--------+---------+
|  Moscow|   MOSCOW|
|   Paris|    PARIS|
|  Madrid|   MADRID|
|  London|   LONDON|
|New York| NEW YORK|
+--------+---------+



lastException: Throwable = null


Удалить колонку из DF можно с помощью метода `drop`:

In [61]:
// drop не будет выдавать ошибку, если будет указана несуществующая колонка

withUpper.drop("upperCity", "abraKadabra").show

+--------+
|   value|
+--------+
|  Moscow|
|   Paris|
|  Madrid|
|  London|
|New York|
+--------+



### Выводы:
+ методы `filter` и `select` принимают в качестве аргументов колонки [org.apache.spark.sql.Column](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column). Это может быть либо ссылка на существующую колонку, либо функцию из [org.apache.spark.sql.functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$)
+ любые трансформации возвращают новый DF, не меняя существующий
+ тип [org.apache.spark.sql.Column](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column) играет важную роль в DF API - на его основе создаются ссылки на существующие колонки, а также функции, принимающие [org.apache.spark.sql.Column](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column) и возвращающие [org.apache.spark.sql.Column](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column). По этой причине обычное сравнение `==` не будет работать в DF API, тк `filter` принимает [org.apache.spark.sql.Column](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column), а не `Boolean`
+ Класс DataFrame в последних версиях Spark представляет собой `org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]`, поэтому его описание следует искать в [org.apache.spark.sql.Dataset](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset)

## Очистка данных

Одной из задач обработки данных является их очистка. DF API содержит класс функций "not available", описанный в пакете [org.apache.spark.sql.DataFrameNaFunctions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions). В данном пакете есть три функции:
+ `na.drop`
+ `na.fill`
+ `na.replace`

Для демонстрации работы данных функций создадим новый датасет:

In [62]:
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset

val testData =
"""{ "name":"Moscow", "country":"Rossiya", "continent": "Europe", "population": 12380664}
{ "name":"Madrid", "country":"Spain" }
{ "name":"Paris", "country":"France", "continent": "Europe", "population" : 2196936}
{ "name":"Berlin", "country":"Germany", "continent": "Europe", "population": 3490105}
{ "name":"Barselona", "country":"Spain", "continent": "Europe" }
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }
{ "name":"New York, "country":"USA","""

// Создаем DF из одной строки и добавляем данные в виде новой колонки
val raw = spark.range(0,1).select(lit(testData).alias("value"))
raw.show(1, false)

// Создаем новую колонку, разибая наши данные по \n
val jsonStrings: Column = split(col("value"), "\n").alias("value")

raw.select(explode(jsonStrings)).show(10, 50, true)
raw.select(jsonStrings).printSchema

// Используем функцию explode для того, чтобы развернуть массив мехом наружу и используем темную магию 
// для превращения DataFrame в Dataset[String]
val splited: Dataset[String] = raw.select(explode(jsonStrings)).as[String]

// splited.show(numRows = 10, truncate = false)


// // Создаем новый датафре... датасет, в котором наши JSON строки будут распарсены
val df: Dataset[Row] = spark.read.json(splited)
df.printSchema
df.show

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                                       

testData = 
raw = [value: string]


{ "name":"Moscow", "country":"Rossiya", "continent": "Europe", "population": 12380664}
{ "name":"Madrid", "country":"Spain" }
{ "name":"Paris", "country":"France", "continent": "Europe", "population" : 2196936}
{ "name":"Berlin", "country":"Germany", "continent": "Europe", "population": 3490105}
{ "name":"Barselona", "country":"Spain", "continent": "Europe" }
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }
{ "name":"New York, "country":"USA",
jsonStrings:...


[value: string]

Для очистки датасета:
+ удалим строку с навалидным JSON, сохраним ее в отдельное место
+ удалим дубликаты
+ заполним `null`ы в колонках
+ исправим `Rossiya` на `Russia`

In [63]:
val corruptData = df.select(col("_corrupt_record")).na.drop("all").collect

corruptData = Array([{ "name":"New York, "country":"USA",])


Array([{ "name":"New York, "country":"USA",])

In [64]:
val fillData: Map[String, Any] = Map("continent" -> "Undefined", "population" -> 0)
val replaceData: Map[Any, Any] = Map("Rossiya" -> "Russia", 100000 -> 0)

val cleanData = 
    df
    .drop(col("_corrupt_record"))
    .na.drop("all")
    .na.fill(fillData)
    .na.replace("country", replaceData)
    .dropDuplicates


cleanData.show

+---------+-------+---------+----------+
|continent|country|     name|population|
+---------+-------+---------+----------+
|   Europe| France|    Paris|   2196936|
|   Europe|Germany|   Berlin|   3490105|
|Undefined|  Spain|   Madrid|         0|
|   Africa|  Egypt|    Cairo|  11922948|
|   Europe|  Spain|Barselona|         0|
|   Europe| Russia|   Moscow|  12380664|
+---------+-------+---------+----------+



fillData = Map(continent -> Undefined, population -> 0)
replaceData = Map(Rossiya -> Russia)
cleanData = [continent: string, country: string ... 2 more fields]


[continent: string, country: string ... 2 more fields]

### Выводы:
+ DF API обладает удобным API для очистки данных, позволяющим разработчику сконцентрироваться разработчику на бизнес логике, а не на написании функций для обработки всех возможных исключительных ситуаций
+ метод `spark.read.json` позволяет читать не только файлы, но и `Dataset[String]`, содержащие JSON строки.

## Агрегаты
Посчитаем суммарное население и количество городов с разбивкой по континентам:

In [65]:
import org.apache.spark.sql.functions._
// from pyspark.sql.functions import *

In [66]:
val aggCount = cleanData.groupBy('continent).count
aggCount.show

+---------+-----+
|continent|count|
+---------+-----+
|   Europe|    4|
|   Africa|    1|
|Undefined|    1|
+---------+-----+



aggCount = [continent: string, count: bigint]


[continent: string, count: bigint]

In [67]:
val aggSum = cleanData.groupBy('continent).sum("population")
aggSum.show

+---------+---------------+
|continent|sum(population)|
+---------+---------------+
|   Europe|       18067705|
|   Africa|       11922948|
|Undefined|              0|
+---------+---------------+



aggSum = [continent: string, sum(population): bigint]


[continent: string, sum(population): bigint]

Для того, чтобы совместить несколько агрегатов в одном DF, мы можем использовать метод `agg()`. Данный метод позволяет использовать любые `Aggregate functions` из пакета [org.apache.spark.sql.functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$)

In [68]:
val agg = cleanData.groupBy('continent).agg(count("*").alias("count"), sum("population").alias("sumPop"))
agg.show

+---------+-----+--------+
|continent|count|  sumPop|
+---------+-----+--------+
|   Europe|    4|18067705|
|   Africa|    1|11922948|
|Undefined|    1|       0|
+---------+-----+--------+



agg = [continent: string, count: bigint ... 1 more field]


[continent: string, count: bigint ... 1 more field]

С помощью агрегатов мы можем выполнять такие действия, как, например, `collect_list` и `collect_set`. Стоит отметить, что колонки в Spark могут иметь не только скалярные типы, но и структуры, словари и массивы:

In [69]:
val aggList = cleanData.groupBy('continent).agg(collect_list("country").alias("countries"))
aggList.printSchema
aggList.show(numRows = 10, truncate = 100, vertical = true)

root
 |-- continent: string (nullable = false)
 |-- countries: array (nullable = true)
 |    |-- element: string (containsNull = true)

-RECORD 0-------------------------------------
 continent | Europe                           
 countries | [France, Germany, Spain, Russia] 
-RECORD 1-------------------------------------
 continent | Africa                           
 countries | [Egypt]                          
-RECORD 2-------------------------------------
 continent | Undefined                        
 countries | [Spain]                          



[continent: string, countries: array<string>]

aggList = [continent: string, countries: array<string>]


Используя методы `struct` и `to_json`, мы можем превратить произвольный набор колонок в JSON строку. Этот методы часто используется перед отправкой данных в Kafka

In [70]:
val withStruct = aggList.select(struct('continent, 'countries).alias("s"))
withStruct.printSchema

withStruct.show(10, false)

root
 |-- s: struct (nullable = false)
 |    |-- continent: string (nullable = false)
 |    |-- countries: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

+------------------------------------------+
|s                                         |
+------------------------------------------+
|[Europe, [France, Germany, Spain, Russia]]|
|[Africa, [Egypt]]                         |
|[Undefined, [Spain]]                      |
+------------------------------------------+



withStruct = [s: struct<continent: string, countries: array<string>>]


[s: struct<continent: string, countries: array<string>>]

In [71]:
withStruct.withColumn("s", to_json('s)).show(10, false)

+------------------------------------------------------------------------+
|s                                                                       |
+------------------------------------------------------------------------+
|{"continent":"Europe","countries":["France","Germany","Spain","Russia"]}|
|{"continent":"Africa","countries":["Egypt"]}                            |
|{"continent":"Undefined","countries":["Spain"]}                         |
+------------------------------------------------------------------------+



Если необходимо превратить все колонки DF в JSON String, можно воспользоваться функций `toJSON`:

In [72]:
val jString: Dataset[String] = aggList.toJSON
jString.show(5, false)

+------------------------------------------------------------------------+
|value                                                                   |
+------------------------------------------------------------------------+
|{"continent":"Europe","countries":["France","Germany","Spain","Russia"]}|
|{"continent":"Africa","countries":["Egypt"]}                            |
|{"continent":"Undefined","countries":["Spain"]}                         |
+------------------------------------------------------------------------+



jString = [value: string]


[value: string]

Если нам необходимо создать колонки из значений текущих колонок, мы можем воспользоваться функцией `pivot`

In [73]:
cleanData.groupBy(col("country")).pivot("continent").agg(sum("population")).show

+-------+--------+--------+---------+
|country|  Africa|  Europe|Undefined|
+-------+--------+--------+---------+
| Russia|    null|12380664|     null|
|Germany|    null| 3490105|     null|
| France|    null| 2196936|     null|
|  Spain|    null|       0|        0|
|  Egypt|11922948|    null|     null|
+-------+--------+--------+---------+



### Выводы:
+ DF API позволяет строить большое количество агрегатов. При этом необходимо помнить, что операции `groupBy`, `cube`, `rollup` возвращают [org.apache.spark.sql.RelationalGroupedDataset](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset), к которому затем необходимо применить одну из функций агрегации - `count`, `sum`, `agg` и т. п.
+ При вычислении агрегатов необходимо помнить, что эта операция требует перемешивания данных между воркерами, что, в случае перекошенных данных, может привести к OOM на воркере.

## Кеширование
По умолчанию при применении каждого действия Spark пересчитывает весь граф, что может негативно сказать на производительности приложения. Для демонстрации возьмем датасет [Airport Codes](https://datahub.io/core/airport-codes)  

In [75]:
val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("/tmp/airport-codes.csv")
airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                 

csvOptions = Map(header -> true, inferSchema -> true)
airports = [ident: string, type: string ... 10 more fields]


lastException: Throwable = null


[ident: string, type: string ... 10 more fields]

Посчитаем несколько агрегатов. Несмотря на то, что `onlyRu` является общим для всех действий, он пересчитывается при вызове каждого действия.

In [76]:
val onlyRuAndHigh = airports.filter('iso_country === "RU" and 'elevation_ft > 1000)
onlyRuAndHigh.show(numRows = 1, truncate = 100, vertical = true)

onlyRuAndHigh.count
onlyRuAndHigh.collect
onlyRuAndHigh.groupBy('municipality).count.orderBy('count.desc).na.drop("any").show

-RECORD 0-----------------------------
 ident        | RU-0006               
 type         | closed                
 name         | Arabatuk Air Base     
 elevation_ft | 2280                  
 continent    | EU                    
 iso_country  | RU                    
 iso_region   | RU-CHI                
 municipality | Daurija               
 gps_code     | null                  
 iata_code    | null                  
 local_code   | ZA2N                  
 coordinates  | 50.223801, 117.098999 
only showing top 1 row

+-----------------+-----+
|     municipality|count|
+-----------------+-----+
|            Chita|    3|
|     Nizhneudinsk|    2|
|         Ulan Ude|    2|
|    Nizhneangarsk|    2|
|          Irkutsk|    2|
|           Borzya|    2|
|  Severo-Eniseysk|    1|
|          Chistyy|    1|
|     Karachayevsk|    1|
|         Barguzin|    1|
|Usolye-Sibirskoye|    1|
|           Amazar|    1|
|            Baley|    1|
|     Snezhnogorsk|    1|
|       Akkem Lake|    1|
|

onlyRuAndHigh = [ident: string, type: string ... 10 more fields]


[ident: string, type: string ... 10 more fields]

Для решения этой проблемы следует использовать методы `cache`, либо `persist`. Данные методы сохраняют состояние графа после первого действия, и следующие обращаются к нему. Разница между методами заключается в том, что `persist` позволяет выбрать, куда сохранить данные, а `cache` использует значение по умолчанию. В текущей версии Spark это [StorageLevel.MEMORY_ONLY](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). Важно помнить, что данный кеш не предназначен для обмена данными между разными Spark приложения - он является внутренним для приложения. После того, как работа с данными окончена, необходимо выполнить `unpersist` для очистки памяти

In [77]:
onlyRuAndHigh.cache
onlyRuAndHigh.count
// при вычислении count данные будут помещены в cache
onlyRuAndHigh.show(numRows = 1, truncate = 100, vertical = true)
onlyRuAndHigh.collect
onlyRuAndHigh.groupBy('municipality).count.orderBy('count.desc).na.drop("any").show

onlyRuAndHigh.unpersist

-RECORD 0-----------------------------
 ident        | RU-0006               
 type         | closed                
 name         | Arabatuk Air Base     
 elevation_ft | 2280                  
 continent    | EU                    
 iso_country  | RU                    
 iso_region   | RU-CHI                
 municipality | Daurija               
 gps_code     | null                  
 iata_code    | null                  
 local_code   | ZA2N                  
 coordinates  | 50.223801, 117.098999 
only showing top 1 row

+-----------------+-----+
|     municipality|count|
+-----------------+-----+
|            Chita|    3|
|     Nizhneudinsk|    2|
|         Ulan Ude|    2|
|    Nizhneangarsk|    2|
|          Irkutsk|    2|
|           Borzya|    2|
|  Severo-Eniseysk|    1|
|          Chistyy|    1|
|     Karachayevsk|    1|
|         Barguzin|    1|
|Usolye-Sibirskoye|    1|
|           Amazar|    1|
|            Baley|    1|
|     Snezhnogorsk|    1|
|       Akkem Lake|    1|
|

[ident: string, type: string ... 10 more fields]

### Выводы:
+ Использование `cache` и `persist` позволяет существенно сократить время обработки данных, однако следует помнить и об увеличении потребляемой памяти на воркерах

## Репартиционирование
RDD и DF являются представляют собой классы, описывающие распределенные коллекции данных. Они (коллекции) разбиты на крупные блоки, которые называются партициями. В графе вычисления, который называется в Spark DAG (Direct Acyclic Graph), есть три основных компонента - `job`, `stage`, `task`.

`job` представляет собой весь граф целиком, от момента создания DF, до применения `action` к нему. Состоит из одной или более `stage`. Когда возникает необходимость сделать `shuffle` данных, Spark создает новый `stage`. Каждый `stage` состоит из большого количества `task`. `task` это базовая операция над данными. Одновременно Spark выполняет N `task`, которые обрабатывают N партиций, где N - это суммарное число доступных потоков на всех воркерах.

Исходя из этого, важно обеспечивать:
+ достаточное количество партиций для распределения нагрузки по всем воркерам
+ равномерное распределение данных между партициями

Создадим датасет с перекосом данных:

In [78]:
import org.apache.spark.sql.functions._

val skewColumn = when(col("id") < 900, lit(0)).otherwise(lit(1))

val skewDf = spark.range(0,1000).repartition(10, skewColumn)

def printItemPerPartition[T](ds: Dataset[T]): Unit = {
    ds.mapPartitions { x => Iterator(x.length) }
    .withColumnRenamed("value", "itemPerPartition")
    .show(50, false)
}

printItemPerPartition[java.lang.Long](skewDf)

+----------------+
|itemPerPartition|
+----------------+
|0               |
|900             |
|0               |
|100             |
|0               |
|0               |
|0               |
|0               |
|0               |
|0               |
+----------------+



skewColumn = CASE WHEN (id < 900) THEN 0 ELSE 1 END
skewDf = [id: bigint]


printItemPerPartition: [T](ds: org.apache.spark.sql.Dataset[T])Unit


[id: bigint]

Любые операции с таким датасетом будут работать медленно, т.к.
+ если суммарное количество потоков на всех воркерах больше 10, то в один момент времени работать будут максимум 10, остальные будут простаивать
+ из 10 партицийи только в 2 есть данные и это означает, что только 2 потока будут обрабатывать данные, при этом из-за перекоса данных между ними (900 vs 100) первый станет bottleneck'ом

Обычно перекошенные датасеты возникают после вычисления агрегатов, оконных функций и соединений, но также могут возникать и при чтении источников.

Для устранения проблемы перекоса данных, следует использовать метод `repartition`:

In [79]:
// здесь мы передаем только новое количество партиций и Spark выполнит RoundRobinPartitioning
val repartitionedDf = skewDf.repartition(20)

printItemPerPartition[java.lang.Long](repartitionedDf)

+----------------+
|itemPerPartition|
+----------------+
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
|50              |
+----------------+



repartitionedDf = [id: bigint]


[id: bigint]

In [80]:
// здесь мы добавляем к числу партиций колонку, по которой необходимо сделать репартиционирование,
// поэтому Spark выполнит HashPartitioning
val repartitionedDf = skewDf.repartition(20, col("id"))

printItemPerPartition[java.lang.Long](repartitionedDf)

+----------------+
|itemPerPartition|
+----------------+
|37              |
|61              |
|48              |
|59              |
|47              |
|54              |
|45              |
|58              |
|55              |
|55              |
|56              |
|46              |
|45              |
|46              |
|49              |
|64              |
|44              |
|39              |
|40              |
|52              |
+----------------+



repartitionedDf = [id: bigint]


[id: bigint]

<img align="right" width="200" height="200" src="https://pngimage.net/wp-content/uploads/2018/06/соленья-png-4.png">

### Соленья
Часто при вычислении агрегатов приходится работать с перекошенными данными:

In [81]:
airports.printSchema

airports.groupBy('type).count.orderBy('count.desc)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



[type: string, count: bigint]

In [84]:
airports.groupBy('type).count.orderBy('count.desc).show

+--------------+-----+
|          type|count|
+--------------+-----+
| small_airport|33998|
|      heliport|11316|
|medium_airport| 4531|
|        closed| 3618|
| seaplane_base| 1014|
| large_airport|  613|
|   balloonport|   23|
+--------------+-----+



Поскольку при вычислении агрегата происходит неявный `HashPartitioning` по ключу (ключам) агрегата, то при выполнении определенных условий происходит нехватка памяти на воркере, которую нельзя исправить, не изменив подход к построению агрегата.

Один из вариантов устранение - соление ключей:

In [82]:
val saltModTen = pmod(round((rand() * 100), 0), lit(10)).cast("int")

val salted = airports.withColumn("salt", saltModTen)
salted.show(numRows = 1, truncate = 200, vertical = true)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                                
 iata_code    | null                               
 local_code   | 00A                                
 coordinates  | 40.07080078125, -74.93360137939453 
 salt         | 3                                  
only showing top 1 row



saltModTen = CAST(pmod(round((rand(6531147292355459874) * 100), 0), 10) AS INT)
salted = [ident: string, type: string ... 11 more fields]


[ident: string, type: string ... 11 more fields]

Это позволяет нам существенно снизить объем данных в каждой партиции (30к vs 3к):

In [85]:
val firstStep = salted.groupBy('type, 'salt).count()

firstStep.orderBy('count.desc).show(200, false)

+--------------+----+-----+
|type          |salt|count|
+--------------+----+-----+
|small_airport |9   |3455 |
|small_airport |3   |3452 |
|small_airport |8   |3452 |
|small_airport |4   |3426 |
|small_airport |2   |3404 |
|small_airport |5   |3398 |
|small_airport |0   |3391 |
|small_airport |1   |3369 |
|small_airport |6   |3337 |
|small_airport |7   |3314 |
|heliport      |2   |1214 |
|heliport      |8   |1185 |
|heliport      |3   |1142 |
|heliport      |0   |1135 |
|heliport      |9   |1130 |
|heliport      |4   |1109 |
|heliport      |6   |1106 |
|heliport      |7   |1104 |
|heliport      |1   |1097 |
|heliport      |5   |1094 |
|medium_airport|0   |494  |
|medium_airport|5   |486  |
|medium_airport|3   |467  |
|medium_airport|2   |466  |
|medium_airport|8   |458  |
|medium_airport|1   |450  |
|medium_airport|6   |437  |
|medium_airport|7   |430  |
|medium_airport|9   |425  |
|medium_airport|4   |418  |
|closed        |9   |399  |
|closed        |1   |392  |
|closed        |5   

firstStep = [type: string, salt: int ... 1 more field]


[type: string, salt: int ... 1 more field]

Вторым шагом мы делаем еще один агрегат, суммируя предыдущие значения `count`:

In [86]:
val secondStep = firstStep.groupBy('type).agg(sum("count").alias("count"))

secondStep.orderBy('count.desc).show(200, false)

+--------------+-----+
|type          |count|
+--------------+-----+
|small_airport |33998|
|heliport      |11316|
|medium_airport|4531 |
|closed        |3618 |
|seaplane_base |1014 |
|large_airport |613  |
|balloonport   |23   |
+--------------+-----+



secondStep = [type: string, count: bigint]


[type: string, count: bigint]

Несмотря на то, что мы сделали две группировки вместо одной, распределение данных по воркерам было более равномерным, что позволило избежать OOM на воркерах.

### Выводы:
+ Партиционирование - важный аспект распределенных вычислений, от которого напрямую зависит стабильность и скорость вычислений
+ В Spark всегда работает правило 1 TASK = 1 THREAD = 1 PARTITION
+ Репартиционирование и соление данных позволяет решить проблему перекоса данных и вычислений
+ Важно помнить, что репартиционирование использует дисковую и сетевую подсистемы - обмен данными происходит **по сети**, а результат записывается **на диск**, что может стать узким местом при выполнении репартиционирования

## Встроенные функции
Помимо базовых SQL операторов, в Spark существует большой набор встроенных функций:
+ API методы из [org.apache.spark.sql.functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$)
+ [SQL built-in functions](https://spark.apache.org/docs/latest/api/sql/index.html)

In [87]:
val df = spark.range(0,10)

// используем org.apache.spark.sql.functions
val newCol: Column = pmod(col("id"), lit(2))
df.withColumn("pmod", newCol).show

+---+----+
| id|pmod|
+---+----+
|  0|   0|
|  1|   1|
|  2|   0|
|  3|   1|
|  4|   0|
|  5|   1|
|  6|   0|
|  7|   1|
|  8|   0|
|  9|   1|
+---+----+



df = [id: bigint]
newCol = pmod(id, 2)


pmod(id, 2)

In [88]:
import org.apache.spark.sql.functions.expr

// используем SQL built-in functions
val newCol: Column = expr("""pmod(id, 2)""")
df.withColumn("pmod", newCol).show

+---+----+
| id|pmod|
+---+----+
|  0|   0|
|  1|   1|
|  2|   0|
|  3|   1|
|  4|   0|
|  5|   1|
|  6|   0|
|  7|   1|
|  8|   0|
|  9|   1|
+---+----+



newCol = pmod(id, 2)


pmod(id, 2)

### Выводы
+ Spark обладает широким набором функций для работы с колонками разных типов, включая простые типы - строки, числа, и т. д., а также словари, массивы и структуры
+ Встроенные функции принимают колонки `org.apache.spark.sql.Column` и возвращают `org.apache.spark.sql.Column` в большинстве случаев
+ Встроенные функции доступны в двух местах - org.apache.spark.sql.functions и SQL built-in functions
+ Встроенные функции можно (и нужно) использовать вместе - на вход во встроенные функции могут подаваться результаты встроенной функции, тк все они возвращают `sql.Column` 

### Пользовательские функции

В том случае, если функционала встроенных функций не хватает, можно написать пользовательскую функцию - UDF. Пользовательская функция может принимать до 16 аргументов. Соответствие Spark и Scala типов описано [здесь](https://spark.apache.org/docs/latest/sql-reference.html#data-types)

Необходимо помнить, что `null` в Spark превращается в `null` внутри UDF

In [89]:
import org.apache.spark.sql.functions.{udf, col}

val df = spark.range(0,10)

val plusOne = udf { (value: Long) => value + 1 }

df.withColumn("idPlusOne", plusOne(col("id"))).show(10, false)

+---+---------+
|id |idPlusOne|
+---+---------+
|0  |1        |
|1  |2        |
|2  |3        |
|3  |4        |
|4  |5        |
|5  |6        |
|6  |7        |
|7  |8        |
|8  |9        |
|9  |10       |
+---+---------+



df = [id: bigint]
plusOne = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))


UserDefinedFunction(<function1>,LongType,Some(List(LongType)))

In [90]:
def plusOne2(x: Int): Int = { x + 1 }

plusOne2: (x: Int)Int


In [91]:
val my_udf_2 = udf { plusOne2 _ }

my_udf_2 = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))


UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))

In [92]:
spark.range(10).select(my_udf_2('id)).show()

+-------+
|UDF(id)|
+-------+
|      1|
|      2|
|      3|
|      4|
|      5|
|      6|
|      7|
|      8|
|      9|
|     10|
+-------+



Пользовательская функция может возвращать:
+ простой тип - `String`, `Long`, `Float`, `Boolean` и т.д.
+ массив - любые коллекции, наследующие `Seq[T]` - `List[T]`, `Vector[T]` и т. д.
+ словарь - `Map[A,B]`
+ инстанс `case class`'а
+ Option[T]

Реализуем функцию, которая возвращает имя хоста, на котором работает воркер:

In [None]:
Option[T]
Try[T]
List[T]
Vector[T]

In [93]:
import java.net.InetAddress

val hostname = udf { () => InetAddress.getLocalHost().getHostName() }

df.withColumn("hostname", hostname()).show(10, false)

+---+--------------+
|id |hostname      |
+---+--------------+
|0  |spark-node-3  |
|1  |spark-node-3  |
|2  |spark-node-3  |
|3  |spark-node-3  |
|4  |spark-node-3  |
|5  |spark-master-6|
|6  |spark-master-6|
|7  |spark-master-6|
|8  |spark-master-6|
|9  |spark-master-6|
+---+--------------+



hostname = UserDefinedFunction(<function0>,StringType,Some(List()))


UserDefinedFunction(<function0>,StringType,Some(List()))

Мы также можем использовать монады `Try[T]` и `Option[T]` и для написания пользовательской функции:

In [94]:
import scala.util.Try
import org.apache.spark.sql.functions.{udf, col}

val df = spark.range(0,10)

val divideTwoBy = udf { (inputValue: Long) => Try { 2L / inputValue }.toOption }

val result = df.withColumn("divideTwoBy", divideTwoBy(col("id")))
result.printSchema
result.show(10, false)

root
 |-- id: long (nullable = false)
 |-- divideTwoBy: long (nullable = true)

+---+-----------+
|id |divideTwoBy|
+---+-----------+
|0  |null       |
|1  |2          |
|2  |1          |
|3  |0          |
|4  |0          |
|5  |0          |
|6  |0          |
|7  |0          |
|8  |0          |
|9  |0          |
+---+-----------+



df = [id: bigint]
divideTwoBy = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
result = [id: bigint, divideTwoBy: bigint]


[id: bigint, divideTwoBy: bigint]

In [95]:
val df = spark.range(10)

df = [id: bigint]


[id: bigint]

In [96]:
df.schema

StructType(StructField(id,LongType,false))

In [97]:
df.schema("id")

StructField(id,LongType,false)

In [98]:
df.schema("id").dataType.getClass.getCanonicalName

org.apache.spark.sql.types.LongType$

In [99]:
LongType // java.lang.Long

Name: Unknown Error
Message: <console>:53: error: not found: value LongType
       LongType // java.lang.Long
       ^

StackTrace: 

### Выводы
+ Пользовательские функции позволяют реализовать произвольный алгоритм и использовать его в DF API
+ Пользовательские функции работают медленнее встроенных, поскольку при использовании встроенных функций Spark использует ряд оптимизаций, например векторизацию вычислений на уровне CPU

## Соединения

Join'ы позволяют соединять два DF в один по заданным условиям.

По типу условия join'ы делятся на:
+ equ-join - соединение по равенству одного или более ключей
+ non-equ join - соединение по условию, отличному от равенства одного или более ключей

По методу соединения join'ы бывают:
![Joins](http://kirillpavlov.com/images/join-types.png)
[Источник](http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/)

Добавим новую колонку к датасету `airports`, в которой будет процент заданного типа аэропорта ко всем типам аэропорта по каждой стране. Первым шагом посчитаем число аэропортов каждого типа по стране:

In [100]:
import org.apache.spark.sql.functions.{count, round, lit}

val aggTypeCountry = airports.groupBy('type, 'iso_country).agg(count("*").alias("cnt_country_type"))

aggTypeCountry.show(5, false)

+--------------+-----------+----------------+
|type          |iso_country|cnt_country_type|
+--------------+-----------+----------------+
|large_airport |GB         |27              |
|small_airport |MP         |1               |
|heliport      |CH         |19              |
|closed        |LT         |4               |
|medium_airport|SS         |3               |
+--------------+-----------+----------------+
only showing top 5 rows



aggTypeCountry = [type: string, iso_country: string ... 1 more field]


[type: string, iso_country: string ... 1 more field]

Теперь посчитаем количество аэропортов по каждой стране:

In [101]:
val aggCountry = airports.groupBy('iso_country).agg(count("*").alias("cnt_country"))
aggCountry.show(5, false)

+-----------+-----------+
|iso_country|cnt_country|
+-----------+-----------+
|DZ         |61         |
|LT         |57         |
|MM         |75         |
|CI         |26         |
|TC         |8          |
+-----------+-----------+
only showing top 5 rows



aggCountry = [iso_country: string, cnt_country: bigint]


[iso_country: string, cnt_country: bigint]

Соединим получившиеся датасеты и получим процентное распределение типов аэропорта по стране

In [102]:
val percent = 
    aggTypeCountry
        .join(aggCountry, Seq("iso_country"), "inner")
        .select('iso_country, 'type, (round(lit(100) * 'cnt_country_type / 'cnt_country, 2).alias("percent")))
percent.show(5, false)

+-----------+--------------+-------+
|iso_country|type          |percent|
+-----------+--------------+-------+
|GB         |large_airport |2.97   |
|MP         |small_airport |9.09   |
|CH         |heliport      |21.84  |
|LT         |closed        |7.02   |
|SS         |medium_airport|6.52   |
+-----------+--------------+-------+
only showing top 5 rows



percent = [iso_country: string, type: string ... 1 more field]


[iso_country: string, type: string ... 1 more field]

Соединим полученный датасет с изначальным:

In [103]:
val result = airports.join(percent, Seq("iso_country", "type"), "left")
result.select('ident, 'iso_country, 'type, 'percent).sample(0.2).show(20, false)

+-------+-----------+--------------+-------+
|ident  |iso_country|type          |percent|
+-------+-----------+--------------+-------+
|EGGW   |GB         |large_airport |2.97   |
|EGHH   |GB         |large_airport |2.97   |
|EGPH   |GB         |large_airport |2.97   |
|EGSS   |GB         |large_airport |2.97   |
|EYKR   |LT         |closed        |7.02   |
|LT-0013|LT         |closed        |7.02   |
|LSHC   |CH         |heliport      |21.84  |
|LSHG   |CH         |heliport      |21.84  |
|LSHI   |CH         |heliport      |21.84  |
|NZAA   |NZ         |large_airport |1.42   |
|LOXZ   |AT         |medium_airport|4.83   |
|EBBE   |BE         |medium_airport|5.52   |
|EBCV   |BE         |medium_airport|5.52   |
|EEKA   |EE         |medium_airport|13.51  |
|EEKE   |EE         |medium_airport|13.51  |
|HECW   |EG         |medium_airport|36.23  |
|HEPS   |EG         |medium_airport|36.23  |
|HU-0014|HU         |small_airport |80.73  |
|HU-0015|HU         |small_airport |80.73  |
|HU-0016|H

result = [iso_country: string, type: string ... 11 more fields]


[iso_country: string, type: string ... 11 more fields]

Во всех наших джойнах присутствует массив `Seq[String]`. Это синтаксических сахар, позволяющий не переименовывать колонки датасетов, а просто указать, что соединение будет делаться по колонкам с именами, входящим в массив.

В общем случае условие джойна должно быть выражено в виде колонки `sql.Column`, например:

In [104]:
import org.apache.spark.sql.Column
val joinCondition: Column = col("left_a") === col("right_a") and col("left_b") === col("right_b")

joinCondition = ((left_a = right_a) AND (left_b = right_b))


((left_a = right_a) AND (left_b = right_b))

При этом в данном выражении допускается использование встроенных функций, пользовательских функций и операторов сравнения. Однако следует помнить, что мы выполняем джойн двух распределенных датасетов и если условие соединения будет плохо составлено, то Spark выполнит `cross join`, производительность которого будет "крайне мала" &copy;

### Выводы:
+ Spark поддерживает большое число типов соединений
+ Условием соединения может быть `Seq[String]`, либо `sql.Column`
+ При использовании сложных условий соединения следует избегать тех, которые приведут к `cross join`

## Оконные функции
Оконные функции позволяют делать функции над "окнами" (кто бы мог подумать) данных

Окно создается из класса [org.apache.spark.sql.expressions.Window](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Window) с указанием полей, определяющих границы окон и полей, определяющих порядок сортировки внутри окна:

```val window = Window.partitionBy("a", "b").orderBy("a")```

Применяя окна, можно использовать такие полезные функции из [org.apache.spark.sql.functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$), как ```lag()``` и ```lead()```, а также эффективно работать с данными time-series данными.

Выполним задачу с вычисление процента отношения типов аэропортов, используя оконные функции.

In [105]:
import org.apache.spark.sql.expressions.Window

val windowCountry = Window.partitionBy("iso_country")
val windowTypeCountry = Window.partitionBy("type", "iso_country")

val result = airports
                .withColumn("cnt_country", count("*").over(windowCountry))
                .withColumn("cnt_country_type", count("*").over(windowTypeCountry))
                .withColumn("percent", round(lit(100) * 'cnt_country_type / 'cnt_country, 2))
                            
result.select('ident, 'iso_country, 'type, 'percent).sample(0.2).show(20, false)

+-------+-----------+--------------+-------+
|ident  |iso_country|type          |percent|
+-------+-----------+--------------+-------+
|LT-0013|LT         |closed        |7.02   |
|DAAG   |DZ         |large_airport |1.64   |
|DAAD   |DZ         |medium_airport|59.02  |
|DAAJ   |DZ         |medium_airport|59.02  |
|DAAP   |DZ         |medium_airport|59.02  |
|DAAV   |DZ         |medium_airport|59.02  |
|DAAZ   |DZ         |medium_airport|59.02  |
|DABB   |DZ         |medium_airport|59.02  |
|DAON   |DZ         |medium_airport|59.02  |
|DAUA   |DZ         |medium_airport|59.02  |
|DAUB   |DZ         |medium_airport|59.02  |
|DAUI   |DZ         |medium_airport|59.02  |
|DAUO   |DZ         |medium_airport|59.02  |
|DAUZ   |DZ         |medium_airport|59.02  |
|VYHH   |MM         |medium_airport|26.67  |
|VYLK   |MM         |medium_airport|26.67  |
|VYLS   |MM         |medium_airport|26.67  |
|VYMS   |MM         |medium_airport|26.67  |
|VYSW   |MM         |medium_airport|26.67  |
|DAAF   |D

windowCountry = org.apache.spark.sql.expressions.WindowSpec@5f24e18c
windowTypeCountry = org.apache.spark.sql.expressions.WindowSpec@496eefc0
result = [ident: string, type: string ... 13 more fields]


[ident: string, type: string ... 13 more fields]

### Выводы:
+ Оконные функции позволяют применять функции, применительно к окнам данных
+ Окно определяется списком колонок и сортировкой
+ Применение оконных функций приводит к `shuffle`

После завершения работы не забывайте останавливать `SparkSession`, чтобы освободить ресурсы кластера!

In [None]:
spark.stop