In [None]:
# Create the Spark Session
import pyspark
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Handling errors and Exceptions") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', f'org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark.__version__}')
    .config('spark.jars', '/home/jovyan/postgresql.jar')
    .config("spark.sql.shuffle.partitions", 8)
    .master("spark://spark-master:7077") 
    .getOrCreate()
)

spark

In [None]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "device-data")
    .option("startingOffsets", "earliest")
    .load()
)


In [None]:
# Defined logic for handling the error records
from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json, col, expr, explode, current_timestamp, lit, size
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

def flatten_data(df):
    
    # Convert binary to string value column
    kafka_json_df = df.withColumn("value", expr("cast(value as string)"))
    
    # Define Schema
    json_schema = (
        StructType(
        [StructField('customerId', StringType(), True), 
        StructField('data', StructType(
            [StructField('devices', 
                         ArrayType(StructType([ 
                            StructField('deviceId', StringType(), True), 
                            StructField('measure', StringType(), True), 
                            StructField('status', StringType(), True), 
                            StructField('temperature', LongType(), True)
                        ]), True), True)
            ]), True), 
        StructField('eventId', StringType(), True), 
        StructField('eventOffset', LongType(), True), 
        StructField('eventPublisher', StringType(), True), 
        StructField('eventTime', StringType(), True)
        ])
    )
    
    # Expand JSON from Value column using Schema
    json_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema))
    
    # Filter out for error data
    error_df = json_df.select("key", "value").withColumn("eventtimestamp",lit(current_timestamp())) \
        .where("values_json.customerId is null or size(values_json.data.devices) = 0")
    
    # Filter out correct flattened data
    streaming_df = json_df.where("values_json.customerId is not null and size(values_json.data.devices) > 0") \
        .selectExpr("values_json.*")
    
    # Explode the correct flattened data
    exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))
    
    # Flatten data
    flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
    )

    # Return both Flattened & Error Dataframe
    return flattened_df, error_df
        

In [None]:
# Function to write the dataframe to JDBC (Postgres)

def postgres(df, table_name):
    (
	df.write
	.mode("append")
	.format("jdbc")
	.option("driver", "org.postgresql.Driver")
    .option("url", "jdbc:postgresql://postgres:5432/streaming_db")
    .option("dbtable", "device_data")
    .option("user", "postgres")
    .option("password", "postgres")
	.save()
    )

In [None]:
# Handle Error and Exception and write to JDBC 
from pyspark.sql.functions import lit

def device_data_output(kafka_df, batch_id):
    print("Batch id:" + str(batch_id))
    try:
        # Get the Flattened and Error Dataframe
        flattened_df, error_df_raw = flatten_data(kafka_df)

        # Add the batchid column in Error Dataframe
        error_df = error_df_raw.withColumn("batchid", lit(batch_id))

        # Write Flattened Dataframe to JDBC
        postgres(flattened_df, "device_data")

        # Write Error Datafram to JDBC
        postgres(error_df, "device_data_error")

        # Display both Dataframes for confirmation
        flattened_df.show()
        error_df.show()
    except Exception as e:
        print(e)
        kafka_df.write.format("parquet").mode("append").save("hdfs://namenode:9000/output/streaming/06/device_data_error")

In [None]:
# Running foreachBatch

(kafka_df
 .writeStream
 .foreachBatch(device_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", f"/home/jovyan/streaming_checkpoint_dir/{spark.sparkContext.appName.replace(' ', '_')}")
 .start()
 .awaitTermination())