# Воркшоп по Kafka и Spark Structured Streaming

Есть бэкенд система, которая обрабатывает покупки. Эта система в режиме реального времени отправляет данные в топик в Kafka в формате JSON. Например:

```json
{
   "InvoiceNo":"536365",
   "StockCode":"85123A",
   "Description":"WHITE HANGING HEART T-LIGHT HOLDER",
   "Quantity":"6",
   "InvoiceDate":"12/1/2010 8:26",
   "UnitPrice":"2.55",
   "CustomerID":"17850",
   "Country":"United Kingdom"
}
```

Есть запрос создать страницу в личном кабинете каждого клиента, где бы отображалась вся история его покупок. Допустим, что эти данные будут поступать на фронтенд через API.

Задача - написать пайплайн, который в потоковом режиме будет преобразовывать сообщения о покупках таким образом, чтобы API смог забрать данные по каждому клиенту с актуальным списком покупок. 

Базовый стек: 
- Spark Structured Streaming
- MongoDB

Можно добавить любую другую технологию или базу данных, если это необходимо. Цель - сделать так, чтобы данные о покупке поступали как можно скорее на API.

## Задание 0

Все логи по умолчанию пишутся в консоль. Чтобы увидеть их в ноутбуке, необходимо выполнить следующие действия:
 - В консоли докера с `pyspark` выполнить команду `ipython profile create`;
 - В файле `.ipython/profile_default/ipython_kernel_config.py` раскомментировать строку `c.IPKernelApp.capture_fd_output = True`;
 - Перезапустить `kernel` в ноутбуке.

## Задание 1

Спроектировать пайплан. Можно нарисовать схему с базами данных, топиками Kafka и процессами Spark. Также можно опустить часть того, каким образом данные отправляются через API на фронтенд - это сейчас не так важно. 

## Задание 2

Подключиться к топику с помощью `Spark DataFrameStreamReader`

In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.streaming.listener import StreamingListener

In [2]:
spark = SparkSession.builder.appName('yp-kafka-workshop') \
  .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/spark-3.3.0-bin-hadoop3/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a156b88b-7e1f-4885-a16a-54245b58cc96;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.2 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found org.mongodb.spark#mongo-spark-connector;10.0.2 in central
	found org.mongodb#mongodb-driver-sync;4.5.1 in central
	[4.5.1] 

22/10/14 13:04:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/14 13:04:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Настройка `ReadStream`:

In [3]:
kafka_user = 'de-student'
kafka_pass = 'ltcneltyn'
topic_name = 'yp.workshop.kafka.retail_data'

df_retail = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
    .option('kafka.security.protocol', 'SASL_SSL') \
    .option('kafka.sasl.jaas.config', f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{kafka_user}" password="{kafka_pass}";') \
    .option('kafka.partition.assignment.strategy', 'org.apache.kafka.clients.consumer.RoundRobinAssignor') \
    .option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
    .option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-17-openjdk-amd64/lib/security/cacerts') \
    .option('kafka.ssl.truststore.password', 'changeit') \
    .option('maxOffsetsPerTrigger', "100") \
    .option('subscribe', topic_name) \
    .option("startingOffsets", "earliest") \
    .load()

Проверяем загрузку данных:

In [5]:
sampleQuery = df_retail.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()
sampleQuery.awaitTermination(7)
sampleQuery.stop()

22/10/14 13:04:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9dbd6519-8ebc-49ed-85dc-818ff9a0a632. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/10/14 13:04:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
+--------------------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...|
|{"InvoiceNo":"536...

## Задание 3

Написать непосредственно преобразование данных. Это преобразование будет выполняться в функции `foreachBatch`:
  - Парсинг JSON. Для этого необходима схема сообщения во формате `StructType`;
  - Фильтрация, group by, сортировка;
  - Запись в базу данных, файл;
  - ...
  
Также необходимо выбрать один из триггеров: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.trigger.html

In [None]:
# Схема данных
retail_schema = StructType([ \
    StructField('InvoiceNo',StringType(),True), \
    StructField('StockCode',StringType(),True), \
    StructField('Description',StringType(),True), \
    StructField('Quantity', IntegerType(), True), \
    StructField('InvoiceDate', StringType(), True), \
    StructField('UnitPrice', StringType(), True), \
    StructField('CustomerID', StringType(), True), \
    StructField('Country', StringType(), True) \
])

In [None]:
mongo_config = {
    "connection.uri": "mongodb://mongodb:27017/",
    "database": "my_database",
    "collection": "retail_data"
}

In [None]:
# Функция, которая будет выполняться в forEachBatch
def process_retail_data(batch_df, batch_id):
    print(batch_df.count())
    """ 
      Написать логику здесь:
          1. Десереализация столбца value
          2. Парсинг строк JSON в схему Spark
          3. Группируем строки по CustomerID, Country
          4. Аггрегация, где собираем все покупки одного клиента в один список
    """
    res = batch_df \
      .select(F.col('value').cast('string')) \
      .select(F.from_json(F.col('value'), retail_schema).alias('ParsedValue')) \
      .select(F.col('ParsedValue.*')) \
      .groupBy('CustomerID', 'Country') \
      .agg(F.collect_list(F.struct('Description', 'Quantity', 'InvoiceDate')).alias('PurchaseDescription')) 
    
    # Запись в Mongo с помощью MongoSpark
    res.write \
      .format("mongodb") \
      .mode("append") \
      .options(**mongo_config) \
      .save()


In [None]:
""" 
  Непосредственно обработка потока данных:
    1. Определяем папку checkpoints, куда Spark будет записывать свой прогреcc
    2. Добавляем функцию в foreachBatch
"""
retail_query = df_retail \
  .writeStream \
  .option("checkpointLocation", "file:///home/jovyan/checkpoints/") \
  .foreachBatch(process_retail_data) \
  .start()

In [None]:
# Остановить обработку:
retail_query.stop()