In [15]:
# Create the Spark Session
from pyspark.sql import SparkSession

# spark = (
#     SparkSession 
#     .builder 
#     .appName("Writing to Multiple Sinks") 
#     .config("spark.streaming.stopGracefullyOnShutdown", True) 
#     .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
#     .config('spark.jars', '/home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar')
#     .config("spark.sql.shuffle.partitions", 8)
#     .master("local[*]")
#     .getOrCreate()
# )

spark = (
    SparkSession 
    .builder 
    .appName("Writing to Multiple Sinks") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.postgresql:postgresql:42.2.20')
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]")
    .getOrCreate()
)

spark

In [16]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:19092")
    .option("subscribe", "weather-data")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false") 
    .load()
)

In [17]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()
#kafka_df.rdd.getNumPartitions()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [19]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType, DoubleType, IntegerType

json_schema = StructType([
    StructField("device_id", StringType()),
    StructField("location", StructType([
        StructField("country", StringType()),
        StructField("city", StringType())
    ])),
    StructField("metrics", StructType([
        StructField("temperature", DoubleType()),
        StructField("humidity", IntegerType())
    ])),
    StructField("timestamp", StringType())
])

In [20]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [21]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- device_id: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- city: string (nullable = true)
 |-- metrics: struct (nullable = true)
 |    |-- temperature: double (nullable = true)
 |    |-- humidity: integer (nullable = true)
 |-- timestamp: string (nullable = true)



In [23]:
# Flatten the exploded df
from pyspark.sql.functions import col, to_timestamp, from_unixtime

# Flatten
flattened_df = (
    streaming_df
    .withColumn("device_id", col("device_id"))
    .withColumn("country", col("location.country"))
    .withColumn("city", col("location.city"))
    .withColumn("temperature", col("metrics.temperature"))
    .withColumn("humidity", col("metrics.humidity"))
    .withColumn("timestamp", from_unixtime(col("timestamp")).cast("timestamp"))
    .drop("location", "metrics")
)

In [24]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_df.printSchema()
#flattened_df.show(truncate=False)

root
 |-- device_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: integer (nullable = true)



In [25]:
# Write flattened_df to a new Kafka topic
from pyspark.sql.functions import to_json, struct

flattened_kafka_output = (
    flattened_df
    .select(
        to_json(
            struct(
                "device_id",
                "country",
                "city",
                "temperature",
                "humidity",
                "timestamp"
            )
        ).alias("value")   # Kafka butuh kolom 'value'
    )
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:19092")
    .option("topic", "weather-data-flattened")
    .option("checkpointLocation", "checkpoint_flattened_kafka")
    .start()
)

In [26]:
from pyspark.sql.functions import col, avg, window
from pyspark.sql.types import DecimalType

agg_df = (
    flattened_df
    .withWatermark("timestamp", "10 minutes")
    .groupBy("device_id", "country", "city", window("timestamp", "1 minute"))
    .agg(
        avg("temperature").cast(DecimalType(5, 2)).alias("avg_temp"),
        avg("humidity").cast(DecimalType(5, 2)).alias("avg_humidity")
    )
    .select(
        col("device_id"),
        col("country"),
        col("city"),
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("avg_temp"),
        col("avg_humidity")
    )
)


In [27]:
agg_df.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- avg_temp: decimal(5,2) (nullable = true)
 |-- avg_humidity: decimal(5,2) (nullable = true)



In [None]:
# Python function to write to multiple sinks
def weather_data_output(df, batch_id):
    print("Batch id: "+ str(batch_id))
    
    # Write to parquet
    df.write.format("parquet").mode("append").save("data/weather_data.parquet/")
    
    
    # Write to JDBC Postgres
    (
        df.write
        .mode("append")
        .format("jdbc")
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://riyandi_postgres:5432/my-postgres")
        .option("dbtable", "weather_data")
        .option("user", "my-postgres")
        .option("password", "my-postgres")
        .save()
    
    )
    
    # Display
    df.show()
    

In [29]:
# Running foreachBatch
# Write the output to Multiple Sinks

(agg_df
 .writeStream
 .foreachBatch(weather_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "/data/checkpoint_dir_kafka")
 .start()
 .awaitTermination())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 