# Apache Spark

**Apache Spark**:
- Фреймворк для распределённой обработки данных
- Входит в экосистему Hadoop

#### 1. Что значит распределённая обработка данных? Когда она нужна?
- данных много
- параллельность
- масштабирование


#### 2. Что такое Hadoop? Каковы его особенности?
- экосистема = набор инструментов
- HDFS
- оркестратор=YARN


#### 3. Что такое MapReduce? Как он работает?
- алгоритм или способ обработки


Преимущества Spark по сравнению с MapReduce:
* Сам простраивает выполнение – не нужно специально писать mapper'ы и reducer'ы
* Максимально выполняет обработку в памяти, не сбрасывая на диск
* Много оптимизаций
* Легко делать итеративные алгоритмы – часто это машинное обучение

Доступные языки: Scala, Java, **Python**, R, SQL

### Работа с данными

Три уровня API к данным:
* RDD – Resilient Data Set, неизменяемая коллекция данных (массив объектов)
* **DataFrame** – абстракция над RDD, позволяет выполнять произвольный SQL (таблица в БД)
* Dataset – строго типизированная версия DataFrame, отсутствует за пределами Scala и Java

In [1]:
import findspark
findspark.init()

Чтобы пользоваться DataFrame API, нужно создать Spark-сессию

`spark.stop()` - чтобы пересоздать заново, сначала нужно остановить предыдущую. Spark-сессия – это [синглтон](https://webdevblog.ru/realizaciya-shablona-singleton-v-python/).


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Apache Spark Lecture").getOrCreate()
spark

23/01/12 11:32:32 WARN Utils: Your hostname, spark-server resolves to a loopback address: 127.0.1.1; using 10.0.0.31 instead (on interface eth0)
23/01/12 11:32:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/12 11:32:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Откуда можно взять данные?**
* из локальной коллекции
* из распределённой файловой системы (HDFS, S3, etc)
* из базы данных (PostgreSQL, Cassandra, etc)
* откуда угодно, если написать коннектор

### Создание DataFrame

Для начала создадим его из локальной коллекции структур:

In [3]:
import datetime

data = [
    ["user1",  datetime.datetime(2022, 6, 7),   1234556, 1,  567.8],
    ["user2",  datetime.datetime(2022, 6, 8),   2345633, 2,  1276.0],
    ["user3",  datetime.datetime(2022, 10, 11), 3687665, 10, 1053.0],
    ["user5",  datetime.datetime(2022, 10, 11), 3687665, 2,  210.6],
    ["user6",  datetime.datetime(2022, 10, 12), 5348776, 5,  2000.0],
    ["user7",  datetime.datetime(2022, 12, 1),  2345765, 8,  186.0],
    ["user8",  datetime.datetime(2021, 10, 11), 2369867, 3,  900.0],
    ["user9",  datetime.datetime(2021, 2, 3),   1234556, 1,  567.8],
    ["user10", datetime.datetime(2021, 5, 10),  2563574, 1,  1050.0],
    ["user11", datetime.datetime(2022, 10, 11), 2346354, 1,  400.0],
    ["user12", datetime.datetime(2022, 6, 15),  8796467, 4,  2400.0],
    ["user13", datetime.datetime(2022, 6, 15),  4573645, 10, 1050.0],
    ["user14", datetime.datetime(2022, 9, 2),   2936764, 8,  64.0],
]

In [4]:
df = spark.createDataFrame(data)
df

DataFrame[_1: string, _2: timestamp, _3: bigint, _4: bigint, _5: double]

У датафрейма есть схема: названия, типы и характеристики колонок

In [5]:
df.printSchema() # выводит схему в поток вывода

root
 |-- _1: string (nullable = true)
 |-- _2: timestamp (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)
 |-- _5: double (nullable = true)



In [6]:
# Пример большей вложенности схемы
spark.createDataFrame([
    [[1, 2], 3],
    [[4, 5], 6]
]).printSchema()

root
 |-- _1: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- _2: long (nullable = true)



Схема – это объект типа [StructType](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=structtype#pyspark.sql.types.StructType).
Отдельные поля представлены объектами типа [StructField](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=structtype#pyspark.sql.types.StructField).

In [7]:
df.schema

StructType(List(StructField(_1,StringType,true),StructField(_2,TimestampType,true),StructField(_3,LongType,true),StructField(_4,LongType,true),StructField(_5,DoubleType,true)))

In [8]:
df.columns

['_1', '_2', '_3', '_4', '_5']

#### Создание своей схемы

In [9]:
from pyspark.sql.types import *

schema = StructType([
    StructField("user", StringType(),    nullable = False),
    StructField("date", TimestampType(), nullable = False),
    StructField("product_id", LongType(), nullable = False),
    StructField("quantity", IntegerType(), nullable = False),
    StructField("payment", DoubleType(), nullable = False)
])

In [10]:
df = spark.createDataFrame(data, schema)

In [11]:
df.printSchema()

root
 |-- user: string (nullable = false)
 |-- date: timestamp (nullable = false)
 |-- product_id: long (nullable = false)
 |-- quantity: integer (nullable = false)
 |-- payment: double (nullable = false)



Spark следит за выполнением ограничения nullable:

In [13]:
spark.createDataFrame([
    ["user1",  datetime.datetime(2022, 6, 7),   None, 1,  None]
], schema)

ValueError: field product_id: This field is not nullable, but got None

*О дополнительных способах создать DataFrame можно почитать [тут](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation)*.

### Просмотр содержимого

Метод show() выводит часть датафрейма в поток вывода:

In [12]:
df.show()

                                                                                

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
| user1|2022-06-07 00:00:00|   1234556|       1|  567.8|
| user2|2022-06-08 00:00:00|   2345633|       2| 1276.0|
| user3|2022-10-11 00:00:00|   3687665|      10| 1053.0|
| user5|2022-10-11 00:00:00|   3687665|       2|  210.6|
| user6|2022-10-12 00:00:00|   5348776|       5| 2000.0|
| user7|2022-12-01 00:00:00|   2345765|       8|  186.0|
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
|user11|2022-10-11 00:00:00|   2346354|       1|  400.0|
|user12|2022-06-15 00:00:00|   8796467|       4| 2400.0|
|user13|2022-06-15 00:00:00|   4573645|      10| 1050.0|
|user14|2022-09-02 00:00:00|   2936764|       8|   64.0|
+------+-------------------+----------+--------+-------+



In [13]:
df.show(2) # Вывести только 2 строки

+-----+-------------------+----------+--------+-------+
| user|               date|product_id|quantity|payment|
+-----+-------------------+----------+--------+-------+
|user1|2022-06-07 00:00:00|   1234556|       1|  567.8|
|user2|2022-06-08 00:00:00|   2345633|       2| 1276.0|
+-----+-------------------+----------+--------+-------+
only showing top 2 rows



In [14]:
df.show(1, vertical=True) # Вертикальное отображение: полезно, когда колонок слишком много

-RECORD 0-------------------------
 user       | user1               
 date       | 2022-06-07 00:00:00 
 product_id | 1234556             
 quantity   | 1                   
 payment    | 567.8               
only showing top 1 row



Что если значение слишком длинное?

In [15]:
long_df = spark.createDataFrame([
    ["AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"]
])

In [16]:
long_df.show()

+--------------------+
|                  _1|
+--------------------+
|AAAAAAAAAAAAAAAAA...|
+--------------------+



In [17]:
long_df.show(truncate=False) # Выводить значения полностью. Полезно для визуального сравнения длинных значений

+----------------------------------------------------------------------------------------------------------------------------------------+
|_1                                                                                                                                      |
+----------------------------------------------------------------------------------------------------------------------------------------+
|AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|
+----------------------------------------------------------------------------------------------------------------------------------------+



### Запись и чтение

Запишем наш датасет. Для этого используем [DataFrameWriter](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=dataframewriter#pyspark.sql.DataFrameWriter), который можно получить методом write:

In [18]:
writer = df.write # отдельная переменная для наглядности
writer.parquet("/tmp/data/test_df") # записать в формате Parquet

                                                                                

In [19]:
ls /tmp/data/test_df

part-00000-80b8d9c2-ea17-4645-a69b-567f41572de7-c000.snappy.parquet  _SUCCESS
part-00001-80b8d9c2-ea17-4645-a69b-567f41572de7-c000.snappy.parquet


Более развёрнутый вариант:
- **format**: название формата
- **mode**: поведение в том случае, если данные существуют 
    - *append*: добавить в конец
    - *overwrite*: перезаписать
    - *error*: упасть с ошибкой
    - *ignore*: игнорировать запись
- **save**: запускает запись по указанному пути    

In [20]:
df.write.format("parquet").mode("overwrite").save("/tmp/data/test_df")

Теперь прочитаем с помощью [DataFrameReader](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=dataframereader#pyspark.sql.DataFrameReader):

In [21]:
reader = spark.read
df = reader.parquet("/tmp/data/test_df")

In [22]:
df

DataFrame[user: string, date: timestamp, product_id: bigint, quantity: int, payment: double]

Более развёрнутый вариант:
- **format**: название формата
- **schema**: принудительная схема датафрейма
- **load**: индексирует файлы по указанному пути для чтения

In [23]:
df = spark.read.format("parquet").schema(schema).load("/tmp/data/test_df")

In [24]:
df.show(1)

+-----+-------------------+----------+--------+-------+
| user|               date|product_id|quantity|payment|
+-----+-------------------+----------+--------+-------+
|user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
+-----+-------------------+----------+--------+-------+
only showing top 1 row



#### Список (неполный) поддерживаемых источников данных
+ Файлы:
    - json
    - text
    - csv
    - orc
    - parquet
    - delta
+ Базы данных
    - elasticsearch
    - cassandra
    - jdbc
    - hive
    - redis
    - mongo
+ Брокеры сообщений
    - kafka

Чтение и запись CSV-файлов может быть сложнее: нужно следить за разделителем и другими нюансами и указывать их в методе **options** (есть и у DataFrameReader, и у DataFrameWriter).
В документации Spark есть хороший [гайд](https://spark.apache.org/docs/latest/sql-data-sources-csv.html) того, как с такими файлами обращаться.                                                                                                                                              

### Чтение и запись с партиционированием по колонке

Датафреймы можно "раскладывать" по значению одной или нескольких колонок. Это помогает в дальнейшем уменьшить количество читаемых данных, если нужны записи в определённом диапазоне значений этих колонок (см далее блок **FILTER/WHERE**).

In [25]:
df1 = spark.createDataFrame([
    ["2022-10-30", "hjksjfgh", 345345],
    ["2022-10-31", "sdfsdf", 456456]    
])

Обычная запись без партиционирования будет выглядеть так:

In [26]:
df1.write.parquet("/tmp/data/df1")

In [27]:
df1

DataFrame[_1: string, _2: string, _3: bigint]

In [28]:
ls /tmp/data/df1

part-00000-4766be33-e3fc-4629-8b5b-c019c8bb844f-c000.snappy.parquet  _SUCCESS
part-00001-4766be33-e3fc-4629-8b5b-c019c8bb844f-c000.snappy.parquet


С помощью метода **DataFrameWriter.partitionBy** можно партиционировать по значению одной или нескольких колонок:

In [29]:
df1.write.partitionBy("_1").parquet("/tmp/data/df2")

                                                                                

In [30]:
ls /tmp/data/df2

[0m[01;34m'_1=2022-10-30'[0m/  [01;34m'_1=2022-10-31'[0m/   _SUCCESS


In [31]:
# порядок колонок важен: он определяет вложенность поддиректорий
df1.write.partitionBy("_1", "_2").parquet("/tmp/data/df3") 

In [32]:
ls /tmp/data/df3/_1=2022-10-30/_2=hjksjfgh

part-00000-2e3425bf-a033-4960-af99-822a3f3f1ecc.c000.snappy.parquet


Читать можно как родительскую директорию, так и поддиректории. Чтение поддиректории – автоматическая фильтрация по значению соответствующей колонки.

In [33]:
spark.read.parquet("/tmp/data/df2").show()

+--------+------+----------+
|      _2|    _3|        _1|
+--------+------+----------+
|hjksjfgh|345345|2022-10-30|
|  sdfsdf|456456|2022-10-31|
+--------+------+----------+



In [34]:
spark.read.parquet("/tmp/data/df2/_1=2022-10-30").show()

+--------+------+
|      _2|    _3|
+--------+------+
|hjksjfgh|345345|
+--------+------+



С помощью DataFrameReader также можно читать несколько файлов за один раз. Если схемы соответствующих датафреймов различаются, см раздел [Schema merging](https://spark.apache.org/docs/2.3.2/sql-programming-guide.html#schema-merging)

In [35]:
spark.read.parquet("/tmp/data/df2/_1=2022-10-30", "/tmp/data/df2/_1=2022-10-31").show()

+--------+------+
|      _2|    _3|
+--------+------+
|hjksjfgh|345345|
|  sdfsdf|456456|
+--------+------+



### Простые SQL-операции

**SELECT**

In [36]:
df.select("user", "product_id").show(3)

+------+----------+
|  user|product_id|
+------+----------+
| user8|   2369867|
| user9|   1234556|
|user10|   2563574|
+------+----------+
only showing top 3 rows



Для обращения к колонке и выполнения над ней операций используется класс [Column](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=column#pyspark.sql.Column).

In [37]:
df.user

Column<'user'>

In [38]:
df.select(df.user.alias("newname")).show(1)

+-------+
|newname|
+-------+
|  user8|
+-------+
only showing top 1 row



In [39]:
import pyspark.sql.functions as F
df.select(F.col("user").alias("newname")).show(1)

+-------+
|newname|
+-------+
|  user8|
+-------+
only showing top 1 row



Метод explain() показывает план выполнения запроса

In [40]:
df.select(df.user.alias("newname")).explain()

== Physical Plan ==
*(1) Project [user#119 AS newname#238]
+- *(1) ColumnarToRow
   +- FileScan parquet [user#119] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/test_df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user:string>




^ Parquet – колоночный формат. Он позволяет читать только отдельные колонки. Spark этим пользуется! <br/>
Такие оптимизации выполняются мощным компонентом [Catalyst](https://www.unraveldata.com/resources/catalyst-analyst-a-deep-dive-into-sparks-optimizer/). Они минимизируют передачу данных по сети и загрузку их в память.

In [41]:
df.select(df.user.alias("newname")).explain(extended=True)

== Parsed Logical Plan ==
Project [user#119 AS newname#240]
+- Relation [user#119,date#120,product_id#121L,quantity#122,payment#123] parquet

== Analyzed Logical Plan ==
newname: string
Project [user#119 AS newname#240]
+- Relation [user#119,date#120,product_id#121L,quantity#122,payment#123] parquet

== Optimized Logical Plan ==
Project [user#119 AS newname#240]
+- Relation [user#119,date#120,product_id#121L,quantity#122,payment#123] parquet

== Physical Plan ==
*(1) Project [user#119 AS newname#240]
+- *(1) ColumnarToRow
   +- FileScan parquet [user#119] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/test_df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user:string>



**ORDER BY**

In [42]:
df.orderBy("product_id").show() # по дефолту asc

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
| user1|2022-06-07 00:00:00|   1234556|       1|  567.8|
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
| user2|2022-06-08 00:00:00|   2345633|       2| 1276.0|
| user7|2022-12-01 00:00:00|   2345765|       8|  186.0|
|user11|2022-10-11 00:00:00|   2346354|       1|  400.0|
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
|user14|2022-09-02 00:00:00|   2936764|       8|   64.0|
| user5|2022-10-11 00:00:00|   3687665|       2|  210.6|
| user3|2022-10-11 00:00:00|   3687665|      10| 1053.0|
|user13|2022-06-15 00:00:00|   4573645|      10| 1050.0|
| user6|2022-10-12 00:00:00|   5348776|       5| 2000.0|
|user12|2022-06-15 00:00:00|   8796467|       4| 2400.0|
+------+-------------------+----------+--------+-------+



In [43]:
# эквивалентно
df.select("*").orderBy("product_id").show()

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
| user1|2022-06-07 00:00:00|   1234556|       1|  567.8|
| user2|2022-06-08 00:00:00|   2345633|       2| 1276.0|
| user7|2022-12-01 00:00:00|   2345765|       8|  186.0|
|user11|2022-10-11 00:00:00|   2346354|       1|  400.0|
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
|user14|2022-09-02 00:00:00|   2936764|       8|   64.0|
| user5|2022-10-11 00:00:00|   3687665|       2|  210.6|
| user3|2022-10-11 00:00:00|   3687665|      10| 1053.0|
|user13|2022-06-15 00:00:00|   4573645|      10| 1050.0|
| user6|2022-10-12 00:00:00|   5348776|       5| 2000.0|
|user12|2022-06-15 00:00:00|   8796467|       4| 2400.0|
+------+-------------------+----------+--------+-------+



In [44]:
df.orderBy(df.product_id.desc()).show() # asc/desc можно вызвать у Column

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
|user12|2022-06-15 00:00:00|   8796467|       4| 2400.0|
| user6|2022-10-12 00:00:00|   5348776|       5| 2000.0|
|user13|2022-06-15 00:00:00|   4573645|      10| 1050.0|
| user3|2022-10-11 00:00:00|   3687665|      10| 1053.0|
| user5|2022-10-11 00:00:00|   3687665|       2|  210.6|
|user14|2022-09-02 00:00:00|   2936764|       8|   64.0|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
|user11|2022-10-11 00:00:00|   2346354|       1|  400.0|
| user7|2022-12-01 00:00:00|   2345765|       8|  186.0|
| user2|2022-06-08 00:00:00|   2345633|       2| 1276.0|
| user1|2022-06-07 00:00:00|   1234556|       1|  567.8|
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
+------+-------------------+----------+--------+-------+



**FILTER/WHERE**

In [45]:
df.filter("payment > 100").show(3)

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
+------+-------------------+----------+--------+-------+
only showing top 3 rows



In [46]:
df.where("payment > 100").show(3)

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
+------+-------------------+----------+--------+-------+
only showing top 3 rows



In [47]:
df.where(df.payment > 100).show(3)

+------+-------------------+----------+--------+-------+
|  user|               date|product_id|quantity|payment|
+------+-------------------+----------+--------+-------+
| user8|2021-10-11 00:00:00|   2369867|       3|  900.0|
| user9|2021-02-03 00:00:00|   1234556|       1|  567.8|
|user10|2021-05-10 00:00:00|   2563574|       1| 1050.0|
+------+-------------------+----------+--------+-------+
only showing top 3 rows



In [48]:
df.where(df.payment > 100).explain()

== Physical Plan ==
*(1) Filter (isnotnull(payment#123) AND (payment#123 > 100.0))
+- *(1) ColumnarToRow
   +- FileScan parquet [user#119,date#120,product_id#121L,quantity#122,payment#123] Batched: true, DataFilters: [isnotnull(payment#123), (payment#123 > 100.0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/test_df], PartitionFilters: [], PushedFilters: [IsNotNull(payment), GreaterThan(payment,100.0)], ReadSchema: struct<user:string,date:timestamp,product_id:bigint,quantity:int,payment:double>




^ Вместо того, чтобы фильтровать после загрузки, файл фильтруется при чтении. В память попадают только нужные значения.

In [49]:
df.select("user", "payment").filter("payment > 100").explain(True)

== Parsed Logical Plan ==
'Filter ('payment > 100)
+- Project [user#119, payment#123]
   +- Relation [user#119,date#120,product_id#121L,quantity#122,payment#123] parquet

== Analyzed Logical Plan ==
user: string, payment: double
Filter (payment#123 > cast(100 as double))
+- Project [user#119, payment#123]
   +- Relation [user#119,date#120,product_id#121L,quantity#122,payment#123] parquet

== Optimized Logical Plan ==
Project [user#119, payment#123]
+- Filter (isnotnull(payment#123) AND (payment#123 > 100.0))
   +- Relation [user#119,date#120,product_id#121L,quantity#122,payment#123] parquet

== Physical Plan ==
*(1) Filter (isnotnull(payment#123) AND (payment#123 > 100.0))
+- *(1) ColumnarToRow
   +- FileScan parquet [user#119,payment#123] Batched: true, DataFilters: [isnotnull(payment#123), (payment#123 > 100.0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/test_df], PartitionFilters: [], PushedFilters: [IsNotNull(payment), GreaterThan(payment,100.0)], ReadSch

^ Фильтр проталкивается как можно ближе к источнику данных, даже если был определён дальше

При фильтрации по колонке партиционирования будут читаться только файлы в поддиректориях с соответствующим значением колонки:

In [50]:
spark.read.parquet("/tmp/data/df2").where("_1 = '2022-10-30'").explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [_2#375,_3#376L,_1#377] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/df2], PartitionFilters: [isnotnull(_1#377), (_1#377 = 2022-10-30)], PushedFilters: [], ReadSchema: struct<_2:string,_3:bigint>




**COUNT и DISTINCT**

In [51]:
df.count()

13

In [52]:
df.select("product_id").distinct().show()

+----------+
|product_id|
+----------+
|   2346354|
|   8796467|
|   2936764|
|   2563574|
|   1234556|
|   2369867|
|   4573645|
|   3687665|
|   2345765|
|   2345633|
|   5348776|
+----------+



In [53]:
df.select("product_id").distinct().count()

11

In [54]:
df.select("product_id").distinct().count().explain() # у объекта int нет explain

AttributeError: 'int' object has no attribute 'explain'

In [55]:
df.select("product_id").distinct().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[product_id#121L], functions=[])
   +- Exchange hashpartitioning(product_id#121L, 200), ENSURE_REQUIREMENTS, [plan_id=645]
      +- HashAggregate(keys=[product_id#121L], functions=[])
         +- FileScan parquet [product_id#121L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/test_df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:bigint>




In [56]:
df.select("product_id").distinct()

DataFrame[product_id: bigint]

### Ленивое выполнение. Трансформации и действия

Вычисления в Spark ленивые – происходят только тогда, когда нужен их результат.<br/>
Все методы обработки данных над DataFrame делятся на **трансформации** и **действия**: <br/>
- *Трансформации* добавляют новый пункт в план выполнения – вычислений не происходит<br/>
- *Действия* запускают вычисление всего плана, что был до них

**Вопрос: какие методы из тех, что были выше, являются трансформациями, а какие – действиями?**<br/>
*Трансформации:* distinct, filter/where, select, load, orderBy<br/>
*Действия:* count, save, show, [explain]

### Очистка данных

Прочитаем датасет побольше:

In [57]:
ls

07-spark.ipynb         population_by_country_2020.csv
airport-codes_csv.csv  [0m[01;34mtest[0m/
demo.ipynb             [01;34mtest_2[0m/
[01;34mparq[0m/                  wikipedia-iso-country-codes.csv


In [58]:
codes1 = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv("airport-codes_csv.csv")

codes1.repartition(5).write.mode("overwrite").parquet("/tmp/data/airport_codes")

codes = spark.read.parquet("/tmp/data/airport_codes")

codes.show()

                                                                                

+-------+--------------+--------------------+------------+---------+-----------+----------+--------------------+--------+---------+----------+--------------------+
|  ident|          type|                name|elevation_ft|continent|iso_country|iso_region|        municipality|gps_code|iata_code|local_code|         coordinates|
+-------+--------------+--------------------+------------+---------+-----------+----------+--------------------+--------+---------+----------+--------------------+
|   SIYK| small_airport|Fazenda Sorriso M...|        2605|       SA|         BR|     BR-GO|              Urutai|    SIYK|     null|      null|-48.2152786254882...|
|   MI92| small_airport|  Lilienthal Airport|        1250|       NA|         US|     US-MI|       Iron Mountain|    MI92|     null|      MI92|-88.0981979370117...|
|   MI80| small_airport|Wabasis Lake Airport|         892|       NA|         US|     US-MI|          Greenville|    MI80|     null|      MI80|-85.3992004394531...|
|   SNXK| small_

**dropDuplicates**: трансформация, удаляет строки с дублями <br/>
По умолчанию dropDuplicates() = distinct()

In [59]:
codes.count()

57421

In [60]:
codes.dropDuplicates().count()

                                                                                

57421

Параметр *subset* задаёт список колонок для дедупликации. <br/>
Возьмётся по одной *любой* записи на каждую уникальную комбинацию значений.

In [61]:
codes.dropDuplicates(subset=["type", "iso_country"]).count()

855

In [62]:
codes.dropDuplicates(subset=["type", "iso_country"])\
.where("type = 'small_airport' and iso_country = 'US'")\
.show()



+-----+-------------+------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|              name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
| MI92|small_airport|Lilienthal Airport|        1250|       NA|         US|     US-MI|Iron Mountain|    MI92|     null|      MI92|-88.0981979370117...|
+-----+-------------+------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+



                                                                                

**na**: возвращает вспомогательный объект с трансформациями для работы с пустыми значениями [DataFrameNaFunctions](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=dataframenafunctions#pyspark.sql.DataFrameNaFunctions)

In [63]:
drop_any = codes.na.drop() # убрать строки, в которых *какая-либо* из колонок пустая
drop_any.count()

2787

In [64]:
drop_all = codes.na.drop(how='all') # убрать строки, в которых *все* колонки пустые
drop_all.count()

57421

In [65]:
codes\
.na.drop()\
.dropDuplicates(subset=["type", "iso_country"])\
.where("type = 'small_airport' and iso_country = 'US'")\
.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| KFDR|small_airport|Frederick Regiona...|        1258|       NA|         US|     US-OK|   Frederick|    KFDR|      FDR|       FDR|-98.98390198, 34....|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+



А если нужно убрать пустые значения только по одной колонке?

In [66]:
codes.where("iata_code is not null").count()

9225

In [67]:
codes.where("iata_code is not null").show(3)

+-----+--------------+--------------------+------------+---------+-----------+----------+--------------------+--------+---------+----------+--------------------+
|ident|          type|                name|elevation_ft|continent|iso_country|iso_region|        municipality|gps_code|iata_code|local_code|         coordinates|
+-----+--------------+--------------------+------------+---------+-----------+----------+--------------------+--------+---------+----------+--------------------+
| LZZI|medium_airport|     Å½ilina Airport|        1020|       EU|         SK|     SK-ZI|             Å½ilina|    LZZI|      ILZ|      null|18.6135005951, 49...|
|  SNQ| small_airport|San QuintÃ­n Mili...|          68|       NA|         MX|    MX-BCN|Military Camp Num...|    null|      SNQ|      null|  -115.9462, 30.5288|
| KBED|medium_airport|Laurence G Hansco...|         133|       NA|         US|     US-MA|             Bedford|    KBED|      BED|       BED|-71.28900146, 42....|
+-----+--------------+------

### Встроенные и пользовательские функции

Spark обладает достаточно большим набором встроенных функций, доступных в [pyspark.sql.functions](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#module-pyspark.sql.functions)<br/>
Все функции Spark принимают на вход и возвращают Column, а это значит, что вы можете совмещать функции вместе. <br/>
**Функции и колонки в Spark могут быть созданы без привязки к конкретным данным и DF**

In [68]:
import pyspark.sql.functions as F # некоторые методы называются так же, как встроенные в Python функции

In [69]:
split_coords = F.split(F.col("coordinates"), pattern=", ").alias("coords") # возвращает массив 
split_coords

Column<'split(coordinates, , , -1) AS coords'>

In [70]:
name_coords = codes.select(F.col("name"), split_coords)

In [71]:
name_coords.show(3)

+--------------------+--------------------+
|                name|              coords|
+--------------------+--------------------+
|Fazenda Sorriso M...|[-48.215278625488...|
|  Lilienthal Airport|[-88.098197937011...|
|Wabasis Lake Airport|[-85.399200439453...|
+--------------------+--------------------+
only showing top 3 rows



In [72]:
name_coords.show(3, truncate=False)

+---------------------------------+-----------------------------------------+
|name                             |coords                                   |
+---------------------------------+-----------------------------------------+
|Fazenda Sorriso MetÃ¡lico Airport|[-48.21527862548828, -17.456666946411133]|
|Lilienthal Airport               |[-88.09819793701172, 45.932701110839844] |
|Wabasis Lake Airport             |[-85.39920043945312, 43.12839889526367]  |
+---------------------------------+-----------------------------------------+
only showing top 3 rows



**withColumn**: трансформация, которая добавляет новую колонку, вычисляемую через Column

In [73]:
name_rads = name_coords.withColumn("coords_rad", F.struct(
    F.radians(F.col("coords")[0]).alias("lat"),
    F.radians(F.col("coords")[1]).alias("lon")
))

In [74]:
name_rads.printSchema()

root
 |-- name: string (nullable = true)
 |-- coords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- coords_rad: struct (nullable = false)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)



In [75]:
name_rads.show(3, truncate=False)

+---------------------------------+-----------------------------------------+------------------------------------------+
|name                             |coords                                   |coords_rad                                |
+---------------------------------+-----------------------------------------+------------------------------------------+
|Fazenda Sorriso MetÃ¡lico Airport|[-48.21527862548828, -17.456666946411133]|{-0.8415153617812164, -0.3046763146389388}|
|Lilienthal Airport               |[-88.09819793701172, 45.932701110839844] |{-1.5376036190745306, 0.8016768687186121} |
|Wabasis Lake Airport             |[-85.39920043945312, 43.12839889526367]  |{-1.4904972262390455, 0.7527325618358361} |
+---------------------------------+-----------------------------------------+------------------------------------------+
only showing top 3 rows



**expr**: превращает SQL-выражение в Column

In [76]:
name_coords\
.withColumn("coords_rad", F.expr("struct(radians(coords[0]) as lat, radians(coords[1]) as lon)"))\
.show(3, truncate=False)

+---------------------------------+-----------------------------------------+------------------------------------------+
|name                             |coords                                   |coords_rad                                |
+---------------------------------+-----------------------------------------+------------------------------------------+
|Fazenda Sorriso MetÃ¡lico Airport|[-48.21527862548828, -17.456666946411133]|{-0.8415153617812164, -0.3046763146389388}|
|Lilienthal Airport               |[-88.09819793701172, 45.932701110839844] |{-1.5376036190745306, 0.8016768687186121} |
|Wabasis Lake Airport             |[-85.39920043945312, 43.12839889526367]  |{-1.4904972262390455, 0.7527325618358361} |
+---------------------------------+-----------------------------------------+------------------------------------------+
only showing top 3 rows



In [77]:
F.expr("struct(radians(coords[0]) as lat, radians(coords[1]) as lon)")

Column<'struct(radians(coords[0]) AS lat, radians(coords[1]) AS lon)'>

In [78]:
F.struct(
    F.radians(F.col("coords")[0]).alias("lat"),
    F.radians(F.col("coords")[1]).alias("lon")
)

Column<'struct(RADIANS(coords[0]) AS lat, RADIANS(coords[1]) AS lon)'>

Некоторые функции есть в Java/Scala, но нет в Python.<br/> 
Т.к. под капотом Spark всё равно работает на Scala, их можно вызывать, обернув в **java_method**. Это доступно только внутри *expr*.

In [79]:
name_coords\
.withColumn("uuid", F.expr("java_method('java.util.UUID', 'randomUUID')"))\
.show(3)

+--------------------+--------------------+--------------------+
|                name|              coords|                uuid|
+--------------------+--------------------+--------------------+
|Fazenda Sorriso M...|[-48.215278625488...|aee94a9b-fe74-40d...|
|  Lilienthal Airport|[-88.098197937011...|19358847-f66a-494...|
|Wabasis Lake Airport|[-85.399200439453...|e6dd3928-223b-49c...|
+--------------------+--------------------+--------------------+
only showing top 3 rows



Если нужной функции нет среди встроенных или среди Java-методов, можно написать свою **UDF** – User Defined Function. <br/>
**У этого решения есть минусы**:
- Встроенные функции оптимизируются, пользовательские непрозрачны для оптимизатора
- В Python UDF будут передавать данные между JVM и Python, снижая производительность

In [80]:
import math
from pyspark.sql.types import StructType, StructField, DoubleType

def rads(coords: list) -> dict:
    return {
        "lat": float(coords[0]) * math.pi / 180,
        "lon": float(coords[1]) * math.pi / 180
    }

output_type = StructType([
    StructField("lat", DoubleType()),
    StructField("lon", DoubleType())
])

rads_udf = F.udf(lambda c: rads(c), output_type)

In [81]:
name_coords\
.withColumn("coords_rad", rads_udf(name_coords.coords))\
.show(3, truncate=False)

+---------------------------------+-----------------------------------------+------------------------------------------+
|name                             |coords                                   |coords_rad                                |
+---------------------------------+-----------------------------------------+------------------------------------------+
|Fazenda Sorriso MetÃ¡lico Airport|[-48.21527862548828, -17.456666946411133]|{-0.8415153617812164, -0.3046763146389388}|
|Lilienthal Airport               |[-88.09819793701172, 45.932701110839844] |{-1.5376036190745304, 0.8016768687186121} |
|Wabasis Lake Airport             |[-85.39920043945312, 43.12839889526367]  |{-1.4904972262390455, 0.7527325618358361} |
+---------------------------------+-----------------------------------------+------------------------------------------+
only showing top 3 rows



### Группировки, агрегации, оконные функции

Метод [groupBy](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) порождает промежуточный DSL-объект [GroupedData](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.GroupedData), на котором вызывается трансформация agg()

In [82]:
codes.groupBy("iso_country")

<pyspark.sql.group.GroupedData at 0x7f1e5cce4d30>

В agg можно использовать агрегирующие функции из [pyspark.sql.functions](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html?highlight=aggregate#module-pyspark.sql.functions) (помечены как Aggregate function)

In [83]:
codes.groupBy("iso_country").agg(F.count("*").alias("count")).show()

+-----------+-----+
|iso_country|count|
+-----------+-----+
|         DZ|   61|
|         LT|   59|
|         MM|   75|
|         CI|   26|
|         TC|    8|
|         FI|  112|
|         PM|    2|
|         AZ|   35|
|         SC|   16|
|         UA|  195|
|         KI|   22|
|         RO|   60|
|         ZM|  103|
|         SL|   12|
|         NL|  115|
|         SB|   38|
|         LA|   20|
|         BS|   65|
|         BW|  129|
|         MN|   30|
+-----------+-----+
only showing top 20 rows



In [84]:
codes.groupBy(F.substring("iso_country", 0, 1)).agg(F.max("elevation_ft")).show()

+----------------------------+-----------------+
|substring(iso_country, 0, 1)|max(elevation_ft)|
+----------------------------+-----------------+
|                           K|            10200|
|                           F|            11647|
|                           Q|              130|
|                           E|             9649|
|                           T|            11962|
|                           B|            14360|
|                           Y|             7216|
|                           M|            10074|
|                           L|            10400|
|                           U|            29977|
|                           V|             5269|
|                           D|             4518|
|                           O|             6500|
|                           C|            14472|
|                           J|             2940|
|                           Z|             6464|
|                           A|            13000|
|                   

Оконные функции выполняют вычисления над окнами данных без группировки. <br/>
Они используются так же, как встроенные функции. Окно определяется через класс [Window](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.Window)

In [85]:
from pyspark.sql import Window

In [86]:
w = Window.partitionBy("iso_country")

In [87]:
codes\
.withColumn("max_elevation", F.max("elevation_ft").over(w))\
.select("name", "iso_country", "iso_region", "max_elevation")\
.show()

+--------------------+-----------+----------+-------------+
|                name|iso_country|iso_region|max_elevation|
+--------------------+-----------+----------+-------------+
|Andorra la Vella ...|         AD|     AD-07|         3450|
|      CamÃ­ Heliport|         AD|     AD-04|         3450|
|Al Hamra Aux Airport|         AE|     AE-AZ|          869|
|Sharjah Internati...|         AE|     AE-SH|          869|
|Al Mushrif Palace...|         AE|     AE-AZ|          869|
|Al Maktoum Intern...|         AE|     AE-DU|          869|
| Schumacher Heliport|         AE|     AE-DU|          869|
|     Arzanah Airport|         AE|     AE-AZ|          869|
|Sir Bani Yas Airport|         AE|     AE-AZ|          869|
|       KIZAD Airport|         AE|     AE-AZ|          869|
|     Dubai Creek SPB|         AE|     AE-DU|          869|
|       Al Saqr Field|         AE|     AE-RK|          869|
|       Umm Al Quwain|         AE|     AE-UQ|          869|
|Al Ghuwaifat Cust...|         AE|     A

In [88]:
codes\
.withColumn("max_elevation", F.max("elevation_ft").over(w))\
.select("name", "iso_country", "iso_region", "max_elevation")\
.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#479, iso_country#482, iso_region#483, max_elevation#1262]
   +- Window [max(elevation_ft#480) windowspecdefinition(iso_country#482, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_elevation#1262], [iso_country#482]
      +- Sort [iso_country#482 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(iso_country#482, 200), ENSURE_REQUIREMENTS, [plan_id=1568]
            +- FileScan parquet [name#479,elevation_ft#480,iso_country#482,iso_region#483] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,elevation_ft:int,iso_country:string,iso_region:string>




### UNION

Объединение реализуется через метод [union](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.union).
* Не выполняет дедупликацию – одинаковые строки будут дублироваться

In [89]:
ab = spark.createDataFrame([
    ["a", 1],
    ["b", 2]
], schema=StructType([
    StructField("str", StringType()),
    StructField("int", IntegerType())    
]))

ac = spark.createDataFrame([
    ["a", 1],
    ["c", 3]
], schema=StructType([
    StructField("str", StringType()),
    StructField("int", IntegerType())    
]))

ab.union(ac).show()

+---+---+
|str|int|
+---+---+
|  a|  1|
|  b|  2|
|  a|  1|
|  c|  3|
+---+---+



* Резолвит колонки **по порядку**, а не по имени

In [90]:
ab1 = spark.createDataFrame([
    ["a", 1],
    ["b", 2]
], schema=StructType([
    StructField("str", StringType()),
    StructField("int", IntegerType())    
]))

ac1 = spark.createDataFrame([
    [1, "a"],
    [3, "c"]
    
# порядок колонок в DataFrame недетерминирован, так что по сути ac1 эквивалентен ac...    
], schema=StructType([
    StructField("int", IntegerType()),
    StructField("str", StringType())   
]))

#... но не для union
ab1.union(ac1).show()

+---+---+
|str|int|
+---+---+
|  a|  1|
|  b|  2|
|  1|  a|
|  3|  c|
+---+---+



In [91]:
ab1.union(ac1).printSchema()

root
 |-- str: string (nullable = true)
 |-- int: string (nullable = true)



**Чтобы правильно резолвить колонки, лучше всегда использовать [unionByName](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionByName):**

In [92]:
ab1.unionByName(ac1).show()

+---+---+
|str|int|
+---+---+
|  a|  1|
|  b|  2|
|  a|  1|
|  c|  3|
+---+---+



### JOIN

Соединение реализуется через трансформацию [join](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join).
Обычные типы джойна:
- `'inner'`
- `'cross'`
- `'left'` == `'left_outer'`
- `'right'` == `'right_outer'`
- `'full'` == `'outer'` == `'full_outer'`

<img src='https://i.stack.imgur.com/vPPHT.png' width=50% height=auto align=left></img><br/><br/>
Картинка из статьи: http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/

Пусть у нас есть данные по количеству населения в разных странах:

In [94]:
spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv("population_by_country_2020.csv")\
.withColumnRenamed("Country (or dependency)", "country")\
.withColumnRenamed("Population (2020)", "population")\
.select("country", "population")\
.repartition(2)\
.write.mode("overwrite").parquet("/tmp/data/population")

In [95]:
population = spark.read.parquet("/tmp/data/population")
population.show()

+--------------------+----------+
|             country|population|
+--------------------+----------+
|             Albania|   2878420|
|       New Caledonia|    284938|
|            Tanzania|  59368313|
|        South Africa|  59154802|
|              Sweden|  10086531|
|             Tokelau|      1354|
|              France|  65244628|
|            Ethiopia| 114357494|
| Trinidad and Tobago|   1398579|
|            Paraguay|   7114524|
|            Kiribati|    119069|
|           Argentina|  45111229|
|      Faeroe Islands|     48826|
|           Venezuela|  28451828|
|            Portugal|  10202571|
|              Guyana|    785788|
|          Costa Rica|   5084636|
|           Australia|  25439164|
|            Botswana|   2341649|
|Central African R...|   4812256|
+--------------------+----------+
only showing top 20 rows



Мы можем сопоставить население аэропортам через код страны:

In [97]:
spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv("wikipedia-iso-country-codes.csv")\
.withColumnRenamed("English short name lower case", "name")\
.withColumnRenamed("Alpha-2 code", "code_2")\
.withColumnRenamed("Alpha-3 code", "code_3")\
.select("name", "code_2", "code_3")\
.repartition(2)\
.write.mode("overwrite").parquet("/tmp/data/country_codes")

In [98]:
country_codes = spark.read.parquet("/tmp/data/country_codes")
country_codes.show()

+----------------+------+------+
|            name|code_2|code_3|
+----------------+------+------+
|        Mongolia|    MN|   MNG|
|          Tuvalu|    TV|   TUV|
|     South Korea|    KR|   KOR|
|            Oman|    OM|   OMN|
|         Ireland|    IE|   IRL|
|          Russia|    RU|   RUS|
|      Kyrgyzstan|    KG|   KGZ|
|       Venezuela|    VE|   VEN|
|           Ghana|    GH|   GHA|
|    Burkina Faso|    BF|   BFA|
|          Rwanda|    RW|   RWA|
|          Guinea|    GN|   GIN|
|     Netherlands|    NL|   NLD|
|           Qatar|    QA|   QAT|
|      Guadeloupe|    GP|   GLP|
|         Andorra|    AD|   AND|
|        Cambodia|    KH|   KHM|
|Papua New Guinea|    PG|   PNG|
|        Suriname|    SR|   SUR|
|         Nigeria|    NG|   NGA|
+----------------+------+------+
only showing top 20 rows



In [99]:
airports = codes.join(country_codes, F.col("iso_country") == F.col("code_2")) # по умолчанию inner join

In [100]:
airports.show()

+-------+--------------+--------------------+------------+---------+-----------+----------+--------------------+--------+---------+----------+--------------------+-------------+------+------+
|  ident|          type|                name|elevation_ft|continent|iso_country|iso_region|        municipality|gps_code|iata_code|local_code|         coordinates|         name|code_2|code_3|
+-------+--------------+--------------------+------------+---------+-----------+----------+--------------------+--------+---------+----------+--------------------+-------------+------+------+
|   SIYK| small_airport|Fazenda Sorriso M...|        2605|       SA|         BR|     BR-GO|              Urutai|    SIYK|     null|      null|-48.2152786254882...|       Brazil|    BR|   BRA|
|   MI92| small_airport|  Lilienthal Airport|        1250|       NA|         US|     US-MI|       Iron Mountain|    MI92|     null|      MI92|-88.0981979370117...|United States|    US|   USA|
|   MI80| small_airport|Wabasis Lake Air

^ Почему Spark не посчитал одинаковые имена колонок ошибкой? <br/>
Потому что он хранит всю цепочку преобразований датасетов и внутри всё ещё видит, что это колонки от разных DF

In [101]:
airports.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [iso_country#482], [code_2#1476], Inner, BuildRight, false
   :- Filter isnotnull(iso_country#482)
   :  +- FileScan parquet [ident#477,type#478,name#479,elevation_ft#480,continent#481,iso_country#482,iso_region#483,municipality#484,gps_code#485,iata_code#486,local_code#487,coordinates#488] Batched: true, DataFilters: [isnotnull(iso_country#482)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [IsNotNull(iso_country)], ReadSchema: struct<ident:string,type:string,name:string,elevation_ft:int,continent:string,iso_country:string,...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=1907]
      +- Filter isnotnull(code_2#1476)
         +- FileScan parquet [name#1475,code_2#1476,code_3#1477] Batched: true, DataFilters: [isnotnull(code_2#1476)], Format: Parquet, Location: InMemoryFileIndex(1 pa

In [102]:
airports.select("name").show(1)

AnalysisException: Reference 'name' is ambiguous, could be: name, name.

In [103]:
airports.select(codes.name).show(1)

+--------------------+
|                name|
+--------------------+
|Fazenda Sorriso M...|
+--------------------+
only showing top 1 row



Трансформация **withColumnRenamed** переименовывает колонки без необходимости делать `withColumn(newName, col(oldName))`

In [104]:
airports = codes\
.withColumnRenamed("name", "airport_name")\
.join(country_codes.withColumnRenamed("name", "country"),
     F.col("iso_country") == F.col("code_2"),
     "left")

In [105]:
airports.select("airport_name", "country", "iso_country", "code_2").show(3)

+--------------------+-------------+-----------+------+
|        airport_name|      country|iso_country|code_2|
+--------------------+-------------+-----------+------+
|Fazenda Sorriso M...|       Brazil|         BR|    BR|
|  Lilienthal Airport|United States|         US|    US|
|Wabasis Lake Airport|United States|         US|    US|
+--------------------+-------------+-----------+------+
only showing top 3 rows



In [106]:
airports_pop = airports.join(population, on=["country"], how="left")

airports_pop.show(3)

+-------------+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+------+------+----------+
|      country|ident|         type|        airport_name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|code_2|code_3|population|
+-------------+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+------+------+----------+
|       Brazil| SIYK|small_airport|Fazenda Sorriso M...|        2605|       SA|         BR|     BR-GO|       Urutai|    SIYK|     null|      null|-48.2152786254882...|    BR|   BRA| 212253150|
|United States| MI92|small_airport|  Lilienthal Airport|        1250|       NA|         US|     US-MI|Iron Mountain|    MI92|     null|      MI92|-88.0981979370117...|    US|   USA| 330610570|
|United States| MI80|small_airport|

**Q: Сколько в крупных странах маленьких аэропортов?**

In [107]:
ans = airports_pop\
.where("population > 100000000")\
.groupBy("country")\
.agg(F.count(F.when(F.col("type") == 'small_airport', F.lit(1))).alias("count"))

In [108]:
ans.show()

+-------------+-----+
|      country|count|
+-------------+-----+
|       Russia|  554|
|  Philippines|  102|
|United States|13559|
|        India|  158|
|        China|   97|
|      Nigeria|   19|
|   Bangladesh|    6|
|       Mexico|  964|
|    Indonesia|  399|
|     Ethiopia|   43|
|       Brazil| 3489|
|        Japan|   54|
|        Egypt|   41|
|     Pakistan|   80|
+-------------+-----+



In [109]:
ans.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[country#1604], functions=[count(CASE WHEN (type#478 = small_airport) THEN 1 END)])
   +- Exchange hashpartitioning(country#1604, 200), ENSURE_REQUIREMENTS, [plan_id=2557]
      +- HashAggregate(keys=[country#1604], functions=[partial_count(CASE WHEN (type#478 = small_airport) THEN 1 END)])
         +- Project [country#1604, type#478]
            +- BroadcastHashJoin [country#1604], [country#1409], Inner, BuildRight, false
               :- Project [type#478, country#1604]
               :  +- BroadcastHashJoin [iso_country#482], [code_2#1476], Inner, BuildRight, false
               :     :- FileScan parquet [type#478,iso_country#482] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<type:string,iso_country:string>
               :     +- BroadcastExchange HashedRelationBroadcastMode

^ Spark "проталкивает" предикаты через джойны ближе к чтению

**Q: Сколько есть маленьких стран, где *нет* аэропортов?**

Дополнительные, "ненастоящие" типы джойна – фильтрации одной таблицы по другой:
- `'left_semi'`: оставляет записи из левой таблицы, для которых *есть* соответствие в правой
- `'left_anti'`: оставляет записи из левой таблицы, для которых *нет* соответствия в правой

In [110]:
ans2 = population\
.where("population <= 100000000")\
.join(airports, "country", "left_anti")

In [111]:
ans2.count()

34

In [112]:
ans2.show(3)

+--------------------+----------+
|             country|population|
+--------------------+----------+
|  State of Palestine|   5076280|
|             Tokelau|      1354|
|Czech Republic (C...|  10705012|
+--------------------+----------+
only showing top 3 rows



In [113]:
airports_pop.where("country = 'Cabo Verde'").show(1)

+-------+-----+----+------------+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+------+------+----------+
|country|ident|type|airport_name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|code_2|code_3|population|
+-------+-----+----+------------+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+------+------+----------+
+-------+-----+----+------------+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+------+------+----------+



In [114]:
ans2.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [country#1409], [country#1604], LeftAnti
   :- Sort [country#1409 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(country#1409, 200), ENSURE_REQUIREMENTS, [plan_id=3291]
   :     +- Filter (isnotnull(population#1410) AND (population#1410 <= 100000000))
   :        +- FileScan parquet [country#1409,population#1410] Batched: true, DataFilters: [isnotnull(population#1410), (population#1410 <= 100000000)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/population], PartitionFilters: [], PushedFilters: [IsNotNull(population), LessThanOrEqual(population,100000000)], ReadSchema: struct<country:string,population:int>
   +- Sort [country#1604 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(country#1604, 200), ENSURE_REQUIREMENTS, [plan_id=3292]
         +- Project [country#1604]
            +- BroadcastHashJoin [iso_country#482], [code_2#1476], Inner, BuildRight, false
    

**^ Мы заново пересчитываем JOIN codes и country_codes. Почему?**<br/><br/>
Ответ: join – трансформация

### Кэширование и контрольные точки

Метод [persist(storageLevel)](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.persist) позволяет сохранить данные в результате цепочки преобразований и не пересчитывать заново.<br/>
Сохранять можно в память *экзекьюторов* и/или на диск, с репликацией или без, и даже в off-heap память: см. [StorageLevel](https://spark.apache.org/docs/2.3.2/api/python/pyspark.html?highlight=storagelevel#pyspark.StorageLevel)

In [115]:
# По умолчанию MEMORY_AND_DISK: сохраняем в память, при заполнении памяти сбрасываем на диск, без репликации

ap_persisted = airports.persist() 
ap_persisted

DataFrame[ident: string, type: string, airport_name: string, elevation_ft: int, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string, country: string, code_2: string, code_3: string]

In [116]:
airports.cache() # эквивалентно persist без параметров

23/01/12 11:39:28 WARN CacheManager: Asked to cache already cached data.


DataFrame[ident: string, type: string, airport_name: string, elevation_ft: int, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string, country: string, code_2: string, code_3: string]

In [117]:
from pyspark import StorageLevel
airports.persist(storageLevel=StorageLevel.DISK_ONLY_2) # сразу сохранять на диск с фактором репликации 2

23/01/12 11:39:31 WARN CacheManager: Asked to cache already cached data.


DataFrame[ident: string, type: string, airport_name: string, elevation_ft: int, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string, country: string, code_2: string, code_3: string]

Persist – это *трансформация*: она помечает узел в плане так, что при следующем *действии* он будет закэширован для всех потомков.

In [118]:
ap_persisted.select("airport_name", "elevation_ft").show(3)

[Stage 158:>                                                        (0 + 1) / 1]

+--------------------+------------+
|        airport_name|elevation_ft|
+--------------------+------------+
|Fazenda Sorriso M...|        2605|
|  Lilienthal Airport|        1250|
|Wabasis Lake Airport|         892|
+--------------------+------------+
only showing top 3 rows



                                                                                

In [119]:
ap_persisted.select("airport_name", "elevation_ft").explain()

== Physical Plan ==
InMemoryTableScan [airport_name#1591, elevation_ft#480]
   +- InMemoryRelation [ident#477, type#478, airport_name#1591, elevation_ft#480, continent#481, iso_country#482, iso_region#483, municipality#484, gps_code#485, iata_code#486, local_code#487, coordinates#488, country#1604, code_2#1476, code_3#1477], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) BroadcastHashJoin [iso_country#482], [code_2#1476], LeftOuter, BuildRight, false
            :- *(2) Project [ident#477, type#478, name#479 AS airport_name#1591, elevation_ft#480, continent#481, iso_country#482, iso_region#483, municipality#484, gps_code#485, iata_code#486, local_code#487, coordinates#488]
            :  +- *(2) ColumnarToRow
            :     +- FileScan parquet [ident#477,type#478,name#479,elevation_ft#480,continent#481,iso_country#482,iso_region#483,municipality#484,gps_code#485,iata_code#486,local_code#487,coordinates#488] Batched: true, DataFilters: [], Format: Parquet, Loca

^ Spark сохраняет план вычисления вместе с данными: если они будут вытеснены, их можно будет пересчитать

Если план слишком большой, он будет долго обрабатываться и может даже не поместиться в память.<br/>
Метод [checkpoint](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.checkpoint) сбрасывает посчитанные данные **на HDFS** (или в локальную файловую систему для standalone mode) и **отбрасывает план**.<br/>
Checkpoint – это *действие*.

In [120]:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints") # обязательно нужно указать в контексте директорию

In [121]:
ap_checkpoint = airports.checkpoint()

                                                                                

In [122]:
ls /tmp/checkpoints

[0m[01;34m0be31925-df12-46c4-b121-208dd1759170[0m/


In [123]:
ls /tmp/checkpoints/2de858f1-b4b8-4b7e-a89f-62120d53fd34

ls: cannot access '/tmp/checkpoints/2de858f1-b4b8-4b7e-a89f-62120d53fd34': No such file or directory


In [124]:
ap_checkpoint.select("airport_name", "elevation_ft").show(3)

+--------------------+------------+
|        airport_name|elevation_ft|
+--------------------+------------+
|Fazenda Sorriso M...|        2605|
|  Lilienthal Airport|        1250|
|Wabasis Lake Airport|         892|
+--------------------+------------+
only showing top 3 rows



In [125]:
ap_checkpoint.select("airport_name", "elevation_ft").explain()

== Physical Plan ==
*(1) Project [airport_name#1591, elevation_ft#480]
+- *(1) Scan ExistingRDD[ident#477,type#478,airport_name#1591,elevation_ft#480,continent#481,iso_country#482,iso_region#483,municipality#484,gps_code#485,iata_code#486,local_code#487,coordinates#488,country#1604,code_2#1476,code_3#1477]




In [126]:
# Убирает DF из кэша
ap_persisted.unpersist()

DataFrame[ident: string, type: string, airport_name: string, elevation_ft: int, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string, country: string, code_2: string, code_3: string]

| | Без материализации | Persist  | Checkpoint  | 
|---|---|---|---|
| **Пересчитывается**  | Каждый раз  | Если материализация потеряна  | Не пересчитывается  |
| **Промежуточные данные**  |  Не сохраняются |  Сохраняются в памяти/на диске | Сохраняются в файловой системе  |
| **Оптимизации**  | Меняют план как угодно  | Не меняют план до материализации  | Не меняют план до материализации  |

### Shuffle и Broadcast

**Вопрос: как сджойнить два файла с разных узлов? Как сопоставить нужные ключи?**<br/>
**Ответы:**
1. хэшируем ключи, прикапываем где-то (на отдельной ноде), потом сопоставляем
2. партиционировать сразу по одному ключу (bucketing)
3. залить один файл к другому
4. прочитать ключи, обменяться частями
5. перемешать между нодами ключи

**Shuffle** – операция разбиения данных на каждом экзекьюторе на N партиций по (чаще всего) хэшу от ключа (*Shuffle Write*) и обмена этими партициями между экзекьюторами (*Shuffle Read*). По умолчанию он нужен тогда, когда результат зависит от данных на других экзекьюторах (*широкие трансформации*). <br/>
Широкие/узкие трансформации: https://sauravomar01.medium.com/wide-vs-narrow-dependencies-in-apache-spark-2cd33bf7ed7d <br/>
Погружение в shuffle: https://medium.com/@philipp.brunenberg/understanding-apache-spark-shuffle-85644d90c8c6 <br/><br/>
Самое главное: **shuffle – это долго и затратно**.

In [127]:
spark.conf.get("spark.sql.shuffle.partitions") # то самое N

'200'

**Вопрос: какая ещё трансформация в этом уроке порождает шаффл?**<br/>
**Ответы:** groupBy, orderBy, оконные функции

In [128]:
airports.sort("type").select("type").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [type#478 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(type#478 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=3412]
      +- Project [type#478]
         +- BroadcastHashJoin [iso_country#482], [code_2#1476], LeftOuter, BuildRight, false
            :- FileScan parquet [type#478,iso_country#482] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<type:string,iso_country:string>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=3408]
               +- Filter isnotnull(code_2#1476)
                  +- FileScan parquet [code_2#1476] Batched: true, DataFilters: [isnotnull(code_2#1476)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/country_codes], PartitionFilters: [], PushedFilters: [IsNotNull(code_2)],

In [129]:
airports.orderBy("type").select("type").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [type#478 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(type#478 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=3445]
      +- Project [type#478]
         +- BroadcastHashJoin [iso_country#482], [code_2#1476], LeftOuter, BuildRight, false
            :- FileScan parquet [type#478,iso_country#482] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<type:string,iso_country:string>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=3441]
               +- Filter isnotnull(code_2#1476)
                  +- FileScan parquet [code_2#1476] Batched: true, DataFilters: [isnotnull(code_2#1476)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/country_codes], PartitionFilters: [], PushedFilters: [IsNotNull(code_2)],

In [130]:
# здесь count – это эквивалент agg(F.count("*"))
codes.groupBy("iso_country").count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[iso_country#482], functions=[count(1)])
   +- Exchange hashpartitioning(iso_country#482, 200), ENSURE_REQUIREMENTS, [plan_id=3458]
      +- HashAggregate(keys=[iso_country#482], functions=[partial_count(1)])
         +- FileScan parquet [iso_country#482] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<iso_country:string>




In [131]:
codes.groupBy("iso_country").count().show(3)

+-----------+-----+
|iso_country|count|
+-----------+-----+
|         DZ|   61|
|         LT|   59|
|         MM|   75|
+-----------+-----+
only showing top 3 rows



In [132]:
codes.groupBy("iso_country").count()

DataFrame[iso_country: string, count: bigint]

In [133]:
codes.groupBy("iso_country").count().count()

243

**Broadcast** – операция рассылания данных на все экзекьюторы. Эти данные будут сохранены в памяти каждого процесса для быстрого доступа. <br/>
В случае с JOIN можно бродкастить *датафрейм* – если он достаточно мал, чтобы поместиться в память экзекьютора (и ещё на обработку осталось).

In [134]:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold") # это верхняя граница размера DF в байтах

'10485760b'

In [135]:
!echo 26214400 | numfmt --to=iec

numfmt: invalid suffix in input: ‘10485760b’


In [137]:
join = codes.join(country_codes, F.col("iso_country") == F.col("code_2"))

In [138]:
join.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [iso_country#482], [code_2#1476], Inner, BuildRight, false
   :- Filter isnotnull(iso_country#482)
   :  +- FileScan parquet [ident#477,type#478,name#479,elevation_ft#480,continent#481,iso_country#482,iso_region#483,municipality#484,gps_code#485,iata_code#486,local_code#487,coordinates#488] Batched: true, DataFilters: [isnotnull(iso_country#482)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [IsNotNull(iso_country)], ReadSchema: struct<ident:string,type:string,name:string,elevation_ft:int,continent:string,iso_country:string,...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=3621]
      +- Filter isnotnull(code_2#1476)
         +- FileScan parquet [name#1475,code_2#1476,code_3#1477] Batched: true, DataFilters: [isnotnull(code_2#1476)], Format: Parquet, Location: InMemoryFileIndex(1 pa

In [139]:
join.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+-------------+------+------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|         name|code_2|code_3|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+-------------+------+------+
| SIYK|small_airport|Fazenda Sorriso M...|        2605|       SA|         BR|     BR-GO|       Urutai|    SIYK|     null|      null|-48.2152786254882...|       Brazil|    BR|   BRA|
| MI92|small_airport|  Lilienthal Airport|        1250|       NA|         US|     US-MI|Iron Mountain|    MI92|     null|      MI92|-88.0981979370117...|United States|    US|   USA|
| MI80|small_airport|Wabasis Lake Airport|         892|       NA|         US|     US-MI|  

In [140]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") # выключает автоматический Broadcast Hash Join

In [141]:
# Перезапускаем сессию для применения
spark.stop()
spark = SparkSession.builder\
.appName("Apache Spark Lecture")\
.config("spark.sql.autoBroadcastJoinThreshold", "-1")\
.getOrCreate()

spark

In [142]:
# С новой сессией все датафреймы нужно создавать заново
codes = spark.read.parquet("/tmp/data/airport_codes")
country_codes = spark.read.parquet("/tmp/data/country_codes")
join = codes.join(country_codes, F.col("iso_country") == F.col("code_2"))

In [143]:
join.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [iso_country#2980], [code_2#3000], Inner
   :- Sort [iso_country#2980 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(iso_country#2980, 200), ENSURE_REQUIREMENTS, [plan_id=3731]
   :     +- Filter isnotnull(iso_country#2980)
   :        +- FileScan parquet [ident#2975,type#2976,name#2977,elevation_ft#2978,continent#2979,iso_country#2980,iso_region#2981,municipality#2982,gps_code#2983,iata_code#2984,local_code#2985,coordinates#2986] Batched: true, DataFilters: [isnotnull(iso_country#2980)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/data/airport_codes], PartitionFilters: [], PushedFilters: [IsNotNull(iso_country)], ReadSchema: struct<ident:string,type:string,name:string,elevation_ft:int,continent:string,iso_country:string,...
   +- Sort [code_2#3000 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(code_2#3000, 200), ENSURE_REQUIREMENTS, [plan_id=3732]
         +- Filt

In [144]:
join.show(3)

+-------+--------------+--------------------+------------+---------+-----------+----------+----------------+--------+---------+----------+-------------------+--------+------+------+
|  ident|          type|                name|elevation_ft|continent|iso_country|iso_region|    municipality|gps_code|iata_code|local_code|        coordinates|    name|code_2|code_3|
+-------+--------------+--------------------+------------+---------+-----------+----------+----------------+--------+---------+----------+-------------------+--------+------+------+
| AD-ALV|      heliport|Andorra la Vella ...|        3450|       EU|         AD|     AD-07|Andorra La Vella|    null|      ALV|      null|1.533551, 42.511174| Andorra|    AD|   AND|
|AD-0001|      heliport|      CamÃ­ Heliport|        null|       EU|         AD|     AD-04|      La Massana|    null|     null|      null| 1.51916, 42.546257| Andorra|    AD|   AND|
|   TQPF|medium_airport|Clayton J Lloyd I...|         127|       NA|         AI|    AI-U-A