In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import *
import time

# Create the Spark session
spark = SparkSession.builder \
    .appName("KafkaWeatherConsumer") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1,"
                                    "org.postgresql:postgresql:42.2.5") \
    .master("local[*]") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-49a88542-092b-4e69-a864-b4c4009c9c14;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found org.postgresql#postgresql;42.2.5 in central
:: resolution report :: resolve 504ms :: artifacts dl 19ms
	:: modules in use:
	com.github.lu

In [2]:
# Define the data schema
schema = StructType([
    StructField("date_UTC", StringType()),
    StructField("temperature_2m", DoubleType()),
    StructField("relative_humidity_2m", DoubleType()),
    StructField("dew_point_2m", DoubleType()),
    StructField("apparent_temperature", DoubleType()),
    StructField("precipitation", DoubleType()),
    StructField("weather_code", IntegerType()),
    StructField("wind_speed_10m", DoubleType()),
    StructField("wind_speed_100m", DoubleType()),
    StructField("wind_direction_10m", DoubleType()),
    StructField("wind_direction_100m", DoubleType()),
    StructField("is_day", IntegerType()),
    StructField("sunshine_duration", DoubleType()),
    StructField("location_id", IntegerType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("elevation", DoubleType()),
    StructField("city", StringType()),
    StructField("timezone", StringType()),
    StructField("UtcOffsetSeconds", IntegerType()),
    StructField("date", StringType())
])

In [3]:
kafka_topic = "jp-weather"
kafka_bootstrap_servers = "broker:29092"  # Adjust according to your settings

kafka_properties = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "startingOffsets": "earliest"  # It can be "latest" if you are only interested in new messages
}

In [4]:
# Configuring the connection to PostgreSQL
pg_url = "jdbc:postgresql://postgres:5432/staging_db"
pg_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver",
    "dbtable": "weather_data"
}

In [5]:
def write_to_postgres(batch_df, epoch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/staging_db") \
        .option("dbtable", "weather_data") \
        .option("user", "admindb") \
        .option("password", "M4st3rP4ssw0rd.") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_properties) \
    .load()

df = df.select(
    col("key").cast("string").alias("key"),
    from_json(col("value").cast("string"), schema).alias("data")
).select("key", "data.*")

df = df.withColumn("date_UTC", to_timestamp("date_UTC")) \
       .withColumn("date", to_timestamp("date"))

pg_query = df.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_postgres) \
    .start()

pg_query.awaitTermination()

24/04/15 02:15:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-10181b6d-3e65-4fba-8cde-6edabe0101c1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/15 02:15:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/04/15 02:15:57 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/04/15 02:15:57 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/04/15 02:15:57 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/04/15 02:15:57 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con