RAW DATA PULLING FROM KAFKA AS BATCH AND STORING IN DELTA FORMAT

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta import *
from datetime import datetime

In [2]:
# Session and Configuration

# spark = SparkSession.builder \
#     .appName("raw_train_live_status") \
#     .config("spark.jars", "C:/Program Files/Spark/spark-jars/spark-sql-kafka-0-10_2.13-4.0.0.jar,"
#                       "C:/Program Files/Spark/spark-jars/kafka-clients-3.4.0.jar,"
#                       "C:/Program Files/Spark/spark-jars/commons-pool2-2.11.1.jar,"
#                       "C:/Program Files/Spark/spark-jars/delta-spark_2.13-4.0.0.jar") \
#     .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#     .master("local[*]") \
#     .getOrCreate()

# print(spark)

In [3]:
#Spark session with Delta Lake and Kafka support

spark = SparkSession.builder \
    .appName("raw_train_live_status") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0,"
            "org.apache.kafka:kafka-clients:3.4.0,"
            "io.delta:delta-spark_2.13:4.0.0") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.properties.defaults.enableDeletionVectors", "true") \
    .master("local[*]") \
    .getOrCreate()

In [None]:

#initial schema
kafka_schem = StructType([
    StructField("schema_type", StringType(), True),
    StructField("data", StringType(), True),
])

#TrainMaster schema
train_master_schema =ArrayType(StructType([
    StructField("trainId", IntegerType(), True),
    StructField("trainNo", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("operator", StringType(), True)
]))

#StationInfo schema
station_info_schema = ArrayType(StructType([
    StructField("stationId", IntegerType(), True),
    StructField("stationName", StringType(), True),
    StructField("stationCode", StringType(), True),
    StructField("latitude", StringType(), True),   # or DoubleType() if values are numeric
    StructField("longitude", StringType(), True)   # or DoubleType() if values are numeric

]))

# TrainLiveStatus schema
train_schedule_time_schema = ArrayType(StructType([
    StructField("scheduleId", StringType(), True),
    StructField("trainId", StringType(), True),
    StructField("trainNumber", IntegerType(), True),
    StructField("trainName", StringType(), True),
    StructField("scheduledArrivalTime", StringType(), True),
    StructField("scheduledDepartureTime", StringType(), True),
    StructField("stationCodes", StringType(), True),
    StructField("scheduleDate",StringType(), True),
    StructField("createdDate", StringType(), True)
]))

In [5]:
# Read data from Kafka topic

kafka_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "train_station_info_batch_data") \
    .option("startingOffsets", "earliest") \
    .load()


In [6]:
decoded_df = kafka_df.selectExpr("CAST(value AS STRING) as json_string")
decoded_df = decoded_df.withColumn("data", from_json(col("json_string"), kafka_schem)).drop("json_string")
decoded_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- schema_type: string (nullable = true)
 |    |-- data: string (nullable = true)



In [7]:
flattened_df = decoded_df \
    .withColumn("schema_type", col("data.schema_type")) \
    .withColumn("record", col("data.data")) \
    .drop("data") 

flattened_df.printSchema()

root
 |-- schema_type: string (nullable = true)
 |-- record: string (nullable = true)



In [8]:
# Row wise data extraction for each schema type

stationinfo_df = flattened_df.filter(col("schema_type") == "stationInfo")\
    .withColumn("record",from_json(col("record"), station_info_schema).alias("stationInfo"))\
    .withColumn("record",explode_outer(col("record")))\
    .select("record.*")


trainmaster_df = flattened_df.filter(col("schema_type") == "trainMaster")\
    .withColumn("record",from_json(col("record"),train_master_schema).alias("trainMaster"))\
    .withColumn("record",explode_outer(col("record")))\
    .select("record.*")

trainscheduledtime_df = flattened_df.filter(col("schema_type") == "trainScheduleTime")\
    .withColumn("record",from_json(col("record"),train_schedule_time_schema).alias("trainScheduleTime"))\
    .withColumn("record",explode_outer(col("record")))\
    .select("record.*")\
    .withColumn("stationCodes", from_json(col("stationCodes"), ArrayType(StringType())))  


In [9]:
#save the dataframes to Delta format with current date as folder name

current_date = datetime.now().date()

 
file_names = ["stationinfo", "trainmaster", "trainscheduledtime"]

for index,df in enumerate([stationinfo_df, trainmaster_df, trainscheduledtime_df]):
    df.write.format("delta") \
        .mode("overwrite") \
        .save(f"raw_data/raw_batch_data/{current_date}/{file_names[index]}")
        