# Spark DataFrames
**Andrey Titov**  
andrey.titov@bigdatateam.org  
Big Data Instructor @ BigData Team  
https://bigdatateam.org

## На этом занятии
+ Общие сведения
+ Базовые функции
+ NA функции
+ Группировки
+ Чтение и запись данных
+ Соединения
+ Оконные функции
+ Функции pyspark.sql.functions

## Общие сведения

**Dataframe:**
+ структурированная колоночная структура данных
+ может быть создана на основе:
  - локальной коллекции
  - файла (файлов)
  - базы данных
+ в python работает значительно быстрее, чем RDD
+ под капотом использует RDD
+ позволяет выполнять произвольные SQL операции с данными
+ аналогично RDD являются ленивыми и неизменяеыми

## Базовый SQL
+ схема [pyspsark.sql.StructType](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.StructType)
+ колонки [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column)
+ данные [pyspark.sql.Row](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row)

Подготовим тестовый набор данных

In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="dataframes")

In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("PySpark Read JSON") \
    .getOrCreate()

In [12]:
from pyspark.sql.functions import *

test_data = [
{"name":"Moscow", "country":"Rossiya", "continent": "Europe", "population": 12380664},
{ "name":"Madrid", "country":"Spain" },
{ "name":"Paris", "country":"France", "continent": "Europe", "population" : 2196936},
{ "name":"Berlin", "country":"Germany", "continent": "Europe", "population": 3490105},
{ "name":"Barselona", "country":"Spain", "continent": "Europe" },
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 },
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 },
{ }
]

rdd = sc.parallelize(test_data)
df = spark.read.json(rdd)
df

DataFrame[continent: string, country: string, name: string, population: bigint]

Метод `show` выводит часть датафрейма в консоль

In [13]:
df.show(10, False) # смотрит на одну из партиций (случайную), если в них данных меньше чем мы указали
                   # идет в следующую партицию, затем данные передаются на драйвер
                   # false обрезает длинные строки

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Rossiya|Moscow   |12380664  |
|null     |Spain  |Madrid   |null      |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Europe   |Spain  |Barselona|null      |
|Africa   |Egypt  |Cairo    |11922948  |
|Africa   |Egypt  |Cairo    |11922948  |
|null     |null   |null     |null      |
+---------+-------+---------+----------+



Метод `printSchema` выводит схему датафрейма в консоль

In [14]:
df.printSchema() # поддерживает сложные структуры

root
 |-- continent: string (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- population: long (nullable = true)



Метод `select` позволяет выбрать существующие (а также создать новые) колонки из датафрейма

In [15]:
from pyspark.sql.functions import *

df.select(col("continent"), col("country")).show(10, False)

+---------+-------+
|continent|country|
+---------+-------+
|Europe   |Rossiya|
|null     |Spain  |
|Europe   |France |
|Europe   |Germany|
|Europe   |Spain  |
|Africa   |Egypt  |
|Africa   |Egypt  |
|null     |null   |
+---------+-------+



Метод `filter` позволяет фильтровать датасет:

In [16]:
df.filter(col("continent") == "Europe").show(10, False)

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Rossiya|Moscow   |12380664  |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Europe   |Spain  |Barselona|null      |
+---------+-------+---------+----------+



Параллелизм обработки данных зависит от количества партиций в датасете:

In [17]:
df.rdd.getNumPartitions()

8

## Очистка данных
Удалим дубликаты. По умолчанию метод `dropDuplicates` удаляет дубликаты строк, у которых ВСЕ колонки совпадают

In [18]:
df.dropDuplicates().show(10, False) # возникает неявный shufl

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Rossiya|Moscow   |12380664  |
|null     |null   |null     |null      |
|null     |Spain  |Madrid   |null      |
|Europe   |Spain  |Barselona|null      |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Africa   |Egypt  |Cairo    |11922948  |
+---------+-------+---------+----------+



Метод `.na.drop` удаляет СТРОКИ, в которых отсутствует часть данных. Параметр `how="all"` означает, что будут удалены строки, у которых ВСЕ колонки `null`

In [19]:
df.dropDuplicates().na.drop(how="all").show(10, False)  # shuffle не требуется

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Rossiya|Moscow   |12380664  |
|null     |Spain  |Madrid   |null      |
|Europe   |Spain  |Barselona|null      |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Africa   |Egypt  |Cairo    |11922948  |
+---------+-------+---------+----------+



Метод `.na.fill` заполняет `null`. Для работы этого метода требуется словарь с изменениями

In [20]:
fill_dict = {'continent': 'n/a', 'population': 0 }

df.dropDuplicates().na.drop(how="all").na.fill(fill_dict).show(10, False)

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Rossiya|Moscow   |12380664  |
|n/a      |Spain  |Madrid   |0         |
|Europe   |Spain  |Barselona|0         |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Africa   |Egypt  |Cairo    |11922948  |
+---------+-------+---------+----------+



Метод `.na.replace` заменяет данные в колонках. Для его работы требуется словарь с заменами

In [21]:
replace_dict = {"Rossiya": "Russia"}

df.dropDuplicates().na.drop("all").na.fill(fill_dict).na.replace(replace_dict).show(10, False)

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Russia |Moscow   |12380664  |
|n/a      |Spain  |Madrid   |0         |
|Europe   |Spain  |Barselona|0         |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Africa   |Egypt  |Cairo    |11922948  |
+---------+-------+---------+----------+



Подготровим датафрейм с очищенными данными

In [22]:
from pyspark.sql.functions import col

clean_data = df \
                .dropDuplicates() \
                .na.drop("all") \
                .na.fill(fill_dict) \
                .na.replace(replace_dict) \
                .filter(col("population") >= 0) \
                .select(col("continent"), col("country"), col("name"), col("population"))

In [23]:
clean_data.show(10, False)

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Russia |Moscow   |12380664  |
|n/a      |Spain  |Madrid   |0         |
|Europe   |Spain  |Barselona|0         |
|Europe   |France |Paris    |2196936   |
|Europe   |Germany|Berlin   |3490105   |
|Africa   |Egypt  |Cairo    |11922948  |
+---------+-------+---------+----------+



In [24]:
clean_data.printSchema()

root
 |-- continent: string (nullable = false)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- population: long (nullable = false)



# Группировки

Подготовим базовый агрегат. По умолчанию имена колонок принимают неудобные названия

In [25]:
from pyspark.sql.functions import count, sum
### вызывет shuffle, все строки с одинаковым континетном помещаются в одну партицию
### groupBy работает в два этапа, сначала группировка внутри партиции а затем между партициями
agg = clean_data.groupBy("continent").agg(count("*"), sum(col("population")))
agg.show(10, False)

+---------+--------+---------------+
|continent|count(1)|sum(population)|
+---------+--------+---------------+
|Europe   |4       |18067705       |
|Africa   |1       |11922948       |
|n/a      |1       |0              |
+---------+--------+---------------+



Метод `alias` позволяет переименовывать колонки

In [26]:
from pyspark.sql.functions import count, sum, lower

pop_count = count("*").alias("city_count")
pop_sum = sum(col("population")).alias("population_sum")

agg = clean_data \
            .groupBy("continent") \
            .agg(pop_count, pop_sum) \
            .withColumn("continent", lower(col("continent")))

agg.show(10, False)

+---------+----------+--------------+
|continent|city_count|population_sum|
+---------+----------+--------------+
|europe   |4         |18067705      |
|africa   |1         |11922948      |
|n/a      |1         |0             |
+---------+----------+--------------+



## Чтение данных из источника
Основной метод чтения любых источников

```df = spark.read.format(datasource_type).option(datasource_options).load(object_name)```

+ ```datasource_type``` - тип источника ("parquet", "json", "cassandra") и т. д.
+ ```datasource_options``` - опции для работы с источником (логины, пароли, адреса для подключения и т. д.)
+ ```object_name``` - имя таблицы/файла/топика/индекса

[DataframeReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader):
+ по умолчанию выводит схему данных
+ является трансформацией (ленивый)
+ возвращает [Dataframe](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)

### Список (неполный) поддерживаемых источников данных
+ Файлы:
  - json
  - text
  - csv
  - orc
  - parquet
  - delta
+ Базы данных
  - elasticsearch
  - cassandra
  - jdbc
  - hive
  - redis
  - mongo
+ Брокеры сообщений
  - kafka
  

**Библиотеки для работы с источниками должны быть доступны в JAVA CLASSPATH на драйвере и воркерах!**

In [27]:
df = spark.read.format("csv").options(header=True, inferSchema=True).load("airport-codes.csv")

In [28]:
df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [29]:
df.show(n=1, truncate=False, vertical=True)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                                
 iata_code    | null                               
 local_code   | 00A                                
 coordinates  | -74.93360137939453, 40.07080078125 
only showing top 1 row



## Запись данных
Основной метод записи в любые системы

```df.write.format(datasource_type).options(datasource_options).mode(savemode).save(object_name)```

+ ```datasource_type``` - тип источника ("parquet", "json", "cassandra") и т. д.
+ ```datasource_options``` - опции для работы с источником (логины, пароли, адреса для подключения и т. д.)
+ ```savemode``` - режим записи данных (добавление, перезапись и т. д.)
+ ```object_name``` - имя таблицы/файла/топика/индекса

[DataFrameWriter](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter):
+ метод ```save``` является действием
+ позволяет работать с партиционированными данными (parquet, orc)
+ не всегда валидирует схему и формат данных


### Список (неполный) поддерживаемых источников данных
+ Файлы:
  - json
  - text
  - csv
  - orc
  - parquet
  - delta
+ Базы данных
  - elasticsearch
  - cassandra
  - jdbc
  - hive
  - redis
  - mongo
+ Брокеры сообщений
  - kafka
  

**Библиотеки для работы с источниками должны быть доступны в JAVA CLASSPATH на драйвере и воркерах!**



In [46]:
agg.show(5)

+---------+----------+--------------+
|continent|city_count|population_sum|
+---------+----------+--------------+
|   europe|         4|      18067705|
|   africa|         1|      11922948|
|      n/a|         1|             0|
+---------+----------+--------------+



In [None]:
condition = col("continent") != "n/a"

agg \
    .filter(condition) \
    .write \
    .json("agg0.json")

print("Ok! Data is written to {}".format("agg0.parquet"))

In [33]:
# P.S.
# Когда мы делаем .filter в DataFrame API, мы передаем условие типа pyspark.sql.column.Column.
print(type(condition))

# когда раньше мы использовали лямбда функции в RDD, мы передавали лямбда функцию:
condition_old = lambda x: x != "Earth"
print(type(condition_old))

<class 'pyspark.sql.column.Column'>
<class 'function'>


## Соединения

Join'ы позволяют соединять два DF в один по заданным условиям.

По типу условия join'ы делятся на:
+ equ-join - соединение по равенству одного или более ключей
+ non-equ join - соединение по условию, отличному от равенства одного или более ключей

По методу соединения join'ы бывают:
![Joins](http://kirillpavlov.com/images/join-types.png)
[Источник](http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/)

При выполнении join Spark автоматически выбирает один [из доступных алгоритмов](https://youtu.be/fp53QhSfQcI) соединения и не всегда делает это оптимально, часто применяя cross join. Поэтому, в последних версиях Spark метод ```join()``` приведет к ошибке, если под капотом он будет использовать cross join. Отключить эту проверку можно с помощью опции ```--conf spark.sql.crossJoin.enabled=true```

In [48]:
# Для демонстрации работы join используем подгтовленные данные
left = clean_data.withColumn("continent", lower(col("continent")))
left.printSchema()

right = agg.filter(condition)
right.printSchema()

root
 |-- continent: string (nullable = false)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- population: long (nullable = false)

root
 |-- continent: string (nullable = false)
 |-- city_count: long (nullable = false)
 |-- population_sum: long (nullable = true)



Самый простой join - inner join по равенству одной колонки

In [49]:
joined = left.join(right, on='continent', how='inner')

joined.printSchema()

joined.show(10, False)

root
 |-- continent: string (nullable = false)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- population: long (nullable = false)
 |-- city_count: long (nullable = false)
 |-- population_sum: long (nullable = true)

+---------+-------+---------+----------+----------+--------------+
|continent|country|name     |population|city_count|population_sum|
+---------+-------+---------+----------+----------+--------------+
|europe   |Russia |Moscow   |12380664  |4         |18067705      |
|europe   |Spain  |Barselona|0         |4         |18067705      |
|europe   |France |Paris    |2196936   |4         |18067705      |
|europe   |Germany|Berlin   |3490105   |4         |18067705      |
|africa   |Egypt  |Cairo    |11922948  |1         |11922948      |
+---------+-------+---------+----------+----------+--------------+



Inner join по равенству двух колонок. Поскольку двух одинаковых колонок у нас нет, мы создадим их, используя константу

In [50]:
from pyspark.sql.functions import lit

new_col = lit("x").alias("x")

left = left.select(col("*"), new_col)
right = right.select(col("*"), new_col)

joined = left.join(right, ['continent', 'x'], 'inner')

joined.printSchema()

joined.show()

root
 |-- continent: string (nullable = false)
 |-- x: string (nullable = false)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- population: long (nullable = false)
 |-- city_count: long (nullable = false)
 |-- population_sum: long (nullable = true)

+---------+---+-------+---------+----------+----------+--------------+
|continent|  x|country|     name|population|city_count|population_sum|
+---------+---+-------+---------+----------+----------+--------------+
|   europe|  x| Russia|   Moscow|  12380664|         4|      18067705|
|   europe|  x|  Spain|Barselona|         0|         4|      18067705|
|   europe|  x| France|    Paris|   2196936|         4|      18067705|
|   europe|  x|Germany|   Berlin|   3490105|         4|      18067705|
|   africa|  x|  Egypt|    Cairo|  11922948|         1|      11922948|
+---------+---+-------+---------+----------+----------+--------------+



## non-equ left join

In [51]:
from pyspark.sql.functions import lit

left = left \
            .withColumn("city_count_max", lit(2)) \
            .withColumnRenamed("continent", "continent_left")

right = agg.withColumnRenamed("continent", "continent_right")

join_condition = \
            (col("continent_left") == col("continent_right")) & (col("city_count") < col("city_count_max"))

joined = left.join(right, join_condition, 'left')

joined.show()

+--------------+-------+---------+----------+---+--------------+---------------+----------+--------------+
|continent_left|country|     name|population|  x|city_count_max|continent_right|city_count|population_sum|
+--------------+-------+---------+----------+---+--------------+---------------+----------+--------------+
|        europe| Russia|   Moscow|  12380664|  x|             2|           null|      null|          null|
|        europe|  Spain|Barselona|         0|  x|             2|           null|      null|          null|
|        europe| France|    Paris|   2196936|  x|             2|           null|      null|          null|
|        europe|Germany|   Berlin|   3490105|  x|             2|           null|      null|          null|
|           n/a|  Spain|   Madrid|         0|  x|             2|            n/a|         1|             0|
|        africa|  Egypt|    Cairo|  11922948|  x|             2|         africa|         1|      11922948|
+--------------+-------+---------+---

In [52]:
# non-equ right join
from pyspark.sql.functions import expr

left = left.withColumnRenamed("continent_left", "continent").alias("left")
right = right.withColumnRenamed("continent_right", "continent").alias("right")

join_condition = """ left.continent = right.continent AND right.city_count < left.city_count_max """

joined = left.join(right, expr(join_condition), 'right')

joined.show()

+---------+-------+------+----------+----+--------------+---------+----------+--------------+
|continent|country|  name|population|   x|city_count_max|continent|city_count|population_sum|
+---------+-------+------+----------+----+--------------+---------+----------+--------------+
|     null|   null|  null|      null|null|          null|   europe|         4|      18067705|
|      n/a|  Spain|Madrid|         0|   x|             2|      n/a|         1|             0|
|   africa|  Egypt| Cairo|  11922948|   x|             2|   africa|         1|      11922948|
+---------+-------+------+----------+----+--------------+---------+----------+--------------+



In [53]:
left.crossJoin(right).show(30, False)

+---------+-------+---------+----------+---+--------------+---------+----------+--------------+
|continent|country|name     |population|x  |city_count_max|continent|city_count|population_sum|
+---------+-------+---------+----------+---+--------------+---------+----------+--------------+
|europe   |Russia |Moscow   |12380664  |x  |2             |europe   |4         |18067705      |
|europe   |Russia |Moscow   |12380664  |x  |2             |africa   |1         |11922948      |
|europe   |Russia |Moscow   |12380664  |x  |2             |n/a      |1         |0             |
|n/a      |Spain  |Madrid   |0         |x  |2             |europe   |4         |18067705      |
|n/a      |Spain  |Madrid   |0         |x  |2             |africa   |1         |11922948      |
|n/a      |Spain  |Madrid   |0         |x  |2             |n/a      |1         |0             |
|europe   |Spain  |Barselona|0         |x  |2             |europe   |4         |18067705      |
|europe   |Spain  |Barselona|0         |

## Оконные функции

Оконные функции позволяют делать функции над "окнами" (кто бы мог подумать) данных

Окно создается из класса [pyspark.sql.Window](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window) с указанием полей, определяющих границы окон и полей, определяющих порядок сортировки внутри окна:

```window = Window.partitionBy("a", "b").orderBy("a")```

Применяя окна, можно использовать такие полезные функции из [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions), как ```lag()``` и ```lead()```, а также эффективно работать с данными time-series, вычисляя такие параметры, как, например, среднее значение заданного поля за 3-х часовой интервал

In [54]:
# В нашем случае, используя оконные функции, мы можем построить DF из предыдущих примеров c join, 
# но без использования соединения

from pyspark.sql import Window
import pyspark.sql.functions as F

window = Window.partitionBy("continent")

agg = clean_data \
    .withColumn("city_count", F.count("*").over(window)) \
    .withColumn("population_sum", F.sum("population").over(window)) \

agg.show()

+---------+-------+---------+----------+----------+--------------+
|continent|country|     name|population|city_count|population_sum|
+---------+-------+---------+----------+----------+--------------+
|   Europe| Russia|   Moscow|  12380664|         4|      18067705|
|   Europe|  Spain|Barselona|         0|         4|      18067705|
|   Europe| France|    Paris|   2196936|         4|      18067705|
|   Europe|Germany|   Berlin|   3490105|         4|      18067705|
|   Africa|  Egypt|    Cairo|  11922948|         1|      11922948|
|      n/a|  Spain|   Madrid|         0|         1|             0|
+---------+-------+---------+----------+----------+--------------+



## Функции pyspark.sql.functions

Spark обладает достаточно большим набором встроенных функций, доступных в [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions), поэтому перед тем, как писать свою UDF, стоит обязательно поискать нужную функцию в данном пакете.

К тому же, все функции Spark принимают на вход и возвращают [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column), а это значит, что вы можете совмещать функции вместе

**Также важно помнить, что функции и колонки в Spark могут быть созданы без привязки к конкретным данным и DF**

In [55]:
from pyspark.sql.functions import to_json, col, struct

avg_pop = \
    to_json(
        struct(
            (col("population_sum") / col("city_count")).alias("value")
        )
    ).alias("avg_pop")

agg.select(col("*"), avg_pop).show(truncate=False)


+---------+-------+---------+----------+----------+--------------+---------------------+
|continent|country|name     |population|city_count|population_sum|avg_pop              |
+---------+-------+---------+----------+----------+--------------+---------------------+
|Europe   |Russia |Moscow   |12380664  |4         |18067705      |{"value":4516926.25} |
|Europe   |Spain  |Barselona|0         |4         |18067705      |{"value":4516926.25} |
|Europe   |France |Paris    |2196936   |4         |18067705      |{"value":4516926.25} |
|Europe   |Germany|Berlin   |3490105   |4         |18067705      |{"value":4516926.25} |
|Africa   |Egypt  |Cairo    |11922948  |1         |11922948      |{"value":1.1922948E7}|
|n/a      |Spain  |Madrid   |0         |1         |0             |{"value":0.0}        |
+---------+-------+---------+----------+----------+--------------+---------------------+



In [56]:
# Большим преимуществом Spark по сравнению с большинством SQL ориентированных БД является наличие
# встроенных функций работы со списками, словарями и структурами данных

from pyspark.sql.functions import *

all_in_one = agg.select(struct(*agg.columns).alias("allinone"))

all_in_one.printSchema()
all_in_one.show(20, False)

root
 |-- allinone: struct (nullable = false)
 |    |-- continent: string (nullable = false)
 |    |-- country: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- population: long (nullable = false)
 |    |-- city_count: long (nullable = false)
 |    |-- population_sum: long (nullable = true)

+-----------------------------------------------+
|allinone                                       |
+-----------------------------------------------+
|{Europe, Russia, Moscow, 12380664, 4, 18067705}|
|{Europe, Spain, Barselona, 0, 4, 18067705}     |
|{Europe, France, Paris, 2196936, 4, 18067705}  |
|{Europe, Germany, Berlin, 3490105, 4, 18067705}|
|{Africa, Egypt, Cairo, 11922948, 1, 11922948}  |
|{n/a, Spain, Madrid, 0, 1, 0}                  |
+-----------------------------------------------+



In [57]:
# Например, можно создавать массивы и объединять их

from pyspark.sql.functions import *

arrays = \
    spark.range(0,1) \
    .withColumn("a", array(lit(1), lit(2), lit(3))) \
    .withColumn("b", array(lit(4),lit(5),lit(6))) \
    .select(array_union(col("a"), col("b")).alias("c"))


arrays.show(1, False)

+------------------+
|c                 |
+------------------+
|[1, 2, 3, 4, 5, 6]|
+------------------+



Также, в разделе [SQL, Built-in Functions](https://spark.apache.org/docs/latest/api/sql/index.html) присутствует еще более широкий список функций, доступных в Spark. Некоторые из них отсутствуют в [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)! 

Эти функции нельзя использовать как обычные методы над [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column), однако вы можете использовать метод ```expr()``` для этого.

In [58]:
from pyspark.sql.functions import *

spark.range(10).select(expr(""" pmod(id, 2) """).alias("foo")).show()

+---+
|foo|
+---+
|  0|
|  1|
|  0|
|  1|
|  0|
|  1|
|  0|
|  1|
|  0|
|  1|
+---+



In [59]:
# В данном примере мы используем Java функцию с помощью функции java_method
# Запомните этот пример и используйте всегда, когда вам не хватает какой-либо функции в pyspark, 
# доступной в Java, ведь, используя такой подход, вы не снижаете производительность вашей программы за счет
# передачи данных между Python и JVM приложением Spark, и при этом вам не нужно уметь писать код на Java/Scala :)

from pyspark.sql.functions import *

spark.range(0,1).withColumn("a", expr("java_method('java.util.UUID', 'randomUUID')")).show(1, False)

+---+------------------------------------+
|id |a                                   |
+---+------------------------------------+
|0  |1f4c4041-7c2e-4155-adf5-317c49baacc6|
+---+------------------------------------+



## Выводы
**Dataframe API**:
+ мощный инструмент для работы с данными
+ в отличие от RDD, Dataframe API устроен так, что все вычисления происходят в JVM
+ обладает единым API для работы с различными источниками данных
+ имеет большой набор встроенных функций работы с данными
+ имеет возможность использовать в pyspark функции, доступные в Java

# Спасибо!