<h1><center>Spark Streaming Read from Kafka | Real time streaming from Kafka</center></h1>
<hr><hr><hr>

- Spark Streaming connects to a kafka broker and subscribes to a topic, in order to receive data from that topic.
- By default, the number of partitions a Kafka topic has will be the number of partitions spark will make for the kafka topic data being read, and thus number of tasks for processing each microBatch will be also the same as number of partitions of the topic in Kafka.

#### NOTE:
------------
- Thus, **to tune Kafka streaming jobs that runs slowly, we need to partition the topic in Kafka to appropriate number of partitions, such that spark can acheive parallelism while reading data from kafka.**

- Required jar files for executing kafka related codes of this notebook, for this specific spark version `3.3.2`: \
    - `spark-sql-kafka-0-10_2.12:3.3.2`
    - `kafka-clients-3.7.1`
    - `spark-streaming-kafka-0-10-assembly_2.12-3.3.2`
    - `spark-token-provider-kafka-0-10_2.12-3.3.2`

In [11]:
import findspark
findspark.init()

In [12]:
import ipynbname
import os

notebook_name = ipynbname.name()
print(notebook_name)

005-kafka_streaming_data_read


In [13]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local")
    .appName( notebook_name )
    .config("spark.streaming.stopGracefulllyOnShutdown", True)
    .config("spark.sql.shuffle.partitions", 8)
    .getOrCreate()
)

spark

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [15]:
KAFKA_SERVER = "localhost:9092"
KAFKA_TOPIC = "device_data2"

## Batch Code for Kafka data processing:
--------------------------------------------

In [9]:
kafka_df = (
    spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_SERVER)
    .option("subscribe", KAFKA_TOPIC)
    .option("startingOffsets", "earliest")
    .load()
)

In [10]:
kafka_df.show()

+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|                 key|               value|       topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|[65 37 32 36 37 3...|[7B 22 65 76 65 6...|device_data2|        0|     0|2024-07-23 13:07:...|            0|
|[39 66 34 65 65 3...|[7B 22 65 76 65 6...|device_data2|        0|     1|2024-07-23 13:07:...|            0|
|[66 33 31 31 30 3...|[7B 22 65 76 65 6...|device_data2|        0|     2|2024-07-23 13:07:...|            0|
|[65 31 62 37 34 3...|[7B 22 65 76 65 6...|device_data2|        0|     3|2024-07-23 13:07:...|            0|
|[61 35 64 66 31 6...|[7B 22 65 76 65 6...|device_data2|        0|     4|2024-07-23 13:07:...|            0|
|[34 65 37 36 35 3...|[7B 22 65 76 65 6...|device_data2|        0|     5|2024-07-23 13:07:...|            0|
|[38 37 36 31 30 3.

In [11]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [12]:
kafka_df.select( F.col("value").cast("string") ).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                                     |
+---------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
# Taking one value as string to get the schema:
json_string = """{"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}"""

In [15]:
(
    kafka_df
    .withColumn( "schema", F.schema_of_json( json_string ) )
    .select("schema")
    .show(1, truncate=False)
)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|schema                                                                                                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|STRUCT<customerId: STRING, data: STRUCT<devices: ARRAY<STRUCT<deviceId: STRING, measure: STRING, status: STRING, temperature: BIGINT>>>, eventId: STRING, eventOffset: BIGINT, eventPublisher: STRING, eventTime: STRING>|
+-----------------------------------------------------------------------------------------------------------------------

In [16]:
json_schema = "STRUCT<customerId: STRING, data: STRUCT<devices: ARRAY<STRUCT<deviceId: STRING, measure: STRING, status: STRING, temperature: BIGINT>>>, eventId: STRING, eventOffset: BIGINT, eventPublisher: STRING, eventTime: STRING>"

In [17]:
parsed_json_df = (
    kafka_df
    .withColumn( "value", F.col("value").cast("string") )
    .withColumn( "json_value", F.from_json( F.col("value"), json_schema ) )
    # .drop("value")
    .select( "json_value.*" )
)

In [18]:
parsed_json_df.show(truncate=False)

+----------+--------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|customerId|data                                                                      |eventId                             |eventOffset|eventPublisher|eventTime                 |
+----------+--------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|CI00102   |{[{null, C, ERROR, 14}, {D002, C, STANDBY, 15}, {D001, C, STANDBY, 10}]}  |d1c50fc5-d6ef-42ee-a94b-c68f0c1b22c3|10000      |device        |2024-07-23 13:07:40.751972|
|CI00105   |{[{D003, C, SUCCESS, 9}, {D002, C, SUCCESS, 12}]}                         |728a1974-43a7-47c6-a67e-76e7d578a798|10001      |device        |2024-07-23 13:07:42.907318|
|CI00101   |{[{null, C, STANDBY, 7}, {D004, C, null, 15}]}                            |902b9b46-b665-4f5f

In [19]:
parsed_json_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [20]:
flat_json_df = ( 
    parsed_json_df.withColumn( "data", F.explode( "data.devices" ) )
    .withColumn( "deviceId", F.col("data.deviceId") )
    .withColumn( "temperature", F.col("data.temperature") )
    .withColumn( "measure", F.col("data.measure") )
    .withColumn( "status", F.col("data.status") )
    # .drop("data")
    .select( "eventId", "eventOffset", "eventPublisher", "customerId", "deviceId", "temperature", "measure", "status", "eventTime" )
)

In [22]:
flat_json_df.show(truncate=False)

+------------------------------------+-----------+--------------+----------+--------+-----------+-------+-------+--------------------------+
|eventId                             |eventOffset|eventPublisher|customerId|deviceId|temperature|measure|status |eventTime                 |
+------------------------------------+-----------+--------------+----------+--------+-----------+-------+-------+--------------------------+
|d1c50fc5-d6ef-42ee-a94b-c68f0c1b22c3|10000      |device        |CI00102   |null    |14         |C      |ERROR  |2024-07-23 13:07:40.751972|
|d1c50fc5-d6ef-42ee-a94b-c68f0c1b22c3|10000      |device        |CI00102   |D002    |15         |C      |STANDBY|2024-07-23 13:07:40.751972|
|d1c50fc5-d6ef-42ee-a94b-c68f0c1b22c3|10000      |device        |CI00102   |D001    |10         |C      |STANDBY|2024-07-23 13:07:40.751972|
|728a1974-43a7-47c6-a67e-76e7d578a798|10001      |device        |CI00105   |D003    |9          |C      |SUCCESS|2024-07-23 13:07:42.907318|
|728a1974-43a

In [23]:
flat_json_df.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- temperature: long (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- eventTime: string (nullable = true)



## Streaming Code for Kafka data processing:
----------------------------------------------

In [16]:
BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK = f"./checkpoints/{notebook_name}"

def update_checkpoint_dir( BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK ):
    """
    # For each run of the notebook streaming job, the directory to be used will be in the pattern "{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/1", "{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/2", ... and so on.
    # Thus, for each run, the below code snippet checks the highest existing numbered folder inside the "BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK". If that highest numbered folder is empty(does not contain any checkpoint files), that will be set as checkpoint directory for current run, else, a new folder with ( 1+ highest numbered folder ) will be created, and will be used as the checkpoint directory.
    """
    # This code snippet is the base directory for saving checkpoints of streaming jobs exists. If it does not exists, creates the base directory.    
    try:
        os.listdir( BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK )
        print(f"Base checkpoint directory '{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}' already exists.")
    except FileNotFoundError:
        os.mkdir(BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK)
        print(f"Base checkpoint directory, did not exist previously, so, it has been created with the relative path: '{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}'")
    
    # get list of all folders inside the 'BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK'
    past_checkpoint_dirs_of_this_notebook = os.listdir(BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK)
    
    
    if len(past_checkpoint_dirs_of_this_notebook) == 0:
        # If 'BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK' is empty, then create a folder named '1' inside it, and select this '1' named folder path as 'CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN'
        os.mkdir(f"{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/1")
        CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN = f"{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/1"
    else:
        # If 'BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK' is not empty, then we need to get the highest numbered folder.
    
        # Converting folder number strings to integers, to currently get the folder name having highest number
        checkpoint_folder_names_converted_to_integers = [int(i) for i in past_checkpoint_dirs_of_this_notebook]
        highest_existing_folder_number = max(checkpoint_folder_names_converted_to_integers)  # max() gives the largest integer value in this list of folder names
    
        # checking if the highest numbered folder is empty, or it contains checkpoint related files inside it.
        # This check is important as it might happen that a new folder is created but not used for storing any checkpoint data. Then, without creating another new folder with higher number, we will use that existing highest numbered empty folder.
        if len( os.listdir( f"{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/{highest_existing_folder_number}" ) ) == 0:
            # If existing highest numbered folder is empty, then its path will be set to 'CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN'
            CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN = f"{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/{highest_existing_folder_number}"
        else:
            # If existing highest numbered folder is not empty(contains checkpoint files), then a new folder with number 1 higher than highest number will be created, and its path will be set to 'CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN'
            new_folder_number = highest_existing_folder_number + 1
            os.mkdir(f"{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/{new_folder_number}")
            CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN = f"{BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK}/{new_folder_number}"

    return CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN
    
print(f"The checkpoint directory to be utilised for current execution of spark streaming job: '{update_checkpoint_dir( BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK )}'")

Base checkpoint directory './checkpoints/005-kafka_streaming_data_read' already exists.
The checkpoint directory to be utilised for current execution of spark streaming job: './checkpoints/005-kafka_streaming_data_read/3'


In [17]:
CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN = update_checkpoint_dir( BASE_CHECKPOINT_DIR_FOR_CURRENT_NOTEBOOK )
# CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN = "./checkpoints/005-kafka_streaming_data_read/2"
print(CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN)

Base checkpoint directory './checkpoints/005-kafka_streaming_data_read' already exists.
./checkpoints/005-kafka_streaming_data_read/3


In [18]:
kafka_streaming_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_SERVER)
    .option("subscribe", KAFKA_TOPIC)
    .option("startingOffsets", "earliest")
    .load()
)

In [19]:
json_schema = "STRUCT<customerId: STRING, data: STRUCT<devices: ARRAY<STRUCT<deviceId: STRING, measure: STRING, status: STRING, temperature: BIGINT>>>, eventId: STRING, eventOffset: BIGINT, eventPublisher: STRING, eventTime: STRING>"

In [20]:
parsed_streaming_json_df = (
    kafka_streaming_df
    .withColumn( "value", F.col("value").cast("string") )
    .withColumn( "json_value", F.from_json( F.col("value"), json_schema ) )
    # .drop("value")
    .select( "json_value.*" )
)

In [29]:
flat_streaming_json_df = ( 
    parsed_streaming_json_df.withColumn( "data", F.explode( "data.devices" ) )
    .withColumn( "deviceId", F.col("data.deviceId") )
    .withColumn( "temperature", F.col("data.temperature") )
    .withColumn( "measure", F.col("data.measure") )
    .withColumn( "status", F.col("data.status") )
    # .drop("data")
    .select( "eventId", "eventOffset", "eventPublisher", "customerId", "deviceId", "temperature", "measure", "status", "eventTime" )
)

In [76]:
stream_write_query = (
    flat_streaming_json_df.writeStream
    .format("console")
    .outputMode("append")
    .option("checkpointLocation", CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN)
    .start()
)

In [None]:
stream_write_query.awaitTermination()

#### Applying Triggers:-
------------------------------
- `.trigger(once=True)`: Used to consume all the available data for a spark streaming job, and the job behaves as a Batch Job. It is used when we want to process all the available data using our streaming job, and shut down the pipeline after the processing is complete(the job will not continue to run indefinitely).
  
- `.trigger(availableNow=True)`: This trigger is available in newer versions of spark. Works same as `.trigger(once=True)`
  
- `.trigger(processingTime="15 seconds")`: It makes the pipeline trigger in microBatch format, and we specificy the time interval at which the pipeline gets triggered. Suppose, we pass here `"15 seconds"`, so the pipeline will process data incrementally, by triggering itself once every 10 seconds. **Using this trigger mode, we mainly define at what time interval does incoming incremental data get processed and written to sink**.
  
- `.trigger(continuos="15 seconds")`: Available in spark 3.3.0+ versions, as experimental option. Does not support all sources and sinks.
    - The `continuos` trigger mode does not process data in microBatch fashion. It processes data continuosly with very low latency(latency here means processing interval, which is in milliseconds). The time passed to this trigger mode is actually the time-interval after which it writes the data offsets, metadata, etc., into the checkpoint directory.
    - For e.g.: `.trigger(continuos="15 seconds")` means the data will be **processed continuosly after some milliseconds interval**, but the checkpoint directory data will be written every 15 seconds, thus **checkpoints will be created once every 15 seconds**.
    - Does support some data sinks, such as `memory`, etc.

In [60]:
(
    flat_streaming_json_df
    .writeStream
    .format("console")
    .outputMode("append")
    .queryName("Kafka_writeStream_trigger_once")
    .option("checkpointLocation", CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN)
    # .trigger(once=True)
    .trigger(availableNow=True)
    .start()
    .awaitTermination()
)

In [None]:
(
    flat_streaming_json_df
    .writeStream
    .format("console")
    .outputMode("append")
    .queryName("Kafka_writeStream_trigger_procesingTime_15_seconds")
    .option("checkpointLocation", CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN)
    .trigger(processingTime="15 seconds")
    .start()
    .awaitTermination()
)

In [22]:
(
    parsed_streaming_json_df
    .writeStream
    .queryName("Kafka_writeStream_trigger_continuos_15_seconds")
    .format("memory")
    .outputMode("append")
    .option("checkpointLocation", CHECKPOINT_DIRECTORY_FOR_CURRENT_RUN)
    .trigger(continuous="10 seconds")
    .start()
    .awaitTermination()
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "D:\Softwares\Apache_Spark\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "D:\Softwares\Apache_Spark\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\Debanjan Sarkar\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

### Reading data from memory:
--------------------------------
- When data is written to memory, fr spark streaming, the query name can be used as table name, to view the records, using `"spark.sql( 'SELECT * FROM <query_name>' )"`

In [23]:
# If the streaming job is stopped, then from the memory, it can be viewed as a spark sql table
# Kill the running job from spark UI, and then run this code
spark.sql("SELECT * FROM Kafka_writeStream_trigger_continuos_15_seconds").show()

+----------+--------------------+--------------------+-----------+--------------+--------------------+
|customerId|                data|             eventId|eventOffset|eventPublisher|           eventTime|
+----------+--------------------+--------------------+-----------+--------------+--------------------+
|   CI00102|{[{null, C, ERROR...|d1c50fc5-d6ef-42e...|      10000|        device|2024-07-23 13:07:...|
|   CI00105|{[{D003, C, SUCCE...|728a1974-43a7-47c...|      10001|        device|2024-07-23 13:07:...|
|   CI00101|{[{null, C, STAND...|902b9b46-b665-4f5...|      10002|        device|2024-07-23 13:07:...|
|   CI00108|                {[]}|145328aa-d8cb-42c...|      10003|        device|2024-07-23 13:07:...|
|   CI00109|                {[]}|f3082e6b-78f7-465...|      10004|        device|2024-07-23 13:07:...|
|   CI00117|                {[]}|798e55fa-c9d9-49b...|      10005|        device|2024-07-23 13:07:...|
|   CI00116|{[{D003, C, STAND...|43208442-74cd-42f...|      10006|       