<img align="right" width="200" height="200" src="https://static.tildacdn.com/tild6236-6337-4339-b337-313363643735/new_logo.png">

# Spark Structured Streaming II
**Андрей Титов**  
tenke.iu8@gmail.com  

## На этом занятии
+ Что такое stateful streaming
+ Удаление дубликатов
+ Агрегаты
+ Соединения

## Что такое stateful streaming

**stateful streaming** - это вид поточной обработки данных, при которой при обработке батча с данными используются данные из предыдущих батчей

Все операции с использованием select, filter, withColumn (кроме операций с плавающими окнами) являются stateless. На практике это означает:
- стрим не выполняет операций, требующих работы с данными из разных батчей
- после обработки батча стрим "забывает" про него
- высокую пропускную способность
- небольшое количество файлов и общий объем чекпоинта
- возможность вносить существенные правки в код стрима без пересоздания чекпоинта

Если при обработке стрима используются такие методы, как `join()`, `groupBy()`, `dropDuplicates()` или функции над плавающими окнами, то:
- в стриме должна быть колонка с временной меткой, на основе которой можно определить `watermark`
- стрим будет работать медленней, чем вы ожидаете
- в чекпоинте будет МНОГО файлов
- при внесении изменений в код стрима с большой вероятностью придется пересоздавать чекпоинт

Подготовим функции для управления стримами:

In [1]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createConsoleSink(chkName: String, df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/chk/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSink: (chkName: String, df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [2]:
import org.apache.spark.sql.SparkSession

def killAll() = {
    SparkSession
        .active
        .streams
        .active
        .foreach { x =>
                    val desc = x.lastProgress.sources.head.description
                    x.stop
                    println(s"Stopped ${desc}")
        }               
}

killAll: ()Unit


In [3]:
import org.apache.spark.sql.functions._

def airportsDf() = {
    val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
    spark.read.options(csvOptions).csv("/tmp/datasets/airport-codes.csv")
}

def randomIdent() = {
    
    val idents = airportsDf().select('ident).limit(20).distinct.as[String].collect

    val columnArray = idents.map( x => lit(x) )
    val sparkArray = array(columnArray:_*)
    val shuffledArray = shuffle(sparkArray)

    shuffledArray(0)
}

airportsDf: ()org.apache.spark.sql.DataFrame
randomIdent: ()org.apache.spark.sql.Column


## Удаление дубликатов

Spark позволяет удалять дубликаты данных в стриме. Это можно сделать двумя способами:
- без использования `watermark`
- с использованием `watermark`

### Без использования watermark
- Хеш каждого элемента будет сохраняться в чекпоинте
- В стриме полностью исключаются дубликаты
- Со временем начнется деградация стрима

In [4]:
import sys.process._
"rm -rf /tmp/chk".!!

res0: String = ""


In [5]:
val sdfWithDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

sdfWithDuplicates = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [6]:
createConsoleSink("state1", sdfWithDuplicates).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@20840780

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2022-08-01 19:28:15.892|0    |00II |
|2022-08-01 19:28:16.892|1    |00AS |
|2022-08-01 19:28:17.892|2    |00CN |
|2022-08-01 19:28:18.892|3    |00GE |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2022-08-01 19:28:19.892|4    |00GA |
|2022-08-01 19:28:25.892|10   |00CO |
|2022-08-01 19:28:20.892|5    |00FL |
|2022-08-01 19:28:26.892|11   |00A  |
|2022-08-01 19:28:21.892|6    |00AR |
|20

In [8]:
val sdfWithoutDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .dropDuplicates(Seq("ident"))

sdfWithoutDuplicates = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [11]:
sdfWithoutDuplicates.explain()

== Physical Plan ==
StreamingDeduplicate [ident#148], state info [ checkpoint = <unknown>, runId = 2973b064-aa0a-4ddc-9470-d2842cb28e01, opId = 0, ver = 0, numPartitions = 200], 0
+- Exchange hashpartitioning(ident#148, 200)
   +- *(1) Project [timestamp#106, value#107L, shuffle([00A,00AA,00AK,00AL,00AR,00AS,00AZ,00CA,00CL,00CN,00CO,00FA,00FD,00FL,00GA,00GE,00HI,00ID,00IG,00II], Some(4760656567083611271))[0] AS ident#148]
      +- StreamingRelation rate, [timestamp#106, value#107L]


In [9]:
createConsoleSink("state2", sdfWithoutDuplicates).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@11f9abb6

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2022-08-01 19:31:52.249|1    |00FA |
|2022-08-01 19:31:58.249|7    |00AK |
|2022-08-01 19:31:57.249|6    |00FD |
|2022-08-01 19:31:53.249|2    |00AZ |
|2022-08-01 19:31:56.249|5    |00AR |
|2022-08-01 19:31:55.249|4    |00AL |
|2022-08-01 19:31:51.249|0    |00CN |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2022-08-01 19:32:03.249|12   |00FL |
|2022-08-01 19:32:06.249|15   |00ID |
|20

In [None]:
Watermark=6

Batch0, MOT=None
3
4
1
9

Batch1, MOT=9
20 # 9 - 6 > 20? N 
!!!!! 1  # 9 - 6 > 1? 
4  # 9 - 6 > 4 ? N
5  # 9 - 6 > 5 ? N
8  # 9 - 6 > 8 ? N

Batch2, MOT=20

In [10]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### С использованием watermark
- Хеш старых событий удаляется из чекпоинта
- Появление дубликатов возможно, если они приходят c задержкой N > watermark
- Стрим не деградирует со временем
- Колонка, по которой делается `watermark`, должна быть включена в `dropDuplicates`

In [12]:
val sdfWithoutDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withWatermark("timestamp", "10 minutes")
    .dropDuplicates(Seq("ident", "timestamp"))

createConsoleSink("state3", sdfWithoutDuplicates).start

sdfWithoutDuplicates = [timestamp: timestamp, value: bigint ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@13ad3be1

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2022-08-01 19:56:24.672|0    |00FL |
|2022-08-01 19:56:26.672|2    |00A  |
|2022-08-01 19:56:27.672|3    |00AS |
|2022-08-01 19:56:28.672|4    |00HI |
|2022-08-01 19:56:25.672|1    |00FD |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2022-08-01 19:56:37.672|13   |00IG |
|2022-08-01 19:56:32.672|8    |00GE |
|2022-08-01 19:56:31.672|7    |00GE |
|2022-08-01 19:56:35.672|11   |00IG |
|20

Поскольку в нашем стриме колонка `timestamp` имеет уникальное значение для каждой строки, то фактически дедупликация здесь работать не будет. Поэтому, дедупликация применима только для следующих случаев:
- вы хотите избавиться от полностью идентичных событий, у которых ключ И метка времени одинаковые
- вы делаете округление колонки с меткой времени

Рассмотрим второй вариант подробнее

In [15]:
import org.apache.spark.sql.Column

def round(thisCol: Column, value: Int) = { 
    ((thisCol.cast("long") / value).cast("long") * value).cast("timestamp")
}

round: (thisCol: org.apache.spark.sql.Column, value: Int)org.apache.spark.sql.Column


In [None]:
watermark = 10 minutes
this_event.timestamp = 00:15
MOT (batch n -1) = 00:17

mot - watermark ? this_event.timestamp
                <= OK
                > NOT OK

In [19]:
val sdfWithoutDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("timestamp", round(col("timestamp"), 60))
    .withWatermark("timestamp", "10 minutes")
    .dropDuplicates(Seq("ident", "timestamp"))

val sq = createConsoleSink("state34", sdfWithoutDuplicates).start

sdfWithoutDuplicates = [timestamp: timestamp, value: bigint ... 1 more field]
sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3bb940


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3bb940

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2022-08-01 20:06:00|6    |00HI |
|2022-08-01 20:06:00|1    |00GA |
|2022-08-01 20:06:00|4    |00ID |
|2022-08-01 20:06:00|0    |00A  |
|2022-08-01 20:06:00|2    |00AA |
|2022-08-01 20:06:00|5    |00CA |
|2022-08-01 20:06:00|3    |00CN |
+-------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2022-08-01 20:06:00|14   |00FL |
|2022-08-01 20:06:00|7    |00AK |
|2022-08-01 20:06:00|9    |00AZ |
|2022-08-01 20:06:00|13   |00AS |

-------------------------------------------
Batch: 24
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2022-08-01 20:10:00|229  |00ID |
|2022-08-01 20:10:00|228  |00FA |
|2022-08-01 20:10:00|230  |00CL |
|2022-08-01 20:10:00|234  |00A  |
|2022-08-01 20:10:00|233  |00II |
|2022-08-01 20:10:00|235  |00AR |
|2022-08-01 20:10:00|236  |00IG |
+-------------------+-----+-----+

-------------------------------------------
Batch: 25
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2022-08-01 20:10:00|238  |00CO |
|2022-08-01 20:10:00|244  |00CN |
|2022-08-01 20:10:00|237  |00GE |
|2022-08-01 20:10:00|242  |00AA |
|2022-08-01 20:10:00|246  |00FL |
+-------------------+-----+-----+

-------------------------------------------
Batch: 26
-------------------------------------------
+-------------------+---

-------------------------------------------
Batch: 49
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2022-08-01 20:14:00|477  |00CA |
|2022-08-01 20:14:00|482  |00AL |
|2022-08-01 20:14:00|478  |00GA |
|2022-08-01 20:14:00|485  |00FD |
|2022-08-01 20:14:00|480  |00AA |
|2022-08-01 20:14:00|484  |00AZ |
|2022-08-01 20:14:00|486  |00IG |
|2022-08-01 20:14:00|483  |00CN |
+-------------------+-----+-----+

-------------------------------------------
Batch: 50
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2022-08-01 20:14:00|492  |00AR |
|2022-08-01 20:14:00|488  |00HI |
|2022-08-01 20:14:00|487  |00II |
|2022-08-01 20:14:00|489  |00AK |
+-------------------+-----+-----+

-------------------------------------------
Batch: 51
-------------------------------------------
+---------+-----+-----+


In [22]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [21]:
println(sq.lastProgress)

{
  "id" : "64f5cd0c-3f68-41d8-9235-67eb31607295",
  "runId" : "58fa09cb-db51-438a-89e1-2aebabf11469",
  "name" : null,
  "timestamp" : "2022-08-01T17:07:20.004Z",
  "batchId" : 7,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 1.000100010001,
  "processedRowsPerSecond" : 3.749531308586427,
  "durationMs" : {
    "addBatch" : 2561,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 21,
    "setOffsetRange" : 0,
    "triggerExecution" : 2667,
    "walCommit" : 40
  },
  "eventTime" : {
    "avg" : "2022-08-01T17:07:00.000Z",
    "max" : "2022-08-01T17:07:00.000Z",
    "min" : "2022-08-01T17:07:00.000Z",
    "watermark" : "2022-08-01T16:57:00.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 31,
    "numRowsUpdated" : 5,
    "memoryUsedBytes" : 89887,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 1400,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 26607
    }
  } ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsP

### Выводы:
- Spark позволяет удалять дубликаты из стрима
- Для стабильной работы требуется использовать `watermark`

## Агрегаты
При построении агрегатов на стриме важно задать правильный `outputMode`, который может иметь три значения:
- `append`
- `update`
- `complete`

In [23]:
import sys.process._
"rm -rf /tmp/chk".!!

res37: String = ""


In [None]:
killAll

### Complete mode

In [7]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createConsoleSink(chkName: String, mode: String, df: DataFrame) = {
    df
    .writeStream
    .outputMode(mode)
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/chk/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSink: (chkName: String, mode: String, df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [25]:
import org.apache.spark.sql.functions._
val grouped = 
    sdfWithDuplicates.groupBy('ident).count

grouped = [ident: string, count: bigint]


[ident: string, count: bigint]

In [26]:
createConsoleSink("state3", "complete", grouped).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@47a9ba80

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00ID |1    |
|00AZ |2    |
|00AS |1    |
|00CN |2    |
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00FA |1    |
|00AK |2    |
|00ID |1    |
|00AA |1    |
|00AZ |3    |
|00A  |1    |
|00AS |1    |
|00CA |2    |
|00AL |1    |
|00GE |1    |
|00CN |2    |
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00CL |1    |
|00FA |2    |
|00AK |2    |
|00ID |2    |
|00AA |1    |
|00FD |1    |
|00AZ |4    |
|00HI |2    |
|00A  |1    |
|00AS |1    |
|00CA |3    |
|00AR |1    |
|00AL |1    |
|00GE |

In [27]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### Update mode

In [28]:
createConsoleSink("state4", "update", grouped).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7de0556a

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00FA |2    |
|00ID |1    |
|00AA |1    |
|00CO |1    |
|00IG |1    |
|00AR |2    |
|00AL |1    |
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00ID |2    |
|00AA |2    |
|00FD |1    |
|00AZ |1    |
|00AR |3    |
|00AL |2    |
|00GE |4    |
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00CL |1    |
|00II |2    |
|00AK |3    |
|00ID |3    |
|00AZ |3    |
|00GE |5    |
+-----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----+

In [29]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


###  Агрегаты на плавающих окнах
Плавающее окно позволяет сгруппировать события в окна определенного размера (по времени). При этом, поскольку каждое событие может находится одновременно в нескольких окнах, то общий размер агрегата существенно увеличивается

Окно задается при создании агрегата с помощью функции `window` внутри `groupBy`. В параметрах указывается длина окна и расстояние между двумя точкой начала текущего и следующего окна.

<img align="center" width="1000" height="1000" src="https://spark.apache.org/docs/latest/img/structured-streaming-window.png">

In [30]:
import sys.process._
"rm -rf /tmp/chk".!!

res49: String = ""


Создадим стрим с использованием `window` и `watermark` в `update` режиме. В данном случае watermark позволяет игнорировать события, которые приходят с запозданием (`latency` > `max_event_timestamp` + `watermark_value`)

In [31]:
val oldData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", lit("OLD DATA"))
    .withColumn("timestamp", date_sub('timestamp, 1).cast("timestamp"))

createConsoleSink("state10", "append", oldData).start

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+



oldData = [timestamp: timestamp, value: bigint ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6c8d85b2

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-----+--------+
|timestamp          |value|ident   |
+-------------------+-----+--------+
|2022-07-31 00:00:00|0    |OLD DATA|
|2022-07-31 00:00:00|6    |OLD DATA|
|2022-07-31 00:00:00|1    |OLD DATA|
|2022-07-31 00:00:00|7    |OLD DATA|
|2022-07-31 00:00:00|2    |OLD DATA|
|2022-07-31 00:00:00|8    |OLD DATA|
|2022-07-31 00:00:00|3    |OLD DATA|
|2022-07-31 00:00:00|9    |OLD DATA|
|2022-07-31 00:00:00|4    |OLD DATA|
|2022-07-31 00:00:00|5    |OLD DATA|
+-------------------+-----+--------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-----+--------+
|timestamp          |value|ident   |
+-------------------+-----+--------+
|2022-07-31 00:00:00|10   |OLD DATA|
|2022-07-31 00:00:00|16   |OLD DATA|
|2022-07-31 00:00:00|11   |OLD DATA|
|2022-07-31 00:00:00|17   |OLD DATA|
|2022-07-31 00:00:00|12   |O

In [33]:
import org.apache.spark.sql.functions._

val newData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

val oldData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", lit("OLD DATA"))
    .withColumn("timestamp", date_sub('timestamp, 1))

val uData = newData
    .union(oldData)
    .withWatermark("timestamp", "10 minutes")
    .groupBy(window($"timestamp", "10 minutes"), 'ident)
    .count

newData = [timestamp: timestamp, value: bigint ... 1 more field]
oldData = [timestamp: date, value: bigint ... 1 more field]
uData = [window: struct<start: timestamp, end: timestamp>, ident: string ... 1 more field]


[window: struct<start: timestamp, end: timestamp>, ident: string ... 1 more field]

In [36]:
val sq = createConsoleSink("state5", "update", uData).start

sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1fedb314


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1fedb314

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |ident|count|
+------------------------------------------+-----+-----+
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00AR |11   |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00FA |11   |
|[2022-08-01 20:40:00, 2022-08-01 20:50:00]|00IG |4    |
|[2022-08-01 20:40:00, 2022-08-01 20:50:00]|00FA |4    |
|[2022-08-01 20:40:00, 2022-08-01 20:50:00]|00AA |9    |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00IG |8    |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00AK |12   |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00GE |11   |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00CN |9    |
|[2022-08-01 20:40:00, 2022-08-01 20:50:00]|00ID |4    |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00GA |6    |
|[2022-08-01 20:30:00, 2022-08-01 20:40:00]|00A  |15   |
|[2022-08-01 20:40:00, 2022-08-01 20:50:00]|00II

In [39]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [38]:
println(sq.lastProgress)

{
  "id" : "c68668f9-73c2-40da-8b86-8f3f2bfd10fc",
  "runId" : "e3f467d8-2cf0-46b3-b658-0e661f153f35",
  "name" : null,
  "timestamp" : "2022-08-01T17:41:50.004Z",
  "batchId" : 8,
  "numInputRows" : 13,
  "inputRowsPerSecond" : 2.068087814190264,
  "processedRowsPerSecond" : 3.197245450073783,
  "durationMs" : {
    "addBatch" : 3926,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 50,
    "setOffsetRange" : 0,
    "triggerExecution" : 4066,
    "walCommit" : 45
  },
  "eventTime" : {
    "avg" : "2022-07-31T21:04:01.689Z",
    "max" : "2022-08-01T17:41:48.994Z",
    "min" : "2022-07-30T21:00:00.000Z",
    "watermark" : "2022-08-01T17:31:41.994Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 40,
    "numRowsUpdated" : 6,
    "memoryUsedBytes" : 93495,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 1000,
      "loadedMapCacheMissCount" : 200,
      "stateOnCurrentVersionSizeBytes" : 29311
    }
  } ],
  "sources" : [ {
    "description" : "RateStreamV2[

Создадим стрим с использованием `window` и `watermark` в `append` режиме. В `append` режиме в синк будут записаны только завершенные окна с данными в момент `window_right_bound` + `watermark_value`

In [40]:
import org.apache.spark.sql.functions._

val newData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

val uData = newData
    .withWatermark("timestamp", "1 minutes")
    .groupBy(window($"timestamp", "1 minutes", "1 minutes"))
    .count

createConsoleSink("state6", "append", uData).start

newData = [timestamp: timestamp, value: bigint ... 1 more field]


uData: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, count: bigint]
res74: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@47240aba


[timestamp: timestamp, value: bigint ... 1 more field]

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 6

-------------------------------------------
Batch: 45
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 46
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 47
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 48
-------------------------------------------
+------+-----+
|window|count|
+------+-----+
+------+-----+

-------------------------------------------
Batch: 49
-------------------------------------------
+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|[2022-08-01 20:50:00, 2022-08-01 20:51:00]|60   |
+------------------------------------------+-----+

----------

### Выводы
- Spark позволяет строить агрегаты на SDF в разных режимах
- Вотермарки поддерживаются при использовании `append` и `update` режимов
- Плавающее окно имеет два параметра - размер окна и сдвиг текущего окна относительно следующего

In [41]:
spark.streams.active.foreach { x => x.stop }

Compile Error: <console>:90: error: not found: type $iw
val $iw = new $iw
              ^
<console>:24: error: object DataFrame is not a member of package org.apache.spark.sql
       ^
<console>:26: error: object DataFrame is not a member of package org.apache.spark.sql
       ^
<console>:39: error: object DataFrame is not a member of package org.apache.spark.sql
       ^
<console>:52: error: stable identifier required, but INSTANCE.type found.
val $line133$read: INSTANCE.type = INSTANCE
                                          ^
<console>:57: error: not found: value newData
       newData
       ^
<console>:62: error: not found: type $iw
val $iw = new $iw
              ^
<console>:64: error: not found: type $iw
val $iw = new $iw
              ^
<console>:66: error: not found: type $iw
val $iw = new $iw
              ^
<console>:68: error: not found: type $iw
val $iw = new $iw
              ^
<console>:70: error: not found: type $iw
val $iw = new $iw
              ^
<console>:72: error: not found: type $iw
val $iw = new $iw
              ^
<console>:74: error: not found: type $iw
val $iw = new $iw
              ^
<console>:76: error: not found: type $iw
val $iw = new $iw
              ^
<console>:78: error: not found: type $iw
val $iw = new $iw
              ^
<console>:80: error: not found: type $iw
val $iw = new $iw
              ^
<console>:82: error: not found: type $iw
val $iw = new $iw
              ^
<console>:84: error: not found: type $iw
val $iw = new $iw
              ^
<console>:86: error: not found: type $iw
val $iw = new $iw
              ^
<console>:88: error: not found: type $iw
val $iw = new $iw
              ^
<console>:4: error: object $eval inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $eval.);
 other members with override errors are: equals, toString, ==, !=, ##
object $eval {
       ^
<console>:5: error: class $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class $read.);
 other members with override errors are: equals, toString, ==, !=, ##
class $read extends Serializable {
      ^
<console>:8: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:9: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:11: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:20: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:22: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:28: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:30: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:31: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:33: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:35: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:37: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:40: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:42: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:43: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:45: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:46: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:48: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:49: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:51: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:54: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:103: error: object $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $read.);
 other members with override errors are: equals, toString, ==, !=, ##
object $read {
       ^
<console>:4: error: object $eval inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $eval.);
 other members with override errors are: equals, toString, ==, !=, ##
object $eval {
       ^
<console>:5: error: class $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class $read.);
 other members with override errors are: equals, toString, ==, !=, ##
class $read extends Serializable {
      ^
<console>:8: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:9: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:11: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:20: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:22: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:28: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:30: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:31: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:33: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:35: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:37: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:40: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:42: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:43: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:45: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:46: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:48: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:49: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:51: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:54: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:103: error: object $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $read.);
 other members with override errors are: equals, toString, ==, !=, ##
object $read {
       ^
<console>:4: error: object $eval inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $eval.);
 other members with override errors are: equals, toString, ==, !=, ##
object $eval {
       ^
<console>:5: error: class $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class $read.);
 other members with override errors are: equals, toString, ==, !=, ##
class $read extends Serializable {
      ^
<console>:8: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:9: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:11: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:22: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:24: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:30: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:32: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:33: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:35: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:37: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:39: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:42: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:44: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:45: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:47: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:48: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:50: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:51: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:53: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:54: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:103: error: object $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $read.);
 other members with override errors are: equals, toString, ==, !=, ##
object $read {
       ^


In [49]:
killAll

Compile Error: <console>:5: error: class $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class $read.);
 other members with override errors are: equals, toString, ==, !=, ##
class $read extends Serializable {
      ^
<console>:8: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:9: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:11: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:20: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:22: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:30: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:32: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:33: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:35: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:37: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:39: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:42: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:44: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:45: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:47: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:48: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:50: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:51: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:53: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:54: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:103: error: object $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $read.);
 other members with override errors are: equals, toString, ==, !=, ##
object $read {
       ^


In [50]:
spark

Compile Error: <console>:5: error: class $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class $read.);
 other members with override errors are: equals, toString, ==, !=, ##
class $read extends Serializable {
      ^
<console>:8: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:9: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:11: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:22: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:24: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:30: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:32: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:33: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:35: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:37: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:39: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:42: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:44: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:45: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:47: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:48: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:50: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:51: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:53: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:54: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:103: error: object $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $read.);
 other members with override errors are: equals, toString, ==, !=, ##
object $read {
       ^


## Соединения
Spark позволяет делать:
- stream - static join
  + inner
  + left outer
  + left anti
- stream - stream join
  + inner
  + left outer
  + right outer
  
### Stream-Static join
Может использоваться для:
- обогащения стрима фактами (left outer)
- фильтрации по blacklist (left anti)
- фильтрации по whitelist (inner)

#### Left outer join

In [48]:
spark.range(10)

Compile Error: <console>:5: error: class $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class $read.);
 other members with override errors are: equals, toString, ==, !=, ##
class $read extends Serializable {
      ^
<console>:8: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:9: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:11: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:22: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:24: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:30: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:32: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:33: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:35: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:37: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:39: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:42: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:44: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:45: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:47: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:48: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:50: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:51: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:53: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:54: error: class $iw inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in class );
 other members with override errors are: equals, toString, ==, !=, ##
class $iw extends Serializable {
      ^
<console>:103: error: object $read inherits conflicting members:
  method hashCode in class Any of type ()Int  and
  method hashCode in class Object of type ()Int
(Note: this can be resolved by declaring an override in object $read.);
 other members with override errors are: equals, toString, ==, !=, ##
object $read {
       ^


In [6]:
val identStream = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

identStream = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [8]:
val right = airportsDf()

val result = identStream.join(right, Seq("ident"), "left").select('ident, 'name, 'elevation_ft, 'iso_country)

result.printSchema

result.explain(true)

createConsoleSink("state7", "append", result).start

root
 |-- ident: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- iso_country: string (nullable = true)

== Parsed Logical Plan ==
'Project [unresolvedalias('ident, None), unresolvedalias('name, None), unresolvedalias('elevation_ft, None), unresolvedalias('iso_country, None)]
+- Project [ident#88, timestamp#46, value#47L, type#103, name#104, elevation_ft#105, continent#106, iso_country#107, iso_region#108, municipality#109, gps_code#110, iata_code#111, local_code#112, coordinates#113]
   +- Join LeftOuter, (ident#88 = ident#102)
      :- Project [timestamp#46, value#47L, shuffle(array(00A, 00AA, 00AK, 00AL, 00AR, 00AS, 00AZ, 00CA, 00CL, 00CN, 00CO, 00FA, 00FD, 00FL, 00GA, 00GE, 00HI, 00ID, 00IG, 00II), Some(-3935085281230065005))[0] AS ident#88]
      :  +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1aa8bdce, rate, [timestamp#46, value#47L]
      +- Relation[ident#102,type#103,nam

right = [ident: string, type: string ... 10 more fields]
result = [ident: string, name: string ... 2 more fields]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@2c5f1d73

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----+------------+-----------+
|ident|name|elevation_ft|iso_country|
+-----+----+------------+-----------+
+-----+----+------------+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----------------------+------------+-----------+
|ident|name                   |elevation_ft|iso_country|
+-----+-----------------------+------------+-----------+
|00AK |Lowell Field           |450         |US         |
|00AL |Epps Airpark           |820         |US         |
|00CA |Goldstone /Gts/ Airport|3038        |US         |
|00AA |Aero B Ranch Airport   |3435        |US         |
|00FL |River Oak Airport      |35          |US         |
|00AZ |Cordes Airport         |3810        |US         |
|00FD |Ringhaver Heliport     |25          |US         |
|00CL |Williams Ag Airport    |87          |US         |
+-----+-------------------

In [None]:
killAll

#### Inner join

In [10]:
val right = Vector("00FA", "00IG", "00FD").toDF.withColumnRenamed("value", "ident")
val result = identStream.join(right, Seq("ident"), "inner")

createConsoleSink("state8", "append", result).start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+---------+-----+
|ident|timestamp|value|
+-----+---------+-----+
+-----+---------+-----+



right = [ident: string]
result = [ident: string, timestamp: timestamp ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6fc0021c

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+---------+-----+
|ident|timestamp|value|
+-----+---------+-----+
+-----+---------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----------------------+-----+
|ident|timestamp              |value|
+-----+-----------------------+-----+
|00IG |2022-08-01 21:05:39.055|1    |
|00FD |2022-08-01 21:05:47.055|9    |
+-----+-----------------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----------------------+-----+
|ident|timestamp              |value|
+-----+-----------------------+-----+
|00IG |2022-08-01 21:05:56.055|18   |
|00FA |2022-08-01 21:05:53.055|15   |
+-----+-----------------------+-----+



In [11]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


#### Left anti join

In [12]:
val right = Vector("00FA", "00IG", "00FD").toDF.withColumnRenamed("value", "ident")
val result = identStream.join(right, Seq("ident"), "left_anti")

createConsoleSink("state9", "append", result).start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+---------+-----+
|ident|timestamp|value|
+-----+---------+-----+
+-----+---------+-----+



right = [ident: string]
result = [ident: string, timestamp: timestamp ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5991255d

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+----------------------+-----+
|ident|timestamp             |value|
+-----+----------------------+-----+
|00FL |2022-08-01 21:06:38.76|0    |
+-----+----------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+----------------------+-----+
|ident|timestamp             |value|
+-----+----------------------+-----+
|00AR |2022-08-01 21:06:39.76|1    |
|00GE |2022-08-01 21:06:45.76|7    |
|00GE |2022-08-01 21:06:40.76|2    |
|00AA |2022-08-01 21:06:46.76|8    |
|00AR |2022-08-01 21:06:41.76|3    |
|00AR |2022-08-01 21:06:47.76|9    |
|00CN |2022-08-01 21:06:42.76|4    |
|00AZ |2022-08-01 21:06:48.76|10   |
|00CO |2022-08-01 21:06:44.76|6    |
+-----+----------------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+----------------------+-----+
|ide

In [13]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### Stream-Stream join
Для соединения двух стримов нам необходимо добавить к условию соединения равенство двух окон или сравнение двух временных меток

In [14]:
import sys.process._
"rm -rf /tmp/chk".!!

res29: String = ""


#### Использование window

In [15]:
import org.apache.spark.sql.functions._

val left = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("left", lit("left"))
    .withWatermark("timestamp", "2 hours")
    .withColumn("window", window('timestamp, "1 minute")).as("left")

val right = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("right", lit("right"))
    .withWatermark("timestamp", "3 hours")
    .withColumn("window", window('timestamp, "1 minute")).as("right")

val joinExpr = expr("""left.value = right.value and left.window = right.window""")

val joined = left.join(right, joinExpr, "inner").select($"left.window", $"left.value", $"left", $"right")

joined.explain(true)

== Parsed Logical Plan ==
'Project [unresolvedalias('left.window, None), unresolvedalias('left.value, None), unresolvedalias('left, None), unresolvedalias('right, None)]
+- Join Inner, ((value#590L = value#649L) && (window#641 = window#700))
   :- SubqueryAlias `left`
   :  +- Project [timestamp#589-T7200000ms, value#590L, ident#631, left#635, window#642-T7200000ms AS window#641]
   :     +- Filter isnotnull(timestamp#589-T7200000ms)
   :        +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#589-T7200000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#589-T7200000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#589-T7200000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimesta

left = [timestamp: timestamp, value: bigint ... 3 more fields]
right = [timestamp: timestamp, value: bigint ... 3 more fields]
joinExpr = ((left.value = right.value) AND (left.window = right.window))
joined = [window: struct<start: timestamp, end: timestamp>, value: bigint ... 2 more fields]


[window: struct<start: timestamp, end: timestamp>, value: bigint ... 2 more fields]

In [16]:
createConsoleSink("state10", "append", joined).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1b084ca7

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+----+-----+
|window|value|left|right|
+------+-----+----+-----+
+------+-----+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+----+-----+
|window                                    |value|left|right|
+------------------------------------------+-----+----+-----+
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|7    |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|10   |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|9    |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|2    |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|5    |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|11   |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|3    |left|right|
|[2022-08-01 21:12:00, 2022-08-01 21:13:00]|4    |left|right|
|[2022-08-01 21:12:

In [17]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


#### Использование timestamp

In [None]:
import sys.process._
"rm -rf /tmp/chk".!!

In [18]:
import org.apache.spark.sql.functions._

val left = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("left", lit("left"))
    .withWatermark("timestamp", "2 hours").as("left")

val right = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("right", lit("right"))
    .withWatermark("timestamp", "3 hours").as("right")

val joinExpr = expr("""left.value = right.value and left.timestamp <= right.timestamp + INTERVAL 1 hour """)

val joined = left.join(right, joinExpr, "inner").select($"left.value", $"left.timestamp", $"left", $"right")

joined.explain(true)

== Parsed Logical Plan ==
'Project [unresolvedalias('left.value, None), unresolvedalias('left.timestamp, None), unresolvedalias('left, None), unresolvedalias('right, None)]
+- Join Inner, ((value#4826L = value#4877L) && (timestamp#4825-T7200000ms <= cast(timestamp#4876-T10800000ms + interval 1 hours as timestamp)))
   :- SubqueryAlias `left`
   :  +- EventTimeWatermark timestamp#4825: timestamp, interval 2 hours
   :     +- Project [timestamp#4825, value#4826L, ident#4867, left AS left#4871]
   :        +- Project [timestamp#4825, value#4826L, shuffle(array(00A, 00AA, 00AK, 00AL, 00AR, 00AS, 00AZ, 00CA, 00CL, 00CN, 00CO, 00FA, 00FD, 00FL, 00GA, 00GE, 00HI, 00ID, 00IG, 00II), Some(7260821119789204172))[0] AS ident#4867]
   :           +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@75950039, rate, [timestamp#4825, value#4826L]
   +- SubqueryAlias `right`
      +- EventTimeWatermark timestamp#4876: timestamp, interval 3 hours
         +- Project

left = [timestamp: timestamp, value: bigint ... 2 more fields]
right = [timestamp: timestamp, value: bigint ... 2 more fields]
joinExpr = ((left.value = right.value) AND (left.timestamp <= (right.timestamp + interval 1 hours)))
joined = [value: bigint, timestamp: timestamp ... 2 more fields]


[value: bigint, timestamp: timestamp ... 2 more fields]

In [19]:
createConsoleSink("state11", "append", joined).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5c6b32c6

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+---------+----+-----+
|value|timestamp|left|right|
+-----+---------+----+-----+
+-----+---------+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----------------------+----+-----+
|value|timestamp              |left|right|
+-----+-----------------------+----+-----+
|0    |2022-08-01 21:14:30.694|left|right|
|7    |2022-08-01 21:14:37.694|left|right|
|6    |2022-08-01 21:14:36.694|left|right|
|9    |2022-08-01 21:14:39.694|left|right|
|5    |2022-08-01 21:14:35.694|left|right|
|1    |2022-08-01 21:14:31.694|left|right|
|10   |2022-08-01 21:14:40.694|left|right|
|3    |2022-08-01 21:14:33.694|left|right|
|8    |2022-08-01 21:14:38.694|left|right|
|2    |2022-08-01 21:14:32.694|left|right|
|4    |2022-08-01 21:14:34.694|left|right|
+-----+-----------------------+----+-----+



In [20]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### Выводы:
- Spark позволяет соединять SDF со статическим DF, используя разные виды соединений: left outer, inner, left anti
- Допускается использование соединений двух стримов, для этого требуется использовать вотермарки и (опционально) плавающие окна

In [None]:
spark.stop()

In [None]:
watermark = 3
groupBy(id, window(ts, 10)).count
mode = update

A 3
B 2
C 5

A, [0, 10], 1
B, [0, 10], 1
C, [0, 10], 1


A 7
B 8
C 14

A, [0, 10], 2
B, [0, 10], 2
-- C, [0, 10], 1
C, [10, 20], 1

A 1
B 1
C 17

C, [10, 20], 2

In [None]:
N seconds since epoch
N * 1000 * 1000 * 1000
java.lang.Long

UDF: java.sql.Timestamp


In [None]:
spark.sql.shuffle.partitions