In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Kafka Multiple Sink")
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2")
    .config('spark.jars', "/opt/spark/jars/postgresql-42.2.20.jar")
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]")
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

raw_kafka_df = (spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "ed-kafka:29092")
            .option("subscribe", "device-data")
            .option("startingOffsets", "earliest")
            .load())


In [3]:
# Defined logic for handling the error records
from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json, col, expr, explode, current_timestamp, lit, size
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

def flatten_data(df):
    raw_kafka_df = raw_kafka_df.withColumn("value", expr("cast(value as string)"))
    json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
                     ArrayType(StructType([ 
                        StructField('deviceId', StringType(), True), 
                        StructField('measure', StringType(), True), 
                        StructField('status', StringType(), True), 
                        StructField('temperature', LongType(), True)
                    ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
    )
    raw_kafka_df = raw_kafka_df.withColumn("value_json", from_json(col("value"), json_schema))
    
    # Filter out for error data
    error_df = raw_kafka_df.select("key", "value").withColumn("eventtimestamp",lit(current_timestamp())) \
        .where("values_json.customerId is null or size(values_json.data.devices) = 0")

    # Filter out correct flattened data
    streaming_df = raw_kafka_df.where("values_json.customerId is not null and size(values_json.data.devices) > 0") \
        .selectExpr("values_json.*")
    
    stream_exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))
    stream_flattened_df = (
    stream_exploded_df
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop('data', 'data_devices')
    )
    return stream_exploded_df, error_df

In [7]:
# Function to write the dataframe to JDBC (Postgres)

def write_on_postgres(df, table_name):
    (
        df
        .write
        .format("jdbc")
        .mode("append")
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://localhost:5432/sqlpad")
        .option("dbtable", table_name)
        .option("user", "sqlpad")
        .option("password", "sqlpad")
        .save()
    )

In [9]:
# Handle Error and Exception and write to JDBC 
from pyspark.sql.functions import lit

def write_multiple_sink(df, batch_id):
    print("Processing batch Id: " + str(batch_id))
    try:
        # Get the Flattened and Error Dataframe
        flattened_df, error_df_raw = flatten_data(df)
        
        # Add the batchid column in Error Dataframe
        error_df_raw = error_df_raw.withColumn("batch_id", lit(batch_id))

        # Write Flattened Dataframe to JDBC
        write_on_postgres(flattened_df, 'device_data')

        # Write Error Datafram to JDBC
        write_on_postgres(error_df_raw, 'device_data')
    
        # Display both Dataframes for confirmation
        flattened_df.show(truncate=False)
        error_df_raw.show(truncate=False)
    
    except Exception as e:
        print(e)
        (df
         .write
         .format("parquet")
         .mode("append")
         .save("data/07_handling_errors_and_exceptions/output/unprocessed_device_data.parquet"))

In [None]:
# Running foreachBatch
# Write the output to Multiple Sinks

( stream_flattened_df
 .writeStream
 .foreachBatch(write_multiple_sink)
 .option("checkPointLocation", "checkpoint_dir")
 .trigger(processingTime='10 seconds')
 .start()
 .awaitTermination()
)