In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType
from pyspark.sql.functions import explode, expr, from_json, col, to_timestamp

In [2]:
spark = (SparkSession
         .builder
         .config("spark.streaming.stopGracefullyOnShutdown", True)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
         .config("spark.jars", "/home/jovyan/.ivy2/jars/postgresql-42.3.6.jar")
         .config("spark.sql.shuffle.partitions", 4)
         .master("local[*]")
         .appName("Write to multiple sinks").getOrCreate())

In [3]:
spark.conf.set("spark.sql.streaming.schemaInterface", True)
spark

In [4]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "spark_kafka-kafka:29092")
    .option("subscribe", "device-data")
    .option("startingOffsets", "earliest")
    .load()
)

In [5]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
kafka_df_json = kafka_df.withColumn("value", expr("cast(value as string)"))
# kafka_df_json.show()

In [7]:
devices_element_schema = StructType([
    StructField('deviceId', StringType(), True),
    StructField('measure', StringType(), True),
    StructField('status', StringType(), True),
    StructField('temperature', LongType(), True)
])

devices_schema = ArrayType(devices_element_schema, True)

data_schema = StructType([
    StructField('devices', devices_schema, True)
])

json_schema = StructType([
    StructField('customerId', StringType(), True),
    StructField('data', data_schema, True),
    StructField('eventId', StringType(), True),
    StructField('eventOffset', LongType(), True),
    StructField('eventPublisher', StringType(), True),
    StructField('eventTime', StringType(), True),
])

In [8]:
streaming_df = kafka_df_json.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [9]:
streaming_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [10]:
exploded_df = streaming_df.withColumn('devices', explode(streaming_df.data.devices)).drop('data')

In [11]:
exploded_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [12]:
flattened_df = exploded_df.withColumns({
    'device_id': exploded_df.devices.deviceId,
    'measure' : exploded_df.devices.measure,
    'status': exploded_df.devices.status,
    'temperature' : exploded_df.devices.temperature
}).drop('devices')

In [13]:
flattened_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [14]:
#function to write to multiple sinks
def device_data_output(df, batch_id):
    print("Batch id: " + str(batch_id))

    df.printSchema()
    
    # write to parquet
    df.write.format("parquet").mode("append").save("data/output/device_data.parquet/")
    
    df = df.withColumn("eventTime", to_timestamp("eventTime", "yyyy-MM-dd HH:mm:ss.SSSSSS"))

    df_transformed = df.withColumnRenamed("customerId", "customer_id") \
                   .withColumnRenamed("eventId", "event_id") \
                   .withColumnRenamed("eventOffset", "event_offset") \
                   .withColumnRenamed("eventPublisher", "event_publisher") \
                   .withColumnRenamed("eventTime", "event_time")

    
    
    # write to JDBC Postgres
    (
        df_transformed.write
        .mode("append")
        .format("jdbc")
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://spark_kafka-db:5432/spark")
        .option("user", "admin")
        .option("password", "admin")
        .option("dbtable", "device_events")
        .save()
    )
    
    # Display data
    df.show()


In [15]:
# Write the output to multiple sinks using foreachBatch
(flattened_df
 .writeStream
 .foreachBatch(device_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())

Batch id: 87
root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)

+----------+--------------------+-----------+--------------+--------------------+---------+-------+-------+-----------+
|customerId|             eventId|eventOffset|eventPublisher|           eventTime|device_id|measure| status|temperature|
+----------+--------------------+-----------+--------------+--------------------+---------+-------+-------+-----------+
|   CI00103|e3cb26d3-41b2-49a...|      10001|        device|2023-01-05 11:13:...|     D001|      C|  ERROR|         15|
|   CI00103|e3cb26d3-41b2-49a...|      10001|        device|2023-01-05 11:13:...|     D002|      C|SUCCESS|         16|
+----------+-

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:

data = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", "jdbc:postgresql://spark_kafka-db:5432/spark")
    .option("user", "admin")
    .option("password", "admin")
    .option("dbtable", "device_events")
    .load())
    

In [17]:
data.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_offset: integer (nullable = true)
 |-- event_publisher: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: integer (nullable = true)
 |-- event_time: date (nullable = true)



In [18]:
data.show()

+-----------+--------------------+------------+---------------+---------+-------+-------+-----------+----------+
|customer_id|            event_id|event_offset|event_publisher|device_id|measure| status|temperature|event_time|
+-----------+--------------------+------------+---------------+---------+-------+-------+-----------+----------+
|    CI00103|e3cb26d3-41b2-49a...|       10001|         device|     D001|      C|  ERROR|         15|2023-01-05|
|    CI00103|e3cb26d3-41b2-49a...|       10001|         device|     D002|      C|SUCCESS|         16|2023-01-05|
|    CI00103|e3cb26d3-41b2-49a...|       10001|         device|     D001|      C|  ERROR|         15|2023-01-05|
|    CI00103|e3cb26d3-41b2-49a...|       10001|         device|     D002|      C|SUCCESS|         16|2023-01-05|
|    CI00103|e3cb26d3-41b2-49a...|       10001|         device|     D001|      C|  ERROR|         15|2023-01-05|
|    CI00103|e3cb26d3-41b2-49a...|       10001|         device|     D002|      C|SUCCESS|       

### 