In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType
import pyspark.sql.functions as F
import datetime

In [None]:
def initialize_spark_session():
    """
    Initializes a SparkSession with specific configurations.

    Returns:
        SparkSession: Initialized SparkSession object.
    """

    return SparkSession.builder \
        .appName("KafkaStreamReader") \
        .config("spark.jars.packages", <spark-sql-kafka-jarfile-path>) \
        .config("spark.local.dir", <location_to_store_spark>) \
        .getOrCreate()
    

def define_input_json_schema():
    """
    Defines the schema for input JSON data.

    Returns:
        StructType: Defined input JSON schema.
    """

    return StructType([
        StructField("schema", StructType([
            StructField("type", StringType(), nullable=True),
            StructField("fields", StructType([
                StructField("type", StringType(), nullable=True),
                StructField("optional", StringType(), nullable=True),
                StructField("field", StringType(), nullable=True)
            ]), nullable=True),
            StructField("optional", StringType(), nullable=True)
        ]), nullable=True),
        StructField("payload", StructType([
            StructField("data", StringType(), nullable=True)
        ]), nullable=True)
    ])

def extract_table_change_info(df):
    """
    Extracts table change information from DataFrame.

    Args:
        df (DataFrame): Input DataFrame containing JSON data.

    Returns:
        DataFrame: Transformed DataFrame with extracted information.
    """

    pattern = "table\s(.*?)\.(.*?)\:\s(.*?)\:\s"
    return df.select(from_json("value", json_schema).alias("json")) \
        .select("json.payload.data") \
        .filter(F.regexp_extract(col("data"), pattern, 1) != "").filter(F.regexp_extract(col("data"), pattern, 2) != "") \
        .select(
            current_timestamp().alias("timestamp"),
            F.current_date().alias("date"),
            F.year(F.current_timestamp()).alias("year"),
            F.month(F.current_timestamp()).alias("month"),
            F.dayofmonth(F.current_timestamp()).alias("day"),
            F.hour(F.current_timestamp()).alias("hour"),
            F.minute(F.current_timestamp()).alias("minute"),
            
            F.regexp_extract(col("data"), pattern, 1).alias("schema_name"),
            F.regexp_extract(col("data"), pattern, 2).alias("table_name"),
            (F.when(F.regexp_extract(col("data"), pattern, 3) == "INSERT", 1).otherwise(0)).alias("insert"),
            (F.when(F.regexp_extract(col("data"), pattern, 3) == "UPDATE", 1).otherwise(0)).alias("update"),
            (F.when(F.regexp_extract(col("data"), pattern, 3) == "TRUNCATE", 1).otherwise(0)).alias("truncate"),
            (F.when(F.regexp_extract(col("data"), pattern, 3) == "DELETE", 1).otherwise(0)).alias("delete") 
        )
    
def write_to_parquet(df, epoch_id):
    """
    Writes DataFrame to Parquet file.

    Args:
        df (DataFrame): DataFrame to be written.
        epoch_id (int): Epoch ID for the current batch.
    """

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H")
    path = f"<path_to_storage>/{timestamp}"
    df.write.parquet(path, mode="append")





In [None]:
# Initialize SparkSession and define JSON schema
spark = initialize_spark_session()
json_schema = define_input_json_schema()

# Define Kafka parameters
kafka_params = {
    "kafka.bootstrap.servers": "localhost:9092",
    "subscribe": "<kakfa_topic_name>"
}

# Read data from Kafka into a DataFrame
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_params) \
    .load()

# Convert value column to string type
df = df.withColumn("value", df["value"].cast("string"))

# Extract table change information from DataFrame
extracted_df = extract_table_change_info(df)

# Write extracted DataFrame to Parquet files
query = extracted_df.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_parquet) \
    .start()

# Wait for query to terminate
query.awaitTermination()
