In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType



In [2]:
# Initialize SparkSession with Kafka and PostgreSQL packages
spark = SparkSession.builder \
    .appName("KafkaStructuredStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.4,org.postgresql:postgresql:42.2.24") \
    .getOrCreate()



24/06/03 14:27:15 WARN Utils: Your hostname, student-HP-Laptop-15s-gr0xxx resolves to a loopback address: 127.0.1.1; using 192.168.1.77 instead (on interface wlo1)
24/06/03 14:27:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/student/hadoop/spark/spark-3.3.4-bin-hadoop2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/student/.ivy2/cache
The jars for the packages stored in: /home/student/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bd68025d-8d08-4822-b69a-7fbc96220266;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.4 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apac

24/06/03 14:27:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Define the schema for the JSON data
schema = StructType([
    StructField("name", StringType(), True),
    StructField("id", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("place", StringType(), True),
    StructField("country", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone number", StringType(), True)
])



In [4]:
# Create DataFrame representing the stream of input lines from Kafka
df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .option("startingOffsets", "earliest") \
  .load()



In [5]:
# Convert the value column from binary to string and log raw data from Kafka
raw_df = df.selectExpr("CAST(value AS STRING)")



In [6]:
# Parse the JSON data and apply the schema
value_df = raw_df.select(from_json(col("value"), schema).alias("data"))



In [7]:
# Select individual fields from the parsed JSON
ex_df = value_df.selectExpr(
    "data.name as Name",
    "data.id as Id",
    "data.age as Age",
    "data.gender as Gender",
    "data.place as Place",
    "data.country as Country",
    "data.email as Email",
    "data.`phone number` as Phone"  # Handle space in field name with backticks
)



In [8]:
# Define PostgreSQL connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/kafka_project"
jdbc_properties = {
    "user": "devu",
    "password": "111",
    "driver": "org.postgresql.Driver"
}



In [9]:
# Function to write to PostgreSQL
def write_to_postgres(df, epoch_id):
    df.write.jdbc(url=jdbc_url, table="kafka_data", mode="append", properties=jdbc_properties)



In [None]:
# Write the stream to PostgreSQL
query = ex_df.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .start()

query.awaitTermination()


24/06/03 14:27:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b237a2ab-0d9b-4793-8c0e-a0d15398b8c9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/03 14:27:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                