# Spark SQL
**Andrey Titov**
tenke.iu8@gmail.com

## На этом занятии
+ Общие сведения
+ Область применения
+ Устройство Spark Dataframe API
+ Чтение данных из источника
+ Работа с данными
  - Базовый SQL
  - NA функции
  - Группировки
  - Запись данных
  - Соединения
  - Оконные функции
  - Функции pyspark.sql.functions

## Dataframe API

**Dataframe:**
+ структурированная колоночная структура данных
+ может быть создана на основе:
  - локальной коллекции
  - файла (файлов)
  - базы данных
+ в python работает значительно быстрее, чем RDD
+ под капотом использует RDD
+ позволяет выполнять произвольные SQL операции с данными
+ аналогично RDD являются ленивыми и неизменяеыми

## Из чего состоит Dataframe
+ схема [pyspsark.sql.StructType](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.StructType)
+ колонки [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column)
+ данные [pyspark.sql.Row](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row)

Подготовим тестовый набор данных

In [7]:
#Initializing PySpark
from pyspark import SparkContext, SparkConf

# #Spark Config
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)

Exception: Java gateway process exited before sending its port number

In [8]:
from pyspark.sql.functions import *
#from pyspark.sql.functions import sc

test_data = [
{"name":"Moscow", "country":"Rossiya", "continent": "Europe", "population": 12380664},
{ "name":"Madrid", "country":"Spain" },
{ "name":"Paris", "country":"France", "continent": "Europe", "population" : 2196936},
{ "name":"Berlin", "country":"Germany", "continent": "Europe", "population": 3490105},
{ "name":"Barselona", "country":"Spain", "continent": "Europe" },
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 },
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 },
{ }
]

rdd = sc.parallelize(test_data)
df = spark.read.json(rdd)
df

NameError: name 'sc' is not defined

Метод `show` выводит часть датафрейма в консоль

In [None]:
df.show(10, False)

Метод `printSchema` выводит схему датафрейма в консоль

In [None]:
df.printSchema()

Метод `select` позволяет выбрать существующие (а также создать новые) колонки из датафрейма

In [None]:
from pyspark.sql.functions import *

df.select(col("continent"), col("country")).show(10, False)

Метод `filter` позволяет фильтровать датасет:

In [None]:
df.filter(col("continent") == "Europe").show(10, False)

Параллелизм обработки данных зависит от количества партиций в датасете:

In [None]:
df.rdd.getNumPartitions()

## Очистка данных
Удалим дубликаты. По умолчанию метод `dropDuplicates` удаляет дубликаты строк, у которых ВСЕ колонки совпадают

In [None]:
df.dropDuplicates().show(10, False)

Метод `.na.drop` удаляет СТРОКИ, в которых отсутствует часть данных. Параметр `how="all"` означает, что будут удалены строки, у которых ВСЕ колонки `null`

In [None]:
df.dropDuplicates().na.drop(how="all").show(10, False)

Метод `.na.fill` заполняет `null`. Для работы этого метода требуется словарь с изменениями

In [None]:
fill_dict = {'continent': 'n/a', 'population': 0 }

df.dropDuplicates().na.drop(how="all").na.fill(fill_dict).show(10, False)

Метод `.na.replace` заменяет данные в колонках. Для его работы требуется словарь с заменами

In [None]:
replace_dict = {"Rossiya": "Russia"}

df.dropDuplicates().na.drop("all").na.fill(fill_dict).na.replace(replace_dict).show(10, False)

Подготровим датафрейм с очищенными данными

In [None]:
from pyspark.sql.functions import col

clean_data = df \
                .dropDuplicates() \
                .na.drop("all") \
                .na.fill(fill_dict) \
                .na.replace(replace_dict) \
                .filter(col("population") >= 0) \
                .select(col("continent"), col("country"), col("name"), col("population"))

In [None]:
clean_data.show(10, False)

In [None]:
clean_data.printSchema()

Подготовим базовый агрегат. По умолчанию имена колонок принимают неудобные названия

In [None]:
from pyspark.sql.functions import count, sum

agg = clean_data.groupBy("continent").agg(count("*"), sum(col("population")))
agg.show(10, False)

Метод `alias` позволяет переименовывать колонки

In [14]:
from pyspark.sql.functions import count, sum, lower

pop_count = count("*").alias("city_count")
pop_sum = sum(col("population")).alias("population_sum")

agg = clean_data \
            .groupBy("continent") \
            .agg(pop_count, pop_sum) \
            .withColumn("continent", lower(col("continent")))

agg.show(10, False)

AttributeError: 'NoneType' object has no attribute '_jvm'

## Чтение данных из источника
Основной метод чтения любых источников

```df = spark.read.format(datasource_type).option(datasource_options).load(object_name)```

+ ```datasource_type``` - тип источника ("parquet", "json", "cassandra") и т. д.
+ ```datasource_options``` - опции для работы с источником (логины, пароли, адреса для подключения и т. д.)
+ ```object_name``` - имя таблицы/файла/топика/индекса

[DataframeReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader):
+ по умолчанию выводит схему данных
+ является трансформацией (ленивый)
+ возвращает [Dataframe](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)

### Список (неполный) поддерживаемых источников данных
+ Файлы:
  - json
  - text
  - csv
  - orc
  - parquet
  - delta
+ Базы данных
  - elasticsearch
  - cassandra
  - jdbc
  - hive
  - redis
  - mongo
+ Брокеры сообщений
  - kafka
  

**Библиотеки для работы с источниками должны быть доступны в JAVA CLASSPATH на драйвере и воркерах!**

In [None]:
df = spark.read.format("csv").options(header=True, inferSchema=True).load("/tmp/datasets/airport-codes.csv")

In [None]:
df.printSchema()

In [None]:
df.show(n=1, truncate=False, vertical=True)

## Запись данных
Основной метод записи в любые системы

```df.write.format(datasource_type).options(datasource_options).mode(savemode).save(object_name)```

+ ```datasource_type``` - тип источника ("parquet", "json", "cassandra") и т. д.
+ ```datasource_options``` - опции для работы с источником (логины, пароли, адреса для подключения и т. д.)
+ ```savemode``` - режим записи данных (добавление, перезапись и т. д.)
+ ```object_name``` - имя таблицы/файла/топика/индекса

[DataFrameWriter](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter):
+ метод ```save``` является действием
+ позволяет работать с партиционированными данными (parquet, orc)
+ не всегда валидирует схему и формат данных


### Список (неполный) поддерживаемых источников данных
+ Файлы:
  - json
  - text
  - csv
  - orc
  - parquet
  - delta
+ Базы данных
  - elasticsearch
  - cassandra
  - jdbc
  - hive
  - redis
  - mongo
+ Брокеры сообщений
  - kafka
  

**Библиотеки для работы с источниками должны быть доступны в JAVA CLASSPATH на драйвере и воркерах!**



In [None]:
condition = col("continent") != "n/a"

agg \
    .filter(condition) \
    .write \
    .format("parquet") \
    .mode("overwrite") \
    .save("/tmp/agg0.parquet")

print("Ok! Data is written to {}".format("/tmp/agg0.parquet"))

In [None]:
# P.S.
# Когда мы делаем .filter в DataFrame API, мы передаем условие типа pyspark.sql.column.Column.
print(type(condition))

# когда раньше мы использовали лямбда функции в RDD, мы передавали лямбда функцию:
condition_old = lambda x: x != "Earth"
print(type(condition_old))

## Соединения

Join'ы позволяют соединять два DF в один по заданным условиям.

По типу условия join'ы делятся на:
+ equ-join - соединение по равенству одного или более ключей
+ non-equ join - соединение по условию, отличному от равенства одного или более ключей

По методу соединения join'ы бывают:
![Joins](http://kirillpavlov.com/images/join-types.png)
[Источник](http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/)

При выполнении join Spark автоматически выбирает один [из доступных алгоритмов](https://youtu.be/fp53QhSfQcI) соединения и не всегда делает это оптимально, часто применяя cross join. Поэтому, в последних версиях Spark метод ```join()``` приведет к ошибке, если под капотом он будет использовать cross join. Отключить эту проверку можно с помощью опции ```--conf spark.sql.crossJoin.enabled=true```

In [None]:
# Для демонстрации работы join используем подгтовленные данные
left = clean_data.withColumn("continent", lower(col("continent")))
left.printSchema()

right = spark.read.parquet("/tmp/agg0.parquet")
right.printSchema()

Самый простой join - inner join по равенству одной колонки

In [None]:
joined = left.join(right, 'continent', 'inner')

joined.printSchema()

joined.show(10, False)

Inner join по равенству двух колонок. Поскольку двух одинаковых колонок у нас нет, мы создадим их, используя константу

In [None]:
from pyspark.sql.functions import lit

new_col = lit("x").alias("x")

left = left.select(col("*"), new_col)
right = right.select(col("*"), new_col)

joined = left.join(right, ['continent', 'x'], 'inner')

joined.printSchema()

joined.show()

non-equ left join

In [None]:
from pyspark.sql.functions import lit

left = left \
            .withColumn("city_count_max", lit(2)) \
            .withColumnRenamed("continent", "continent_left")

right = agg.withColumnRenamed("continent", "continent_right")

join_condition = \
            (col("continent_left") == col("continent_right")) & (col("city_count") < col("city_count_max"))

joined = left.join(right, join_condition, 'left')

joined.show()

In [None]:
# non-equ right join
from pyspark.sql.functions import expr

left = left.withColumnRenamed("continent_left", "continent").alias("left")
right = right.withColumnRenamed("continent_right", "continent").alias("right")

join_condition = """ left.continent = right.continent AND right.city_count < left.city_count_max """

joined = left.join(right, expr(join_condition), 'right')

joined.show()

In [None]:
left.crossJoin(right).show(30, False)

## Оконные функции

Оконные функции позволяют делать функции над "окнами" (кто бы мог подумать) данных

Окно создается из класса [pyspark.sql.Window](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window) с указанием полей, определяющих границы окон и полей, определяющих порядок сортировки внутри окна:

```window = Window.partitionBy("a", "b").orderBy("a")```

Применяя окна, можно использовать такие полезные функции из [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions), как ```lag()``` и ```lead()```, а также эффективно работать с данными time-series, вычисляя такие параметры, как, например, среднее значение заданного поля за 3-х часовой интервал

In [None]:
# В нашем случае, используя оконные функции, мы можем построить DF из предыдущих примеров c join, 
# но без использования соединения

from pyspark.sql import Window
import pyspark.sql.functions as F

window = Window.partitionBy("continent")

agg = clean_data \
    .withColumn("city_count", F.count("*").over(window)) \
    .withColumn("population_sum", F.sum("population").over(window)) \

agg.show()

## Функции pyspark.sql.functions

Spark обладает достаточно большим набором встроенных функций, доступных в [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions), поэтому перед тем, как писать свою UDF, стоит обязательно поискать нужную функцию в данном пакете.

К тому же, все функции Spark принимают на вход и возвращают [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column), а это значит, что вы можете совмещать функции вместе

**Также важно помнить, что функции и колонки в Spark могут быть созданы без привязки к конкретным данным и DF**

In [None]:
from pyspark.sql.functions import to_json, col, struct

avg_pop = \
    to_json(
        struct(
            (col("population_sum") / col("city_count")).alias("value")
        )
    ).alias("avg_pop")

agg.select(col("*"), avg_pop).show(truncate=False)


In [None]:
# Большим преимуществом Spark по сравнению с большинством SQL ориентированных БД является наличие
# встроенных функций работы со списками, словарями и структурами данных

from pyspark.sql.functions import *

all_in_one = agg.select(struct(*agg.columns).alias("allinone"))

all_in_one.printSchema()
all_in_one.show(20, False)

In [None]:
# Например, можно создавать массивы и объединять их

from pyspark.sql.functions import *

arrays = \
    spark.range(0,1) \
    .withColumn("a", array(lit(1), lit(2), lit(3))) \
    .withColumn("b", array(lit(4),lit(5),lit(6))) \
    .select(array_union(col("a"), col("b")).alias("c"))


arrays.show(1, False)

Также, в разделе [SQL, Built-in Functions](https://spark.apache.org/docs/latest/api/sql/index.html) присутствует еще более широкий список функций, доступных в Spark. Некоторые из них отсутствуют в [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)! 

Эти функции нельзя использовать как обычные методы над [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column), однако вы можете использовать метод ```expr()``` для этого.

In [None]:
from pyspark.sql.functions import *

spark.range(10).select(expr(""" pmod(id, 2) """).alias("foo")).show()

In [None]:
# В данном примере мы используем Java функцию с помощью функции java_method
# Запомните этот пример и используйте всегда, когда вам не хватает какой-либо функции в pyspark, 
# доступной в Java, ведь, используя такой подход, вы не снижаете производительность вашей программы за счет
# передачи данных между Python и JVM приложением Spark, и при этом вам не нужно уметь писать код на Java/Scala :)

from pyspark.sql.functions import *

spark.range(0,1).withColumn("a", expr("java_method('java.util.UUID', 'randomUUID')")).show(1, False)

## Выводы
**Dataframe API**:
+ мощный инструмент для работы с данными
+ в отличие от RDD, Dataframe API устроен так, что все вычисления происходят в JVM
+ обладает единым API для работы с различными источниками данных
+ имеет большой набор встроенных функций работы с данными
+ имеет возможность использовать в pyspark функции, доступные в Java

# Спасибо!