### 0.Spark Setup

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,com.datastax.spark:spark-cassandra-connector_2.12:3.3.0" --conf "spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions" pyspark-shell'

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import regexp_replace, col, from_json

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.cassandra.connection.host", "172.20.0.4") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.output.consistency.level", "ONE") \
    .getOrCreate()

# Set the configuration option
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

### 2. Reading From Kafka Stream

through `readStream`

#### 2.1 Raw Kafka Stream

In [None]:
# default for startingOffsets is "latest"
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "train_data") \
    .option("startingOffsets", "latest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [None]:
df_kafka_raw.printSchema()

#### 2.2 Encoded Kafka Stream

In [None]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [None]:
df_kafka_encoded.printSchema()

#### 2.3 Structure Streaming DataFrame


In [None]:
# Convert the JSON string to PySpark schema
train_schema = T.StructType(
    [T.StructField("sequence_number", T.StringType()),
    T.StructField("schedule_id", T.StringType()),
    T.StructField("unique_id", T.StringType()),
    T.StructField("service_start_date", T.StringType()),
    T.StructField("location_code", T.StringType()),
    T.StructField("scheduled_arrival", T.StringType()),
    T.StructField("scheduled_departure", T.StringType()),
    T.StructField("actual_arrival", T.StringType()),
    T.StructField("actual_departure", T.StringType()),
    T.StructField("platforms", T.StringType()),
    T.StructField("estimated_time", T.StringType()),
    T.StructField("source", T.StringType())
    ])

In [None]:
def parse_train_from_kafka_message(df_raw, schema):
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"
    # Convert the value column from JSON string to PySpark schema
    df_json = df_raw.select(from_json(col("value"), schema).alias("train"))

    # Flatten the nested columns and select the required columns
    df_flattened = df_json.selectExpr(
        "CAST(train.sequence_number AS LONG) AS sequence_number",
        "CAST(train.schedule_id AS LONG) AS schedule_id",
        "train.unique_id",
        "TO_DATE(train.service_start_date) AS service_start_date",
        "train.location_code",
        "train.scheduled_arrival",
        "train.scheduled_departure",
        "train.actual_arrival",
        "train.actual_departure",
        "train.platforms",
        "train.estimated_time",
        "train.source"
    )

    return df_flattened

In [None]:
df_trains = parse_train_from_kafka_message(df_raw=df_kafka_encoded, schema=train_schema)

In [None]:
df_trains.printSchema()

### 3. Write to Cassandra & Sink Operation

through `writeStream`


#### 3.1. Write to Cassandra

In [None]:
def write_cassandra(df):
    if df.isStreaming:
        write_query = df.writeStream \
            .foreachBatch(lambda batchDF, epochId: batchDF.write \
            .format("org.apache.spark.sql.cassandra") \
            .mode("append") \
            .options(table="service_performance", keyspace="train_service") \
            .save()) \
            .start()
        return write_query
    else:
        print("Data is not streaming")

In [None]:
write_query = write_cassandra(df_trains)

#### 3.2. Sink to Console

In [None]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

In [None]:
write_query = sink_console(df_trains, output_mode='append')