# Spark Structured Streaming

In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "Natasha Pritykovskaya Streaming app") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
spark

In [4]:
from pyspark.sql.functions import *

KAFKA_BOOTSTRAP = "bd-master.newprolab.com:6667"

### Test dataset

Подготовим тестовый датасет:

In [5]:
test_data = \
"""{ "name":"Moscow", "country":"Russia", "continent": "Europe", "population": 12380664}
{ "name":"Madrid", "country":"Spain" }
{ "name":"Paris", "country":"France", "continent": "Europe", "population" : 2196936}
{ "name":"Berlin", "country":"Germany", "continent": "Europe", "population": 3490105}
{ "name":"Barselona", "country":"Spain", "continent": "Europe" }
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }
{ "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }
{ "name":"New York, "country":"USA","""

splited = list(map(lambda x: (x,), test_data.split("\n")))

cities_df = spark.createDataFrame(splited, schema="""value: string""")

cities_df.printSchema()

cities_df.show(10, 100)

root
 |-- value: string (nullable = true)

+-------------------------------------------------------------------------------------+
|                                                                                value|
+-------------------------------------------------------------------------------------+
|{ "name":"Moscow", "country":"Russia", "continent": "Europe", "population": 12380664}|
|                                               { "name":"Madrid", "country":"Spain" }|
| { "name":"Paris", "country":"France", "continent": "Europe", "population" : 2196936}|
|{ "name":"Berlin", "country":"Germany", "continent": "Europe", "population": 3490105}|
|                     { "name":"Barselona", "country":"Spain", "continent": "Europe" }|
| { "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }|
| { "name":"Cairo", "country":"Egypt", "continent": "Africa", "population": 11922948 }|
|                                                 { "name":"New York, "countr

## Working with kafka using static DF API

Работать с кафкой можно как с использованием обычных статических датафреймов, так и стримовых. Начнем со статических.

### Produce static DF to Kafka
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka

Запись данных в кафку с точки зрения API мало чем отличается от записи в файл. Достаточно указать нужный `format` и задать параметры подключения к кафке через `option` или `options`.

Одним из требований является структура датафрейма - в нем должны быть колонки `value` (обязательно) и `key`, `topic` (опционально). В нашем датафрейме одна единственная колонка `value`, которая содержит JSON строку

In [6]:
cities_df \
    .withColumn("topic", lit("test_topic_100")) \
    .write.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
    .save()

print("Data has been written to Kafka")

Data has been written to Kafka


### Consume static DF from Kafka
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

Чтение выполняется аналогично записи: необходимо указать `format` и параметры подключения к топику. Датафрейм, созданный на основе топика (топиков) кафки всегда имеет одну структуру и набор колонок:

In [7]:
static_from_kafka = \
    spark \
        .read.format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
        .option("subscribe", "test_topic_100") \
        .option("startingOffsets", "earliest") \
        .load()

static_from_kafka.printSchema()

static_from_kafka.show(5, 25)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+-------------------------+--------------+---------+------+-----------------------+-------------+
| key|                    value|         topic|partition|offset|              timestamp|timestampType|
+----+-------------------------+--------------+---------+------+-----------------------+-------------+
|null|[7B 20 22 6E 61 6D 65 ...|test_topic_100|        0|     0|2020-12-01 12:47:19.588|            0|
|null|[7B 20 22 6E 61 6D 65 ...|test_topic_100|        0|     1|2020-12-01 12:47:19.685|            0|
|null|[7B 20 22 6E 61 6D 65 ...|test_topic_100|        0|     2|2020-12-01 12:47:19.701|            0|
|null|[7B 20 22 6E 61 6D 65 ...|test_topic_100|        0|     3|2020-12-01 12:47:19.601|           

При этом необходимо помнить, что мы сейчас работаем с кафкой, используя статический датафрейм и оффсеты здесь никак не используются. При каждой запуске `static_from_kafka.show(5, 25)` мы будем читать одни и те же данные

### Run queries on static DF from Kafka

В части выполнения SQL запросов здесь нет никаких отличий от других источников:

In [8]:
# Deserialize binary value to string
deserialized = \
    static_from_kafka \
        .select(col("value").cast("string").alias("value")) \

deserialized.show(5, 50)

+--------------------------------------------------+
|                                             value|
+--------------------------------------------------+
|{ "name":"Moscow", "country":"Russia", "contine...|
|{ "name":"Barselona", "country":"Spain", "conti...|
|{ "name":"Cairo", "country":"Egypt", "continent...|
|            { "name":"Madrid", "country":"Spain" }|
|{ "name":"Cairo", "country":"Egypt", "continent...|
+--------------------------------------------------+
only showing top 5 rows



Одна из частых задач при работе с кафкой - это десериализация `value` (и иногда `key`) сообщения. Кафка не знает ничего о типах данных в ваших сообщениях, которые хранятся в ней в бинарном виде. При чтении из кафки мы десериализуем данные в нужный формат, а при записи - сериализуем. В нашем случае мы используем JSON.

Для работа с JSON строкой в спарке есть 3 функции: `json_tuple`, `from_json`, `get_json_object`:

In [9]:
# Parse JSON strings using different methods

# json_tuple
parsed = \
    deserialized \
        .select(json_tuple(col("value"), "continent", "country", "name", "population")
                .alias("continent", "country", "name", "population"))

# from_json
parsed = \
    deserialized \
        .select(col("value").cast("string").alias("value")) \
        .select(from_json(col("value"), """continent string, country string, name string, population long""")
                .alias("value")) \
        .select(col("value.*"))

parsed = \
    deserialized \
        .select(
            get_json_object(col("value"), "$.continent").alias("continent"),
            get_json_object(col("value"), "$.country").alias("country"),
            get_json_object(col("value"), "$.name").alias("name"),
            get_json_object(col("value"), "$.population").alias("population"),
        )

parsed.show(5, False)

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Russia |Moscow   |12380664  |
|Europe   |Spain  |Barselona|null      |
|Africa   |Egypt  |Cairo    |11922948  |
|null     |Spain  |Madrid   |null      |
|Africa   |Egypt  |Cairo    |11922948  |
+---------+-------+---------+----------+
only showing top 5 rows



# Cleanse data

После парсинга JSON мы можем очистить наши данные:

In [10]:
clean = \
    parsed \
        .na.drop("all") \
        .dropDuplicates() \
        .na.fill({"continent": "n/a", "population": 0})

clean.show(10, False)

+---------+-------+---------+----------+
|continent|country|name     |population|
+---------+-------+---------+----------+
|Europe   |Germany|Berlin   |3490105   |
|Africa   |Egypt  |Cairo    |11922948  |
|n/a      |Spain  |Madrid   |0         |
|Europe   |Spain  |Barselona|0         |
|Europe   |Russia |Moscow   |12380664  |
|Europe   |France |Paris    |2196936   |
+---------+-------+---------+----------+



Посчитаем базовый агрегат:

In [11]:
# Run aggregation query
agg = \
    clean \
        .groupBy("country") \
        .agg(count("*").alias("count"), sum("population").cast("long").alias("total_population"))

agg.show(10, False)

+-------+-----+----------------+
|country|count|total_population|
+-------+-----+----------------+
|Russia |1    |12380664        |
|Germany|1    |3490105         |
|France |1    |2196936         |
|Spain  |2    |0               |
|Egypt  |1    |11922948        |
+-------+-----+----------------+



## Working with kafka using Streaming DF API

До этого мы работали с кафкой, используя статические датафреймы. Данный подход удобен, когда вам нужно изучить данные, которые хранятся в кафке или выполнить одноразовую задачу, к которой вы не вернетесь в будущем. Однако для построения стриминг пайплайна используются стриминг датафреймы

In [12]:
streaming_from_kafka = \
    spark \
        .readStream.format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
        .option("subscribe", "test_topic_100") \
        .option("startingOffsets", "earliest") \
        .load()

print(type(streaming_from_kafka))
print(streaming_from_kafka.isStreaming)

streaming_from_kafka.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
True
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Создание стриминг датафрейма и применение SQL к нему является ленивой операцией. Фактического чтения данных из кафки не происходит.

In [13]:
clean = \
    streaming_from_kafka \
        .select(col("value").cast("string").alias("value")) \
        .select(json_tuple(col("value"), "continent", "country", "name", "population")
                    .alias("continent", "country", "name", "population")) \
        .na.drop("all") \
            .dropDuplicates() \
            .na.fill({"continent": "n/a", "population": 0})

Для того, чтобы запустить наш стрим, необходимо создать `StreamingQuery` с помощью `writeStream` и `start()`.
В нашем случае мы используем `triger(once=True)`. Это означает, что стрим прочитает один батч (некоторое количество данных из кафки) и завершит свою работу.

In [14]:
sq = clean \
        .writeStream \
        .format("console") \
        .option("truncate", "false") \
        .trigger(once=True) \
        .start()

# .trigger(once=True) means that spark will trigger only one batch and then stop the stream
# this is mode is useful when writing tests

sq

<pyspark.sql.streaming.StreamingQuery at 0x7f2212eb2a58>

In [15]:
sq.isActive

True

Этот вариант удобен для написать тестов и исследования данных. Для того, чтобы создать стрим, который не будет завершаться после обработки первого батча, необходимо использовать другой `trigger`. Создадим новый стрим на базе генератора значений `rate`. Наш стрим будет генерировать данные на основе счетчика и записывать их в топик кафки. Батчи будут запускаться каждые 20 сек. Поскольку мы создаем стрим `rate` > `kafka`, то никакого видимого результата в консоли мы не увидим

In [16]:
from pyspark.sql.functions import *
test_df = spark.range(0,10).withColumn("foo", lit("foo")).withColumn("bar", lit("bar"))
test_df.printSchema()


print(test_df.columns)

root
 |-- id: long (nullable = false)
 |-- foo: string (nullable = false)
 |-- bar: string (nullable = false)

['id', 'foo', 'bar']


In [17]:
from_rate = spark.readStream.format("rate").load()

from_rate.printSchema()

to_kafka = \
    from_rate \
        .select(to_json(struct(*from_rate.columns)).alias("value")) \
        .withColumn("topic", lit("test_topic_200")) \
        .writeStream.format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
        .option("checkpointLocation", "chk_1") \
        .trigger(processingTime="20 seconds") \
        .start()

# .trigger(processingTime="20 seconds") means that spark will trigger batch every 
# 20 seconds and process new data. This mode ensure your stream will never end unless error
# will occur
# 
# .option("checkpointLocation", "chk_1") specifies location to persist stream state which 
# allows stream to be restarted from the point it stopped after intentional shut down or failure
# If checkpoint does not exist, stream will start with initial options like "earliest" or "latest" offset

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)



In [18]:
from pprint import pprint

for i in spark.streams.active:
    pprint(i.status)

{'isDataAvailable': False,
 'isTriggerActive': True,
 'message': 'Getting offsets from KafkaV2[Subscribe[test_topic_100]]'}
{'isDataAvailable': False,
 'isTriggerActive': False,
 'message': 'Initializing sources'}


In [22]:
# for i in spark.streams.active:
#     if "KafkaV2" in i.lastProgress["sources"][0]["description"]:
#         i.stop()
        

In [26]:
# df = spark.read.parquet("output/test_parquet/*")
# df.printSchema()
# df.show()
# df.count()

Создадим еще один стрим `kafka` > `console`. Данные стримы работают независимо друг от друга в рамках одного спарк приложения

In [19]:
from_kafka = \
    spark \
        .readStream.format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP) \
        .option("subscribe", "test_topic_200") \
        .option("startingOffsets", "earliest") \
        .load()

to_console = \
    from_kafka \
        .select(col("value").cast("string").alias("value")) \
        .select(json_tuple(col("value"), "value", "timestamp")
                .alias("value", "timestamp")) \
        .writeStream.format("console") \
        .option("checkpointLocation", "chk_2") \
        .option("truncate", "false") \
        .start()

In [20]:
to_parquet = \
    from_kafka \
    .select(col("value").cast("string").alias("value")) \
    .select(json_tuple(col("value"), "value", "timestamp")
        .alias("value", "timestamp")) \
    .writeStream.format("parquet") \
    .option("path", "data_1") \
    .option("checkpointLocation", "chk_3") \
    .start()

In [24]:
!hdfs dfs -ls data_1

Found 44 items
drwxr-xr-x   - natalya.pritykovskaya natalya.pritykovskaya          0 2020-12-01 18:22 data_1/_spark_metadata
-rw-r--r--   2 natalya.pritykovskaya natalya.pritykovskaya        887 2020-12-01 18:19 data_1/part-00000-01acba55-86e8-417c-9ccf-e8c28c674666-c000.snappy.parquet
-rw-r--r--   2 natalya.pritykovskaya natalya.pritykovskaya        889 2020-12-01 18:20 data_1/part-00000-043c6e69-5ac1-40e5-9f57-3dc5353265d9-c000.snappy.parquet
-rw-r--r--   2 natalya.pritykovskaya natalya.pritykovskaya        827 2020-12-01 18:22 data_1/part-00000-07c7b928-2508-4215-bd96-caef42220be6-c000.snappy.parquet
-rw-r--r--   2 natalya.pritykovskaya natalya.pritykovskaya        818 2020-12-01 18:17 data_1/part-00000-0ff02336-feae-454b-a563-bdf8c7974ea2-c000.snappy.parquet
-rw-r--r--   2 natalya.pritykovskaya natalya.pritykovskaya        950 2020-12-01 18:20 data_1/part-00000-1a97d611-25c1-4cd9-8545-4062c5109f76-c000.snappy.parquet
-rw-r--r--   2 natalya.pritykovskaya natalya.pritykovskaya     17

Таким образом мы можем создать произвольное количество стримов. Одним из недостатков данного API является то, что если мы сделали один стриминг датафрейм из кафки и создали несколько стримов с записью в разные синки (от англ. sink), мы фактически создадим несколько назависимых стримов и будем читать кафку несколько раз. В большинстве случаев это непозволительная роскошь, т.к. такой подход созает лишнюю нагрузку на кафку и сеть. Для решения этой проблемы можно использовать синк `foreachBatch`

In [25]:
def batch_func(batch_df, batch_id):
    batch_df.write.mode("append").parquet("test_parquet/{id}".format(id=batch_id))
    batch_df.write.mode("append").orc("test_orc/{id}".format(id=batch_id))


to_parquet_2 = \
    from_kafka \
        .select(col("value").cast("string").alias("value")) \
        .select(json_tuple(col("value"), "value", "timestamp")
                .alias("value", "timestamp")) \
        .writeStream.foreachBatch(batch_func) \
        .option("checkpointLocation", "chk_4") \
        .start()

Убедимся в корректности записанных данных:

In [27]:
orc_file = spark.read.orc("test_orc/*")
orc_file.printSchema()
orc_file.show(10, False)

root
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)

+-----+-----------------------------+
|value|timestamp                    |
+-----+-----------------------------+
|0    |2020-12-01T12:47:56.252+03:00|
|1    |2020-12-01T12:47:57.252+03:00|
|2    |2020-12-01T12:47:58.252+03:00|
|3    |2020-12-01T12:47:59.252+03:00|
|6    |2020-12-01T12:48:02.252+03:00|
|5    |2020-12-01T12:48:01.252+03:00|
|4    |2020-12-01T12:48:00.252+03:00|
|7    |2020-12-01T12:48:03.252+03:00|
|10   |2020-12-01T12:48:06.252+03:00|
|8    |2020-12-01T12:48:04.252+03:00|
+-----+-----------------------------+
only showing top 10 rows



In [29]:
parquet_file = spark.read.parquet("test_parquet/*")
parquet_file.printSchema()
parquet_file.show(10, False)

root
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)

+-----+-----------------------------+
|value|timestamp                    |
+-----+-----------------------------+
|0    |2020-12-01T12:47:56.252+03:00|
|1    |2020-12-01T12:47:57.252+03:00|
|2    |2020-12-01T12:47:58.252+03:00|
|3    |2020-12-01T12:47:59.252+03:00|
|6    |2020-12-01T12:48:02.252+03:00|
|5    |2020-12-01T12:48:01.252+03:00|
|4    |2020-12-01T12:48:00.252+03:00|
|7    |2020-12-01T12:48:03.252+03:00|
|10   |2020-12-01T12:48:06.252+03:00|
|8    |2020-12-01T12:48:04.252+03:00|
+-----+-----------------------------+
only showing top 10 rows



### Useful functions

In [30]:
# Stop all running streaming queries
for s in spark.streams.active:
    s.stop()
print("All streams has been stopped!")

All streams has been stopped!


In [33]:
from pyspark.sql.functions import *

from_rate_2 = spark.readStream.format("rate").load()
with_pmod = from_rate_2.withColumn("pm", expr("""pmod(value, 3)""")).groupBy(col("pm")).count()
plus_one = with_pmod.withColumn("pm", col("pm")).filter(col("pm") != 1)

sq_2 = plus_one \
    .writeStream.format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .outputMode("update") \
    .option("checkpointLocation", "chk_13") \
    .start()

In [34]:
import json

# Get status of all streaming queries
for s in spark.streams.active:
    print(json.dumps(s.status))

{"message": "Processing new data", "isDataAvailable": true, "isTriggerActive": true}


In [37]:
import pprint

# Get last progress of each streaming query
for s in spark.streams.active:
    print("#" * 50)
    pprint.pprint(s.lastProgress)
    print("#" * 50)

##################################################
{'batchId': 10,
 'durationMs': {'addBatch': 5119,
                'getBatch': 1,
                'getEndOffset': 0,
                'queryPlanning': 35,
                'setOffsetRange': 0,
                'triggerExecution': 5226,
                'walCommit': 30},
 'id': 'ce38a9f2-92d8-4bd1-909e-022b8709f3ba',
 'inputRowsPerSecond': 1.1545122185876466,
 'name': None,
 'numInputRows': 6,
 'processedRowsPerSecond': 1.148105625717566,
 'runId': 'e7b992eb-1b4e-495d-889f-f904ffb5fc38',
 'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2b32e5a9'},
 'sources': [{'description': 'RateStreamV2[rowsPerSecond=1, '
                             'rampUpTimeSeconds=0, numPartitions=default',
              'endOffset': 88,
              'inputRowsPerSecond': 1.1545122185876466,
              'numInputRows': 6,
              'processedRowsPerSecond': 1.148105625717566,
              'startOffset': 82}],
 'stateOpera

In [38]:
spark.stop()