# Pyspark Structured Streaming

---

In cadrul acestui referat voi prezenta Structured Streaming, care este un mod de a scrie aplicatii de streaming scalabile si fault-tolerante folosind Apache Spark SQL, care permite scrierea de cod similar cu cel folosit pentru batch processing si care beneficiaza de aceleasi optimizari ca acesta. Spark Structured Streaming poate fi folosit in oricare dintre limbajele suportate de Spark, cum ar fi Scala, Java, Python sau R. Structured Streaming este construit pe baza API-ului DataFrame si Dataset, care ofera o abordare mai simpla si mai puternica pentru a lucra cu datele de streaming. Strucred streaming este un sistem de procesare a datelor de streaming care ofera garantii de livrare a datelor (end-to-end-exactly-once), toleranta la erori si scalabilitate.
In cadrul referatului voi prezenta cum putem citi datele de streaming din surse precum socket-uri, fisiere sau Kafka, cum putem scrie datele in sink-uri precum console, fisiere sau Kafka, cum putem face operatii de windowing si cum putem folosi shuffle partition pentru a imbunatati performanta aplicatiei noastre. De asemnea, voi prezenta putin o parte de optimizare pentru a intelege mai bine cum functioneaza Spark in spate. Limbajul folosit va fi Python, iar mediul de lucru va fi Jupyter Notebook.

---

**Table of Contents**

1. [Setting Up The Environment For Windows (Optional)](#crearea-unui-enviroment-de-lucru-local-pentru-windows)
1. [Introduction](#introduction)
1. [Reading Data](#reading-data)
   1. [Reading From Sockets](#reading-from-sockets)
   1. [Spark UI](#spark-ui)
   1. [Reading From Files](#reading-from-files)
1. [Writing Data And Windowing](#writing-data-and-windowing)
   1. [Complete Mode](#complete-mode)
   1. [Update Mode](#update-mode)
   1. [Window Operations And Append Mode](#window-operations-and-append-mode)
   1. [Session Window](#session-window)
   1. [Writing To Files](#writing-to-files)
   1. [Triggers](#triggers)
1. [Shuffle Partition](#shuffle-partition)
1. [Kafka](#kafka)
   1. [Overview](#overview)
   1. [Local Setup](#local-setup)
   1. [Hello World Kafka](#hello-world-kafka)
   1. [Kafka With Spark](#kafka-with-spark)
      1. [Reading From Kafka](#reading-from-kafka)
      1. [Writing To Kafka](#writing-to-kafka)
1. [Writing To Multiple Sinks](#writing-to-multiple-sinks)
   1. [For Each Batch](#foreachbatch)
   1. [Setup The Environment](#setup-the-environment)
1. [Bibliografie](#bibliografie)


## Setting Up The Environment For Windows


Inainte sa incep prezentarea continutului efectiv, as dori sa arat un mod de a crea un mediu de lucru local pentru Windows cu ajutorul unui container Docker. Acest lucru este necesar deoarece Spark nu este disponibil in mod nativ pe Windows, iar celelalte metode de instalare sunt destul de complicate si pot cauza probleme (mai ales in lucurul cu fisierele).

Inainte de toate, trebuie sa aveti instalat Docker Desktop pe calculator. Daca nu il aveti, il puteti descarca de [aici](https://www.docker.com/products/docker-desktop). Eu personal voi folosi editorul de text [Visual Studio Code](https://code.visualstudio.com/), dar puteti folosi orice editor doriti sau Jupyter Notebook.

Dupa ce s-a instalat Docker Desktop, creati un fisier numit `docker-compose.yml` in care sa scrieti urmatorul cod:

```yaml
services:
  bd-jupyter-notebook:
    image: jupyter/pyspark-notebook:latest
    user: root
    container_name: bd-jupyter-notebook-lab
    ports:
      - 8888:8888
      - 4040:4040
    environment:
      JUPYTER_PORT: 8888
      SPARK_UI_PORT: 4040
      GRANT_SUDO: yes
    volumes:
      - <cale catre un fisier dorit>:/home/jovyan:rw
      - streaming_data:/data:rw

  bd-zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: bd-zookeeper
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  bd-kafka:
    image: confluentinc/cp-kafka:latest
    container_name: bd-kafka
    depends_on:
      - bd-zookeeper
    ports:
      - 9092:9092
    volumes:
      - streaming_data:/data:rw
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: bd-zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://bd-kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  bd-postgres:
    image: postgres:latest
    container_name: bd-postgres
    environment:
      POSTGRES_DB: mydb
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
    ports:
      - 5432:5432
    volumes:
      - posgres_bd:/var/lib/postgresql/data
  bd-pgadmin4:
    image: dpage/pgadmin4
    container_name: bd-pgadmin4
    environment:
      PGADMIN_DEFAULT_EMAIL: admin@admin.com
      PGADMIN_DEFAULT_PASSWORD: admin
    ports:
      - 5050:80
    depends_on:
      - bd-postgres

volumes:
  streaming_data:
  posgres_bd:
```

In acest fisier, am definit patru servicii: un container cu Jupyter Notebook impreuna cu PySpark, un container cu Zookeeper, un container cu Kafka, care depinde de Zookeeper si doua containere pentru baza de date Postgres si PgAdmin4. Linia care contine `<cale catre un fisier dorit>:/home/jovyan:rw` este optionala, dar daca doriti sa aveti un loc unde sa salvati fisierele, in afara de cel unde Docker face acest lucru by defualt, puteti adauga calea catre un folder dorit.

Dupa ce ati creat fisierul, deschideti un terminal in folderul unde se afla acesta si rulati comanda `docker-compose up`. Pentru a conecta VS Code cu containerul, deschideti VS Code si instalati extensia pentru [Jupyter](https://marketplace.visualstudio.com/items?itemName=ms-toolsai.jupyter). Dupa ce ati instalat extensia, creati un fisier Jupyter Notebook si in partea drepta sus a editorului apasati `Select Kernel`. Dupa aceea, selectati `Existing Jupyter Environment` si introduceti `http://localhost:8888/tree` in campul care apare. Dupa ce ati facut acest pas va va aprea un prompt pentru parola. Pentru a afla parola mergeti in logurile containerului `bd-jupyter-notebook-lab`, unde ar trebuie sa gasiti:

![Setup Logs Image](./images/setup/logsSetup.png)

Copiati tokenul, ie valoarea lui `?token=`, si introduceti-l in prompt. Dupa ce ati facut acest pas, ar trebui ca totul sa fie gata.

Pentru mai multe detalii pentru conectarea unui Jupyter Notebook cu un container, puteti accesa [acest link](https://medium.com/@FredAsDev/connect-vs-code-jupyter-notebook-to-a-jupyter-container-a63293f29325).


## Introduction


Structured Streaming este un mod de a scrie aplicatii de streaming scalabile si fault-tolerante folosind Apache Spark SQL, care permite scrierea de cod similar cu cel folosit pentru batch processing si care beneficiaza de aceleasi optimizari ca acesta. Componenta de streaming poate fi folosita in oricare dintre limbajele suportate de Spark, cum ar fi Scala, Java, Python sau R. Structured Streaming este construit pe baza API-ului DataFrame si Dataset, care ofera o abordare mai simpla si mai puternica pentru a lucra cu datele de streaming. Strucred streaming este un sistem de procesare a datelor de streaming care ofera garantii de livrare a datelor (end-to-end-exactly-once), toleranta la erori si scalabilitate.

Intern Structured Streaming este construit pe baza conceptului de `micro-batch` processing, unde datele de streaming sunt impartite in micro-batch-uri ('bucati mici'), care sunt procesate folosind API-ul DataFrame si Dataset si care reusesc sa aiba o latenta de pana la 100 milisecunde si o garantie a rezistenti la erori si a livrarii exacte a datelor.
In Spark 2.3, s-a introdus suportul pentru `continuous processing`, care permite procesarea continua a datelor de streaming, fara a fi nevoie de micro-batch-uri, insa acesta nu este inca valabil in varianta finala, fiind inca in stadiu experimental si are si anumite dezavantaje:

- Lanseaza multiple taskuri lungi (long-running tasks) care citesc in mod continuu datele, le proceseaza si le scriu in sink. Numarul de taskuri este determinat de numarul de partitii ale datelor care pot fi citite in parallel. Asadar, ininte de a folosi aceasta metoda, trebuie sa va asigurati ca aveti suficiente resurse pentru a rula aceste taskuri in paralel. De exemplu, daca aveti un stream de date care are 100 de partitii, atunci veti avea 100 de taskuri care ruleaza in paralel.
- Nu sunt disponibile sisteme de garantare a livrarii exacte a datelor si a rezistentei la erori, asa ca trebuie sa va asigurati ca datele sunt scrise in mod atomic in sink-ul dorit.
- Nu suporta agregarile de date

In Spark Structured Streaming, streamul de intrare este tratat ca un tabel indefinit care poate fi interogat folosind SparkSQL. Fieacre micro-batch este tratat ca un now 'chunk' de linii in acest tabel, iar motorul SparkSQL genereaza interogari pentru acestea ca si pentru liniile statice. Tabelul rezultat este updatat in mod continuu.

<div style="background-color:white;">
    <img src="https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png" alt="Structured Streaming">
</div>

Important de retinut este ca Structured Streaming nu materializeaza intregul tabel, el citeste cele mai recente date disponibile din sursa, le proceseaza, apoi actualizeaza rezultatul si sterge datele sursa vechi. Tine minte doar un numar minimal de informatii necesare pentru a procesa incremental noile date.


## Reading Data


### Reading From Sockets


Un mod simplu de a crea un stream de date este de a citi datele de la un socket. Pentru a face acest lucru, putem folosi metoda `readStream` a obiectului `SparkSession` si metoda `format` pentru a specifica formatul in care sunt datele, in cazul nostru `socket`, si metoda `option` pentru a specifica hostul si portul de la care se citesc datele. In final, putem apela metoda `load` pentru a crea un DataFrame de streaming.

In continuare, inainte de a prezenta modurile de scriere vom afisa in consola folosind metoda `writeStream` a obiectului DataFrame cu outputMode-ul `append`. Acesta va fi vizibil in logurile containerului `bd-jupyter-notebook-lab`. Pentru a inchide stream-ul, vom opri celula de Jupyter Notebook.

```python
writeStream.format("console").outputMode("append").start().awaitTermination()
```


In [1]:
# crearea sesiunii Spark

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Reading from Sockets")
    .master("local[*]")
    .getOrCreate()
)

spark

Pentru a crea socketul, ne vom conecta la containerul `bd-jupyter-notebook-lab`, deschizand un terminal si ruland comanda

```bash
docker exec -it bd-jupyter-notebook-lab bash
```

Dupa ce ne-am conectat la container, rulam comanda pentru instalarea [Netcat](https://en.wikipedia.org/wiki/Netcat)

```bash
sudo apt-get update && sudo apt-get install -y netcat
```

Ca sa deschidem un socket pentru portul 9999, rulam comanda

```bash
nc -l 9999
```


In [2]:
df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
)

print(df)
print(df.isStreaming)
print(df.printSchema())

DataFrame[value: string]
True
root
 |-- value: string (nullable = true)

None


Dupa cum putem observa, `df` este un simplu DataFrame, dar care are `streaming` setat pe `True`. Acesta nu contine inca date, deoarece nu am pornit stream-ul.

**Important**: Nu se poate apela metoda `show` pe un DataFrame de streaming, deoarece aceasta metoda nu este suportata. In schimb, putem apela metoda `printSchema` pentru a afisa schema DataFrame-ului.


In [4]:
df.isStreaming

True

In [5]:
query = (
    df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Odata ce s-a pornit stream-ul, putem observa ca datele sunt citite si afisate in consola 'imediat' ce sunt introduse in socket.
De exemplu, daca in socket au fost introduse datele:

```bash
ana are mere
ma duc la magazin
```

In outup se va afisa:

```bash
 Batch: 0
 -------------------------------------------
 +-----+
 |value|
 +-----+
 +-----+

 -------------------------------------------
 Batch: 1
 -------------------------------------------
 +------------+
 |       value|
 +------------+
 |ana are mere|
 +------------+

 -------------------------------------------
 Batch: 2
 -------------------------------------------
 +-----------------+
 |            value|
 +-----------------+
 |ma duc la magazin|
 +-----------------+
```

Batch 0 este gol, deoarece nu am introdus date in socket inainte de a porni stream-ul.


In [6]:
# stop the query
query.stop()

Datele introduse in socket nu trebuie sa fie neaparat string-uri, ci pot fi si alte tipuri de date, de exemplu json. Aceste tipuri de date sunt citite ca string-uri si trebuie sa fie parsate inainte de a fi folosite.

Opriti socketul prin comanda `Ctrl + C` si redeschideti-l pentru a putea rula urmatoarul exemplu.


In [7]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

json_schema = T.StructType([
    T.StructField("name", T.StringType()),
    T.StructField("age", T.IntegerType()),
])

df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
)
df.printSchema()

root
 |-- value: string (nullable = true)



In [8]:
json_df = df.select(F.from_json("value", json_schema).alias("parsed_value"))
json_df.printSchema()

root
 |-- parsed_value: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = true)



In [9]:
final_df = json_df.select("parsed_value.*")
final_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [11]:
query = (
    final_df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca scriem in socket datele:

```bash
{"name": "Ana", "age": 20}
{"name": "Maria", "age": 30}
```

Vom obtine in output:

```bash
Batch: 0
-------------------------------------------
+----+---+
|name|age|
+----+---+
+----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+---+
|name|age|
+----+---+
| Ana| 20|
+----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+---+
| name|age|
+-----+---+
|Maria| 30|
+-----+---+
```


### Spark UI


Pentru fiecare micro-batch, Spark creeaza un nou job pentru procesarea datelor. Daca mergem in Spark UI, la sectiunes [Jobs](http://localhost:4040/jobs/), putem observa joburile create de Spark pentru procesare. Ar trebui sa vedem 6 joburi, 3 pentru primul exemplu si 3 pentru al doilea exemplu.

![Jobs Image](./images/sparkUI/jobsReading.png)

Numarul de taskuri folosite este 16, adica numarul de core-uri disponibile. Acest numar se poate vedea ruland

```python
spark.sparkContext.defaultParallelism
```

Odata ce cel putin o operatie de tip stream este rulata in cadrul Spark UI, va fi disponibila sectiune [Structured Streaming](http://localhost:4040/StreamingQuery/) unde putem vedea informatii despre streamuri. In cazul nostru ar trebuie sa se vada pagina

[![Streaming Query Image](./images/sparkUI/streamingInfo.png)](http://localhost:4040/StreamingQuery/)

Putem observa ca avem un stream activ si unul terminat, deoarece pentru al doilea stream nu am apelat inca `query.stop()` pentru a-l opri. De asemena, daca se apasa pe fiecare stream se vor putea vedea date statistice ale acestuia.


In [13]:
query.stop()

### Reading From Files


In continuare, voi prezenta cum putem citi datele de streaming din fisiere. Pentru a face acest lucru, vom folosi metoda `readStream` a obiectului `SparkSession`. ReadStream se comporta similar cu metoda `read` a obiectului `SparkSession`, dar in loc sa returneze un DataFrame static, returneaza un DataFrame de streaming. In cazul nostru, voi folosi un fisier de tip json multiline. Datele din acesta pot fi gasite in folderul `data/devices` din acest repository.

Pentru a citi din fisiere, Spark va urmari un director si va citi toate fisierele din acel director, o data ce sunt adaugate.

Schema fisierelor poate fi data in mod manual, sau se poate activa optiunea de inferare a acesteia, setand configurarea: `spark.conf.set("spark.sql.streaming.schemaInference", True)`. Pentru ca aceasta configurare sa functioneze trebuie sa existe cel putin un fisier in directorul de citire.

Fisierele pe care le vom citi sunt de tip json multiline, asa ca trebuie sa setam optiunea `multiline` pe `True`, iar un exemplu de intrare este:

```json
{
  "eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502",
  "eventOffset": 10001,
  "eventPublisher": "device",
  "customerId": "CI00103",
  "data": {
    "devices": [
      {
        "deviceId": "D001",
        "temperature": 15,
        "measure": "C",
        "status": "ERROR"
      },
      {
        "deviceId": "D002",
        "temperature": 16,
        "measure": "C",
        "status": "SUCCESS"
      }
    ]
  },
  "eventTime": "2023-01-05 11:13:53.643364"
}
```


In [2]:
# spark.conf.set("spark.sql.streaming.schemaInference", True)

In [14]:
import os
data_path = os.path.join(os.getcwd(), "referat")
data_path

'/home/jovyan/referat'

In [15]:
import pyspark.sql.types as T

device_schema = T.StructType([
    T.StructField("deviceId", T.StringType(), True),
    T.StructField("temperature", T.IntegerType(), True),
    T.StructField("measure", T.StringType(), True),
    T.StructField("status", T.StringType(), True)
])

json_schema = T.StructType([
    T.StructField("eventId", T.StringType(), True),
    T.StructField("eventOffset", T.IntegerType(), True),
    T.StructField("eventPublisher", T.StringType(), True),
    T.StructField("customerId", T.StringType(), True),
    T.StructField("data", T.StructType([
        T.StructField("devices", T.ArrayType(device_schema), True)
    ]), True),
    T.StructField("eventTime", T.TimestampType(), True)
])

In [16]:
def create_final_df(df):
    return df.withColumn(
        "data_devices", F.explode('data.devices'))\
        .withColumn('deviceId', F.col('data_devices.deviceId'))\
        .withColumn('measure', F.col('data_devices.measure'))\
        .withColumn('status', F.col('data_devices.status'))\
        .withColumn('temperature', F.col('data_devices.temperature'))\
        .drop('data_devices').drop('data')

In [17]:
import pyspark.sql.functions as F

read_df = spark.readStream\
    .json(data_path+'/input/devices/', multiLine=True, schema=json_schema)


final_df = create_final_df(read_df)


final_df.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: integer (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: integer (nullable = true)



In [18]:
query = (
    final_df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [19]:
query.stop()

Daca vom copia prima data fisierul `device_01.json` in directorul de citire, iar apoi fisierele `device_02.json` si `device_03.json` impreuna, vom observa ca by default Spark nu citeste fiser cu fiser, ci citeste toate fisierele din directorul de citire, odata ce sunt adaugate.

Astfel, in logurile `bd-jupyter-notebook-lab` vom avea 2 batchuri, unul pentru fisierul `device_01.json` si unul pentru fisierele `device_02.json` si `device_03.json`:

```bash
Batch: 0
-------------------------------------------
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|             eventId|eventOffset|eventPublisher|customerId|           eventTime|deviceId|measure| status|temperature|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|e3cb26d3-41b2-49a...|      10001|        device|   CI00103|2023-01-05 11:13:...|    D001|      C|  ERROR|         15|
|e3cb26d3-41b2-49a...|      10001|        device|   CI00103|2023-01-05 11:13:...|    D002|      C|SUCCESS|         16|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|             eventId|eventOffset|eventPublisher|customerId|           eventTime|deviceId|measure| status|temperature|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|1450324a-c546-417...|      10038|        device|   CI00101|2023-01-05 11:13:...|    D004|      C|SUCCESS|         20|
|1450324a-c546-417...|      10038|        device|   CI00101|2023-01-05 11:13:...|    D004|      C|SUCCESS|          1|
|1450324a-c546-417...|      10038|        device|   CI00101|2023-01-05 11:13:...|    D002|      C|SUCCESS|         21|
|aa90011f-3967-496...|      10003|        device|   CI00108|2023-01-05 11:13:...|    D004|      C|SUCCESS|         16|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
```

Daca dorim sa citim cate un numar specific indiferent de cate fisiere adaugam deodata putem folosi optiunea `maxFilesPerTrigger` pentru a specifica cate fisiere sa fie citite la fiecare trigger. De exemplu, daca dorim sa citim cate un fisier la fiecare trigger, putem seta optiunea `maxFilesPerTrigger` pe 1.

Inainte de a ilustra acest exemplu, trebuie sa stergem fisierele din directorul de citire.


In [22]:
read_df = spark.readStream.option("maxFilesPerTrigger", 1)\
    .json(data_path+'/input/devices/', multiLine=True, schema=json_schema)

final_df = create_final_df(read_df)

query = (
    final_df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [23]:
query.stop()

Daca repetam actiunile anterioare vom vedea ca de data aceasta vor fi afisate 3 batch-uri, unul pentru fiecare fisier:

```bash
Batch: 0
-------------------------------------------
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|             eventId|eventOffset|eventPublisher|customerId|           eventTime|deviceId|measure| status|temperature|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|e3cb26d3-41b2-49a...|      10001|        device|   CI00103|2023-01-05 11:13:...|    D001|      C|  ERROR|         15|
|e3cb26d3-41b2-49a...|      10001|        device|   CI00103|2023-01-05 11:13:...|    D002|      C|SUCCESS|         16|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|             eventId|eventOffset|eventPublisher|customerId|           eventTime|deviceId|measure| status|temperature|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|aa90011f-3967-496...|      10003|        device|   CI00108|2023-01-05 11:13:...|    D004|      C|SUCCESS|         16|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|             eventId|eventOffset|eventPublisher|customerId|           eventTime|deviceId|measure| status|temperature|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
|1450324a-c546-417...|      10038|        device|   CI00101|2023-01-05 11:13:...|    D004|      C|SUCCESS|         20|
|1450324a-c546-417...|      10038|        device|   CI00101|2023-01-05 11:13:...|    D004|      C|SUCCESS|          1|
|1450324a-c546-417...|      10038|        device|   CI00101|2023-01-05 11:13:...|    D002|      C|SUCCESS|         21|
+--------------------+-----------+--------------+----------+--------------------+--------+-------+-------+-----------+
```


De asemenea, la citire se poate specifica si optiunea `cleanSoruce`, care poate avea urmatoarele valori:

- "off" - este default si nu modifica directorul de citire
- "delete" - sterge fisierele citite
- "archive" - muta fisierele citite in folderul specificat de optiunea `sourceArchiveDir`. Mutarea se face doar in momentul in care sunt introduse fisere noi. De exemplu, daca introducem `device_01.json` si-l citim, acesta nu va fi mutat decat dupa ce am citit si `device_02.json`.

Inainte de a ilustra acest exemplu, trebuie sa stergem fisierele din directorul de citire.

In continuare vom vedea optiunea `archive`:


In [24]:
sourceArchiveDir = data_path+'/output/archive/devices'
sourceArchiveDir

'/home/jovyan/referat/output/archive/devices'

In [25]:
read_df = spark.readStream.option("maxFilesPerTrigger", 1)\
    .option("cleanSource", "archive")\
    .option("sourceArchiveDir", sourceArchiveDir)\
    .json(data_path+'/input/devices/', multiLine=True, schema=json_schema)

final_df = create_final_df(read_df)

query = (
    final_df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca vom aduaga fisierele unul cate unul, dupa adaugarea urmatorului, la refresh se poate vedea ca cel dinainte a disparut din folderul de cititre, iar acesta va aparea in cel de arhivare.


In [26]:
query.stop()

## Writing Data And Windowing


In continuare, voi prezenta cum putem scrie datele de streaming in sink. Pentru a face acest lucru, vom folosi metoda `writeStream` a obiectului DataFrame. WriteStream se comporta similar cu metoda `write`, dar in loc sa scrie datele in mod static, le scrie in mod continuu.

Metodele de scriere ale output-ului sunt:

- _Complete Mode_ : intregul rezultat al tabelului este scris in sink la fiecare trigger. Acest mod este folosit pentru agregari, deoarece rezultatul complet al agregarii este necesar pentru a calcula agregarile corect.
- _Append Mode_ : doar noile linii adaugate in tabel sunt scrise in sink la fiecare trigger. Acest mod este folosit pentru a scrie datele in sink fara a le modifica si suporta doar agregarile care au un watermark pentru a sti cand datele devin nemodificabile. Acest mod este compatibil cu sinkurile care suporta doar adaugarea de date fara a le modifica pe cele existente.
- _Update Mode_ : doar liniile modificate in tabel sunt scrise in sink la fiecare trigger. Acest mod este similar cu Append Mode, daca nu se fac agregari, dar daca se fac agregari, atunci se scriu doar liniile modificate. Acest mod, este compatibil doar cu sinkurile care suporta actualizarea si stergerea datelor in mod atomic.

Pentru a ilustra modurile de scriere, vom folosi sink-ul `console`, care va afisa datele in loggurile containerului `bd-jupyter-notebook-lab`. Datele de intrare vor fi citite din socket, si vor putea fi gasite in folderul `data/words` din acest repository.

Un element de intrare este:

```json
{ "timestamp": "2024-04-06 10:05:00", "word": "Streaming" }
```


In [27]:
word_schema = T.StructType([
    T.StructField("word", T.StringType(), True),
    T.StructField("timestamp", T.TimestampType(), True)
])

In [28]:
read_df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    .withColumn("parsed_value", F.from_json("value", word_schema))
    .select('parsed_value.*')
    .filter(F.col('word').isNotNull())
    .groupBy('word')
    .agg(F.count('word').alias('count'))
)

read_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



### Complete Mode


In [30]:
queryComplete = (
    read_df.writeStream
    .format("console")
    .outputMode("complete")
    .start()
)

queryComplete.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca in socket avem intrarile :

```bash
{"timestamp":"2024-04-06 10:05:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:40:00","word":"Hello"}
{"timestamp":"2024-04-06 10:20:00","word":"Spark"}
{"timestamp":"2024-04-06 10:48:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:14:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:16:00","word":"Hadoop"}
```

In logurile containerului vom vedea cele 7 batchuri, care vor contine toate datele indiferent de liniile modificate:

```bash
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|Streaming|    1|
+---------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|    Hello|    1|
|Streaming|    1|
+---------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|    Hello|    1|
|Streaming|    1|
|    Spark|    1|
+---------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|    Hello|    1|
|Streaming|    2|
|    Spark|    1|
+---------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|    Hello|    1|
|Streaming|    3|
|    Spark|    1|
+---------+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|    Hello|    1|
|Streaming|    3|
|    Spark|    1|
|   Hadoop|    1|
+---------+-----+
```


In [31]:
queryComplete.stop()

### Update Mode


In [32]:
queryUpdate = (
    read_df.writeStream
    .format("console")
    .outputMode("update")
    .start()
)

queryUpdate.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca vom introduce in socket aceleasi intrari vom vedea ca in loguri vom avea dor liniile care s-au modificat:

```bash
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|Streaming|    1|
+---------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|Hello|    1|
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|Spark|    1|
+-----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|Streaming|    2|
+---------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+---------+-----+
|     word|count|
+---------+-----+
|Streaming|    3|
+---------+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|Hadoop|    1|
+------+-----+
```


In [33]:
queryUpdate.stop()

### Window Operations And Append Mode


Inainte de a prezenta modul de scriere `update`, vom vorbi despre operatiile de tip `window`:

- _Tumbling Window_: este un tip de fereastra care se imparte in bucati de o anumita durata, de exemplu, daca avem o fereastra de 5 minute, aceasta va imparti datele dupa o coloana temporala in grupuri de 5 minute. Daca avem 3 inputuri la ora 10:00, 10:01, 10:06, se vor crea doua ferestre de tipul [10:00, 10:05) si [10:05, 10:10).Primele doua inputuri vor fi in prima fereastra si cel de-al treilea in cea de-a doua.
- _Sliding Window_: este un tip de fereastra care se imparte in bucati de o anumita durata, dar care se suprapun. De exemplu, daca avem o fereastra de 5 minute si un slide de 2 minute, aceasta se va imparti datele dupa o coloana temporal in grupuri de 5 minute care se suprapun. Daca avem 3 inputuri la ora 10:00, 10:01, 10:06, se vor crea trei ferestre de tipul [10:00, 10:05), [10:02, 10:07) si [10:04, 10:09). Primele doua inputuri vor fi in prima fereastra, cel de-al doilea in a doua si cel de-al treilea in a treia.
- _Watermark_: este un mecanism care ne ajuta sa gestionam datele care sunt in intarziere. Daca un eveniment ajunge in sistem cu o intarziere mai mare decat watermark-ul, acesta nu va fi luat in considerare. De exemplu, daca avem un watermark de 5 minute atunci un eveniment intarziat va fi procesat doar daca ajunge mai devreme de max(timestamp evenimente)-5 minute. De asemenea, Spark va folosi watermarkul pentru a sterge din memorie ferestrele care nu mai pot fi actualizate.
- _Session Window_: este un tip de fereastra care se imparte in bucati de o anumita durata, dar care se extind in functie de timpul de inactivitate al datelor. Daca nu se primesc date timp de o anumita durata, fereastra se inchide. Daca se primesc date dupa ce fereastra s-a inchis, se va deschide o noua fereastra. Exista restrictii suplimentare: `update mode` nu este suportat ca output, iar in groupBy trebuie sa existe cel putin inca o coloana in afara de `session_window`


In [34]:
# Thumbling window

read_df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    .withColumn("parsed_value", F.from_json("value", word_schema))
    .select('parsed_value.*')
    .filter(F.col('word').isNotNull())
    .groupBy('word', F.window('timestamp', '15 minute'))
    .agg(F.count('word').alias('count'))
)
read_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [35]:
final_df = read_df.select('window.*', 'word', 'count')

final_df.printSchema()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [36]:
thumbling_query = (
    final_df.writeStream
    .format("console")
    .outputMode("update")
    .start()
)

thumbling_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca in socket introducem inputurile in ordine:

```bash
{"timestamp":"2024-04-06 10:05:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:14:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:20:00","word":"Spark"}
{"timestamp":"2024-04-06 10:40:00","word":"Hello"}
{"timestamp":"2024-04-06 10:16:00","word":"Hadoop"}
{"timestamp":"2024-04-06 10:48:00","word":"Streaming"}
```

Vom obtine in loguri:

```bash
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:00:00|2024-04-06 10:15:00|Streaming|    1|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:00:00|2024-04-06 10:15:00|Streaming|    2|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------------------+-----+-----+
|              start|                end| word|count|
+-------------------+-------------------+-----+-----+
|2024-04-06 10:15:00|2024-04-06 10:30:00|Spark|    1|
+-------------------+-------------------+-----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------+-------------------+-----+-----+
|              start|                end| word|count|
+-------------------+-------------------+-----+-----+
|2024-04-06 10:30:00|2024-04-06 10:45:00|Hello|    1|
+-------------------+-------------------+-----+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-------------------+------+-----+
|              start|                end|  word|count|
+-------------------+-------------------+------+-----+
|2024-04-06 10:15:00|2024-04-06 10:30:00|Hadoop|    1|
+-------------------+-------------------+------+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:45:00|2024-04-06 11:00:00|Streaming|    1|
+-------------------+-------------------+---------+-----+
```


Observam ca desi penultima intrare are ora `10:16:00`, iar cea dinaintea ei are ora `10:40:00`, in cadrul Batch 5 Spark actualizeaza corect a doua fereastra, chiar daca aceasta este in 'urma'.


In [37]:
thumbling_query.stop()

In [38]:
# Sliding window
read_df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    .withColumn("parsed_value", F.from_json("value", word_schema))
    .select('parsed_value.*')
    .filter(F.col('word').isNotNull())
    # 15 minute window, sliding every 5 minutes, astfel fiecare event va fi in exact 3 windows (15/5=3)
    .groupBy('word', F.window('timestamp', '15 minute', '5 minute'))
    .agg(F.count('word').alias('count'))
)
read_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [39]:
final_df = read_df.select('window.*', 'word', 'count')

final_df.printSchema()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [41]:
sliding_query = (
    final_df.writeStream
    .format("console")
    .outputMode("update")
    .start()
)

sliding_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca introducem inputurile de mai devreme putem observa ca fiecare element este in 3 ferestre, deoarece acestea se suprapun, iar lungimea unei fersetre este de 15 minute si slidingul este de 5 minute, iar 15/5 = 3. Astfel outputul va fi:

```bash
Batch: 0
-------------------------------------------
+-----+---+----+-----+
|start|end|word|count|
+-----+---+----+-----+
+-----+---+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 09:55:00|2024-04-06 10:10:00|Streaming|    1|
|2024-04-06 10:05:00|2024-04-06 10:20:00|Streaming|    1|
|2024-04-06 10:00:00|2024-04-06 10:15:00|Streaming|    1|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:20:00|Streaming|    2|
|2024-04-06 10:10:00|2024-04-06 10:25:00|Streaming|    1|
|2024-04-06 10:00:00|2024-04-06 10:15:00|Streaming|    2|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------------------+-----+-----+
|              start|                end| word|count|
+-------------------+-------------------+-----+-----+
|2024-04-06 10:15:00|2024-04-06 10:30:00|Spark|    1|
|2024-04-06 10:20:00|2024-04-06 10:35:00|Spark|    1|
|2024-04-06 10:10:00|2024-04-06 10:25:00|Spark|    1|
+-------------------+-------------------+-----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------+-------------------+-----+-----+
|              start|                end| word|count|
+-------------------+-------------------+-----+-----+
|2024-04-06 10:30:00|2024-04-06 10:45:00|Hello|    1|
|2024-04-06 10:35:00|2024-04-06 10:50:00|Hello|    1|
|2024-04-06 10:40:00|2024-04-06 10:55:00|Hello|    1|
+-------------------+-------------------+-----+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-------------------+------+-----+
|              start|                end|  word|count|
+-------------------+-------------------+------+-----+
|2024-04-06 10:15:00|2024-04-06 10:30:00|Hadoop|    1|
|2024-04-06 10:10:00|2024-04-06 10:25:00|Hadoop|    1|
|2024-04-06 10:05:00|2024-04-06 10:20:00|Hadoop|    1|
+-------------------+-------------------+------+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:40:00|2024-04-06 10:55:00|Streaming|    1|
|2024-04-06 10:35:00|2024-04-06 10:50:00|Streaming|    1|
|2024-04-06 10:45:00|2024-04-06 11:00:00|Streaming|    1|
+-------------------+-------------------+---------+-----+
```


In [42]:
sliding_query.stop()

In [43]:
# Watermarking

read_df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    .withColumn("parsed_value", F.from_json("value", word_schema))
    .select('parsed_value.*')
    .filter(F.col('word').isNotNull())
    # 30 minute watermark, este important sa se apeleze withWatermark inainte de groupBy, iar coloana dupa care se face windowing si cea a watermarkului trebuie sa fie aceeasi
    .withWatermark("timestamp", "30 minutes")
    .groupBy('word', F.window('timestamp', '15 minute'))
    .agg(F.count('word').alias('count'))
)
read_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [44]:
final_df = read_df.select('window.*', 'word', 'count')

final_df.printSchema()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [46]:
waterMark_query = (
    final_df.writeStream
    .format("console")
    .outputMode("update")
    .start()
)

waterMark_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

_**Atentie, folosind watermaking, in consola pot aparea batchuri goale, lipsite de sens.**_

Watermark Boundry = Max Event Time - Watermark Duration. Daca o fereastra are capatul superior mai mic decat acesta boundry ea va fi stearsa automat de catre spark.

Punand in socket urmatoarele date de intrare:

```bash
{"timestamp":"2024-04-06 10:05:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:40:00","word":"Hello"}
{"timestamp":"2024-04-06 10:20:00","word":"Spark"}
{"timestamp":"2024-04-06 10:48:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:14:00","word":"Streaming"}
{"timestamp":"2024-04-06 10:16:00","word":"Hadoop}
```

Vom obtine in loguri:

Batch: 0

| start | end | word | count |
| ----- | --- | ---- | ----- |
|       |     |      |       |

Batch: 1

| start               | end                 | word      | count |
| ------------------- | ------------------- | --------- | ----- |
| 2024-04-06 10:00:00 | 2024-04-06 10:15:00 | Streaming | 1     |

WaterMark Boundry = 10:05 - 30 minutes = 9:35

Batch: 2

| start               | end                 | word  | count |
| ------------------- | ------------------- | ----- | ----- |
| 2024-04-06 10:30:00 | 2024-04-06 10:45:00 | Hello | 1     |

WaterMark Boundry = 10:40 - 30 minutes = 10:10

Batch: 3

| start               | end                 | word  | count |
| ------------------- | ------------------- | ----- | ----- |
| 2024-04-06 10:15:00 | 2024-04-06 10:30:00 | Spark | 1     |

WaterMark Boundry = 10:40 - 30 minutes = 10:10 . Ramane neschimbat, deoarece eventul introdus nu este cel mai tarziu.

Batch: 4

| start               | end                 | word      | count |
| ------------------- | ------------------- | --------- | ----- |
| 2024-04-06 10:45:00 | 2024-04-06 11:00:00 | Streaming | 1     |

WaterMark Boundry = 10:48 - 30 minutes = 10:18. In acest moment prima fereastra cea de la 10:00-10:15 va fi stearsa doarece capatul superior este mai mic decat boundry.

Evenimentul `{"timestamp":"2024-04-06 10:14:00","word":"Streaming"}` nu a fost luat in considerare deoarece a ajuns prea tarziu, iar fereastra a fost stearsa.

Batch: 5

| start               | end                 | word   | count |
| ------------------- | ------------------- | ------ | ----- |
| 2024-04-06 10:15:00 | 2024-04-06 10:30:00 | Hadoop | 1     |

WaterMark Boundry = 10:40 - 30 minutes = 10:18 . Ramane neschimbat, deoarece eventul introdus nu este cel mai tarziu.


In [47]:
waterMark_query.stop()

In [49]:
waterMarkAppend_query = (
    final_df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

waterMarkAppend_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca rulam aceleasi inputuri de mai sus,dar avem modul de afisare setat pe `append`, vom vedea ca doar prima fereastra, doar dupa ce a fost stearsa din cache va fi printata. Acesta se datoreaza faptului ca `append` trebuie sa fie sigur ca fereastra nu va mai fi modificata inainte sa o scrie.

Batch: 0

| start | end | word | count |
| ----- | --- | ---- | ----- |
|       |     |      |       |

Evenimentul de la 10:48 va modifica WaterMark Boundry = 10:48 - 30 minutes = 10:18. In acest moment prima fereastra de la 10:00 - 10:15 va fi stearsa din cache, deci nu va mai putea fi modificata, astfel ca aceasta va aparea in consola, devenind imutabila.

Batch: 1

| start               | end                 | word      | count |
| ------------------- | ------------------- | --------- | ----- |
| 2024-04-06 10:00:00 | 2024-04-06 10:15:00 | Streaming | 1     |


In [50]:
waterMarkAppend_query.stop()

### Session Window


In [51]:
# Session Window

import pyspark.sql.functions as F

session_window = (
    F.session_window(F.col('timestamp'),
                     F.when(F.col('word') == "Spark", "20 minutes").otherwise("10 minutes"))
)

read_df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    .withColumn("parsed_value", F.from_json("value", word_schema))
    .select('parsed_value.*')
    .filter(F.col('word').isNotNull())
    .groupBy('word', session_window)
    .agg(F.count('word').alias('count'))
)
read_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- session_window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [52]:
final_df = read_df.select('session_window.*', 'word', 'count')

final_df.printSchema()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [54]:
sessionWindow_query = (
    final_df.writeStream
    .format("console")
    .outputMode("complete")
    .start()
)

sessionWindow_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca in socket vom pune ca input:

```bash
{ "timestamp" : "2024-04-06 10:05:00","word":"Streaming" }
{ "timestamp" : "2024-04-06 10:14:00","word":"Streaming" }
{ "timestamp" : "2024-04-06 10:25:00","word":"Spark" }
{ "timestamp" : "2024-04-06 10:44:00","word":"Streaming" }
{ "timestamp" : "2024-04-06 10:44:00","word":"Streaming" }
{ "timestamp" : "2024-04-06 10:44:00", "word":"Spark" }
```

Avand modul the afiasare `complete` vom avea outputurile:

```bash
Batch: 0
-------------------------------------------
+-----+---+----+-----+
|start|end|word|count|
+-----+---+----+-----+
+-----+---+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:15:00|Streaming|    1|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:24:00|Streaming|    2|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:24:00|Streaming|    2|
|2024-04-06 10:25:00|2024-04-06 10:45:00|    Spark|    1|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:24:00|Streaming|    2|
|2024-04-06 10:44:00|2024-04-06 10:54:00|Streaming|    1|
|2024-04-06 10:25:00|2024-04-06 10:45:00|    Spark|    1|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:24:00|Streaming|    2|
|2024-04-06 10:44:00|2024-04-06 10:54:00|Streaming|    2|
|2024-04-06 10:25:00|2024-04-06 10:45:00|    Spark|    1|
+-------------------+-------------------+---------+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------------------+-------------------+---------+-----+
|              start|                end|     word|count|
+-------------------+-------------------+---------+-----+
|2024-04-06 10:05:00|2024-04-06 10:24:00|Streaming|    2|
|2024-04-06 10:44:00|2024-04-06 10:54:00|Streaming|    2|
|2024-04-06 10:25:00|2024-04-06 11:04:00|    Spark|    2|
+-------------------+-------------------+---------+-----+
```

Astfel, observam ca pt cuvantul `Spark` avem un window de 20 de minute, iar pentru cuvantul `Streaming` (orice cuvant diferit de Spark) avem un window de 10 minute.


In [55]:
sessionWindow_query.stop()

### Writing To Files


Scrierea streamului in fisiere este foarte asemanatoare cu cea a unui DataFrame static. Metoda `writeStream` este asemanatoarea cu cea a metodei `write` pentru DataFrame-uri statice. Diferenta este ca metoda `writeStream` returneaza un obiect `DataStreamWriter` care are metode pentru configurarea streamului. Pentru scrierea in fisiere este obligatoriu sa avem un checkpoint directory, astfel incat Spark sa poata asigura consecventa datelor si unicitatea lor chiar si in cazul unei probleme. Modul de scriere in fisiere este by default cel `append`. In continuare, vom reveni la datele device pe care le vom citi din fisere json si le vom scrie in format csv.


In [60]:
sourceArchiveDirJson = data_path+'/output/archive/devices/json'
sourceArchiveDirJson

'/home/jovyan/referat/output/archive/devices/json'

In [61]:

read_df = spark.readStream\
    .option("sourceArchiveDir", sourceArchiveDirJson)\
    .json(data_path+'/input/devices/', multiLine=True, schema=json_schema)


final_df = create_final_df(read_df)


final_df.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: integer (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: integer (nullable = true)



In [64]:
query = (
    final_df.writeStream
    .format("csv")
    .option("cleanSource", "archive")
    .option("checkpointLocation", data_path+'/checkPointDir/devices/json')
    .option("path", data_path+'/output/devices/')
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca punem fisirele `device_01.json`, `device_02.json` si `device_03.json` pe rand vom observa ca in directorul de output apar pentru fiecare fisier doua fisiere, de fapt apar 2 \* (nr partitii), dar in cazul nostru avem doar o partitie. Primul fisier este pentru metadate, iar ce-l de-al doilea este output-ul efectiv. Daca incercam sa recitim un fisier deja citit vom observa ca in output nu se schimba nimic, deoarece Spark recunoaste numele si stie ca acel fisier a fost deja citit, astfel pastrand unicitatea outputului. Configurarea fisierului se face cu ajutorul `checkpointLocation`. Este recomandat sa nu se atinga acest fisier, deoarece Spark nu mai poate garanta consitenta datelor.


In [65]:
query.stop()

### Triggers


Optiunea de trigger defineste cand vor fi procesate datele primite.

In Spark Structured Streaming sunt 5 tipruri de triggeri:

1. _Defaukt_: Procesarea este declansata de fiecare data cand un nou batch de date este disponibil.
1. _Fixed interval micro-batches_: Procesarea este declansata la intervale fixe de timp.
   - Daca micro-batch-ul anterior a fost procesat mai devreme, atunci sistemul asteapta pana cand intervalul de timp este atins.
   - Daca micro-batch-ul anterior a fost procesat mai tarziu, atunci sistemul va procesa micro-batch-ul curent imediat.
   - Daca nu sunt date noi, atunci niciun nou micro-batch nu va fi inceput.
   ```python
   trigger(processingTime='2 seconds')
   ```
1. _One-time micro-batch_ (deprecated): Procesarea este declansata o singura data, dupa care se opreste automat.
   ```python
   trigger(once=True)
   ```
1. _Available -now micro-batch_: Este la fel ca One-time micro-batch, dar procesarea datelor nu se realizeaza intr-un singur batch, ci in mai multe.
   ```python
   trigger(availableNow=True)
   ```
1. _Continuous with fixed checkpoint interval_: Procesarea este declansata la intervale fixe de timp, dar nu se bazeaza pe micro-batch-uri. In schimb, datele sunt procesate in mod continuu, fara a fi grupate in micro-batch-uri. (**Nu accepta agregari!**)
   ```python
   trigger(continuous='1 second')
   ```


In [66]:
# Exemplu pentru processing time 1 minute

streaming_df = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

)

procQuery = (
    streaming_df.writeStream
    .format("console")
    .trigger(processingTime='1 minute')
    .start()
)

procQuery.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [67]:
procQuery.stop()

Daca pornim streamul iar dupa aceea scriem in netcat. _Important_ : al doilea output trebuie scris dupa ce primul a fost afisat:

```bash
ana are mere
mihai are pere
```

Vom observa ca outputurile sunt la distanta de 1 minut, mai putin intre primul si al doilea, unde spark a 'taiat' din timp deoarece acesta nu ia in considerare timpul de 'pornire'.


## Shuffle Partition


Mai intai, un **job** este o unitate de lucru care contine o serie de **stages**. Un **stage** este o unitate de lucru care contine o serie de **taskuri**. Un **task** este o unitate de lucru care este executata pe un singur executor. Un **executor** este un proces care ruleaza pe un nod si care executa taskuri. Un **executor** poate avea mai multe **cores**. Un **core** este un thread care executa taskuri.

Daca ne uitam la unul dintre joburile asocitate unei citiri pentru un input pentru datele `words` ale queriurilor anterioare:

![Words Job](./images/sparkUI/wordsJob.png)

Vom observa 216 taskuri: acest 216 = spark.sparkContext.defaultParallelism + shufflePartitions.

- spark.sparkContext.defaultParallelism = 16, deoarece avem un master `local[*]`, iar aceasta inseamna ca-i dam lui spark voie sa foloseasca toate coreurile(virtuale) disponibile. Daca am avea `local[2]` acest numar ar fi egal cu 2. Acestea vor fi folosite pentru citirea datelor.
- shufflePartitions = 200 by default si este numarul de partitii pe care spark le va folosi pentru operatiile de shuffle. Acest numar poate fi modificat prin `spark.sql.shuffle.partitions`. Operatia de shuffle este o operatie care implica redistribuirea datelor intre partitii, de exemplu un `groupBy` sau un `join`, in cazul nostru `groupBy`.

Putem vedea aceasta distribuire intrand pe taskul respectiv si dand scroll pana la finalul paginii la sectiunea `Completed Stages`. Acolo vom vedea doua stagii:

1. Unul pentru citirea datelor
2. Unul de shuffle

![Task Stages](./images/sparkUI/taskStages.png)

Se poate vedea ca primul task **138** pune in Shuffle Write 99B, iar al doilea citeste acest Shuffle Write. Acest lucur se poate observa si din DAG-ul generat de spark, unde exista un Exchange.

![Task DAG](./images/sparkUI/dagStages.png)

Daca intram pe primul Stage vom vedea ca doar unul din cele 16 taskuri a fost folosit pentru citirea datelor, iar restul nu au facut nimic, acest lucru se datoreaza faptului ca datele vin din socket, iar Spark nu poate optimiza automat citirea.


Daca am citi 3 fisiere deodata, atunci am vedea ca 3 din cele 16 taskuri vor fi instantiate pentru citire, insa tot 200 vor fi folosite pentru shuffle.

Vom pune intr-un folder input pentru `words` cele trei fisier disponibile `words.txt`, `words_copy_01.txt`, `words_copy_02.txt` si le vom citi concomitent.


In [68]:
data_path

'/home/jovyan/referat'

In [71]:
df = (spark.readStream
      .text(data_path+'/input/words/')
      .withColumn('parsedValue', F.from_json(F.col('value'), word_schema))
      .select('parsedValue.*')
      .filter(F.col('word').isNotNull())
      .groupBy('word')
      .agg(F.count('word').alias('count'))
      )


query = (
    df.writeStream
    .format("console")
    .outputMode("complete")
    .trigger(availableNow=True)
    .start()
)

query.awaitTermination()

In [72]:
query.stop()

Jobul asociat citrii celor 3 fisiere se poate observa ca are 203 taskuri nu 216, deci spark a putut optimiza citirea.

![Files Read Job](./images/sparkUI/filesReadJob.png)

Daca intram pe acest Job, se poate vedea clar ca optimizarea a avut loc la citire, deoarce avem 3 fisere si le citim concomitent, deci Spark ne ofera 3 taskuri pentru citire.

![File Read Stages](./images/sparkUI/fileReadStages.png)

Mai departe, intrand pe stageul de citire putem vedea ca fiecare task a lucrat si nu exista taskuri care nu au facut nimc.

![Reading File Stage Tasks](./images/sparkUI/readingFileStageTasks.png)

In schimb daca intram pe stageul de shuffle putem vedea ca majoritatea taskurilor nu sunt utile, ele nefacand ceva in shuffle.

In general trebuie sa avem grija cand schimbam configurarea `spark.sql.shuffle.partitions`, deoarece putem incetini seminficativ partitionarea si chiar sa ramanem fara memorie in executori daca numarul este prea mic pentru datele transmise. Insa in cazul nostru stim ca numarul de date este mic (chiar infim pentru spark), asa ca vom incerca sa setam numarul de partitii la 8.


In [73]:
spark.conf.set("spark.sql.shuffle.partitions", "8")

In [74]:
df = (spark.readStream
      .text(data_path+'/input/words/')
      .withColumn('parsedValue', F.from_json(F.col('value'), word_schema))
      .select('parsedValue.*')
      .filter(F.col('word').isNotNull())
      .groupBy('word')
      .agg(F.count('word').alias('count'))
      )


query = (
    df.writeStream
    .format("console")
    .outputMode("complete")
    .trigger(availableNow=True)
    .start()
)

query.awaitTermination()
query.stop()

Daca rulam aceeasi citire a celor 3 fisiere putem observa o diferenta considerabile in timpul de rulare. Timpul a scazut de la **2s** la **0.1s**.
Numarul de taskuri utilizat va fi 11 = 3 citire + 8 shuffle.

![Files Read Job Shuffle](./images/sparkUI/filesReadJobShuffle.png)

Intrand in stagiul de shuffle, observam ca 3 din cele 8 taskuri sunt utilizate, ceea ce reflecta modul in care Spark optimizeaza gestionarea resurselor. Acest numar de **3** nu este aleatoriu. Spark isi adapteaza comportamentul de procesare a datelor in functie de volumul si structura acestora:

- **Partitionare Adaptiva**: Initial, datele sunt impartite in 3 partitii in faza de citire. Spark foloseste aceasta informatie pentru a ajusta numarul de taskuri active in etapa de shuffle. Prin evaluarea dimensiunii si distributiei datelor in aceste partitii, Spark determina ca 3 taskuri sunt suficiente pentru a procesa eficient datele, evitand astfel risipa de resurse computationale.

- **Reducerea Overhead-ului**: Utilizand un numar mai mic de taskuri decat numarul maxim disponibil, Spark reduce overhead-ul asociat cu gestionarea unui numar mare de taskuri si comunicatiile de retea inter-node care pot deveni costisitoare, mai ales daca datele sunt deja bine partitionate.

- **Optimizarea Performantei**: Aceasta abordare nu doar ca optimizeaza utilizarea resurselor dar si imbunatateste performanta generala a jobului de Spark. Taskurile sunt dimensionate adecvat pentru a balansa incarcarea de lucru si a minimiza timpul de asteptare intre taskuri.


In [75]:
spark.stop()

## Kafka


### Overview


**Apache Kafka** este o platforma distribuita de streaming de evenimente/mesaje care este utilizata in construirea 'pipelineurilor' de streaming pentru aplicatii. Este conceput pentru gestionarea volumelor mari de date intr-o maniera scalabila si rezistenta la erori, facandu-l ideal pentru programarea reactiva si arhitecturi de tipul event driven. La baza Kafka este un sistem de mesaje distribuite avand arhitectura _publish-subcribe_, asemanator programarii reactive. Datele/Mesajele din Kafka sunt trimise de catre _producer_ intr-un _topic_, iar apoi sunt receptionate de catre _consumerii_ abonati la topicurile respective, permitand procesarea paralela a datelor si replicarea acestora pe mai multi _brokeri_.

In cadrul Kafka nu trebuie sa ne gandim prima data la date, ci la evenimentele asociate cu acestea. Fiecare mesaj este la urma urmei un state al unui eveniment. Datele din eveniment descriu ce s-a intamplat, cand s-a itamplat si cine a fost implicat. Kafka proceseaza evenimentele din topicuri in mod cronologic, iar spre deosebire de alte techologii de tip _message que_ precum RabbitMQ, un mesaj din Kafka nu este sters odata ce este consumat, ci este sters fie dupa o perioada de timp, fie cand memoria depaseste un anumit prag.

![Kafka Cluster](https://static.javatpoint.com/tutorial/kafka/images/apache-kafka-tutorial-1.png)

**Terminologie**
Kafka este un sistem distribuit bazat pe arhitectura de tip cluster, iar comunicarea dintre clienti si servere se realizeaza prin intermediul [TCP](https://www.fortinet.com/resources/cyberglossary/tcp-ip).

1. _**Brokers**_ : se refra la un server din tierul de stocare al evenimentelor din Kafka pentru una sau mai multe surse. Un cluster din Kafka este de obicei compus din mai multi brokeri. Fiecare broker dintr-un cluster este si un server bootsrap, insemnand ca daca se realizeaza conexiunea cu un server, atunci se va realiza cu toti. In special, acest tip de server are rolul de oferi clientului metadatele despre cluster. In final, un borker este doar un mod de conectare al consumatorilor de producatori.

1. _**Topics**_ : modul de organizare al evenimentelor din Kafka. Un topic este ca o categorie. Un topic este un 'log' (jurnal) de evenimente. Un topic are urmatoarele caracteristici:

   - Este 'append only'. Cand un nou mesaj este scris, acesta este adugat la coada.
   - Mesajele din topic sunt imutabile.
   - Un consumator citeste un topic de la un anumit offset pana la capat. Primul elemnt are offestul 0, al doilea il are 1 etc.
   - Este multi-producer si multi-subscriber. Un topic poate avea zero sau mai multi produceri care scriu in el si zero sau mai multi consumeri care citesc din el.

   Spre deosebire de sistemele de cozi de mesaje evenimentele dintr-un topic pot fi citite de mai multe ori si nu sunt sterse dupa prima consumare. In schimb, ele sunt sterse fie cand au atins un threshold de timp sau cand topicul a atins o anumita dimensiune.

1. _**Producers**_ : sunt clientii care scriu mesaje in Kafka. Un producer scrie mesaje catre anumite topicuri. De asemnea, acestia au posibilitatea de a partitiona scrierea.

1. _**Consumers**_ : sunt clientii care citestc mesajele din Kafka. Acestia sunt responsabili de a-si acutaliza singuri offseturile. Un consumator poate sa treaca si la un offset din trecut, daca este nevoie, atat timp cat datele mai sunt in acel topic. Marele avantaj al lui Kafka este ca fiecare consumator este independent.

1. _**Partitions**_ : datele sau mesajele sunt impartite in parti de mici dimensiuni. Fiecare partitie va avea un offset propriu. Datele sunt mereu scrise in ordine secventiala. Nu se poate garanta in ce partitie va fi scrisa o bucata dintr-un mesaj.

1. _**Zookeper**_ : are rolul de a stoca informatii despre cluster si despre consumers. Acesta gestioneaza brokerii prin mentinarea unei liste, de asemnea Zookeper este responsabil si pentru alegerea unui lider al partitiilor. Daca apar modificari precum schimbarea brokerilor, introducerea de topicuri, Zookeper notifica nodurile clusterului Kafka. Un consumator nu interactioneaza direct cu Zookeper, ci el interactioneaza cu borkerii (bootsrap server).

![Kafka Cluster Arhitecture](./images/kafka/kafkaCluster.png)


### Local Setup


In environementul nostru local:

```yml
bd-zookeeper:
  image: confluentinc/cp-zookeeper:latest
  container_name: bd-zookeeper
  ports:
    - 2181:2181
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

bd-kafka:
  image: confluentinc/cp-kafka:latest
  container_name: bd-kafka
  depends_on:
    - bd-zookeeper
  ports:
    - 9092:9092
  volumes:
    - streaming_data:/data:rw
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: bd-zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://bd-kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
```

avem un cluster Kafka cu un singur broker.

Pentru _Zookepeer_ (bd-zookeeper) avem configurararile:

- Expunerea portului `2181` pentru conectarea brokerilor
- `ZOOKEEPER_CLIENT_PORT: 2181` : are rolul de a inregistra portul (default) catre broker pentru conexiune.
- `ZOOKEEPER_TICK_TIME: 2000` : este unitatea de timp in ms folosita pentru 'heartbeaturi' ie semnale periodice trimise intre Zookerp si borker pentru a confirma validitatea conexiunii.

Pentru _Broker_ (bd-kafka) avem configurarile:

- Expunerea portului `9092` pentru conectarea clientilor
- `KAFKA_BROKER_ID: 1` : id-ul brokerului. Fiecare broker in Kafka trebuie neparat sa aiba un id unic in cluster.
- `KAFKA_ZOOKEEPER_CONNECT: bd-zookeeper:2181` : Conectarea la Zookeepr prin portul expus de catrea acesta, anume 2181.
- `KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://bd-kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092` : specifica faptul ca modul de interactiune atat in networkul din Docker pe portul 29092 cat si in localhost din masina pe portul 9092 sa fie facut prin PLAINTEXT.
- `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT` : specifica faptul ca intre clienti si borkeri comunicarile nu vor fi criptate ie PLAINTEXT ramane nemodificat.
- `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1` : in Kafka, ca in multe sisteme distribuite precum Hadoop, datele pot fi replicate pe mai multe noduri. In cazul nostru, avem un singur broker daca am seta factorul de replicare mai mare decat 1, am replica pe aceeasi masina, ceea ce combate scopul replicarii, asadar am setat acest numar la 1.


### Hellow World Kafka


Pentru a lucra cu primul nostru topic, mai intai sa pornim clusterul, iar apoi sa ne conectam la broker prin:

```bash
docker exec -it bd-kafka bash
```

Mai intai sa vedem daca exista topicuri in cluster:

```bash
kafka-topics --list --bootstrap-server localhost:29092
```

Comanda listeaza toate topicurile disponibile pe cluster, folosim localhost:29092, deoarece suntem conectati la broker, iar acesta expune in mediul Docker portul 29092.
Ca output vom avea:

```bash
__consumer_offsets
```

Acesta este un topic generat de Kafka folosit pentru a memora informatii despre offseturile clientilor cu privire la fiecare topic folosit de catre acestia si respectiv fiecare partitie a topicurilor folosite. Este _indicat_ sa nu se modifice acest topic.

Pentru a ne crea propriul nostru topic vom folosi comanda:

```bash
kafka-topics --create --topic big-data-topic --bootstrap-server localhost:29092
```

daca totul a mers ok vom primi

```bash
Created topic big-data-topic.
```

Asadar, am creat un topic numit _big-data-topic_ in cadrul clusterului nostru.

Pentru a vedea informatiile despre acest topic vom scrie:

```bash
kafka-topics --describe --topic big-data-topic --bootstrap-server localhost:29092
```

Outputul va fi:

```bash
Topic: big-data-topic   TopicId: Jz0qfweDT4KfzFuI17-Jaw PartitionCount: 1       ReplicationFactor: 1    Configs:
      Topic: big-data-topic   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
```

Cum nu am specificat numarul de partitii si numarul de replici, topicul nostru are 1 partie si 1 replica. (In cazul nostru este inutila specificarea acestora, avand doar un broker). Leader este borkerul 'principal' al partitiei, fiind responsabil de scrierea si citirea datelor din acea partitie. Daca Leaderul nu este valabil intr-un cluster, unul din ceilalti brokeri followeri ai partitiei vor fi desemnati lideri in mod automat. ISR(In Sync Replicas) este numarul de replici care respecta ultima instanta a mesajelor din topic.

Daca incercam sa crem un topic cu numarul de replici mai mare decat numarul de brokeri, vom primi o eroare deoarece Kafka nu permita existenta a doua replici identice pe acelasi broker. Asadar, in cazul nostru, replication factor trebuie sa fie neparat 1, insa putem partitiona datele in cadrul brokerului:

```bash
kafka-topics --create --topic big-data-topic-2 --partitions 3 --replication-factor 1 --bootstrap-server localhost:29092
```

Comanda va crea un topic numit _big-data-topic-2_ cu 3 partitii si cu replication factor dat explicit ca 1 (chiar daca este defaultul).
Descrierea noului topic:

```bash
kafka-topics --describe --topic big-data-topic-2 --bootstrap-server localhost:29092
```

```bash
Topic: big-data-topic-2 TopicId: Lr_FoUwaT9qE-434Lr05Ig PartitionCount: 3       ReplicationFactor: 1    Configs:
        Topic: big-data-topic-2 Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: big-data-topic-2 Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: big-data-topic-2 Partition: 2    Leader: 1       Replicas: 1     Isr: 1
```

Dupa cum am mai mentionat si anterior, toate replicile vor avea acelasi lider avand doar un broker. In cazul mai multor broker se va incerca o distribuire a acestora.
Pentru a vedea fiecare offset ('lungimea') al partitilor create vom folosi:

```bash
kafka-get-offsets --topic big-data-topic-2 --bootstrap-server localhost:29092
```

Ca output vom avea:

```bash
big-data-topic-2:0:0
big-data-topic-2:1:0
big-data-topic-2:2:0
```

Outputul are forma <topic name>:<replica number>:<offset number>. Cum numele topicului nostru este big-data-topic-2 si avem trei replici, 0, 1 si 2, iar nu avem niciun mesaj in topic, constatam ca outputul reflecta realitatea.
Pentru a publica mesaje in topic vom deschide un nou terminal in cadrul `bd-kafka` si vom scrie:

```bash
kafka-console-producer --topic big-data-topic-2 --bootstrap-server localhost:29092
```

Vom crea un producer. Interactiunea cu acesta este asemanatoare cu cea a unui socket Netcat. Daca introducem mesajele:

```bash
big data
typescript should be used
```

si apoi afisam din nou offseturile, folosind celalalt terminal, vom obtine:

```bash
big-data-topic-2:0:0
big-data-topic-2:1:0
big-data-topic-2:2:2
```

Putem vedea ca mesajele au fost introduse in cadrul unei partitii, anume a treia, aceasta nu este o regula. (In cazul nostru nu este o intamplare deloc, deoarece avem acelasi lider al replicilor, deci ele sunt redundante, asadar Kafka insereaza doar in una din replici pentru sesiunea consumerului.)
Pentru a crea un consumer, din nou vom crea inca un terminal conectat la containerul de kafka si vom scrie:

```bash
kafka-console-consumer --topic big-data-topic-2 --bootstrap-server localhost:29092 --partition 2 --offset earliest
```

In cadrul comenzii am specificat partitia dorita, in cazul nostru a treia, iar offsetul ca earliest (acesta poate fi de exemplu si 1 ie de la al doilea mesaj inclusiv), adica de la inceput. Daca rulam vom vedea cele doua mesaje:

```bash
big data
typescript should be used
```

Daca dorim sa accesam mesajele noi din topic indiferent de partitie, vom rula:

```bash
kafka-console-consumer --topic big-data-topic-2 --bootstrap-server localhost:29092
```

La inceput nu vom vedea mesajele vechi intrucat nespecificand partitia nu avem un singur offset, ci avem trei diferite pentru partitii, asadar vom vedea doar mesajele nou introduse ie cele introduse dupa abonarea la topic a consumerului.
Acum, daca in terminalul producer vom introduce mesaje acestea vor aparea 'live' in terminalul de consumer.

Daca mai introducem de exemplu inca doua mesaje si reafisam offseturile, vom obtine:

```bash
big-data-topic-2:0:0
big-data-topic-2:1:2
big-data-topic-2:2:2
```

Vom inchide terminalele de producer si consumer si apoi vom sterge topicul folosind:

```bash
kafka-topics --delete --topic big-data-topic-2 --bootstrap-server localhost:29092
```

Daca relistam topicurile vom vedea ca _big-data-topic-2_ a disparut.


### Kafka With Spark


#### Reading From Kafka


Pentru a conecta o aplicatie Pyspark la Kafka trebuie sa folosim un jar care contine dependentele necesare pentru a realiza aceasta conexiune. In cazul nostru vom folosi jarul `org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0` care poate fi gasit in directorul jars. Acesta contine toate dependentele necesare pentru a realiza conexiunea intre Kafka si Spark. Avem:

- `org.apache.spark` : este grupul care a creat jarul
- `spark-sql-kafka-0-10_2.12` : este numele artefactului
- `3.5.0` : este versiunea jarului si a Spark-ului


In [1]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("Spark with Kafka")
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
    .config("spark.sql.shuffle.partitions", 16)
    .master("local[*]")
    .getOrCreate()
)

Pentru a citi datele din Kafka vom folosi metoda `readStream` a obiectului `spark` si metoda `format` pentru a specifica formatul de citire, in cazul nostru `kafka`. Pentru a specifica serverele de bootstrapping vom folosi metoda `option` si cheia `kafka.bootstrap.servers`. Pentru a specifica topicul vom folosi metoda `option` si cheia `subscribe`. In cazul nostru vom citi din topicul dorit.


In [2]:
# adresa brokerului in networkul din Docker
KAFKA_BOOTSTRAP_SERVER = 'bd-kafka:29092'
TOPIC = 'bd-topic'

In [3]:
df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)  # brokerul
    .option("subscribe", TOPIC)  # topicul
    .option("startingOffsets", "earliest")  # incepem de la primul mesaj
    .load()
)

df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Un stream citit din Kafka este un DataFrame cu urmatoarele coloane:

| Column            | Type      | Description            |
| ----------------- | --------- | ---------------------- |
| key               | binary    | Cheia mesajului        |
| value             | binary    | Valoarea mesajului     |
| topic             | string    | Numele topicului       |
| partition         | int       | Numarul partitiei      |
| offset            | long      | Offset-ul mesajului    |
| timestamp         | timestamp | Timestamp-ul mesajului |
| timestampType     | int       | Tipul timestamp-ului   |
| headers(optional) | array     | Headers ale mesajului  |

Mesajul efectiv se afla in coloana `value`.

Mai intai vom trimite cuvinte in Kafka folosind un producer din terminalul containerului de Kafka, iar apoi doar vom numara cuvintele din mesaje.


In [4]:
count_df = (
    df.selectExpr("CAST(value AS STRING) as word")
    .groupBy("word")
    .count()
)
count_df.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



Inainte de a porni streamul trebuie sa crem topicul `bd-topic` in Kafka, iar apoi trebuie sa ne conectam la acesta folosind un producer:

```bash
kafka-topics --create --topic bd-topic --bootstrap-server localhost:9092
```

Dupa care:

```bash
kafka-console-producer --topic bd-topic --bootstrap-server localhost:9092
```


In [7]:
query = (
    count_df.writeStream
    .format("console")
    .outputMode("complete")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [8]:
query.stop()

Vom vedea ca query-ul se comporta ca si cum am scrie in socket. Pe Spark nu-l intereseaza de unde vin datele si acesta este un avantaj al lui Spark Streaming.

Daca repornim queryul vedem ca reapar mesajele din topic, acest fapt se datoreaza faptului ca Kafka nu sterge mesajele dupa ce acestea sunt consumate, ci le sterge in mod implicit in 7 zile daca nu am specificat nimic.


In [9]:
query = (
    count_df.writeStream
    .format("console")
    .outputMode("complete")
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [10]:
query.stop()

Dupa cum am specificat si in introducere **Consumerul este responsabil de mesajele pe care le doreste**, insa folosind Spark putem utiliza checkpointing pentru a pastra offseturile si pentru a nu pierde datele in cazul unui esec.


In [11]:
import os
data_path = os.path.join(os.getcwd(), "referat")
data_path

'/home/jovyan/referat'

In [12]:
kafka_bd_topic_dir = os.path.join(data_path, "checkPointDir", "kafka", "words")
kafka_bd_topic_dir

'/home/jovyan/referat/checkPointDir/kafka/words'

La prima apelare cu checkpoint dir vom vedea in continuare mesajele, insa daca oprim queryul si il repornim vom vedea ca nu mai apar mesajele, deoarece Spark a pastrat offseturile si a citit doar mesajele noi.


In [15]:
query_check = (
    count_df.writeStream
    .format("console")
    .outputMode("complete")
    .option("checkpointLocation", kafka_bd_topic_dir)
    .start()
)

query_check.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:
query_check.stop()

Repornind queryul vom vedea ca nu mai apar mesajele vechi, ci doar cele noi.


In [17]:
query_check = (
    count_df.writeStream
    .format("console")
    .outputMode("complete")
    .option("checkpointLocation", kafka_bd_topic_dir)
    .start()
)

query_check.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [18]:
query_check.stop()

Sa crem un nou topic pentru a explora si celelalte atribute ale datelor din Kafka.
Vom crea un nou topic `bd-topic-2` si doar vom afisa mesajele din el pentru inceput.

**OBS** Pentru a folosi outputmode-ul `complete` cu Kafka trebuie sa avem o agregare intrucat se poate ajunge foare usor la un volum mare de date, iar Spark va ramane fara memorie.


In [19]:
TOPIC_2 = 'bd-topic-2'

In [20]:
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("subscribe", TOPIC_2)
    .option("startingOffsets", "earliest")
    .load()
    .writeStream
    .format("console")
    .outputMode("append")
    .start()
)
query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca in producer introducem o valoare, de exemplu `big data`, vom optiune un output:

```bash
Batch: 1
-------------------------------------------
+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|NULL|[62 69 67 20 64 6...|bd-topic-2|        0|     0|2024-04-12 13:21:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+
```

- `key` : este null deoarece am folosit console producer si nu am specificat vreo cheie anume
- `value` : este mesajul in sine, in format binar
- `topic` : numele topicului
- `partition` : numarul partitiei, folosind configurarile default, automat avem doar o partitie
- `offset` : offsetul mesajului
- `timestamp` : timestampul mesajului
- `timestampType`: Acesta poate fi 0 sau 1, 0 reprezinta un timestamp de tip _CreateTime_ adica momentul in care a fost adugat in borker, fiind cel default, iar 1 reprezinta un timestamp de tip _LogAppendTime_ adica producerul a specificat un timestamp.


In [21]:
query.stop()

Dupa cum am mentionat anterior, mesajul nu trebuie sa fie un simplu string, el poate fi orice standard. In cele ce urmeaza vom crea un script pentru a trimite cuvinte in Kafka in format JSON si apoi vom citi aceste cuvinte.
Pentru aceasta vom folosi un producer in Python care va trimite cuvinte in format JSON in Kafka folosind libraria [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python).
Scripturile aferente se pot gasi in Notebookul `KafkaProducers.ipynb`.

Pentru a instala libraria folosim in containerul de jupyter:

```bash
conda install conda-forge::python-confluent-kafka
```

Vom trimite mesaje cu key si headers pentru a ilustra mai bine atributele puse la dispozitie de Kafka.

Mai intai vom crea un topic numit `bd-words` **acest topic** trebuie se fie cel din producerul de Python.


Avand headers trebuie sa le parsam, deoarece ele sunt in format binar. Pentru a face acest lucru vom crea un UDF care va face aceasta conversie. Headersul va fi un array de structuri, fiecare structura avand doua campuri: `key` de tipul string si `value` de tipul binary.


In [22]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

header_schema = T.ArrayType(T.StructType([
    T.StructField("key", T.StringType(), True),
    T.StructField("value", T.StringType(), True)
]))


@F.udf(returnType=header_schema)
def parse_headers(headers):
    if headers is None:
        return []
    return [{'key': header.key, 'value': header.value.decode('utf-8')} for header in headers]

In [23]:
KAFKA_BOOTSTRAP_SERVER = 'bd-kafka:29092'
TOPIC_WORDS = 'bd-words'
sourceArchiveDir = os.path.join(
    data_path, "checkPointDir", "kafka", "wordsJson")

In [24]:
df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("subscribe", TOPIC_WORDS)
    .option("startingOffsets", "earliest")
    .option("includeHeaders", True)
    .load()
)

df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)



In [25]:
word_schema = T.StructType([
    T.StructField("word", T.StringType(), True),
    T.StructField("createdAt", T.TimestampType(), True)
])

In [26]:
parsed_df = (
    df.selectExpr("CAST(value AS STRING) as stringVal",
                  "CAST(key AS STRING) as keyKafka", "headers")
    .withColumn("parsedValue", F.from_json(F.col("stringVal"), word_schema))
    .withColumn("parsedHeaders", parse_headers(F.col("headers")))
    .select("keyKafka", "parsedValue", F.explode("parsedHeaders").alias("header"))
    .select("keyKafka", "parsedValue.*", "header.*")
)
parsed_df.printSchema()

root
 |-- keyKafka: string (nullable = true)
 |-- word: string (nullable = true)
 |-- createdAt: timestamp (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [27]:
query = (
    parsed_df.writeStream
    .format("console")
    .outputMode("append")
    .option("checkpointLocation", sourceArchiveDir)
    .option("truncate", False)
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

**PENTRU A VEDEA REZULATE NU UITATI SA PORNITI PRODUCERUL**

Pentru un mesaj vom avea un output de forma:

```bash
+--------------------------------+------+-------------------+------------+----------------+
|keyKafka                        |word  |createdAt          |key         |value           |
+--------------------------------+------+-------------------+------------+----------------+
|89dbf672029e4675abf9a378232e5044|radish|2024-04-06 10:05:00|content-type|application/json|
+--------------------------------+------+-------------------+------------+----------------+
```

- `keyKafka` : cheia mesajului pe care am trimis-o din producer, un uuid
- `word` : cuvantul trimis in mesaj
- `createdAt` : timestamp-ul mesajului trimis din producer
- `key` : cheia headerului trimis din producer, in cazul nostru _content-type_
- `value` : valoarea headerului trimis din producer, in cazul nostru _application/json_


In [28]:
query.stop()

#### Writing To Kafka


Pana acum doar am folosit Kafka ca si consumer, insa o aplicatie Spark poate sa scrie in Kafka la fel de usor. Pentru a face acest lucru vom folosi metoda `writeStream` a obiectului `DataFrame` si metoda `format` pentru a specifica formatul de scriere, in cazul nostru `kafka`. Pentru a specifica serverele de bootstrapping vom utiliza metoda `option` si cheia `kafka.bootstrap.servers`. Pentru a specifica topicul vom folosi metoda `option` si cheia `topic`. Nu vom specifica `outputMode` deoarece Kafka nu suporta `update` si `complete`, ci doar `append`. Este obligatoriu sa specificam `checkpointLocation` pentru a pastra offseturile si pentru a nu pierde datele in cazul unui esec.

**OBS** Pana acum am creat topicurile folosind comanda `kafka-topics`, insa putem crea topicuri si direct din Spark, insa acest lucru nu este recomandat in productie, deoarece nu putem specifica toate configurarile necesare pentru un topic. Asadar, pentru a ne invata cu best practices vom crea un topic utilizand kafka-topics in terminal.

Vom folosi codul anteriror pentru citire din Kafka si vom scrie rezultatul intr-un nou topic numit `bd-words-output`. Pentru a vedea in mod live rezultatele vom lansa un consumer in terminal.

Pentru ca in consumer sa vedem datele efective, acestea trebuie sa fie in coloana `value`, asadar vom crea un nou DataFrame care va avea aceasta coloana.

**NU UITATI SA PORNITI PRODUCERUL SI DUPA CREAREA TOPICULUI SA PORNITI CONSUMERUL**


In [29]:
OUTPUT_TOPIC = 'bd-words-output'
sourceArchiveDirOutput = os.path.join(
    data_path, "checkPointDir", "kafka", "wordsJsonOutput")

In [30]:
@F.udf(returnType=T.StringType())
def create_json(keyKafka, word, createdAt):
    return f'{{"keyKafka": "{keyKafka}", "word": "{word}", "createdAt": "{createdAt}"}}'

In [31]:
output_df = (
    parsed_df
    .withColumn("value", create_json(F.col("keyKafka"), F.col("word"), F.col("createdAt")))
    .select("value")
)
output_df.printSchema()

root
 |-- value: string (nullable = true)



In [32]:
query = (
    output_df
    .withColumnRenamed("key", "keyHeader")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("topic", OUTPUT_TOPIC)
    .option("checkpointLocation", sourceArchiveDirOutput)
    .start()
)

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In consumerul din terminal vom vedea mesajele sub forma:

```bash
{"keyKafka": "be76d1338be44ef995ddc1c403e37684", "word": "olive", "createdAt": "2024-04-06 10:48:00"}
{"keyKafka": "63aa2397c6264556869978da90ee6f59", "word": "eggplant", "createdAt": "2024-04-06 10:14:00"}
{"keyKafka": "5b03b2d54edd40dbb9dc39d832eb5500", "word": "shallot", "createdAt": "2024-04-06 10:14:00"}
{"keyKafka": "9fc1667cde514d5b887f9e793fed6c20", "word": "banana", "createdAt": "2024-04-06 10:16:00"}
{"keyKafka": "38abe98e727a42faa3b20e66d1ae4736", "word": "mulberry", "createdAt": "2024-04-06 10:40:00"}
{"keyKafka": "1ba61be15e134133bb96f460f5611ea0", "word": "cherry", "createdAt": "2024-04-06 10:48:00"}
```


In [33]:
query.stop()

In [34]:
spark.stop()

## Writing To Multiple Sinks


### ForeachBatch


Operatia `foreachBatch` ne permite sa utilizam logica customizata la scrierea datelor pentru fiecare _micro-batch_. Aceasta operatie ne permite sa executam o functie pentru fiecare _micro-batch_ in parte si primeste doi paremetri: batchul in sine si id-ul batchului/epocii. Este utila in special pentru a scrie in surse care nu au implementatat in mod nativ scrierea pentru streaming. In cazul nostru vom folosi aceasta operatie pentru a scrie in Kafka si intr-o baza de date SQL, anume Postgres. Postgres nu dispune in mod nativ de un format de streaming. (Daca suntem familiari cu conceptul de R2DBC din Spring vom putea zice ca scriem datele in mod _reactiv_ .)


### Setup The Environment


Pentru a ne putea conecta la Postgres ne trebuie un jar specific. In cazul nostru vom folosi `org.postgresql:postgresql:42.7.3` care poate fi gasit in directorul jars. Acesta contine toate dependentele necesare pentru a realiza conexiunea intre Spark si Postgres. Unde:

- `org.postgresql` : este grupul care a creat jarul
- `postgresql` : este numele artefactului
- `42.7.3` : este versiunea jarului si a Postgres-ului

Vom pune acest jar in volumul containerului de jupyter la calea `/home/jovyan/.ivy2/jars`. **ATENTIE IN ACEL FOLDER SUNT SI ALTE JARURI FOLOSITE DE SPARK, ASADAR NU MODIFICATI ALTCEVA DECAT SA ADAUGATI JARUL NOU.**

Dupa ce adaugam jarul, va trebui sa recream sesiunesa Spark. Daca dupa recreare intampinati probleme, reproniti containerul si dupa creati noua sesiune.


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Writing To Multiple Sources")
    # kafka
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
    # jarul nou introdus, atentie la calea data
    .config('spark.jars', '/home/jovyan/.ivy2/jars/postgresql-42.7.3.jar')
    .config("spark.sql.shuffle.partitions", 16)
    .master("local[*]")
    .getOrCreate()
)

In [2]:
# recrearea codului anterior pentru cuvintele din kafka

import pyspark.sql.functions as F
import pyspark.sql.types as T
import os

data_path = os.path.join(os.getcwd(), "referat")

KAFKA_BOOTSTRAP_SERVER = 'bd-kafka:29092'
TOPIC_WORDS = 'bd-words'
OUTPUT_TOPIC = 'bd-words-output'
sourceArchiveDirOutput = os.path.join(
    data_path, "checkPointDir", "kafka", "wordsJsonOutput")

word_schema = T.StructType([
    T.StructField("word", T.StringType(), True),
    T.StructField("createdAt", T.TimestampType(), True)
])

header_schema = T.ArrayType(T.StructType([
    T.StructField("key", T.StringType(), True),
    T.StructField("value", T.StringType(), True)
]))


@F.udf(returnType=header_schema)
def parse_headers(headers):
    if headers is None:
        return []
    return [{'key': header.key, 'value': header.value.decode('utf-8')} for header in headers]


@F.udf(returnType=T.StringType())
def create_json(keyKafka, word, createdAt):
    return f'{{"keyKafka": "{keyKafka}", "word": "{word}", "createdAt": "{createdAt}"}}'


parsed_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("subscribe", TOPIC_WORDS)
    .option("startingOffsets", "earliest")
    .option("includeHeaders", True)
    .load()
    .selectExpr("CAST(value AS STRING) as stringVal",
                "CAST(key AS STRING) as keyKafka", "headers")
    .withColumn("parsedValue", F.from_json(F.col("stringVal"), word_schema))
    .withColumn("parsedHeaders", parse_headers(F.col("headers")))
    .select("keyKafka", "parsedValue", F.explode("parsedHeaders").alias("header"))
    .select("keyKafka", "parsedValue.*", "header.*")
)

parsed_df.printSchema()

root
 |-- keyKafka: string (nullable = true)
 |-- word: string (nullable = true)
 |-- createdAt: timestamp (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



Inainte de a continua cu codul din Pyspark sa cream o baza de date si un tabel in aceasta pentru a salva informatia. Structura tabelului va fi:

| Column    | Type         |
| --------- | ------------ |
| keyKafka  | VARCHAR(255) |
| word      | Text         |
| createdAt | TIMESTAMP    |
| keyHeader | VARCHAR(255) |
| value     | Text         |

Pentru a deschide **PgAdmin** vom merge la adresa `localhost:5050` si ne vom loga cu userul specificat in container, anume:

- `email` : admin@admin.com
- `password` : admin

Vom adauga un nou server cu urmatoarele configurari:

- `General` :
  - `Name` : referat
- `Connection` : 
  - `Host name/address` : bd-postgres 
  - `Port` : 5432 - `Maintenance database` : postgres 
  - `Username` : myuser 
  - `Password` : mypassword
  Restul setarilor le lasam default.

Dupa ce ne conectam la server, daca apasam pe `referat` vom vedea `databases`. Click dreapta pe `databases` -> `Create` -> `Database` si vom crea o baza de date cu numele `mydb`. Dupa ce am creat baza de date, click dreapta pe `mydb` -> `Query Tool` si vom crea tabelul cu urmatorul cod SQL:

```sql
CREATE TABLE word_table (
    keyKafka VARCHAR(255),
    word TEXT,
    createdAt TIMESTAMP,
    keyHeader VARCHAR(255),
    value TEXT
);
```

In acest moment avem setata baza de date pentru a scrie.

In continuare vom crea functia pentru _forEachBatch_ pentru a scrie atat in Kafka cat si in Postgres.


In [3]:
def write_to_kafka_and_postgres(batch_df, batch_id):
    print(f"Writing batch {batch_id}")
    batch_df.persist()
    try:
        # batch_df.show() # putem apela show
        batch_df.withColumn("value", create_json(F.col("keyKafka"), F.col("word"), F.col("createdAt"))) \
            .select("value") \
            .write.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
            .option("topic", OUTPUT_TOPIC) \
            .save()
        print("Datele introduse in Kafka.")

        (batch_df.withColumnRenamed("key", "keyHeader")
            .write.format("jdbc")
            .option("url", "jdbc:postgresql://bd-postgres:5432/mydb")
            .option("dbtable", "word_table")
            .option("user", "myuser")
            .option("password", "mypassword")
            .option('driver', 'org.postgresql.Driver')
            .mode("append")
            .save())
        print("Datele introduse in PostgreSQL.")

    except Exception as e:
        print(f"Error: {e}")
    finally:
        batch_df.unpersist()

De observat este ca nu am folosit `writeStream` ci `write`deoarece functia `forEachBatch` va apela callback-ul pentru fiecare _micro-batch_ in parte si nu pentru intregul stream.
Am folosit `batch_df.persist()` pentru a pastra datele in memorie si a le folosi in ambele operatii, iar apoi am folosit `batch_df.unpersist()` pentru a elibera memoria.

Pentru a scrie in Postgres am folosit `write.format("jdbc")` si am specificat urmatoarele:

- `url` : adresa bazei de date, atentie `mydb` este baza de date pe care am creat-o anterior
- `dbtable` : numele tabelului in care vom scrie
- `user` : userul pentru baza de date
- `password` : parola pentru baza de date
- `driver` : driverul pentru Postgres
- `mode` : modul in care vom scrie, in cazul nostru `append` pentru a adauga datele la cele existente

Pentru a introduce mesaje in topicul de citire din Kafka vom rula producerul pentru cuvinte. Pentru a vedea rezultatele in Kafka vom folosi un console consumer, iar pentru a vedea rezultatele in Postgres vom folosi PgAdmin.


In [4]:
query = parsed_df.writeStream \
    .foreachBatch(write_to_kafka_and_postgres) \
    .option("checkpointLocation", sourceArchiveDirOutput) \
    .start()

query.awaitTermination()

Writing batch 77
Datele introduse in Kafka.
Datele introduse in PostgreSQL.
Writing batch 78
Datele introduse in Kafka.
Datele introduse in PostgreSQL.
Writing batch 79
Datele introduse in Kafka.
Datele introduse in PostgreSQL.
Writing batch 80
Datele introduse in Kafka.
Datele introduse in PostgreSQL.
Writing batch 81
Datele introduse in Kafka.
Datele introduse in PostgreSQL.
Writing batch 82
Datele introduse in Kafka.
Datele introduse in PostgreSQL.


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Daca suntem conectati la topicul de output, _bd-words-output_, vom vedea mesajele cum apar in timp real, iar daca facem query pe tabelul `word_table` din PgAdmin vom vedea cum datele au fost introduse in baza de date.

De exemplu, streamul a parcurs 5 date. In Kafka vom avea:

![Kafka BD Sink](./images/kafka/kafkaBdSink.png)

Iar in Postgres , ruland

```sql
SELECT * FROM word_table;
```

vom avea:

![Postgres Output Words](./images/postgres/postgresOutputWords.png)


In [5]:
query.stop()

## Bibliografie

- Partea introductiva
  - https://subhamkharwal.medium.com/pyspark-structured-streaming-read-from-files-c46fa0ce8888
  - https://www.projectpro.io/recipes/perform-window-operations-during-spark-structured-streaming
  - https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- Partea de shuffle
  - https://spark.apache.org/docs/latest/sql-performance-tuning.html
- Partea de Kafka
  - https://subhamkharwal.medium.com/pyspark-structured-streaming-read-from-kafka-64c40767155f
  - https://www.javatpoint.com/apache-kafka
  - https://docs.confluent.io/kafka/introduction.html
