<img align="right" width="200" height="200" src="https://static.tildacdn.com/tild6236-6337-4339-b337-313363643735/new_logo.png">

# Spark Dataframes III
**Андрей Титов**  
tenke.iu8@gmail.com  

## На этом занятии
+ Обзор источников данных
+ Текстовые форматы txt, csv, json
+ Parquet и ORC
+ Elastic
+ Cassandra
+ PostgreSQL

## Обзор источников данных
Spark - это платформа для **обработки** распределенных данных. Она не отвечает за хранение данных и не завязана на какую-либо БД или формат хранения, что позволяет разработать коннектор для работы с любым источником. Часть распространенных источников доступна "из коробки", часть - в виде сторонних библиотек. 

На текущий момент Spark DF API позволяет работать (читать и писать) с большим набором источников:
+ Текстовые файлы:
  - [json](https://spark.apache.org/docs/latest/sql-data-sources-json.html)
  - text
  - [csv](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv)
+ Бинарные файлы:
  - [orc](https://spark.apache.org/docs/latest/sql-data-sources-orc.html)
  - [parquet](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)
  - [delta](https://docs.delta.io/latest/quick-start.html)
+ Базы данных
  - [elastic](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql)
  - [cassandra](https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md)
  - [jdbc](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)
  - [redis](https://github.com/RedisLabs/spark-redis/blob/master/doc/dataframe.md)
  - [mongo](https://docs.mongodb.com/spark-connector/master/scala-api/)
+ Стриминг системы
  - [kafka](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)

Для текстовых файлов поддерживаются различные кодеки сжатия (например `lzo`, `snappy`, `gzip`)

### Добавление поддержки
Чтобы добавить поддержку источника в проект, необходимо:
+ найти нужный пакет на https://mvnrepository.com
 - выбрать актуальную версию для `Scala 2.11`
 - скачать jar или скопировать команду для нужной системы сборки 
+ добавить зависимость в `libraryDependencies` в файле `build.sbt`  
```libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-20" % "7.7.0"```
+ добавить зависимость в приложение одним из способов:
  - добавить зависимость в **spark-submit**:  
  ```spark-submit --packages org.elasticsearch:elasticsearch-spark-20_2.11:7.7.0```
  - добавить jar файл в **spark-submit**:  
  ```spark-submit --jars /path/to/elasticsearch-spark-20_2.11-7.7.0.jar```
  - добавить зависимость в **spark-defaults.conf**:  
  ```spark.jars.packages org.elasticsearch:elasticsearch-spark-20_2.11:7.7.0```
  - добавить jar файл в **spark-defaults.conf**:  
  ```spark.jars /path/to/elasticsearch-spark-20_2.11-7.7.0.jar```
  - в коде через [`spark.sparkContext.addJar`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@addJar(path:String):Unit)
  
### Использование в коде
Конфиги источника задаются одним из способов:
- через **spark-submit**:  
```spark-submit --conf spark.es.nodes=localhost:9200```
- в **spark-defaults.conf**:  
```spark.es.nodes localhost:9200```
- в коде через **SparkSession**:
  + ```spark.conf.set("spark.es.nodes", "localhost:9200")```
- в коде при чтении:  
  + ```val df = spark.read.format("elastic").option("es.nodes", "localhost:9200")...```
  + ```val df = spark.read.format("elastic").options(Map("es.nodes" -> "localhost:9200"))...```
- в коде при записи:  
  + ```df.write.format("elastic").option("es.nodes", "localhost:9200")...```
  + ```df.write.format("elastic").options(Map("es.nodes" -> "localhost:9200"))...```
  
### Выводы:
- Spark позволяет работать с болшим количеством источников
- Поддержка источника всегда добавлеяется на уровне JVM (даже для pyspark) путем добавления в `java classpath` нужного класса
- Добавить поддержку источника можно по-разному, однако в большинстве случаев следует избегать "хардкода"

## Текстовые форматы

Spark позволяет хранить данные в текстовом виде в форматах `text`, `json`, `csv`
- `json` - JSON строки (не массив JSON документов, а именно раздельные строки, разделенные `\n`)  
- `csv` - плоские данные с разделителем  
- `text` просто текстовые строки, вычитываются как DF с единственной колонкой `value: String`  

### Преимущества:
- простота интеграции
- поддержка партиционирования и сжатия

### Недостатки:
- отсутствие оптимизаций 
- низкая скорость чтения сжатых данных
- слабая типизация

Прочитаем датасет [Airport Codes](https://datahub.io/core/airport-codes):

In [1]:
import sys.process._

println("ls -al /tmp/datasets/airport-codes.csv".!!)

-rw-r--r--  1 t3nq  wheel  6117301 Feb  7 18:03 /tmp/datasets/airport-codes.csv



In [4]:
val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("/tmp/datasets/airport-codes.csv")
// val airports = 
//     spark.read
//         .format("csv")
//         .option("path", "/tmp/datasets/airport-codes.csv")
//         .options(csvOptions).load()

airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                 

csvOptions = Map(header -> true, inferSchema -> true)
airports = [ident: string, type: string ... 10 more fields]


lastException: Throwable = null


[ident: string, type: string ... 10 more fields]

In [3]:
airports.rdd.getNumPartitions

2

Запишем его в формате `csv`. Запись данных происходит в директорию, внутри которой будут файлы с данными. Это свойство является общим для всех файловых форматов в Spark

In [5]:
// airports.write.mode("overwrite").csv("/tmp/datasets/airports-2.csv")
airports.write.format("csv").mode("overwrite").option("path", "/tmp/datasets/airports-2.csv").save

In [6]:
println("ls -al /tmp/datasets/airports-2.csv".!!)

total 12360
drwxr-xr-x  8 t3nq  wheel      256 Feb  7 19:34 .
drwxr-xr-x  5 t3nq  wheel      160 Feb  7 19:34 ..
-rw-r--r--  1 t3nq  wheel        8 Feb  7 19:34 ._SUCCESS.crc
-rw-r--r--  1 t3nq  wheel    33540 Feb  7 19:34 .part-00000-1774eeb3-dc7b-4ddf-aab8-fa4a1e84ed23-c000.csv.crc
-rw-r--r--  1 t3nq  wheel    15440 Feb  7 19:34 .part-00001-1774eeb3-dc7b-4ddf-aab8-fa4a1e84ed23-c000.csv.crc
-rw-r--r--  1 t3nq  wheel        0 Feb  7 19:34 _SUCCESS
-rw-r--r--  1 t3nq  wheel  4291982 Feb  7 19:34 part-00000-1774eeb3-dc7b-4ddf-aab8-fa4a1e84ed23-c000.csv
-rw-r--r--  1 t3nq  wheel  1974943 Feb  7 19:34 part-00001-1774eeb3-dc7b-4ddf-aab8-fa4a1e84ed23-c000.csv



Если мы попытаемся прочитать его с помощью `spark.read`, используя старый код, получим ошибку - в качестве схемы Spark взял одну из строк, содержащую данные.

In [3]:
val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("/tmp/datasets/airports-2.csv")
airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

lastException = null


org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/datasets/airports-2.csv;

Поищем шапку в сырых данных - ее там не будет:

In [8]:
spark.read.text("/tmp/datasets/airports-2.csv").filter('value contains "elevation_ft").count

0

Если прочитать с `header=false`, названия колонок будут автоматически сгенерированы:

In [9]:
val csvOptions = Map("header" -> "false", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("/tmp/datasets/airports-2.csv")
airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)

-RECORD 0----------------------------------
 _c0  | 00A                                
 _c1  | heliport                           
 _c2  | Total Rf Heliport                  
 _c3  | 11                                 
 _c4  | NA                                 
 _c5  | US                                 
 _c6  | US-PA                              
 _c7  | Bensalem                           
 _c8  | 00A                                
 _c9  | null                               
 _c10 | 00A                                
 _c11 | -74.93360137939453, 40.07080078125 

csvOptions = Map(header -> false, inferSchema -> true)
airports = [_c0: string, _c1: string ... 10 more fields]


[_c0: string, _c1: string ... 10 more fields]

Имея шапку в виде строки, мы можем создать схему самостоятельно:

In [10]:
import org.apache.spark.sql.types._

val firstLine = "head -n 1 /tmp/datasets/airport-codes.csv".!!
val schema = StructType(firstLine.split(",", -1).map(x => StructField(x, StringType)))
val csvOptions = Map("header" -> "false", "inferSchema" -> "true")
val airports = spark.read.schema(schema).options(csvOptions).csv("/tmp/datasets/airports-2.csv")
airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates
: string (nullable = true)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                 

firstLine = 
schema = 
csvOptions = Map(header -> false, inf...


"ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
"
StructType(StructField(ident,StringType,true), StructField(type,StringType,true), StructField(name,StringType,true), StructField(elevation_ft,StringType,true), StructField(continent,StringType,true), StructField(iso_country,StringType,true), StructField(iso_region,StringType,true), StructField(municipality,StringType,true), StructField(gps_code,StringType,true), StructField(iata_code,StringType,true), StructField(local_code,StringType,true), StructField(coordinates
,StringType,true))


Map(header -> false, inf...

Сохраним данные в `csv` с включенной компрессией `gzip`:

In [14]:
airports.repartition(1).write.mode("overwrite").option("codec", "gzip").csv("/tmp/datasets/airports-3.csv")

In [18]:
println("ls -alh /tmp/datasets/airports-3.csv".!!)

total 31144
drwxr-xr-x  8 t3nq  wheel   256B Feb  7 19:42 .
drwxr-xr-x  6 t3nq  wheel   192B Feb  7 19:40 ..
-rw-r--r--  1 t3nq  wheel     8B Feb  7 19:42 ._SUCCESS.crc
-rw-r--r--  1 t3nq  wheel    16K Feb  7 19:40 .part-00000-82cd0df5-8903-4ef2-9a12-94574e8082f5-c000.csv.gz.crc
-rw-r--r--  1 t3nq  wheel   104K Feb  7 19:42 .part-00000-98ddd864-6e94-46e6-9e78-3d30b4401e7b-c000.json.crc
-rw-r--r--  1 t3nq  wheel     0B Feb  7 19:42 _SUCCESS
-rw-r--r--  1 t3nq  wheel   2.1M Feb  7 19:40 part-00000-82cd0df5-8903-4ef2-9a12-94574e8082f5-c000.csv.gz
-rw-r--r--  1 t3nq  wheel    13M Feb  7 19:42 part-00000-98ddd864-6e94-46e6-9e78-3d30b4401e7b-c000.json



In [16]:
val df = spark.read.format("csv").option("path", "/tmp/datasets/airports-3.csv").load
df.rdd.getNumPartitions

df = [_c0: string, _c1: string ... 10 more fields]


1

In [17]:
airports
    .repartition(1).write.mode("append").option("codec", "gzip").json("/tmp/datasets/airports-3.csv")

Данные стали занимать меньше места, но у этого решения есть существенный минус - при чтении каждый сжатый файл превращается ровно в 1 партицию в DF. При работе с большими датасетами это означает:
- если файлов мало и они большие, то воркерам может не хватить памяти для их чтения (тк один сжатый файл нельзя разбить на несколько партиций
- если файлов много и они маленькие - мы получаем увеличенный расход памяти в heap HDFS NameNode (память расходуется пропорционально количеству файлов на HDFS из расчета 1 ГБ памяти на 1 000 000 файлов)

Сохраним датасет в формате `json` с партиционирование по колонкам `iso_region` и `iso_country`:

In [19]:
airports.write.mode("overwrite").partitionBy("iso_region", "iso_country").json("/tmp/datasets/airports-1.json")

In [22]:
println("ls -alh /tmp/datasets/airports-1.json/iso_region=AD-04/iso_country=AD".!!)

total 16
drwxr-xr-x  4 t3nq  wheel   128B Feb  7 19:44 .
drwxr-xr-x  3 t3nq  wheel    96B Feb  7 19:44 ..
-rw-r--r--  1 t3nq  wheel    12B Feb  7 19:44 .part-00000-2114957a-21b2-4db9-a4f0-fd1ca52c7c98.c000.json.crc
-rw-r--r--  1 t3nq  wheel   144B Feb  7 19:44 part-00000-2114957a-21b2-4db9-a4f0-fd1ca52c7c98.c000.json



In [30]:
import org.apache.spark.sql.types._

val mySchemaString = """`continent` STRING,`coordinates` STRING,`elevation_ft` BIGINT,`gps_code` STRING,`iata_code` STRING,`ident` STRING,`local_code` STRING,`municipality` STRING,`name` STRING,`type` STRING,`iso_region` STRING,`iso_country` STRING"""

val mySchema = DataType.fromDDL(mySchemaString).asInstanceOf[StructType]

mySchemaString = `continent` STRING,`coordinates` STRING,`elevation_ft` BIGINT,`gps_code` STRING,`iata_code` STRING,`ident` STRING,`local_code` STRING,`municipality` STRING,`name` STRING,`type` STRING,`iso_region` STRING,`iso_country` STRING
mySchema = StructType(StructField(continent,StringType,true), StructField(coordinates,StringType,true), StructField(elevation_ft,LongType,true), StructField(gps_code,StringType,true), StructField(iata_code,StringType,true), StructField(ident,StringType,true), StructField(local_code,StringType,true), StructField(municipality,StringType,true), StructField(name,StringType,true), StructField(type,StringType,true), StructField(iso_region,StringType,true), StructField(iso_c...


StructType(StructField(continent,StringType,true), StructField(coordinates,StringType,true), StructField(elevation_ft,LongType,true), StructField(gps_code,StringType,true), StructField(iata_code,StringType,true), StructField(ident,StringType,true), StructField(local_code,StringType,true), StructField(municipality,StringType,true), StructField(name,StringType,true), StructField(type,StringType,true), StructField(iso_region,StringType,true), StructField(iso_c...

In [39]:
val df = spark.read.schema(mySchema).json("/tmp/datasets/airports-1.json")
df.filter('iso_region === "AD-04" and 'iso_country === "AD").explain
df.rdd.getNumPartitions

== Physical Plan ==
*(1) FileScan json [continent#875,coordinates#876,elevation_ft#877L,gps_code#878,iata_code#879,ident#880,local_code#881,municipality#882,name#883,type#884,iso_region#885,iso_country#886] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/tmp/datasets/airports-1.json], PartitionCount: 1, PartitionFilters: [isnotnull(iso_region#885), isnotnull(iso_country#886), (iso_region#885 = AD-04), (iso_country#88..., PushedFilters: [], ReadSchema: struct<continent:string,coordinates:string,elevation_ft:bigint,gps_code:string,iata_code:string,i...


df = [continent: string, coordinates: string ... 10 more fields]


105

In [37]:
df.write.json("/tmp/foo/bar")

In [38]:
"ls -alh /tmp/foo/bar".!!

"total 28032
drwxr-xr-x  214 t3nq  wheel   6.7K Feb  7 19:53 .
drwxr-xr-x    3 t3nq  wheel    96B Feb  7 19:53 ..
-rw-r--r--    1 t3nq  wheel     8B Feb  7 19:53 ._SUCCESS.crc
-rw-r--r--    1 t3nq  wheel    38K Feb  7 19:53 .part-00000-9e8e9bdd-c629-4d3f-98df-fa72e4948cee-c000.json.crc
-rw-r--r--    1 t3nq  wheel    16K Feb  7 19:53 .part-00001-9e8e9bdd-c629-4d3f-98df-fa72e4948cee-c000.json.crc
-rw-r--r--    1 t3nq  wheel   8.0K Feb  7 19:53 .part-00002-9e8e9bdd-c629-4d3f-98df-fa72e4948cee-c000.json.crc
-rw-r--r--    1 t3nq  wheel   4.9K Feb  7 19:53 .part-00003-9e8e9bdd-c629-4d3f-98df-fa72e4948cee-c000.json.crc
-rw-r--r--    1 t3nq  wheel   3.6K Feb  7 19:53 .part-00004-9e8e9bdd-c629-4d3f-98df-fa72e4948cee-c000.json.crc
-rw-r--r--    1 t3nq  wheel   3.0K Feb  7 19:53 .p...


In [42]:
import org.apache.spark.sql.functions._

val df = spark.range(1).select(schema_of_json(lit("""{ "foo" : "bar" }""")))
df.as[String].collect.head

df = [schemaofjson({ "foo" : "bar" }): string]


struct<foo:string>

In [24]:
df.schema.toDDL

`continent` STRING,`coordinates` STRING,`elevation_ft` BIGINT,`gps_code` STRING,`iata_code` STRING,`ident` STRING,`local_code` STRING,`municipality` STRING,`name` STRING,`type` STRING,`iso_region` STRING,`iso_country` STRING

Такой формат хранения позволит использовать `partition pruning` и быстро фильтровать данные по колонкам `iso_region` и `iso_country`

Теперь сохраним датасет в формат `text`. Для этого нам необходимо подгтовить DF, в котором будет единственная колонка `value: String`

In [None]:
airports
    .select('ident.alias("value"))
    .write
    .mode("overwrite")
    .format("text")
    .save("/tmp/datasets/airports-1.txt")

In [None]:
println("ls -alh /tmp/datasets/airports-1.txt".!!)

Файловые форматы не имеют автоматической валидации данных при записи, поэтому достаточно легко ошибиться и записать данные в другом формате. Такая запись пройдет без ошибок:

In [None]:
airports
    .write
    .mode("append")
    .json("/tmp/datasets/airports-1.txt")

При попытке чтения данных с помощью `text` мы получим все данные, тк форма `json` сохраняет все в виде JSON строк. Однако, если прочитать данные с помощью `json`, часть данных будут помечены как невалидные и помещены в колонку `_corrupt_record`

In [None]:
val airports = spark.read.json("/tmp/datasets/airports-1.txt")
airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

Отобразим невалидные JSON строки:

In [None]:
// Начиная со Spark 2.3 нельзя выбирать одну колонку _corrupt_record, поэтому мы добавим к выводу ident
airports.na.drop("all", Seq("_corrupt_record")).select($"_corrupt_record", $"ident").show(20, false)

### Режимы записи
Spark позволяет нам выбирать режим записи данных с помощью метода `mode()`. Данный метод принимает один из параметров:
- `overwrite` - перезаписывает всю директорию целиком (или партицию, если используется партиционирование)
- `append` - дописывает новые файлы к текущим
- `ignore` - не выполняет запись (no op режим)
- `error` или `errorifexists` - возвращает ошибку, если директория уже существует

### Семплирование

Форматы `csv` и `json` позволяют автоматически выводить схему из данных. При этом по-умолчанию Spark прочитает все данные и составит подходящую схему. Однако, если мы работаем с большим датасетом, это может занять продолжительное время. Решить это можно с помощью опции `samplingRatio`:

In [43]:
spark.time { 
    val csvOptions = Map("header" -> "true", "inferSchema" -> "true", "samplingRatio" -> "0.1")
    val airports = spark.read.options(csvOptions).csv("/tmp/datasets/airport-codes.csv")
    airports.printSchema
}

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

Time taken: 263 ms


In [44]:
spark.time { 
    val csvOptions = Map("header" -> "true", "inferSchema" -> "true", "samplingRatio" -> "1.0")
    val airports = spark.read.options(csvOptions).csv("/tmp/datasets/airport-codes.csv")
    airports.printSchema
}

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

Time taken: 265 ms


### Выводы
- Spark позволяет работать с текстовыми файлами `json`, `csv`, `text`
- При чтении и записи поддерживаются кодеки сжатия данных, это создает дополнительные накладные расходы
- При записи данных в текстовые форматы Spark **не выполняет** валидацию схемы и формата
- При включенном выведении схемы из источника чтение из текстовых форматов происходит дольше

## Orc и Parquet
В отличие от обычных текстовых форматов, ORC и Parquet изначально спроектированы под распределенные системы хранения и обработки. Они являются колоночными - в них есть колонки и схема, как в таблицах БД и бинарными - прочитать обычным текстовым редактором их не получится. Форматы имеют похожие показатели производительности и архитектуру, но Parquet используется чаще

### Преимущества
- наличие схемы данных
- блочная компрессия 
- для каждого блока для каждой колонки вычисляется max и min, что позволяет ускорять чтение

### Недостатки:
- нельзя дописывать/менять данные в существующих файлах
- необходимо делать compaction

Подробнее о Parquet:  
[Фёдор Лаврентьев, Moscow Spark #5: Как класть Parquet](https://youtu.be/VHsvr10b63c?t=512)

По аналогии с текстовыми форматами, при записи, Spark создает директорию и пишет туда все непустые партиции.  Обратите внимание на последовательность форматов записи - `snappy.parquet` вместо, скажем, `json.gz`. При использовании компрессии сам parquet файл не помещается в сжатый контейнер. Вместо этого, компрессии подлежат блоки с данными. Это полностью снимает ограничение, из-за которого чтение сжатых текстовых файлов происходит в 1 поток в 1 партицию.

In [46]:
airports.write.mode("overwrite").parquet("/tmp/datasets/airports-1.parquet")

In [47]:
println("ls -alh /tmp/datasets/airports-1.parquet".!!)

total 8384
drwxr-xr-x  8 t3nq  wheel   256B Feb  7 20:23 .
drwxr-xr-x  8 t3nq  wheel   256B Feb  7 20:23 ..
-rw-r--r--  1 t3nq  wheel     8B Feb  7 20:23 ._SUCCESS.crc
-rw-r--r--  1 t3nq  wheel    17K Feb  7 20:23 .part-00000-6ff24119-ba57-4148-b862-894ccfe5cca0-c000.snappy.parquet.crc
-rw-r--r--  1 t3nq  wheel   7.8K Feb  7 20:23 .part-00001-6ff24119-ba57-4148-b862-894ccfe5cca0-c000.snappy.parquet.crc
-rw-r--r--  1 t3nq  wheel     0B Feb  7 20:23 _SUCCESS
-rw-r--r--  1 t3nq  wheel   2.1M Feb  7 20:23 part-00000-6ff24119-ba57-4148-b862-894ccfe5cca0-c000.snappy.parquet
-rw-r--r--  1 t3nq  wheel   994K Feb  7 20:23 part-00001-6ff24119-ba57-4148-b862-894ccfe5cca0-c000.snappy.parquet



In [48]:
spark.read.parquet("/tmp/datasets/airports-1.parquet").printSchema

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



### Schema evolution

При работе с ORC/Parquet, часто возникает вопрос эволюции схемы - изменения структуры данных относительно первоначальных файлов. Создадим два DF с разными схемами и запишем их в одну директорию:

In [49]:
case class Apple(size: Int, color: String)

defined class Apple


In [64]:
List(Apple(1, "green")).toDS.write.mode("overwrite").parquet("/tmp/datasets/apples.parquet")

lastException: Throwable = null


In [51]:
case class PriceApple(size: Int, color: String, price: Double)

defined class PriceApple


In [52]:
List(PriceApple(1, "green", 2.0)).toDS.write.mode("append").parquet("/tmp/datasets/apples.parquet")

Несмотря на то, что файлы имеют разную схему, Spark корректно читает файлы, используя обобщенную схему:

In [56]:
val df = spark.read.parquet("/tmp/datasets/apples.parquet")
df.show

+----+-----+-----+
|size|color|price|
+----+-----+-----+
|   1|green|  2.0|
|   1|green| null|
+----+-----+-----+



df = [size: int, color: string ... 1 more field]


[size: int, color: string ... 1 more field]

Однако, это работает только тогда, когда мы добавляем новые колонки к нашей схеме. Если мы запишем новый файл, изменив тип уже существующей колонки, мы получим ошибку:

In [59]:
case class AppleBase(size: Double)

defined class AppleBase


In [67]:
val df1 = spark.read.format("parquet").option("path", "/tmp/datasets/apples.parquet").load
df1.schema

df1 = [size: int, color: string]


StructType(StructField(size,IntegerType,true), StructField(color,StringType,true))

In [None]:
val path = "/tmp/datasets/apples.parquet"
val toWrite = List(PriceApple(1, "green", 2.0)).toDS

val mySchema = spark.read.parquet(path)
if (mySchema == toWrite.schema) { 
    toWrite.write.mode("append").parquet("/tmp/datasets/apples.parquet")
}

In [60]:
List(AppleBase(3.0)).toDS.write.mode("append").parquet("/tmp/datasets/apples.parquet")

In [63]:
spark.conf.set("spark.sql.parquet.mergeSchema", "true")
val df = spark.read.parquet("/tmp/datasets/apples.parquet")
df.show

lastException = null


org.apache.spark.SparkException: Failed merging schema:
root
 |-- size: double (nullable = false)


Посмотрим все доступные опции для работы с Parquet:

In [57]:
%%dataframe --limit 20
spark.sql("SET -v").filter('key contains "parquet")

key,value,meaning
spark.sql.parquet.binaryAsString,false,"Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems."
spark.sql.parquet.columnarReaderBatchSize,4096,The number of rows to include in a parquet vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data.
spark.sql.parquet.compression.codec,snappy,"Sets the compression codec used when writing Parquet files. If either `compression` or `parquet.compression` is specified in the table-specific options/properties, the precedence would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd."
spark.sql.parquet.enableVectorizedReader,true,Enables vectorized parquet decoding.
spark.sql.parquet.filterPushdown,true,Enables Parquet filter push-down optimization when set to true.
spark.sql.parquet.int64AsTimestampMillis,false,"(Deprecated since Spark 2.3, please set spark.sql.parquet.outputTimestampType.) When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the extended type. In this mode, the microsecond portion of the timestamp value will betruncated."
spark.sql.parquet.int96AsTimestamp,true,"Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems."
spark.sql.parquet.int96TimestampConversion,false,"This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive & Spark."
spark.sql.parquet.mergeSchema,false,"When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available."
spark.sql.parquet.outputTimestampType,INT96,"Sets which Parquet timestamp type to use when Spark writes data to Parquet files. INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value."


### Parquet Tools
Для диагностики и решения проблем, связанных с parquet, можно использовать утилиту `parquet-tools`

Она позволяет:
- получить схему файла
- вывести содержимое файла в консоль
- объединить несколько файлов в один

https://github.com/apache/parquet-mr/tree/master/parquet-tools

### Сравнение скорости обработки запросов

Подготовим датасеты:

In [68]:
val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("/tmp/datasets/airport-codes.csv")

csvOptions = Map(header -> true, inferSchema -> true)
airports = [ident: string, type: string ... 10 more fields]


[ident: string, type: string ... 10 more fields]

In [69]:
1 to 50 foreach { x =>
    airports.repartition(1).write.mode("append").parquet("/tmp/datasets/a1.parquet")
    airports.repartition(1).write.mode("append").json("/tmp/datasets/a1.json")
    airports.repartition(1).write.mode("append").orc("/tmp/datasets/a1.orc")
}

In [70]:
import org.apache.spark.sql.Dataset
case class DatasetFormat[T](ds: Dataset[T], format: String)

val datasets = 
    DatasetFormat(spark.read.orc("/tmp/datasets/a1.orc"), "orc") ::
    DatasetFormat(spark.read.parquet("/tmp/datasets/a1.parquet"), "parquet") ::
    DatasetFormat(spark.read.json("/tmp/datasets/a1.json"), "json") ::
    Nil

defined class DatasetFormat
datasets = List(DatasetFormat([ident: string, type: string ... 10 more fields],orc), DatasetFormat([ident: string, type: string ... 10 more fields],parquet), DatasetFormat([continent: string, coordinates: string ... 10 more fields],json))


List(DatasetFormat([ident: string, type: string ... 10 more fields],orc), DatasetFormat([ident: string, type: string ... 10 more fields],parquet), DatasetFormat([continent: string, coordinates: string ... 10 more fields],json))

Сравним скорость работы фильтраци:

In [71]:
datasets.foreach { x => 
    println(s"Running ${x.format}")
    spark.time {
        println(x.ds.filter($"iso_country" === "RU" and $"elevation_ft" > 300).count)
    }
}

Running orc
20500
Time taken: 713 ms
Running parquet
20500
Time taken: 524 ms
Running json
20500
Time taken: 2394 ms


Сравним скорость подсчета количества строк:

In [72]:
datasets.foreach { x => 
    println(s"Running ${x.format}")
    spark.time {
        println(x.ds.count)
    }
}

Running orc
2811300
Time taken: 175 ms
Running parquet
2811300
Time taken: 129 ms
Running json
2811300
Time taken: 2010 ms


### Выводы:
- Форматы `orc` и `parquet` позволяют эффективно работать со структурированными данными
- Производительность `orc` и `parquet` на порядок выше обычных текстовых файлов
- Данные форматы поддерживают сжатие на блочном уровне, что позволяет избегать проблем с многопоточным чтением
- Форматы поддерживают добавление новых колонок в схему, но не изменение текущих

## Elastic
Документориентированная распределенная база данных.

### Преимущества:
- Удобный графический интерфейс Kibana
- Полнотекстовый поиск по любым колонкам
- Встроенная поддержка timeseries
- Поддержка вложенных структур
- Возможность записи данных с произвольной схемой
- Возможность перезаписывать данные по ключу документа

### Недостатки:
- Ассиметричная архитектура
- Скорость записи ограничена самой медленным узлом
- Большие накладные расходы CPU на индексирование
- Ротация шардов не всегда проходит гладко

https://www.elastic.co

### Запуск в docker
https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#docker-cli-run-dev-mode  
https://www.elastic.co/guide/en/kibana/current/docker.html#_running_kibana_on_docker_for_development  

### Spark connector
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20  
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html  

В Elastic есть несколько основных сущностей:

**Index** - представляет собой "таблицу с данными", если проводить аналогию с реляционными БД. Данные в elastic обычно хранятся в виде индексов, разбитых на сутки (foo-2020-05-30, foo-2020-05-29 и т. д.). У каждого документа в индексе есть ключ `_id` и может быть метка времени, по которой в Kibana строятся визуализации

**Template** - шаблон с параметрами, с которыми создается новый индекс. Пример шаблона представлен ниже:
```shell
PUT _template/airports
{
  "index_patterns": ["airports-*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_doc": {
      "dynamic": true,
      "_source": {
        "enabled": true
      },
      "properties": {
        "ts": {
          "type": "date",
          "format": "strict_date_optional_time||epoch_millis"
        }
      }
    }
  }
}
```
**Shard** - индексы в elastic делятся ~~почкованием~~ на шарды. Это позволяет хранить индекс на нескольких узлах кластера

**Index Pattern** - шаблон, применяемый к индексу на уровне Kibana. Позволяет настраивать форматирование и подсветку полей

Перед тем, как начать писать в elastic с помощью Spark, нам необходимо создать шаблон, иначе индекс будет создан с параметрами по умолчанию и построить красивый pie chart в Kibana у нас не получится. Это можно сделать с помощью Dev Tools в Kibana.

In [73]:
import org.apache.spark.sql.functions._

val esOptions = 
    Map(
        "es.nodes" -> "localhost:9200", 
        "es.batch.write.refresh" -> "false",
        "es.nodes.wan.only" -> "true"   
    )

airports
    .withColumn("ts", current_timestamp())
    .withColumn("date", current_date())
    .withColumn("foo", lit("foo"))
    .write.format("es").options(esOptions).save("airports-{date}/_doc")

esOptions = Map(es.nodes -> localhost:9200, es.batch.write.refresh -> false, es.nodes.wan.only -> true)


Map(es.nodes -> localhost:9200, es.batch.write.refresh -> false, es.nodes.wan.only -> true)

In [74]:
val esDf = spark.read.format("es").options(esOptions).load("airports-*")
esDf.printSchema
esDf.show(1, 200, true)

root
 |-- continent: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- date: long (nullable = true)
 |-- elevation_ft: long (nullable = true)
 |-- foo: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- type: string (nullable = true)

-RECORD 0-------------------------------
 continent    | SA                      
 coordinates  | -73.868056, 9.942222    
 date         | 1644181200000           
 elevation_ft | 256                     
 foo          | foo                     
 gps_code     | null                    
 iata_code    | null                    
 ident        | SK-164                  
 iso_country  | CO              

esDf = [continent: string, coordinates: string ... 13 more fields]


[continent: string, coordinates: string ... 13 more fields]

Количество партций в DF совпадает с общим числом шардов индексов, которые мы указали в `load()`. Поскольку у нас 1 индекс и 1 шард (см. шаблон), данный DF имеет 1 партицию:

In [75]:
esDf.rdd.getNumPartitions

1

К применяемым фильтрам применяется оптимизация `filter pushdown`

In [76]:
esDf.filter('iso_region contains "RU").explain()

== Physical Plan ==
*(1) Filter (isnotnull(iso_region#5195) && Contains(iso_region#5195, RU))
+- *(1) Scan ElasticsearchRelation(Map(es.nodes -> localhost:9200, es.nodes.wan.only -> true, es.resource -> airports-*, es.batch.write.refresh -> false),org.apache.spark.sql.SQLContext@10c75cde,None) [continent#5186,coordinates#5187,date#5188L,elevation_ft#5189L,foo#5190,gps_code#5191,iata_code#5192,ident#5193,iso_country#5194,iso_region#5195,local_code#5196,municipality#5197,name#5198,ts#5199,type#5200] PushedFilters: [IsNotNull(iso_region), StringContains(iso_region,RU)], ReadSchema: struct<continent:string,coordinates:string,date:bigint,elevation_ft:bigint,foo:string,gps_code:st...


In [77]:
esDf
    .filter(
        'ts between (
                    lit("2020-06-01 16:57:30.000").cast("timestamp"), 
                    lit("2020-06-01 16:59:30.000").cast("timestamp")
        )
    ).explain(true)

== Parsed Logical Plan ==
'Filter (('ts >= cast(2020-06-01 16:57:30.000 as timestamp)) && ('ts <= cast(2020-06-01 16:59:30.000 as timestamp)))
+- Relation[continent#5186,coordinates#5187,date#5188L,elevation_ft#5189L,foo#5190,gps_code#5191,iata_code#5192,ident#5193,iso_country#5194,iso_region#5195,local_code#5196,municipality#5197,name#5198,ts#5199,type#5200] ElasticsearchRelation(Map(es.nodes -> localhost:9200, es.nodes.wan.only -> true, es.resource -> airports-*, es.batch.write.refresh -> false),org.apache.spark.sql.SQLContext@10c75cde,None)

== Analyzed Logical Plan ==
continent: string, coordinates: string, date: bigint, elevation_ft: bigint, foo: string, gps_code: string, iata_code: string, ident: string, iso_country: string, iso_region: string, local_code: string, municipality: string, name: string, ts: timestamp, type: string
Filter ((ts#5199 >= cast(2020-06-01 16:57:30.000 as timestamp)) && (ts#5199 <= cast(2020-06-01 16:59:30.000 as timestamp)))
+- Relation[continent#5186,coor

### Выводы:
- Elastic - удобное распределенное хранилище документов, не накладывающее строгих ограничений на схему документов
- Elastic позволяет делать сложные запросы, включая полнотекстовые
- При работе с elastic, Spark часто использует `filter pushdown`
- Spark отлично подходит для того, чтобы писать в elastic. Однако чтение работает не очень быстро.

## Cassandra
Распределенная табличная база данных

### Преимущества
- Высокая доступность данных
- Мжожно строить гео-кластера
- Высокая скорость записи и чтения
- Скорость ограничена самым быстрым узлом
- Линейная масштабируемость
- Возможность хранить БОЛЬШИЕ объемы данных
- Возможность быстро получать строку по ключу на любом объеме данных

### Недостатки
- слабая согласованность (eventual)
- Бедный SQL (в кассандре он называется CQL)
- Отсутствие транзакций (не совсем)

https://cassandra.apache.org

Cassandra имеет симметричную архитектуру. Каждый узел отвечает за хранение данных, обработку запросов и состояние кластера. 

Расположение данных определяется значением хеш функции от Partition key.

Высокая доступность данных обеспечивается за счет репликации.

![Cassandra Architecture](https://cassandra.apache.org/doc/latest/_images/ring.svg)
Источник: https://cassandra.apache.org/doc/latest/architecture/dynamo.html#dataset-partitioning-consistent-hashing

### Запуск в docker

Запуск инстанса:
```shell
docker run --rm --name cass -p 9042:9042 -e CASSANDRA_BROADCAST_ADDRESS=127.0.0.1 cassandra:latest
```

Подключение к cassandra:
```shell
docker run -it --rm cassandra:latest cqlsh host.docker.internal
```

В Cassandra есть:
- `keyspace` - аналог database - логическое объединение таблиц. На уровне keyspace устанавливается фактор репликации
- `table` - таблицы, как в обычной БД

### Spark connector
https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector  
https://github.com/datastax/spark-cassandra-connector#documentation

Для того, чтобы записать данные в cassandra, нам необходимо создать keyspace, используя утилиту `cqlsh`:
```shell
CREATE KEYSPACE IF NOT EXISTS airports 
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3};
```

Теперь нужно создать таблицу со схемой:

In [78]:
val typesMap = Map("string" -> "text", "int" -> "int")

val primaryKey = "ident"

val ddlColumns = airports.schema.fields.map { x =>
    if(x.name == primaryKey) {
        s"${x.name} ${typesMap(x.dataType.simpleString)} PRIMARY KEY"
    }
    else {
        s"${x.name} ${typesMap(x.dataType.simpleString)}"
    }
}.mkString(",")

val ddlQuery = s"CREATE TABLE IF NOT EXISTS airports.codes ($ddlColumns);"

println(ddlQuery)

CREATE TABLE IF NOT EXISTS airports.codes (ident text PRIMARY KEY,type text,name text,elevation_ft int,continent text,iso_country text,iso_region text,municipality text,gps_code text,iata_code text,local_code text,coordinates text);


typesMap = Map(string -> text, int -> int)
primaryKey = ident
ddlColumns = ident text PRIMARY KEY,type text,name text,elevation_ft int,continent text,iso_country text,iso_region text,municipality text,gps_code text,iata_code text,local_code text,coordinates text
ddlQuery = CREATE TABLE IF NOT EXISTS airports.codes (ident text PRIMARY KEY,type text,name text,elevation_ft int,continent text,iso_country text,iso_region text,municipality text,gps_code text,iata_code text,local_code text,coordinates text);


CREATE TABLE IF NOT EXISTS airports.codes (ident text PRIMARY KEY,type text,name text,elevation_ft int,continent text,iso_country text,iso_region text,municipality text,gps_code text,iata_code text,local_code text,coordinates text);

Настроим параметры подключения к БД:

In [79]:
import org.apache.spark.sql.cassandra._

spark.conf.set("spark.cassandra.connection.host", "127.0.0.1")
spark.conf.set("spark.cassandra.output.consistency.level", "ANY")
spark.conf.set("spark.cassandra.input.consistency.level", "ONE")

val tableOpts = Map("table" -> "codes","keyspace" -> "airports")

tableOpts = Map(table -> codes, keyspace -> airports)


Map(table -> codes, keyspace -> airports)

Теперь мы можем записать датасет в БД:

In [80]:
airports
    .write
    .format("org.apache.spark.sql.cassandra")
    .mode("append")
    .options(tableOpts)
    .save()

In [81]:
val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(tableOpts)
  .load()

df.show(1, 200, true)

-RECORD 0-------------------------------
 ident        | VE-0107                 
 continent    | SA                      
 coordinates  | -65.533333, 3.166667    
 elevation_ft | 375                     
 gps_code     | SVLE                    
 iata_code    | null                    
 iso_country  | VE                      
 iso_region   | VE-Z                    
 local_code   | null                    
 municipality | La Esmeralda            
 name         | Aeropuerto La Esmeralda 
 type         | small_airport           
only showing top 1 row



df = [ident: string, continent: string ... 10 more fields]


[ident: string, continent: string ... 10 more fields]

Скорость чтения ОЧЕНЬ сильно зависит от структуры таблицы и запроса. Если мы сделаем запрос по колонке `ident`, которая является ключом, то будет применена оптимизация `filter pushdown` и запрос отработает очень быстро.

In [82]:
df.filter('ident === "22WV").explain()

spark.time { 
    df.filter('ident === "22WV").show(1, 200, true)
}

== Physical Plan ==
*(1) Filter isnotnull(ident#5302)
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@267e6964 [ident#5302,continent#5303,coordinates#5304,elevation_ft#5305,gps_code#5306,iata_code#5307,iso_country#5308,iso_region#5309,local_code#5310,municipality#5311,name#5312,type#5313] PushedFilters: [IsNotNull(ident), *EqualTo(ident,22WV)], ReadSchema: struct<ident:string,continent:string,coordinates:string,elevation_ft:int,gps_code:string,iata_cod...
-RECORD 0--------------------------------------------
 ident        | 22WV                                 
 continent    | NA                                   
 coordinates  | -80.0291976928711, 39.34360122680664 
 elevation_ft | 1271                                 
 gps_code     | 22WV                                 
 iata_code    | null                                 
 iso_country  | US                                   
 iso_region   | US-WV                                
 local_code   | 22WV              

Если же мы сделаем запрос по другой колонке, то он не будет так же эффективен, хотя `filter pushdown` тоже отработает. Это происходит из-за того, что зная ключ, Cassandra знает, на каком хосте и где находятся данные. Когда мы фильтруем по колонке, которая не является ключом, БД приходится искать эти данные на всем кластере.

In [83]:
df.filter('iso_region === "RU").explain()

spark.time {
    df.filter('iso_region === "RU").show(1, 200, true)
}

== Physical Plan ==
*(1) Filter (isnotnull(iso_region#5309) && (iso_region#5309 = RU))
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@267e6964 [ident#5302,continent#5303,coordinates#5304,elevation_ft#5305,gps_code#5306,iata_code#5307,iso_country#5308,iso_region#5309,local_code#5310,municipality#5311,name#5312,type#5313] PushedFilters: [IsNotNull(iso_region), EqualTo(iso_region,RU)], ReadSchema: struct<ident:string,continent:string,coordinates:string,elevation_ft:int,gps_code:string,iata_cod...
(0 rows)

Time taken: 2120 ms


Если сделать запрос более сложным, то `filter pushdown` не отработает и Spark прочитает всю таблицу целиком:

In [84]:
import org.apache.spark.sql.functions._

df.filter(lower('iso_region) === "ru").explain(true)

spark.time {
    df.filter(lower('iso_region) === "ru").show(1, 200, true)
}

== Parsed Logical Plan ==
'Filter (lower('iso_region) = ru)
+- Relation[ident#5302,continent#5303,coordinates#5304,elevation_ft#5305,gps_code#5306,iata_code#5307,iso_country#5308,iso_region#5309,local_code#5310,municipality#5311,name#5312,type#5313] org.apache.spark.sql.cassandra.CassandraSourceRelation@267e6964

== Analyzed Logical Plan ==
ident: string, continent: string, coordinates: string, elevation_ft: int, gps_code: string, iata_code: string, iso_country: string, iso_region: string, local_code: string, municipality: string, name: string, type: string
Filter (lower(iso_region#5309) = ru)
+- Relation[ident#5302,continent#5303,coordinates#5304,elevation_ft#5305,gps_code#5306,iata_code#5307,iso_country#5308,iso_region#5309,local_code#5310,municipality#5311,name#5312,type#5313] org.apache.spark.sql.cassandra.CassandraSourceRelation@267e6964

== Optimized Logical Plan ==
Filter (lower(iso_region#5309) = ru)
+- Relation[ident#5302,continent#5303,coordinates#5304,elevation_ft#5305,gps_c

Необходимо понимать, что структура данной таблицы не отражает реального паттерна работой с данной базой - редко ключом является только одна колонка. В реальности ключ является составным и состоит из нескольких колонок, что позволяет делать более сложные запросы, которые будут обрабатываться также быстро, как первый запрос с фильтрацией по колонке "ident"

### Выводы
- cassandra - одна из немногих БД, которая способна эффективно хранить большие объемы данных
- в cassandra структура таблицы формируется, исходя из запросов, которые будут выполняться, а не наоборот
- в данной БД данные обычно хранятся в денормализованном виде (если утрировать - то по таблице на каждый запрос)
- Spark отлично подходит для чтения и записи данных в cassandra, но ее придется настроить под данный профиль нагрузки

## PostgreSQL
Классическая РБД

### Преимущества:
- это PostgreSQL

### Недостатки
- нет их

### Запуск в docker
```shell
docker run --rm -p 5432:5432 --name test_postgre -e POSTGRES_PASSWORD=12345 postgres:latest
```

Подключение с помощью psql:
```shell
docker run -it --rm postgres psql -h host.docker.internal -U postgres
```

### Spark connector
https://mvnrepository.com/artifact/org.postgresql/postgresql
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Для работы с БД создадим database:
```shell
CREATE DATABASE airports
```
Подготовим DDL для создания таблицы:

In [8]:
val typesMap = Map("string" -> "VARCHAR (100)", "int" -> "INTEGER")

val primaryKey = "ident"

val ddlColumns = airports.schema.fields.map { x =>
    if(x.name == primaryKey) {
        s"${x.name} ${typesMap(x.dataType.simpleString)} PRIMARY KEY"
    }
    else {
        s"${x.name} ${typesMap(x.dataType.simpleString)}"
    }
}.mkString(",")

val ddlQuery = s"CREATE TABLE IF NOT EXISTS codes ($ddlColumns);"

val jdbcUrl = "jdbc:postgresql://localhost/airports?user=postgres&password=12345"

println(ddlQuery)

CREATE TABLE IF NOT EXISTS codes (ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100));


typesMap = Map(string -> VARCHAR (100), int -> INTEGER)
primaryKey = ident
ddlColumns = ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100)
ddlQuery = CREATE TABLE IF NOT EXISTS codes (ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100));


jdbcUrl: ...


CREATE TABLE IF NOT EXISTS codes (ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100));

Запишем данные в БД:

In [9]:
airports.write.format("jdbc").option("url", jdbcUrl).option("dbtable", "codes").mode("append").save

In [10]:
val df = spark
    .read
    .format("jdbc")
    .option("url", jdbcUrl)
    .option("dbtable", "codes")
    .load()

df.printSchema
df.show(2, 200, true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

-RECORD 0----------------------------------------------
 ident        | OK94                                   
 type         | small_airport                          
 name         | Sand Ridge Airpark Inc Airport         
 elevation_ft | 675                                    
 continent    | NA                                     
 iso_country  | US                                     
 iso_region   | US-OK                                  
 municipality | Collinsville                           


df = [ident: string, type: string ... 10 more fields]


[ident: string, type: string ... 10 more fields]

При использовании параметром по умолчанию мы получаем всего 1 партицию в DF:

In [88]:
df.rdd.getNumPartitions

1

Исправить это можно, используя параметры `partitionColumn`, `lowerBound`, `upperBound`, `numPartitions`. Для этого нам понадобится добавиь новую колонку в нашу таблицу:

In [89]:
val ddlColumns = airports.schema.fields.map { x =>
    if(x.name == primaryKey) {
        s"${x.name} ${typesMap(x.dataType.simpleString)} PRIMARY KEY"
    }
    else {
        s"${x.name} ${typesMap(x.dataType.simpleString)}"
    }
} :+ "id INTEGER" mkString(",")

val ddlQuery = s"CREATE TABLE IF NOT EXISTS codes_x ($ddlColumns);"

println(ddlQuery)

CREATE TABLE IF NOT EXISTS codes_x (ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100),id INTEGER);


ddlColumns = ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100),id INTEGER
ddlQuery = CREATE TABLE IF NOT EXISTS codes_x (ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100),id INTEGER);


CREATE TABLE IF NOT EXISTS codes_x (ident VARCHAR (100) PRIMARY KEY,type VARCHAR (100),name VARCHAR (100),elevation_ft INTEGER,continent VARCHAR (100),iso_country VARCHAR (100),iso_region VARCHAR (100),municipality VARCHAR (100),gps_code VARCHAR (100),iata_code VARCHAR (100),local_code VARCHAR (100),coordinates VARCHAR (100),id INTEGER);

Перезапишем данные в новую таблицу:

In [90]:
import org.apache.spark.sql.functions._

airports
    .withColumn("id", round(rand() * 10000).cast("int"))
    .write
    .format("jdbc")
    .option("url", jdbcUrl)
    .option("dbtable", "codes_x")
    .mode("append").save

Прочитаем таблицу, установив дополнительные параметры:

In [94]:
val df = spark
    .read
    .format("jdbc")
    .option("url", jdbcUrl)
    .option("dbtable", "codes_x")
    .option("partitionColumn", "id")
    .option("lowerBound", "0")
    .option("upperBound", "100000")
    .option("numPartitions", "200")
    .load()

df.printSchema
df.show(2, 200, true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- id: integer (nullable = true)

-RECORD 0-------------------------------------
 ident        | OK99                          
 type         | closed                        
 name         | Unity Health Center Heliport  
 elevation_ft | 1073                          
 continent    | NA                            
 iso_country  | US                            
 iso_region   | US-OK                         
 municipality | Shawnee                       
 gps_code     | null                          

df = [ident: string, type: string ... 11 more fields]


[ident: string, type: string ... 11 more fields]

Проверим, сколько партиций получилось:

In [95]:
df.rdd.getNumPartitions

200

Проверим распределение данных по партициям:

In [96]:
import org.apache.spark.sql.functions._

df.groupBy(spark_partition_id()).count().show(200, false)

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|12                  |2864 |
|1                   |2811 |
|13                  |2867 |
|6                   |2831 |
|16                  |2753 |
|3                   |2870 |
|20                  |4    |
|5                   |2890 |
|19                  |2819 |
|15                  |2740 |
|9                   |2817 |
|17                  |2920 |
|4                   |2846 |
|8                   |2769 |
|7                   |2819 |
|10                  |2707 |
|11                  |2782 |
|14                  |2653 |
|2                   |2908 |
|0                   |2813 |
|18                  |2743 |
+--------------------+-----+



In [None]:
SELECT * FROM table WHERE 1 = 0 LIMIT 0;

In [None]:
SELECT * FROM table WHERE id > 0 AND id < 10;
SELECT * FROM table WHERE id > 10 AND id < 20;

### Выводы:
- Spark позволяет работать с PostgreSQL через JDBC коннектор
- При использовании `jdbc` настройка партиционирования задается вручную