In [1]:
'''
    Error:
        malformed data: payload is not in correct format or have missing fields

        Each microbatch splited to two dataframes: correct records, malformed records
        
    Exceptions:
        runtime error: db connection, network..

        In case of exception we will write the whole batch in a location
    
    
'''

'''
    postgresql: create table device_data_error (key varchar(100), value varchar(max), eventtimestamp timestamp, batchid int);
'''

'\n    postgresql: create table device_data_error (key varchar(100), value varchar(max), eventtimestamp timestamp, batchid int);\n'

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.
    builder.
    appName("Handle error and exceptions").
    config("spark.streaming.stopGracefullyOnShutdown", True).
    config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0').
    config("spark.jars", '/home/jovyan/jars/postgresql-42.2.20.jar').
    config("spark.shuffle.partitions", 4).
    master("local[*]").
    getOrCreate()
    
)

spark

In [3]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:19092")
    .option("subscribe", "device-data")
    .option("startingOffsets", "earliest")
    .load()
)

In [4]:
from pyspark.sql.functions import expr, lit, explode, col
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json, current_timestamp

def flatten_dataframe(df_message_kafka):

    # convert binary message to string
    streaming_df = df_message_kafka.withColumn("value", expr('cast(value as string)'))

    # define expected schema and parse message
    json_schema = (
        StructType(
        [StructField('customerId', StringType(), True), 
        StructField('data', StructType(
            [StructField('devices', 
                         ArrayType(StructType([ 
                            StructField('deviceId', StringType(), True), 
                            StructField('measure', StringType(), True), 
                            StructField('status', StringType(), True), 
                            StructField('temperature', LongType(), True)
                        ]), True), True)
            ]), True), 
        StructField('eventId', StringType(), True), 
        StructField('eventOffset', LongType(), True), 
        StructField('eventPublisher', StringType(), True), 
        StructField('eventTime', StringType(), True)
        ])
    )
    streaming_df = streaming_df.withColumn("value_json", from_json("value", json_schema))

    # filter error messages: not have customerid or device data
    error_df = (
                    streaming_df.
                    select("key", "value").
                    withColumn("eventtimestamp", lit(current_timestamp())).
                    where("value_json.customerId is null or size(value_json.data.devices) == 0")
                )

    # filter correct data
    correct_df = (
                streaming_df.
                where("value_json.customerId is not null and size(value_json.data.devices) > 0").
                selectExpr("value_json.*")
            )
    explode_device_data = correct_df.withColumn("data_devices", explode("data.devices"))
    explode_device_data.show(truncate=False)
    flatten_correct_data = (
                                explode_device_data.
                                drop("data").
                                withColumn("deviceId", col("data_devices.deviceId")).
                                withColumn("measure", col("data_devices.measure")).
                                withColumn("status", col("data_devices.status")).
                                withColumn("temperature", col("data_devices.temperature")).
                                drop("data_devices")
                            )
    return flatten_correct_data, error_df

In [5]:
def write_df_to_db(df, table_name):

    # write to postgresql
    (
        df.
        write.
        format("jdbc").
        mode("append").
        option("driver", "org.postgresql.Driver").
        option("url", "jdbc:postgresql://source_postgresql:5432/source_pg").
        option("dbtable", table_name).
        option("user", "root").
        option("password", "root").
        save()
    )

In [6]:
# python function that will run for each batch
def write_micro_batch(df, batch_id):
    print(f"Batch_di: {batch_id}")

    try:
        # split the new data to error and correct
        correct_df_raw, error_df_raw = flatten_dataframe(df)
    
        # add batchid in error df
        error_df_raw = error_df_raw.withColumn("batchid", lit(batch_id))
    
        # wirte dataframes to db
        write_df_to_db(correct_df_raw, "public.device_data")
        write_df_to_db(error_df_raw, "public.device_data_error")
        
        correct_df_raw.show()
        error_df_raw.show()
        
    except Exception as e:
        print(e)
        df.write.format("parquet").mode("append").save("/home/jovyan/data/device_data_error.parquet/")

In [7]:
(
    kafka_df.
    writeStream.
    foreachBatch(write_micro_batch).
    trigger(processingTime='10 seconds').
    option("checkpointLocation", "/home/jovyan/data/checkpoint_kafka").
    start().
    awaitTermination()
)
    
    
 

Batch_di: 10
+----------+--------------------------+------------------------------------+-----------+--------------+--------------------------+----------------------+
|customerId|data                      |eventId                             |eventOffset|eventPublisher|eventTime                 |data_devices          |
+----------+--------------------------+------------------------------------+-----------+--------------+--------------------------+----------------------+
|CI00108   |{[{D004, C, SUCCESS, 16}]}|aa90011f-3967-496c-b94b-a0c8de19a3d3|10003      |device        |2023-01-05 11:13:53.643364|{D004, C, SUCCESS, 16}|
+----------+--------------------------+------------------------------------+-----------+--------------+--------------------------+----------------------+

+----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
|customerId|             eventId|eventOffset|eventPublisher|           eventTime|deviceId|measure

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 