<img align="right" width="200" height="200" src="https://static.tildacdn.com/tild6236-6337-4339-b337-313363643735/new_logo.png">

# Spark Structured Streaming II
**Андрей Титов**  
tenke.iu8@gmail.com  

## На этом занятии
+ Что такое stateful streaming
+ Удаление дубликатов
+ Агрегаты
+ Соединения

## Что такое stateful streaming

**stateful streaming** - это вид поточной обработки данных, при которой при обработке батча с данными используются данные из предыдущих батчей

Все операции с использованием select, filter, withColumn (кроме операций с плавающими окнами) являются stateless. На практике это означает:
- стрим не выполняет операций, требующих работы с данными из разных батчей
- после обработки батча стрим "забывает" про него
- высокую пропускную способность
- небольшое количество файлов и общий объем чекпоинта
- возможность вносить существенные правки в код стрима без пересоздания чекпоинта

Если при обработке стрима используются такие методы, как `join()`, `groupBy()`, `dropDuplicates()` или функции над плавающими окнами, то:
- в стриме должна быть колонка с временной меткой, на основе которой можно определить `watermark`
- стрим будет работать медленней, чем вы ожидаете
- в чекпоинте будет МНОГО файлов
- при внесении изменений в код стрима с большой вероятностью придется пересоздавать чекпоинт

Подготовим функции для управления стримами:

In [None]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createConsoleSink(chkName: String, df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/chk/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

In [None]:
import org.apache.spark.sql.SparkSession

def killAll() = {
    SparkSession
        .active
        .streams
        .active
        .foreach { x =>
                    val desc = x.lastProgress.sources.head.description
                    x.stop
                    println(s"Stopped ${desc}")
        }               
}

In [None]:
import org.apache.spark.sql.functions._

def airportsDf() = {
    val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
    spark.read.options(csvOptions).csv("/tmp/datasets/airport-codes.csv")
}

def randomIdent() = {
    
    val idents = airportsDf().select('ident).limit(20).distinct.as[String].collect

    val columnArray = idents.map( x => lit(x) )
    val sparkArray = array(columnArray:_*)
    val shuffledArray = shuffle(sparkArray)

    shuffledArray(0)
}

## Удаление дубликатов

Spark позволяет удалять дубликаты данных в стриме. Это можно сделать двумя способами:
- без использования `watermark`
- с использованием `watermark`

### Без использования watermark
- Хеш каждого элемента будет сохраняться в чекпоинте
- В стриме полностью исключаются дубликаты
- Со временем начнется деградация стрима

In [None]:
import sys.process._
"rm -rf /tmp/chk".!!

In [None]:
val sdfWithDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

In [None]:
createConsoleSink("state1", sdfWithDuplicates).start

In [None]:
val sdfWithoutDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .dropDuplicates(Seq("ident"))

In [None]:
createConsoleSink("state2", sdfWithoutDuplicates).start

In [None]:
killAll

### С использованием watermark
- Хеш старых событий удаляется из чекпоинта
- Появление дубликатов возможно, если они приходят c задержкой N > watermark
- Стрим не деградирует со временем
- Колонка, по которой делается `watermark`, должна быть включена в `dropDuplicates`

In [None]:
val sdfWithoutDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withWatermark("timestamp", "10 minutes")
    .dropDuplicates(Seq("ident", "timestamp"))

createConsoleSink("state3", sdfWithoutDuplicates).start

### Выводы:
- Spark позволяет удалять дубликаты из стрима
- Для стабильной работы требуется использовать `watermark`

## Агрегаты
При построении агрегатов на стриме важно задать правильный `outputMode`, который может иметь три значения:
- `append`
- `update`
- `complete`

In [None]:
import sys.process._
"rm -rf /tmp/chk".!!

In [None]:
killAll

### Complete mode

In [None]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createConsoleSink(chkName: String, mode: String, df: DataFrame) = {
    df
    .writeStream
    .outputMode(mode)
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/chk/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

In [None]:
import org.apache.spark.sql.functions._
val grouped = 
    sdfWithDuplicates.groupBy('ident).count

In [None]:
createConsoleSink("state3", "complete", grouped).start

In [None]:
killAll

### Update mode

In [None]:
createConsoleSink("state4", "update", grouped).start

In [None]:
killAll

###  Агрегаты на плавающих окнах
Плавающее окно позволяет сгруппировать события в окна определенного размера (по времени). При этом, поскольку каждое событие может находится одновременно в нескольких окнах, то общий размер агрегата существенно увеличивается

Окно задается при создании агрегата с помощью функции `window` внутри `groupBy`. В параметрах указывается длина окна и расстояние между двумя точкой начала текущего и следующего окна.

<img align="center" width="1000" height="1000" src="https://spark.apache.org/docs/latest/img/structured-streaming-window.png">

In [None]:
import sys.process._
"rm -rf /tmp/chk".!!

Создадим стрим с использованием `window` и `watermark` в `update` режиме. В данном случае watermark позволяет игнорировать события, которые приходят с запозданием (`latency` > `max_event_timestamp` + `watermark_value`)

In [None]:
val oldData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", lit("OLD DATA"))
    .withColumn("timestamp", date_sub('timestamp, 1))

createConsoleSink("state10", "append", oldData).start

In [None]:
import org.apache.spark.sql.functions._

val newData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

val oldData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", lit("OLD DATA"))
    .withColumn("timestamp", date_sub('timestamp, 1))

val uData = newData
    .union(oldData)
    .withWatermark("timestamp", "10 minutes")
    .groupBy(window($"timestamp", "10 minutes"), 'ident)
    .count

In [None]:
createConsoleSink("state5", "update", uData).start

In [None]:
killAll

Создадим стрим с использованием `window` и `watermark` в `append` режиме. В `append` режиме в синк будут записаны только завершенные окна с данными в момент `window_right_bound` + `watermark_value`

In [None]:
import org.apache.spark.sql.functions._

val newData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

val uData = newData
    .withWatermark("timestamp", "1 minutes")
    .groupBy(window($"timestamp", "1 minutes", "1 minutes"))
    .count

createConsoleSink("state6", "append", uData).start

### Выводы
- Spark позволяет строить агрегаты на SDF в разных режимах
- Вотермарки поддерживаются при использовании `append` и `update` режимов
- Плавающее окно имеет два параметра - размер окна и сдвиг текущего окна относительно следующего

In [None]:
killAll

## Соединения
Spark позволяет делать:
- stream - static join
  + inner
  + left outer
  + left anti
- stream - stream join
  + inner
  + left outer
  + right outer
  
### Stream-Static join
Может использоваться для:
- обогащения стрима фактами (left outer)
- фильтрации по blacklist (left anti)
- фильтрации по whitelist (inner)

#### Left outer join

In [None]:
val identStream = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

In [None]:
val right = airportsDf()

val result = identStream.join(right, Seq("ident"), "left").select('ident, 'name, 'elevation_ft, 'iso_country)

result.printSchema

result.explain(true)

createConsoleSink("state7", result).start

In [None]:
killAll

#### Inner join

In [None]:
val right = Vector("00FA", "00IG", "00FD").toDF.withColumnRenamed("value", "ident")
val result = identStream.join(right, Seq("ident"), "inner")

createConsoleSink("state8", result).start

In [None]:
killAll

#### Left anti join

In [None]:
val right = Vector("00FA", "00IG", "00FD").toDF.withColumnRenamed("value", "ident")
val result = identStream.join(right, Seq("ident"), "left_anti")

createConsoleSink("state9", result).start

In [None]:
killAll

### Stream-Stream join
Для соединения двух стримов нам необходимо добавить к условию соединения равенство двух окон или сравнение двух временных меток

In [None]:
import sys.process._
"rm -rf /tmp/chk".!!

#### Использование window

In [None]:
import org.apache.spark.sql.functions._

val left = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("left", lit("left"))
    .withWatermark("timestamp", "2 hours")
    .withColumn("window", window('timestamp, "1 minute")).as("left")

val right = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("right", lit("right"))
    .withWatermark("timestamp", "3 hours")
    .withColumn("window", window('timestamp, "1 minute")).as("right")

val joinExpr = expr("""left.value = right.value and left.window = right.window""")

val joined = left.join(right, joinExpr, "inner").select($"left.window", $"left.value", $"left", $"right")

// joined.explain(true)

In [None]:
createConsoleSink("state10", joined).start

In [None]:
killAll

#### Использование timestamp

In [None]:
import sys.process._
"rm -rf /tmp/chk".!!

In [None]:
import org.apache.spark.sql.functions._

val left = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("left", lit("left"))
    .withWatermark("timestamp", "2 hours").as("left")

val right = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("right", lit("right"))
    .withWatermark("timestamp", "3 hours").as("right")

val joinExpr = expr("""left.value = right.value and left.timestamp <= right.timestamp + INTERVAL 1 hour """)

val joined = left.join(right, joinExpr, "inner").select($"left.value", $"left.timestamp", $"left", $"right")

joined.explain(true)

In [None]:
createConsoleSink("state11", joined).start

In [None]:
killAll

### Выводы:
- Spark позволяет соединять SDF со статическим DF, используя разные виды соединений: left outer, inner, left anti
- Допускается использование соединений двух стримов, для этого требуется использовать вотермарки и (опционально) плавающие окна

In [None]:
spark.stop()