# 1. Load LIB

In [1]:
import sys.process._

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset

import org.apache.log4j._

# 2. Load function & params

In [2]:
def killAll() = {
    SparkSession
        .active
        .streams
        .active
        .foreach { x =>
                    val desc = x.lastProgress.sources.head.description
                    x.stop
                    println(s"Stopped ${desc}")
        }               
}

killAll: ()Unit


In [3]:
def createConsoleSink(df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds")) // раз в 10 секунд, а по умолчанию раз в 1 секунду
    .option("truncate", "false")
    .option("numRows", "20")
}


createConsoleSink: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [4]:
def createParquetSink(df: DataFrame, 
                      fileName: String) = {
    df
    .writeStream
    .format("parquet")
    .option("path", s"/tmp/$fileName")
    .option("checkpointLocation", s"/tmp/$fileName")
    //.trigger(Trigger.ProcessingTime("10 seconds"))
}

createParquetSink: (df: org.apache.spark.sql.DataFrame, fileName: String)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [5]:
val sdf_rate = spark
    .readStream
    .format("rate")
    .load
sdf_rate.printSchema
sdf_rate.explain(true)

Waiting for a Spark session to start...

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@10988909, rate, [timestamp#0, value#1L]

== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint
StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@10988909, rate, [timestamp#0, value#1L]

== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@10988909, rate, [timestamp#0, value#1L]

== Physical Plan ==
StreamingRelation rate, [timestamp#0, value#1L]


sdf_rate = [timestamp: timestamp, value: bigint]


[timestamp: timestamp, value: bigint]

In [6]:
val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("airport-codes.csv")

airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                 

csvOptions = Map(header -> true, inferSchema -> true)
airports = [ident: string, type: string ... 10 more fields]


[ident: string, type: string ... 10 more fields]

In [7]:
// Когда JSON в ячейке в столбце col датафрейма, функцией получаем схему начинки JSON 
def col_to_schema(sdf_tmp: DataFrame, col: String): org.apache.spark.sql.types.StructType = {

    val row_data: String = sdf_tmp.select( col ).collect()(0)(0).toString
    val schema = spark.read.json(
        SparkSession.active.sparkContext.parallelize(List(row_data))
    ).schema
    schema
}

col_to_schema: (sdf_tmp: org.apache.spark.sql.DataFrame, col: String)org.apache.spark.sql.types.StructType


In [8]:
// Периодически нужно чистить Кэш
def tmp_clear(chkName: String) = {
    println(s"hadoop fs -rm -r /tmp/$chkName".!!)
}

tmp_clear: (chkName: String)Unit


# 3. example1: Console Sink

In [9]:
val sink_console = createConsoleSink(sdf_rate)
val sq_console = sink_console.start

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+



sink_console = org.apache.spark.sql.streaming.DataStreamWriter@196f859a
sq_console = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6fbcb059


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6fbcb059

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2023-03-12 20:19:14.037|0    |
|2023-03-12 20:19:16.037|2    |
|2023-03-12 20:19:18.037|4    |
|2023-03-12 20:19:15.037|1    |
|2023-03-12 20:19:17.037|3    |
+-----------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2023-03-12 20:19:19.037|5    |
|2023-03-12 20:19:21.037|7    |
|2023-03-12 20:19:23.037|9    |
|2023-03-12 20:19:25.037|11   |
|2023-03-12 20:19:27.037|13   |
|2023-03-12 20:19:20.037|6    |
|2023-03-12 20:19:22.037|8    |
|2023-03-12 20:19:24.037|10   |
|2023-03-12 20:19:26.037|12   |
|2023-03-12 20:19:28.037|14   |
+-----------------------+-----+



In [10]:
// After 10-20 seconds
killAll()

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


# 4. example2: Parquet Sink1

In [11]:
tmp_clear("tmp_01.parquet")

23/03/12 20:19:40 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/tmp_01.parquet' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/tmp_01.parquet1678641580325


In [12]:
val sink_pq = createParquetSink(sdf_rate, "tmp_01.parquet")
val sq_pq = sink_pq.start

sink_pq = org.apache.spark.sql.streaming.DataStreamWriter@5e197325
sq_pq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6a9865e4


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6a9865e4

In [13]:
// After 10-20 seconds
killAll()

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [14]:
println("hadoop fs -ls /tmp/tmp_01.parquet".!!)

Found 21 items
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:19 /tmp/tmp_01.parquet/_spark_metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:19 /tmp/tmp_01.parquet/commits
-rw-r--r--   3 dinar.sadykov hdfs         45 2023-03-12 20:19 /tmp/tmp_01.parquet/metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:19 /tmp/tmp_01.parquet/offsets
-rw-r--r--   3 dinar.sadykov hdfs        790 2023-03-12 20:19 /tmp/tmp_01.parquet/part-00000-047a4b6e-ce2c-4eb5-867a-63e4cb98fca8-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs        790 2023-03-12 20:19 /tmp/tmp_01.parquet/part-00000-04e01d79-31dc-4595-9116-b2c892aebc6f-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs        790 2023-03-12 20:19 /tmp/tmp_01.parquet/part-00000-0ebdc5b7-ba78-4222-997a-af5170910f53-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs        790 2023-03-12 20:19 /tmp/tmp_01.parquet/part-00000-1b32f0dd-7112-43eb-942c-ebd1d60aaa67-c000.snappy.parquet
-rw-r--r--   3 d

In [15]:
val rates = spark
    .read
    .parquet("/tmp/tmp_01.parquet")
println(rates.count)
rates.printSchema
rates.show(5, false)

15
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2023-03-12 20:19:40.852|0    |
|2023-03-12 20:19:41.852|1    |
|2023-03-12 20:19:42.852|2    |
|2023-03-12 20:19:43.852|3    |
|2023-03-12 20:19:44.852|4    |
+-----------------------+-----+
only showing top 5 rows



rates = [timestamp: timestamp, value: bigint]


[timestamp: timestamp, value: bigint]

# 5. example3: Parquet Sink1 + ident

In [16]:
// Берем 10 случайных кодов Аэропорта
val idents = airports.select("ident").limit(10).distinct.as[String].collect

val ident_sdf_rate = sdf_rate.withColumn("ident"
                              , shuffle( // для каждой строки будет перемешивание вутри массива
                                  array(
                                      idents.map(lit(_)):_*)
                              )(0)) // берем первый элемент массива

idents = Array(SSMK, SSML, SSMM, SSMN, SSMP, SSMQ, SSMR, SSMS, SSMT, SSMU)
ident_sdf_rate = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [17]:
tmp_clear("tmp_02.parquet")

23/03/12 20:20:42 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/tmp_02.parquet' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/tmp_02.parquet1678641642821


In [18]:
val ident_sq_pq_sink = createParquetSink(ident_sdf_rate, "tmp_02.parquet")
val ident_sq_pq = ident_sq_pq_sink.start

ident_sq_pq_sink = org.apache.spark.sql.streaming.DataStreamWriter@2fb0bc5f
ident_sq_pq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@79016317


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@79016317

In [19]:
// After 10-20 seconds
killAll()

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [20]:
println("hadoop fs -ls /tmp/tmp_02.parquet/*".!!)

Found 11 items
-rw-r--r--   3 dinar.sadykov hdfs        257 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/0
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/1
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/10
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/2
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/3
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/4
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/5
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/6
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_spark_metadata/7
-rw-r--r--   3 dinar.sadykov hdfs        258 2023-03-12 20:20 /tmp/tmp_02.parquet/_s

In [21]:
val ident_pq = spark.read
    .parquet("/tmp/tmp_02.parquet/part-00000-d4f345f6-4a9a-40a2-a29a-853a20dff8e8-c000.snappy.parquet")

println(ident_pq.count)
ident_pq.printSchema
ident_pq.show(5, false)

0
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- ident: string (nullable = true)

+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+



ident_pq = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [22]:
val ident_pq = spark.read
    .parquet("/tmp/tmp_02.parquet/part-00000-d4f345f6-4a9a-40a2-a29a-853a20dff8e8-c000.snappy.parquet")

println(ident_pq.count)
ident_pq.printSchema
ident_pq.show(5, false)

1
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- ident: string (nullable = true)

+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:20:49.349|6    |SSML |
+-----------------------+-----+-----+



ident_pq = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

# 6. Работа с Kafka с помощь Static Dataframe

In [23]:
val ident_pq = spark.read
    .parquet("/tmp/tmp_02.parquet/")

println(ident_pq.count)
ident_pq.printSchema
ident_pq.show(5, false)

10
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- ident: string (nullable = true)

+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:20:43.349|0    |SSMR |
|2023-03-12 20:20:44.349|1    |SSMU |
|2023-03-12 20:20:45.349|2    |SSML |
|2023-03-12 20:20:46.349|3    |SSMT |
|2023-03-12 20:20:47.349|4    |SSMU |
+-----------------------+-----+-----+
only showing top 5 rows



ident_pq = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [24]:
// def writeKafka[T](topic: String, data: Dataset[T]): Unit = {
//     val kafkaParams = Map(
//         "kafka.bootstrap.servers" -> "spark-master-1.newprolab.com:6667"
//     )
    
//     data
//         .toJSON
//         .withColumn("topic", lit(topic))
//         .write
//         .format("kafka")
//         .options(kafkaParams)
//         .save
// }

// writeKafka("test_topic0", ident_pq)

Name: Syntax Error.
Message: 
StackTrace: 

In [25]:
val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "spark-master-1.newprolab.com:6667",
        "subscribe" -> "test_topic0"
    )


val sdf_kafka0 = spark.read
    .format("kafka")
    .options(kafkaParams)
    .load

sdf_kafka0.printSchema
sdf_kafka0.show(3)
sdf_kafka0.count

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   310|2023-03-08 23:48:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   311|2023-03-08 23:48:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   312|2023-03-08 23:48:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+
only showing top 3 rows



kafkaParams = Map(kafka.bootstrap.servers -> spark-master-1.newprolab.com:6667, subscribe -> test_topic0)
sdf_kafka0 = [key: binary, value: binary ... 5 more fields]


81

Чтение из Kafka имеет несколько особенностей:
- по умолчанию читается все содержимое топика. Поскольку обычно в нем много данных, эта операция может создать большую нагрузку на кластер Kafka и Spark приложение
- колонки `value` и `key` имеют тип `binary`, который необходимо десереализовать

Чтобы прочитать только определенную часть топика, нам необходимо задать минимальный и максимальный оффсет для чтения с помощью параметров `startingOffsets` , `endingOffsets`. Возьмем два случайных события:

In [26]:
// На основании этих событий подготовим параметры startingOffsets и endingOffsets

sdf_kafka0
    .sample(0.1)
    .limit(10)
    .select('topic, 'partition, 'offset)
    .show

+-----------+---------+------+
|      topic|partition|offset|
+-----------+---------+------+
|test_topic0|        0|   313|
|test_topic0|        0|   316|
|test_topic0|        0|   319|
|test_topic0|        0|   325|
|test_topic0|        0|   331|
|test_topic0|        0|   336|
|test_topic0|        0|   358|
|test_topic0|        0|   362|
|test_topic0|        0|   377|
|test_topic0|        0|   383|
+-----------+---------+------+



In [27]:
val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "spark-master-1.newprolab.com:6667",
        "subscribe" -> "test_topic0",
        "startingOffsets" -> """ { "test_topic0": { "0": 313 } } """,
        "endingOffsets" -> """ { "test_topic0": { "0": 383 } }  """//,
        //"failOnDataLoss" -> "false"
    )


val sdf_kafka1 = spark
    .read
    .format("kafka")
    .options(kafkaParams)
    .load

sdf_kafka1.printSchema
sdf_kafka1.show(20)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   313|2023-03-08 23:48:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   314|2023-03-08 23:48:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   315|2023-03-08 23:48:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   316|2023-03-08 23:48:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|   317| 2023-03-08 23:48

kafkaParams = Map(kafka.bootstrap.servers -> spark-master-1.newprolab.com:6667, subscribe -> test_topic0, startingOffsets -> " { "test_topic0": { "0": 313 } } ", endingOffsets -> " { "test_topic0": { "0": 383 } }  ")
sdf_kafka1 = [key: binary, value: binary ... 5 more fields]


[key: binary, value: binary ... 5 more fields]

По умолчанию параметр `startingOffsets` имеет значение `earliest`, а `endingOffsets` - `latest`. Поэтому, когда мы не указывали эти параметры, Spark прочитал содержимое всего топика

Чтобы получить наши данные, которые мы записали в топик, нам необходимо их десереализовать. В нашем случае достаточно использовать `.cast("string")`, однако это работает не всегда, т.к. формат данных может быть произвольным.

In [28]:
val sdf_kafka1_json = sdf_kafka1
    .select('value.cast("string"))
    .as[String]

sdf_kafka1_json.show(3, false)

val sdf_kafka1_json_parsed = spark.read
    .json(sdf_kafka1_json)

sdf_kafka1_json_parsed.printSchema
sdf_kafka1_json_parsed.show(3, false)

+-----------------------------------------------------------------------+
|value                                                                  |
+-----------------------------------------------------------------------+
|{"timestamp":"2022-10-31T19:53:44.336+03:00","value":6,"ident":"00AS"} |
|{"timestamp":"2022-10-31T19:53:46.336+03:00","value":8,"ident":"01ID"} |
|{"timestamp":"2022-10-31T19:53:48.336+03:00","value":10,"ident":"00II"}|
+-----------------------------------------------------------------------+
only showing top 3 rows

root
 |-- ident: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- value: long (nullable = true)

+-----+-----------------------------+-----+
|ident|timestamp                    |value|
+-----+-----------------------------+-----+
|00AS |2022-10-31T19:53:44.336+03:00|6    |
|01ID |2022-10-31T19:53:46.336+03:00|8    |
|00II |2022-10-31T19:53:48.336+03:00|10   |
+-----+-----------------------------+-----+
only showing top 3 rows



sdf_kafka1_json = [value: string]
sdf_kafka1_json_parsed = [ident: string, timestamp: string ... 1 more field]


[ident: string, timestamp: string ... 1 more field]

# 7. Работа с Kafka с помощью Streaming DF

При создании SDF из Kafka необходимо помнить, что:
- `startingOffsets` по умолчанию имеет значение `latest`
- `endingOffsets` использовать нельзя
- количество сообщений за батч можно (и нужно) ограничить параметром `maxOffsetPerTrigger` (по умолчанию он не задан и первый батч будет содержать данные всего топика

In [29]:
val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "spark-master-1.newprolab.com:6667",
        "subscribe" -> "test_topic0",
        "startingOffsets" -> """earliest""",
        "maxOffsetsPerTrigger" -> "2"
    )

val sdf_kafka2 = spark
    .readStream // <- TOBE, AS IS = .read
    .format("kafka")
    .options(kafkaParams)
    .load

val sdf_kafka2_parsed = sdf_kafka2
    .select('value.cast("string")
            , 'topic
            , 'partition
            , 'offset)

val sdf_kafka2_sink = createConsoleSink(sdf_kafka2_parsed)

val sdf_kafka2_sq = sdf_kafka2_sink.start

kafkaParams = Map(kafka.bootstrap.servers -> spark-master-1.newprolab.com:6667, subscribe -> test_topic0, startingOffsets -> earliest, maxOffsetsPerTrigger -> 2)
sdf_kafka2 = [key: binary, value: binary ... 5 more fields]
sdf_kafka2_parsed = [value: string, topic: string ... 2 more fields]
sdf_kafka2_sink = org.apache.spark.sql.streaming.DataStreamWriter@15a76d13
sdf_kafka2_sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1edef287


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1edef287

-------------------------------------------
Batch: 0
-------------------------------------------
+----------------------------------------------------------------------+-----------+---------+------+
|value                                                                 |topic      |partition|offset|
+----------------------------------------------------------------------+-----------+---------+------+
|{"timestamp":"2022-10-31T19:53:38.336+03:00","value":0,"ident":"00KS"}|test_topic0|0        |310   |
|{"timestamp":"2022-10-31T19:53:40.336+03:00","value":2,"ident":"00MO"}|test_topic0|0        |311   |
+----------------------------------------------------------------------+-----------+---------+------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------------------------------------------------------------------+-----------+---------+------+
|value                                                                 |topic      |partiti

Если мы перезапустим этот стрим, он повторно прочитает все данные. Чтобы обеспечить сохранение состояния стрима после обработки каждого батча, нам необходимо добавить параметр `checkpointLocation` в опции `writeStream`:

In [30]:
def createConsoleSinkWithCheckpoint(chkName: String, df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSinkWithCheckpoint: (chkName: String, df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [31]:
tmp_clear("tmp_03")

23/03/12 20:22:32 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/tmp_03' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/tmp_031678641752333


In [32]:
val sdf_kafka2_sink_2 = createConsoleSinkWithCheckpoint("tmp_03", sdf_kafka2_parsed)
val sdf_kafka2_sq_2 = sdf_kafka2_sink_2.start

sdf_kafka2_sink_2 = org.apache.spark.sql.streaming.DataStreamWriter@5a8eba8d
sdf_kafka2_sq_2 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@12bf6387


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@12bf6387

-------------------------------------------
Batch: 0
-------------------------------------------
+----------------------------------------------------------------------+-----------+---------+------+
|value                                                                 |topic      |partition|offset|
+----------------------------------------------------------------------+-----------+---------+------+
|{"timestamp":"2022-10-31T19:53:38.336+03:00","value":0,"ident":"00KS"}|test_topic0|0        |310   |
|{"timestamp":"2022-10-31T19:53:40.336+03:00","value":2,"ident":"00MO"}|test_topic0|0        |311   |
+----------------------------------------------------------------------+-----------+---------+------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------------------------------------------------------------------+-----------+---------+------+
|value                                                                 |topic      |partiti

In [33]:
//After 10-20 second
killAll

Stopped KafkaV2[Subscribe[test_topic0]]
Stopped KafkaV2[Subscribe[test_topic0]]


In [34]:
println("hadoop fs -ls /tmp/tmp_03/".!!)

Found 4 items
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:22 /tmp/tmp_03/commits
-rw-r--r--   3 dinar.sadykov hdfs         45 2023-03-12 20:22 /tmp/tmp_03/metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:22 /tmp/tmp_03/offsets
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:22 /tmp/tmp_03/sources



In [35]:
println("hadoop fs -ls /tmp/tmp_03/offsets/".!!)

Found 2 items
-rw-r--r--   3 dinar.sadykov hdfs        432 2023-03-12 20:22 /tmp/tmp_03/offsets/0
-rw-r--r--   3 dinar.sadykov hdfs        432 2023-03-12 20:22 /tmp/tmp_03/offsets/1



In [36]:
println("hadoop fs -head /tmp/tmp_03/offsets/1/".!!)

v1
{"batchWatermarkMs":0,"batchTimestampMs":1678641760002,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"test_topic0":{"0":314}}



# 8. Выводы:

- Работать с Kafka можно как с использованием Static DF, так и с помощью Streaming DF
- Чтобы стрим запоминал свое состояние после остановки, необходимо использовать checkpoint - директорию на HDFS (или локальной ФС), в которую будет сохранятся состояние стрима после каждого батча
- Apache Kafka - распределенная система, обеспечивающая передачу потока данных в слабосвязанных системах

# 9. laba04b

In [37]:
val topic1 = "dinar.sadykov"

topic1 = dinar.sadykov


dinar.sadykov

In [38]:
val sdf_buy1 = spark
    .read
    .json("visits/buy")
    .filter("uid is not NULL")

val sdf_view1 = spark
    .read
    .json("visits/view")
    .filter("uid is not NULL")

val sdf_visit1 = sdf_buy1.union(sdf_view1)
    .repartitionByRange(200, 'uid)
    .cache()

sdf_visit1.printSchema
sdf_visit1.show(numRows = 10, truncate = 10)

root
 |-- category: string (nullable = true)
 |-- date: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: long (nullable = true)
 |-- uid: string (nullable = true)
 |-- p_date: integer (nullable = true)

+----------+--------+----------+----------+----------+----------+--------+
|  category|    date|event_type|   item_id|item_price|       uid|  p_date|
+----------+--------+----------+----------+----------+----------+--------+
|Kitchen...|20200426|       buy|Kitchen...|      1245|036ab2e...|20200426|
|Enterta...|20200426|       buy|Enterta...|       252|032df29...|20200426|
|Enterta...|20200206|       buy|Enterta...|      1815|0342da4...|20200206|
|Kitchen...|20200206|       buy|Kitchen...|      3691|0355d72...|20200206|
|Enterta...|20200206|       buy|Enterta...|      3432|038168f...|20200206|
|Enterta...|20200306|       buy|Enterta...|      2211|039331b...|20200306|
|Mobile-...|20200220|       buy|Mobile-...|     

sdf_buy1 = [category: string, date: string ... 5 more fields]
sdf_view1 = [category: string, date: string ... 5 more fields]
sdf_visit1 = [category: string, date: string ... 5 more fields]


[category: string, date: string ... 5 more fields]

In [39]:
////////////////////////////////////////////////////////
////////////////////////////////////////////////////////
val buy_list = sdf_visit1
    .filter(" event_type == 'buy' ")
    .select('uid, 'item_id, 'item_price)
    .limit(200)
    .distinct
    .toJSON
    .as[String].collect

////////////////////////////////////////////////////////
val view_list = sdf_visit1
    .filter(" event_type == 'view' ")
    .select('uid, 'item_id, 'item_price)
    .limit(200)
    .distinct
    .toJSON
    .as[String].collect

////////////////////////////////////////////////////////
////////////////////////////////////////////////////////

// timestamp -> timestamp
// value -> offset
// ident -> value
// topic -> topic
// randint -> partition

val buy_rate = sdf_rate
        .withColumn("ident"
                  , shuffle( // для каждой строки будет перемешивание вутри массива
                      array(
                          buy_list.map(lit(_)):_*)
                  )(0)) // берем первый элемент массива
        .withColumn("topic", lit(topic1))
        .withColumnRenamed("value","offset")
        .withColumn("partition", (round( org.apache.spark.sql.functions.rand()*(10)+5,0)).cast("int") )
        .withColumnRenamed("ident","value")

buy_rate.printSchema

////////////////////////////////////////////////////////
val view_rate = sdf_rate
        .withColumn("ident"
                  , shuffle( // для каждой строки будет перемешивание вутри массива
                      array(
                          view_list.map(lit(_)):_*)
                  )(0)) // берем первый элемент массива
        .withColumn("topic", lit(topic1))
        .withColumnRenamed("value","offset")
        .withColumn("partition", (round( org.apache.spark.sql.functions.rand()*(10)+5,0)).cast("int") )
        .withColumnRenamed("ident","value")

view_rate.printSchema

root
 |-- timestamp: timestamp (nullable = true)
 |-- offset: long (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = false)
 |-- partition: integer (nullable = true)

root
 |-- timestamp: timestamp (nullable = true)
 |-- offset: long (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = false)
 |-- partition: integer (nullable = true)



buy_list = Array({"uid":"03c5ffaa-8252-41ac-b3bc-ad01e153c9e9","item_id":"Kitchen-appliances-8","item_price":4944}, {"uid":"0512ba78-fa67-4de6-a73a-ef82080c565d","item_id":"Cameras-3","item_price":4292}, {"uid":"0502209d-9832-4645-9d83-ab34500ba12c","item_id":"Kitchen-appliances-14","item_price":3253}, {"uid":"0544a761-1ade-4438-9bf2-b889293dbd05","item_id":"Household-furniture-8","item_price":3114}, {"uid":"050707ba-9503-4f11-bdcf-77c92e582aaa","item_id":"Cameras-17","item_price":1286}, {"uid":"053e2c8d-3968-4dcb-a102-8ea3f8946a03","item_id":"Computers-3","item_price":1298}, {"uid":"03fb8ac3-2b5c-4dae-8b8b-5ee929e5c5a3","item_id":"Shoes-8","item_price":4012}, {"uid":"03b22e4e-358d-41c9-9a52-8985addedfb1","item_id":"Clothing-17","item_price":3880}, {"uid":"03f26d66-ce54-4...


Array({"uid":"03c5ffaa-8252-41ac-b3bc-ad01e153c9e9","item_id":"Kitchen-appliances-8","item_price":4944}, {"uid":"0512ba78-fa67-4de6-a73a-ef82080c565d","item_id":"Cameras-3","item_price":4292}, {"uid":"0502209d-9832-4645-9d83-ab34500ba12c","item_id":"Kitchen-appliances-14","item_price":3253}, {"uid":"0544a761-1ade-4438-9bf2-b889293dbd05","item_id":"Household-furniture-8","item_price":3114}, {"uid":"050707ba-9503-4f11-bdcf-77c92e582aaa","item_id":"Cameras-17","item_price":1286}, {"uid":"053e2c8d-3968-4dcb-a102-8ea3f8946a03","item_id":"Computers-3","item_price":1298}, {"uid":"03fb8ac3-2b5c-4dae-8b8b-5ee929e5c5a3","item_id":"Shoes-8","item_price":4012}, {"uid":"03b22e4e-358d-41c9-9a52-8985addedfb1","item_id":"Clothing-17","item_price":3880}, {"uid":"03f26d66-ce54-4...

In [None]:
// val sink_tmp = createConsoleSink(view_rate)
// val sq_tmp = sink_tmp.start

//After 10-20 second
// killAll()

In [40]:
tmp_clear("tmp_04_b.parquet")

23/03/12 20:23:27 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/tmp_04_b.parquet' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/tmp_04_b.parquet1678641807358


In [41]:
tmp_clear("tmp_04_v.parquet")

23/03/12 20:23:28 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/tmp_04_v.parquet' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/tmp_04_v.parquet1678641808983


In [42]:
val buy_sq_pq_sink = createParquetSink(buy_rate, "tmp_04_b.parquet")
val buy_sq_pq = buy_sq_pq_sink.start

buy_sq_pq_sink = org.apache.spark.sql.streaming.DataStreamWriter@2993d355
buy_sq_pq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6bcd8e18


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6bcd8e18

In [43]:
val view_sq_pq_sink = createParquetSink(view_rate, "tmp_04_v.parquet")
val view_sq_pq = view_sq_pq_sink.start

view_sq_pq_sink = org.apache.spark.sql.streaming.DataStreamWriter@51d0b7e
view_sq_pq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@732cf391


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@732cf391

In [44]:
// After 10-20 second
killAll()

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default
Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [45]:
println("hadoop fs -ls /tmp/tmp_04_b.parquet".!!)

Found 20 items
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:23 /tmp/tmp_04_b.parquet/_spark_metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:23 /tmp/tmp_04_b.parquet/commits
-rw-r--r--   3 dinar.sadykov hdfs         45 2023-03-12 20:23 /tmp/tmp_04_b.parquet/metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:23 /tmp/tmp_04_b.parquet/offsets
-rw-r--r--   3 dinar.sadykov hdfs       2263 2023-03-12 20:23 /tmp/tmp_04_b.parquet/part-00000-043ab707-e180-4115-b52f-3ce58b35d773-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs       2362 2023-03-12 20:23 /tmp/tmp_04_b.parquet/part-00000-0e0cd228-847b-4d23-a0e5-7ef969b1f527-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs       2344 2023-03-12 20:23 /tmp/tmp_04_b.parquet/part-00000-293e97ea-882d-4827-aaab-3a949408a5a1-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs       2272 2023-03-12 20:23 /tmp/tmp_04_b.parquet/part-00000-3029f663-4664-4445-bc00-d59daed3074b-c000.snappy.parquet


In [46]:
val sdf_tmp = spark.read
    .parquet("/tmp/tmp_04_b.parquet/part-00000-5447b29a-c654-48aa-9fb4-3e4d58da05e5-c000.snappy.parquet")
sdf_tmp.count()
sdf_tmp.printSchema
sdf_tmp.show(3)

root
 |-- timestamp: timestamp (nullable = true)
 |-- offset: long (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)

+--------------------+------+--------------------+-------------+---------+
|           timestamp|offset|               value|        topic|partition|
+--------------------+------+--------------------+-------------+---------+
|2023-03-12 20:23:...|     2|{"uid":"0507c191-...|dinar.sadykov|       13|
+--------------------+------+--------------------+-------------+---------+



sdf_tmp = [timestamp: timestamp, offset: bigint ... 3 more fields]


[timestamp: timestamp, offset: bigint ... 3 more fields]

In [47]:
println("hadoop fs -ls /tmp/tmp_04_v.parquet".!!)

Found 19 items
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:23 /tmp/tmp_04_v.parquet/_spark_metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:23 /tmp/tmp_04_v.parquet/commits
-rw-r--r--   3 dinar.sadykov hdfs         45 2023-03-12 20:23 /tmp/tmp_04_v.parquet/metadata
drwxr-xr-x   - dinar.sadykov hdfs          0 2023-03-12 20:23 /tmp/tmp_04_v.parquet/offsets
-rw-r--r--   3 dinar.sadykov hdfs       2254 2023-03-12 20:23 /tmp/tmp_04_v.parquet/part-00000-026319e8-f764-4a5c-ab7e-e14ac90565d0-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs       2245 2023-03-12 20:23 /tmp/tmp_04_v.parquet/part-00000-22410b63-a586-42ff-b92f-2a9e12d5ff13-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs       2290 2023-03-12 20:23 /tmp/tmp_04_v.parquet/part-00000-352041ff-c2a2-4848-9fbf-08d5864a58d9-c000.snappy.parquet
-rw-r--r--   3 dinar.sadykov hdfs       2396 2023-03-12 20:23 /tmp/tmp_04_v.parquet/part-00000-52fd0127-b6b6-4089-85eb-b88724a91bcc-c000.snappy.parquet


In [48]:
val sdf_tmp = spark.read
    .parquet("/tmp/tmp_04_v.parquet/part-00000-5e4307ef-37ef-4019-9a5a-a6ac44d11c5b-c000.snappy.parquet")
sdf_tmp.count()
sdf_tmp.printSchema
sdf_tmp.show(3)

root
 |-- timestamp: timestamp (nullable = true)
 |-- offset: long (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)

+--------------------+------+--------------------+-------------+---------+
|           timestamp|offset|               value|        topic|partition|
+--------------------+------+--------------------+-------------+---------+
|2023-03-12 20:23:...|     1|{"uid":"0355d721-...|dinar.sadykov|       11|
+--------------------+------+--------------------+-------------+---------+



sdf_tmp = [timestamp: timestamp, offset: bigint ... 3 more fields]


[timestamp: timestamp, offset: bigint ... 3 more fields]

In [49]:
val schema = col_to_schema(sdf_tmp, "value")

// Зная схему - получаем JSON -> Columns
val sdf_tmp_parse = sdf_tmp
    .withColumn("root", from_json('value, schema) )
    .select( 'timestamp, 'offset, 'topic, 'partition, col("root.*") )

sdf_tmp_parse.printSchema
sdf_tmp_parse.show(3)

root
 |-- timestamp: timestamp (nullable = true)
 |-- offset: long (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: long (nullable = true)
 |-- uid: string (nullable = true)

+--------------------+------+-------------+---------+-------+----------+--------------------+
|           timestamp|offset|        topic|partition|item_id|item_price|                 uid|
+--------------------+------+-------------+---------+-------+----------+--------------------+
|2023-03-12 20:23:...|     1|dinar.sadykov|       11|Shoes-0|       418|0355d721-ad22-473...|
+--------------------+------+-------------+---------+-------+----------+--------------------+



schema = StructType(StructField(item_id,StringType,true), StructField(item_price,LongType,true), StructField(uid,StringType,true))
sdf_tmp_parse = [timestamp: timestamp, offset: bigint ... 5 more fields]


[timestamp: timestamp, offset: bigint ... 5 more fields]

# 10. Что такое stateful streaming


**stateful streaming** - это вид поточной обработки данных, при которой при обработке батча с данными используются данные из предыдущих батчей

Все операции с использованием select, filter, withColumn (кроме операций с плавающими окнами) являются stateless. На практике это означает:
- стрим не выполняет операций, требующих работы с данными из разных батчей
- после обработки батча стрим "забывает" про него
- высокую пропускную способность
- небольшое количество файлов и общий объем чекпоинта
- возможность вносить существенные правки в код стрима без пересоздания чекпоинта

Если при обработке стрима используются такие методы, как `join()`, `groupBy()`, `dropDuplicates()` или функции над плавающими окнами, то:
- в стриме должна быть колонка с временной меткой, на основе которой можно определить `watermark`
- стрим будет работать медленней, чем вы ожидаете
- в чекпоинте будет МНОГО файлов
- при внесении изменений в код стрима с большой вероятностью придется пересоздавать чекпоинт

Подготовим функции для управления стримами:

In [50]:
def randomIdent() = {
    
    val idents = airports.select('ident).limit(5).distinct.as[String].collect
    val columnArray = idents.map( x => lit(x) )
    val sparkArray = array(columnArray:_*)
    val shuffledArray = shuffle(sparkArray)

    shuffledArray(0)
}

randomIdent: ()org.apache.spark.sql.Column


In [52]:
def createConsoleSink2(chkName: String, df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSink2: (chkName: String, df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


# 11. Удаление дубликатов

Spark позволяет удалять дубликаты данных в стриме. Это можно сделать двумя способами:
- без использования `watermark`
- с использованием `watermark`

### Без использования watermark
- Хеш каждого элемента будет сохраняться в чекпоинте
- В стриме полностью исключаются дубликаты
- Со временем начнется деградация стрима

In [53]:
val sdfWithDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

sdfWithDuplicates.explain()

== Physical Plan ==
*(1) Project [timestamp#1391, value#1392L, shuffle([00A,00AA,00AK,00AL,00AR], Some(-5644576063220082580))[0] AS ident#1399]
+- StreamingRelation rate, [timestamp#1391, value#1392L]


sdfWithDuplicates = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [54]:
val sdfWithoutDuplicates = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .dropDuplicates(Seq("ident"))

sdfWithoutDuplicates.explain()

== Physical Plan ==
StreamingDeduplicate [ident#1413], state info [ checkpoint = <unknown>, runId = f8a5e5df-00ed-4376-aad0-7f7339556cf3, opId = 0, ver = 0, numPartitions = 200], 0
+- Exchange hashpartitioning(ident#1413, 200)
   +- *(1) Project [timestamp#1405, value#1406L, shuffle([SSMK,SSML,SSMM,SSMN,SSMP], Some(4977013574783073570))[0] AS ident#1413]
      +- StreamingRelation rate, [timestamp#1405, value#1406L]


sdfWithoutDuplicates = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [55]:
tmp_clear("state1_sdr")

23/03/12 20:24:40 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state1_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state1_sdr1678641880960


In [56]:
tmp_clear("state2_sdr")

23/03/12 20:24:42 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state2_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state2_sdr1678641882587


In [57]:
createConsoleSinkWithCheckpoint("state1_sdr", sdfWithDuplicates).start

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+



org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7704181d

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:24:46.911|0    |00AL |
|2023-03-12 20:24:48.911|2    |00AL |
|2023-03-12 20:24:47.911|1    |00AK |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:24:49.911|3    |00AR |
|2023-03-12 20:24:51.911|5    |00AA |
|2023-03-12 20:24:53.911|7    |00AL |
|2023-03-12 20:24:55.911|9    |00AR |
|2023-03-12 20:24:57.911|11   |00AR |
|2023-03-12 20:24:50.911|4    |00AK |
|2023-03-12 20:24:52.911|6    |00AK |
|2023-03-12 20:24:54.911|8    |00AK |
|2023-03-12 20:24:56.911|10   |00AK |
|2023-03-12 20:24:58.911|12   |00AK |
+-----------------------+-----+-----+

------

In [58]:
createConsoleSinkWithCheckpoint("state2_sdr", sdfWithoutDuplicates).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@2824d477

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:24:48.456|1    |SSMN |
|2023-03-12 20:24:47.456|0    |SSMK |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:24:56.456|9    |SSMP |
|2023-03-12 20:24:53.456|6    |SSMM |
|2023-03-12 20:24:51.456|4    |SSML |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+---------+-----+--

In [59]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default
Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### С использованием watermark
- Хеш старых событий удаляется из чекпоинта
- Появление дубликатов возможно, если они приходят c задержкой N > watermark
- Стрим не деградирует со временем
- Колонка, по которой делается `watermark`, должна быть включена в `dropDuplicates`

In [60]:
val sdfWithoutDuplicates_w = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withWatermark("timestamp", "10 minutes")
    .dropDuplicates(Seq("ident", "timestamp"))

sdfWithoutDuplicates_w = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [61]:
tmp_clear("state3_sdr")

23/03/12 20:25:29 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state3_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state3_sdr1678641929380


In [62]:
createConsoleSinkWithCheckpoint("state3_sdr", sdfWithoutDuplicates_w).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@202a0e18

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:25:30.886|1    |00A  |
|2023-03-12 20:25:31.886|2    |00AR |
|2023-03-12 20:25:29.886|0    |00AR |
+-----------------------+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-----+
|timestamp              |value|ident|
+-----------------------+-----+-----+
|2023-03-12 20:25:32.886|3    |00AK |
|2023-03-12 20:25:34.886|5    |00AR |
|2023-03-12 20:25:33.886|4    |00AA |
|2023-03-12 20:25:35.886|6    |00A  |
|2023-03-12 20:25:38.886|9    |00AL |
|2023-03-12 20:25:37.886|8    |00A  |
|20

In [63]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


Поскольку в нашем стриме колонка `timestamp` имеет уникальное значение для каждой строки, то фактически дедупликация здесь работать не будет. Поэтому, дедупликация применима только для следующих случаев:
- вы хотите избавиться от полностью идентичных событий, у которых ключ И метка времени одинаковые
- вы делаете округление колонки с меткой времени

Рассмотрим второй вариант подробнее

In [64]:
import org.apache.spark.sql.Column

def round(thisCol: Column, value: Int) = { 
    ((thisCol.cast("long") / value).cast("long") * value).cast("timestamp")
}

round: (thisCol: org.apache.spark.sql.Column, value: Int)org.apache.spark.sql.Column


In [65]:
val sdfWithoutDuplicates2 = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("timestamp", round(col("timestamp"), 60*10))
    .withWatermark("timestamp", "10 minutes")
    .dropDuplicates(Seq("ident", "timestamp"))

sdfWithoutDuplicates2 = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [66]:
val sq_state4 = createConsoleSinkWithCheckpoint("state4_sdr", sdfWithoutDuplicates2).start

sq_state4 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@15462415


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@15462415

-------------------------------------------
Batch: 14
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2023-03-12 19:40:00|2241 |SSMM |
|2023-03-12 19:40:00|2240 |SSML |
|2023-03-12 19:40:00|2238 |SSMK |
|2023-03-12 19:40:00|2245 |SSMN |
|2023-03-12 19:40:00|2242 |SSMP |
+-------------------+-----+-----+

-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-----+-----+
|timestamp          |value|ident|
+-------------------+-----+-----+
|2023-03-12 20:00:00|3381 |SSMP |
|2023-03-12 19:50:00|2791 |SSMK |
|2023-03-12 20:10:00|3979 |SSMP |
|2023-03-12 19:50:00|2779 |SSMM |
|2023-03-12 20:20:00|4579 |SSML |
|2023-03-12 20:20:00|4587 |SSMP |
|2023-03-12 19:50:00|2783 |SSMN |
|2023-03-12 20:10:00|3981 |SSMN |
|2023-03-12 20:20:00|4584 |SSMM |
|2023-03-12 20:00:00|3412 |SSMM |
|2023-03-12 20:10:00|3978 |SSMM |
|2023-03-12 19:50:00|

In [67]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [68]:
println(sq_state4.lastProgress)

{
  "id" : "a836a40d-1e52-4aa4-ac0d-dd799cdefbb2",
  "runId" : "ea6dce69-ed3d-49fa-9c10-bba4fc98e295",
  "name" : null,
  "timestamp" : "2023-03-12T17:26:10.000Z",
  "batchId" : 15,
  "numInputRows" : 2700,
  "inputRowsPerSecond" : 320.3987184051264,
  "processedRowsPerSecond" : 752.2986904430203,
  "durationMs" : {
    "addBatch" : 3491,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 13,
    "setOffsetRange" : 0,
    "triggerExecution" : 3589,
    "walCommit" : 30
  },
  "eventTime" : {
    "avg" : "2023-03-12T16:58:47.999Z",
    "max" : "2023-03-12T17:20:00.000Z",
    "min" : "2023-03-12T16:40:00.000Z",
    "watermark" : "2023-03-12T16:30:00.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 30,
    "numRowsUpdated" : 20,
    "memoryUsedBytes" : 89079,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 200,
      "loadedMapCacheMissCount" : 200,
      "stateOnCurrentVersionSizeBytes" : 26359
    }
  } ],
  "sources" : [ {
    "description" : "RateStream

### Выводы:
- Spark позволяет удалять дубликаты из стрима
- Для стабильной работы требуется использовать `watermark`

# 12. Агрегаты

При построении агрегатов на стриме важно задать правильный `outputMode`, который может иметь три значения:
- `append`
- `update`
- `complete`

### Complete mode

In [69]:
def createConsoleSinkMode(chkName: String, mode: String, df: DataFrame) = {
    df
    .writeStream
    .outputMode(mode)
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"/tmp/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSinkMode: (chkName: String, mode: String, df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [70]:
val grouped = sdfWithDuplicates.groupBy('ident).count

grouped.explain

== Physical Plan ==
*(4) HashAggregate(keys=[ident#1399], functions=[count(1)])
+- StateStoreSave [ident#1399], state info [ checkpoint = <unknown>, runId = ed8dd955-367b-4333-8d59-44e4cbede365, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
   +- *(3) HashAggregate(keys=[ident#1399], functions=[merge_count(1)])
      +- StateStoreRestore [ident#1399], state info [ checkpoint = <unknown>, runId = ed8dd955-367b-4333-8d59-44e4cbede365, opId = 0, ver = 0, numPartitions = 200], 2
         +- *(2) HashAggregate(keys=[ident#1399], functions=[merge_count(1)])
            +- Exchange hashpartitioning(ident#1399, 200)
               +- *(1) HashAggregate(keys=[ident#1399], functions=[partial_count(1)])
                  +- *(1) Project [shuffle([00A,00AA,00AK,00AL,00AR], Some(2189780357686438522))[0] AS ident#1399]
                     +- StreamingRelation rate, [timestamp#1391, value#1392L]


grouped = [ident: string, count: bigint]


[ident: string, count: bigint]

In [71]:
tmp_clear("state4_mode_sdr")

23/03/12 20:26:24 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state4_mode_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state4_mode_sdr1678641984457


In [72]:
createConsoleSinkMode("state4_mode_sdr", "complete", grouped).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@d3fabae

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00AK |1    |
|00A  |2    |
|00AL |2    |
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00AK |3    |
|00AA |3    |
|00A  |4    |
|00AR |3    |
|00AL |2    |
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00AK |3    |
|00AA |4    |
|00A  |7    |
|00AR |6    |
|00AL |5    |
+-----+-----+



In [73]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### Update mode

In [74]:
tmp_clear("state4_mode_sdr2")

23/03/12 20:27:00 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state4_mode_sdr2' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state4_mode_sdr21678642020884


In [75]:
createConsoleSinkMode("state4_mode_sdr2", "update", grouped).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7efdc717

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00AA |2    |
|00A  |2    |
|00AR |2    |
|00AL |2    |
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|ident|count|
+-----+-----+
|00AK |3    |
|00AA |6    |
|00A  |3    |
|00AR |3    |
|00AL |3    |
+-----+-----+



In [76]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


error:
  unrecoverable error
     while compiling: <console>
        during phase: globalPhase=erasure, enteringPhase=posterasure
     library version: version 2.11.12
    compiler version: version 2.11.12
  last tree to typer: TypeTree(class $iw)
       tree position: line 7 of <console>
            tree tpe: $iw
              symbol: class $iw
   symbol definition: class $iw extends Serializable (a ClassSymbol)
      symbol package: $line203
       symbol owners: class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $iw -> class $read
           call site: package <root> in <none>
<Cannot read source file>
lastException: Throwable = null


###  Агрегаты на плавающих окнах
Плавающее окно позволяет сгруппировать события в окна определенного размера (по времени). При этом, поскольку каждое событие может находится одновременно в нескольких окнах, то общий размер агрегата существенно увеличивается

Окно задается при создании агрегата с помощью функции `window` внутри `groupBy`. В параметрах указывается длина окна и расстояние между двумя точкой начала текущего и следующего окна.

<img align="center" width="1000" height="1000" src="https://spark.apache.org/docs/latest/img/structured-streaming-window.png">

Создадим стрим с использованием `window` и `watermark` в `update` режиме. В данном случае watermark позволяет игнорировать события, которые приходят с запозданием (`latency` > `max_event_timestamp` + `watermark_value`)

In [77]:
tmp_clear("state10_sdr")

23/03/12 20:27:46 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state10_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state10_sdr1678642066150


In [78]:
val oldData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", lit("OLD DATA"))
    .withColumn("timestamp", date_sub('timestamp, 1).cast("timestamp"))

createConsoleSinkMode("state10_sdr", "append", oldData).start

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-----+
|timestamp|value|ident|
+---------+-----+-----+
+---------+-----+-----+



oldData = [timestamp: timestamp, value: bigint ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3c8241b9

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-----+--------+
|timestamp          |value|ident   |
+-------------------+-----+--------+
|2023-03-11 00:00:00|0    |OLD DATA|
|2023-03-11 00:00:00|2    |OLD DATA|
|2023-03-11 00:00:00|1    |OLD DATA|
+-------------------+-----+--------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-----+--------+
|timestamp          |value|ident   |
+-------------------+-----+--------+
|2023-03-11 00:00:00|3    |OLD DATA|
|2023-03-11 00:00:00|5    |OLD DATA|
|2023-03-11 00:00:00|7    |OLD DATA|
|2023-03-11 00:00:00|9    |OLD DATA|
|2023-03-11 00:00:00|11   |OLD DATA|
|2023-03-11 00:00:00|4    |OLD DATA|
|2023-03-11 00:00:00|6    |OLD DATA|
|2023-03-11 00:00:00|8    |OLD DATA|
|2023-03-11 00:00:00|10   |OLD DATA|
|2023-03-11 00:00:00|12   |OLD DATA|
+-------------------+-----+--------+



In [79]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [80]:
tmp_clear("state5_sdr")

23/03/12 20:28:03 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state5_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state5_sdr


In [81]:
val newData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

val oldData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", lit("OLD DATA"))
    .withColumn("timestamp", date_sub('timestamp, 1))

val uData = newData
    .union(oldData)
    .withWatermark("timestamp", "1 minutes")
    .groupBy(window($"timestamp", "1 minutes"), 'ident)
    .count

newData = [timestamp: timestamp, value: bigint ... 1 more field]
oldData = [timestamp: date, value: bigint ... 1 more field]
uData = [window: struct<start: timestamp, end: timestamp>, ident: string ... 1 more field]


[window: struct<start: timestamp, end: timestamp>, ident: string ... 1 more field]

In [82]:
val sq_tmp = createConsoleSinkMode("state5_sdr", "complete", uData).start

sq_tmp = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@44ff7a42


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@44ff7a42

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+-----+
|window|ident|count|
+------+-----+-----+
+------+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+--------+-----+
|window                                    |ident   |count|
+------------------------------------------+--------+-----+
|[2023-03-11 00:00:00, 2023-03-11 00:01:00]|OLD DATA|5    |
|[2023-03-12 20:28:00, 2023-03-12 20:29:00]|SSMP    |1    |
|[2023-03-12 20:28:00, 2023-03-12 20:29:00]|SSMM    |2    |
|[2023-03-12 20:28:00, 2023-03-12 20:29:00]|SSML    |2    |
+------------------------------------------+--------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+--------+-----+
|window                                    |ident   |count|
+----------------------

In [83]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [84]:
println(sq_tmp.lastProgress)

{
  "id" : "5dbc6731-5772-4312-89a9-38b21250b68c",
  "runId" : "bbddb334-6388-494f-8e3a-26e0a2b6d706",
  "name" : null,
  "timestamp" : "2023-03-12T17:28:20.000Z",
  "batchId" : 2,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 2.0,
  "processedRowsPerSecond" : 5.083884087442806,
  "durationMs" : {
    "addBatch" : 3846,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 26,
    "setOffsetRange" : 0,
    "triggerExecution" : 3934,
    "walCommit" : 39
  },
  "eventTime" : {
    "avg" : "2023-03-11T19:14:06.940Z",
    "max" : "2023-03-12T17:28:18.380Z",
    "min" : "2023-03-10T21:00:00.000Z",
    "watermark" : "2023-03-12T17:27:08.380Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 5,
    "numRowsUpdated" : 5,
    "memoryUsedBytes" : 81295,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 800,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 18479
    }
  } ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=1, r

Создадим стрим с использованием `window` и `watermark` в `append` режиме. В `append` режиме в синк будут записаны только завершенные окна с данными в момент `window_right_bound` + `watermark_value`

In [85]:
tmp_clear("state6_sdr")

rm: `/tmp/state6_sdr': No such file or directory


Name: java.lang.RuntimeException
Message: Nonzero exit value: 1
StackTrace:   at scala.sys.package$.error(package.scala:27)
  at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:132)
  at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:102)
  at tmp_clear(<console>:42)

In [86]:
val newData = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

val uData = newData
    .withWatermark("timestamp", "20 seconds")
    .groupBy(window($"timestamp", "20 seconds", "10 seconds"))
    .count

val uData2 = newData
    .withWatermark("timestamp", "30 seconds")
    .groupBy(window($"timestamp", "30 seconds", "10 seconds"))
    .count

val uDataAll = uData.union( uData2 )

createConsoleSinkMode("state6_sdr", "append", uDataAll).start

lastException = null


Name: org.apache.spark.sql.AnalysisException
Message: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Union
:- Aggregate [window#2034-T20000ms], [window#2034-T20000ms AS window#2028-T20000ms, count(1) AS count#2033L]
:  +- Filter ((timestamp#2016-T20000ms >= window#2034-T20000ms.start) && (timestamp#2016-T20000ms < window#2034-T20000ms.end))
:     +- Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#2016-T20000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#2016-T20000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#2016-T20000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#2016-T20000ms, TimestampType, LongTyp

### Выводы
- Spark позволяет строить агрегаты на SDF в разных режимах
- Вотермарки поддерживаются при использовании `append` и `update` режимов
- Плавающее окно имеет два параметра - размер окна и сдвиг текущего окна относительно следующего

In [87]:
spark.streams.active.foreach { x => x.stop }

lastException: Throwable = null


In [88]:
killAll

In [89]:
spark

org.apache.spark.sql.SparkSession@2251dfe0

# 13. Соединения

### Spark позволяет делать:
- stream - static join
  + inner
  + left outer
  + left anti
- stream - stream join
  + inner
  + left outer
  + right outer

### Stream-Static join

Может использоваться для:
- обогащения стрима фактами (left outer)
- фильтрации по blacklist (left anti)
- фильтрации по whitelist (inner)

#### Left outer join

In [90]:
val ident_rate = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())

ident_rate = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [91]:
val right = airports

val result = ident_rate
    .join(right
         , Seq("ident")
         , "left")
    .select( 'ident
            , 'name
            , 'elevation_ft
            , 'iso_country)

result.printSchema
result.explain(true)

createConsoleSinkMode("state7_sdr", "append", result).start

root
 |-- ident: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- iso_country: string (nullable = true)

== Parsed Logical Plan ==
'Project [unresolvedalias('ident, None), unresolvedalias('name, None), unresolvedalias('elevation_ft, None), unresolvedalias('iso_country, None)]
+- Project [ident#2058, timestamp#2050, value#2051L, type#16, name#17, elevation_ft#18, continent#19, iso_country#20, iso_region#21, municipality#22, gps_code#23, iata_code#24, local_code#25, coordinates#26]
   +- Join LeftOuter, (ident#2058 = ident#15)
      :- Project [timestamp#2050, value#2051L, shuffle(array(SSMK, SSML, SSMM, SSMN, SSMP), Some(-5444629639441506601))[0] AS ident#2058]
      :  +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@47992e23, rate, [timestamp#2050, value#2051L]
      +- Relation[ident#15,type#16,name#17,elevation_ft#18,continent#19,iso_country#20,iso_region#21,municipality#22,gps_co

right = [ident: string, type: string ... 10 more fields]
result = [ident: string, name: string ... 2 more fields]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4d8e32ae

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+----------------------------+------------+-----------+
|ident|name                        |elevation_ft|iso_country|
+-----+----------------------------+------------+-----------+
|SSMN |Kururuzinho Airport         |1640        |BR         |
|SSMK |El Dorado Airport           |20          |BR         |
|SSMK |El Dorado Airport           |20          |BR         |
|SSMK |El Dorado Airport           |20          |BR         |
|SSMN |Kururuzinho Airport         |1640        |BR         |
|SSMP |Fazenda TrÃªs Minas Airport |1391        |BR         |
|SSMM |MACAÃ Heliport             |1247        |BR         |
|SSMP |Fazenda TrÃªs Minas Airport |1391        |BR         |
|SSML |Fazenda Maria LuÃ­za Airport|1083        |BR         |
|SSMP |Fazenda TrÃªs Minas Airport |1391        |BR         |
|SSMP |Fazenda TrÃªs Minas Airport |1391        |BR         |
|SSMM |MACAÃ Heliport             

In [92]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


#### Inner join

In [93]:
tmp_clear("state8_sdr")

23/03/12 20:28:59 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state8_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state8_sdr1678642139725


In [96]:
val right = Vector("SSMN", "SSMP", "00A").toDF.withColumnRenamed("value", "ident")
right.show

val result = ident_rate.join(right
                             , Seq("ident")
                             , "inner")

createConsoleSinkMode("state8_sdr", "append", result).start

+-----+
|ident|
+-----+
| SSMN|
| SSMP|
|  00A|
+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----------------------+-----+
|ident|timestamp              |value|
+-----+-----------------------+-----+
|SSMP |2023-03-12 20:29:23.287|23   |
|SSMN |2023-03-12 20:29:25.287|25   |
|SSMN |2023-03-12 20:29:33.287|33   |
|SSMN |2023-03-12 20:29:37.287|37   |
|SSMN |2023-03-12 20:29:47.287|47   |
|SSMP |2023-03-12 20:29:49.287|49   |
|SSMN |2023-03-12 20:29:51.287|51   |
|SSMN |2023-03-12 20:29:53.287|53   |
|SSMP |2023-03-12 20:30:03.287|63   |
|SSMP |2023-03-12 20:30:07.287|67   |
|SSMP |2023-03-12 20:30:09.287|69   |
|SSMP |2023-03-12 20:30:11.287|71   |
|SSMP |2023-03-12 20:30:15.287|75   |
|SSMN |2023-03-12 20:30:19.287|79   |
|SSMP |2023-03-12 20:30:23.287|83   |
|SSMN |2023-03-12 20:30:25.287|85   |
|SSMP |2023-03-12 20:30:29.287|89   |
|SSMP |2023-03-12 20:30:33.287|93   |
|SSMP |2023-03-12 20:30:39.287|99   |
|SSMN |202

right = [ident: string]
result = [ident: string, timestamp: timestamp ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@494c3bf

-------------------------------------------
Batch: 4
-------------------------------------------
+-----+---------+-----+
|ident|timestamp|value|
+-----+---------+-----+
+-----+---------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+-----+-----------------------+-----+
|ident|timestamp              |value|
+-----+-----------------------+-----+
|SSMN |2023-03-12 20:30:51.287|111  |
|SSMN |2023-03-12 20:30:57.287|117  |
+-----+-----------------------+-----+



In [97]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


lastException: Throwable = null


#### Left anti join

In [98]:
tmp_clear("state9_sdr")

23/03/12 20:31:12 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state9_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state9_sdr


In [99]:
val right = Vector( "SSMN", "00A", "00FD").toDF.withColumnRenamed("value", "ident")
val result = ident_rate.join(right, Seq("ident"), "left_anti")

createConsoleSinkMode("state9_sdr", "append", result).start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+---------+-----+
|ident|timestamp|value|
+-----+---------+-----+
+-----+---------+-----+



right = [ident: string]
result = [ident: string, timestamp: timestamp ... 1 more field]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@717f6d3f

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----------------------+-----+
|ident|timestamp              |value|
+-----+-----------------------+-----+
|SSML |2023-03-12 20:31:14.236|0    |
|SSMP |2023-03-12 20:31:16.236|2    |
|SSML |2023-03-12 20:31:18.236|4    |
|SSMP |2023-03-12 20:31:15.236|1    |
+-----+-----------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----------------------+-----+
|ident|timestamp              |value|
+-----+-----------------------+-----+
|SSMK |2023-03-12 20:31:19.236|5    |
|SSML |2023-03-12 20:31:21.236|7    |
|SSMP |2023-03-12 20:31:23.236|9    |
|SSMP |2023-03-12 20:31:25.236|11   |
|SSML |2023-03-12 20:31:27.236|13   |
|SSMK |2023-03-12 20:31:20.236|6    |
|SSMP |2023-03-12 20:31:22.236|8    |
|SSMP |2023-03-12 20:31:24.236|10   |
|SSMM |2023-03-12 20:31:26.236|12   |
|SSMM |2023-03-12 20:31:28.236|14   |
+-----+

In [100]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


In [101]:
ident_rate.join(right, Seq("ident"), "left_anti").explain

== Physical Plan ==
*(1) Project [ident#2058, timestamp#2050, value#2051L]
+- *(1) BroadcastHashJoin [ident#2058], [ident#2292], LeftAnti, BuildRight
   :- *(1) Project [timestamp#2050, value#2051L, shuffle([SSMK,SSML,SSMM,SSMN,SSMP], Some(-5435056235164864367))[0] AS ident#2058]
   :  +- StreamingRelation rate, [timestamp#2050, value#2051L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [ident#2292]


### Stream-Stream join

Для соединения двух стримов нам необходимо добавить к условию соединения равенство двух окон или сравнение двух временных меток

In [102]:
val left = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("left", lit("left"))
    .withWatermark("timestamp", "1 minutes")
    .withColumn("window", window('timestamp, "20 second")).as("left")

val right = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("right", lit("right"))
    .withWatermark("timestamp", "2 minutes")
    .withColumn("window", window('timestamp, "10 second")).as("right")

val joinExpr = expr("""left.value = right.value""")

val joined = left
        .join( right, joinExpr, "inner" )
        .select( $"left.window"
                , $"right.window"
                , $"left.value"
                , $"left"
                , $"right")

joined.explain(true)

== Parsed Logical Plan ==
'Project [unresolvedalias('left.window, None), unresolvedalias('right.window, None), unresolvedalias('left.value, None), unresolvedalias('left, None), unresolvedalias('right, None)]
+- Join Inner, (value#2363L = value#2388L)
   :- SubqueryAlias `left`
   :  +- Project [timestamp#2362-T60000ms, value#2363L, ident#2370, left#2374, window#2381-T60000ms AS window#2380]
   :     +- Filter isnotnull(timestamp#2362-T60000ms)
   :        +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#2362-T60000ms, TimestampType, LongType) - 0) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#2362-T60000ms, TimestampType, LongType) - 0) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#2362-T60000ms, TimestampType, LongType) - 0) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precise

left = [timestamp: timestamp, value: bigint ... 3 more fields]
right = [timestamp: timestamp, value: bigint ... 3 more fields]
joinExpr = (left.value = right.value)
joined = [window: struct<start: timestamp, end: timestamp>, window: struct<start: timestamp, end: timestamp> ... 3 more fields]


[window: struct<start: timestamp, end: timestamp>, window: struct<start: timestamp, end: timestamp> ... 3 more fields]

In [103]:
tmp_clear("state10_sdr")

23/03/12 20:31:39 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state10_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state10_sdr1678642299007


In [104]:
createConsoleSinkMode("state10_sdr", "append", joined).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1dd7d225

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+-----+----+-----+
|window|window|value|left|right|
+------+------+-----+----+-----+
+------+------+-----+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+------------------------------------------+-----+----+-----+
|window                                    |window                                    |value|left|right|
+------------------------------------------+------------------------------------------+-----+----+-----+
|[2023-03-12 20:31:40, 2023-03-12 20:32:00]|[2023-03-12 20:31:40, 2023-03-12 20:31:50]|0    |left|right|
|[2023-03-12 20:31:40, 2023-03-12 20:32:00]|[2023-03-12 20:31:40, 2023-03-12 20:31:50]|7    |left|right|
|[2023-03-12 20:31:40, 2023-03-12 20:32:00]|[2023-03-12 20:31:40, 2023-03-12 20:31:50]|6    |left|right|
|[2023-03-12 20:31:40, 2023-03-12 20:32:00]

#### Использование timestamp

In [105]:
val left = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("left", lit("left"))
    .withWatermark("timestamp", "1 minutes").as("left")

val right = spark
    .readStream
    .format("rate")
    .load
    .withColumn("ident", randomIdent())
    .withColumn("right", lit("right"))
    .withWatermark("timestamp", "2 minutes").as("right")

val joinExpr = expr("""left.value = right.value and left.timestamp <= right.timestamp + INTERVAL 1 minutes """)

val joined = left.join(right, joinExpr, "inner").select($"left.value", $"left.timestamp", $"right.timestamp", $"left", $"right")

joined.explain(true)

== Parsed Logical Plan ==
'Project [unresolvedalias('left.value, None), unresolvedalias('left.timestamp, None), unresolvedalias('right.timestamp, None), unresolvedalias('left, None), unresolvedalias('right, None)]
+- Join Inner, ((value#2491L = value#2508L) && (timestamp#2490-T60000ms <= cast(timestamp#2507-T120000ms + interval 1 minutes as timestamp)))
   :- SubqueryAlias `left`
   :  +- EventTimeWatermark timestamp#2490: timestamp, interval 1 minutes
   :     +- Project [timestamp#2490, value#2491L, ident#2498, left AS left#2502]
   :        +- Project [timestamp#2490, value#2491L, shuffle(array(00A, 00AA, 00AK, 00AL, 00AR), Some(-9065781549738897253))[0] AS ident#2498]
   :           +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@4fef5c6a, rate, [timestamp#2490, value#2491L]
   +- SubqueryAlias `right`
      +- EventTimeWatermark timestamp#2507: timestamp, interval 2 minutes
         +- Project [timestamp#2507, value#2508L, ident#2515, rig

left = [timestamp: timestamp, value: bigint ... 2 more fields]
right = [timestamp: timestamp, value: bigint ... 2 more fields]
joinExpr = ((left.value = right.value) AND (left.timestamp <= (right.timestamp + interval 1 minutes)))
joined = [value: bigint, timestamp: timestamp ... 3 more fields]


[value: bigint, timestamp: timestamp ... 3 more fields]

In [106]:
tmp_clear("state11_sdr")

23/03/12 20:32:05 INFO fs.TrashPolicyDefault: Moved: 'hdfs://spark-master-1.newprolab.com:8020/tmp/state11_sdr' to trash at: hdfs://spark-master-1.newprolab.com:8020/user/dinar.sadykov/.Trash/Current/tmp/state11_sdr


In [107]:
createConsoleSinkMode("state11_sdr", "append", joined).start

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@388706d5

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+---------+---------+----+-----+
|value|timestamp|timestamp|left|right|
+-----+---------+---------+----+-----+
+-----+---------+---------+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----------------------+-----------------------+----+-----+
|value|timestamp              |timestamp              |left|right|
+-----+-----------------------+-----------------------+----+-----+
|0    |2023-03-12 20:32:05.664|2023-03-12 20:32:05.685|left|right|
|7    |2023-03-12 20:32:12.664|2023-03-12 20:32:12.685|left|right|
|6    |2023-03-12 20:32:11.664|2023-03-12 20:32:11.685|left|right|
|9    |2023-03-12 20:32:14.664|2023-03-12 20:32:14.685|left|right|
|5    |2023-03-12 20:32:10.664|2023-03-12 20:32:10.685|left|right|
|1    |2023-03-12 20:32:06.664|2023-03-12 20:32:06.685|left|right|
|10   |2023-03-12 20:32:15.664|2023-03-12 20:3

In [108]:
killAll

Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default
Stopped RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default


### Выводы:
- Spark позволяет соединять SDF со статическим DF, используя разные виды соединений: left outer, inner, left anti
- Допускается использование соединений двух стримов, для этого требуется использовать вотермарки и (опционально) плавающие окна

In [109]:
spark.stop()