In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, schema_of_json

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("KafkaSparkPartitionConsumer") \
    .config("spark.jars", "/usr/share/java/postgresql-42.7.4.jar") \
    .getOrCreate()

# PostgreSQL connection parameters
jdbc_url_target = "jdbc:postgresql://host.docker.internal:5432/postgres" # your database name in postgres
jdbc_properties = {
    "user": "postgres",
    "password": "2003", # your password
    "driver": "org.postgresql.Driver"
}

# Read a single Kafka message to infer schema
sample_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "postgres.public.patients1") \ 
    .option("startingOffsets", "earliest") \
    .load() \
    .limit(1)



In [None]:

# Extract a sample JSON string and infer schema
sample_json = sample_df.selectExpr("CAST(value AS STRING) as json_string").first()["json_string"]
inferred_schema = schema_of_json(sample_json)

# Read data from Kafka with inferred schema
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "postgres.public.patients1") \
    .option("startingOffsets", "earliest") \
    .load()

# Convert Kafka JSON messages to structured data dynamically
df = df.selectExpr("CAST(value AS STRING) as json_string") \
    .select(from_json(col("json_string"), inferred_schema).alias("data")) \
    .select("data.*")  # Extract fields dynamically

# Define partition mapping
partition_name_map = {
    0: "Tirunelveli",
    1: "Madurai",
    2: "Salem",
    3: "Coimbatore",
    4: "Tiruchirappalli",
    5: "Chennai"
}

# Function to write data to PostgreSQL
def write_to_postgres(batch_df, batch_id):
    if batch_df.count() == 0:
        return  # Skip empty batches
    
    # Identify partition dynamically
    unique_cities = batch_df.select("cityid").distinct().collect()
    
    for row in unique_cities:
        city_id = row["cityid"]
        if city_id in partition_name_map:
            table_name = f"patients_{partition_name_map[city_id]}"
            partition_df = batch_df.filter(col("cityid") == city_id)
            partition_df.write.jdbc(url=jdbc_url_target, table=table_name, mode="append", properties=jdbc_properties)

# Start streaming
query = df.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .start()

query.awaitTermination()
