In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import from_json, col, when
# from pyspark.sql.types import StructType, StringType, DoubleType

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, window, expr, broadcast
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

spark = SparkSession.builder.appName("flights_stream").getOrCreate()

# Kafka source
kafka_df = (spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "flights.events")
    .option("startingOffsets", "latest")
    .load())

# define schema for flights event (select columns you need)
schema = StructType() \
    .add("FL_DATE", StringType()) \
    .add("OP_CARRIER", StringType()) \
    .add("ORIGIN", StringType()) \
    .add("DEST", StringType()) \
    .add("CRS_DEP_TIME", StringType()) \
    .add("DEP_DELAY", StringType()) \
    .add("ARR_DELAY", StringType()) \
    .add("event_time", StringType()) \
    # add other fields as needed

parsed = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
# cast event_time properly
events = parsed.withColumn("event_time", to_timestamp("event_time"))

# read lookups (small) and broadcast
airlines = spark.read.csv("/data/lookups/airlines.csv", header=True)  # or read from Cassandra
airports = spark.read.csv("/data/lookups/airports.csv", header=True)

# Example: 30-minute tumbling window average arrival delay per airline
agg = (events
       .withWatermark("event_time", "1 hour")
       .groupBy(window(col("event_time"), "30 minutes"), col("OP_CARRIER"))
       .agg(
            expr("avg(cast(ARR_DELAY as double)) as avg_arr_delay"),
            expr("count(*) as num_flights")
       ))

# enrich with airline names using broadcast
agg_enriched = agg.join(broadcast(airlines), agg.OP_CARRIER == airlines.Code, "left") \
                  .select("window", "OP_CARRIER", "Name", "avg_arr_delay", "num_flights")

# write to Cassandra via foreachBatch for idempotent upserts
def write_to_cassandra(batch_df, batch_id):
    # transform batch_df -> upsert friendly schema
    # use batch_df.write.format("org.apache.spark.sql.cassandra") ...
    batch_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table="hourly_airline_agg", keyspace="flightsks") \
        .save()

query = (agg_enriched.writeStream
         .foreachBatch(write_to_cassandra)
         .outputMode("update")
         .option("checkpointLocation", "/checkpoints/flights_agg")
         .start())

query.awaitTermination()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/01 18:01:24 WARN Utils: Your hostname, DESKTOP-VS2UPJ4, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/01 18:01:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/01 18:01:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [None]:
schema = StructType() \
    .add("flight_number", StringType()) \
    .add("airline", StringType()) \
    .add("origin", StringType()) \
    .add("destination", StringType()) \
    .add("delay", DoubleType()) \
    .add("status", StringType())

In [None]:
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "topic_name")  \
    .option("startingOffsets", "latest") \
    .load()

In [None]:
df_parsed = df_raw.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

In [None]:
df_transformed = df_parsed.withColumn(
    "delay_category",
    when(col("delay") > 0, "Delayed")
    .otherwise("On Time")
)

In [None]:
hdfs_query = df_transformed.writeStream \
    .format("parquet") \
    .option("path", "hdfs://hadoop-namenode:9000/flights/stream_output") \
    .option("checkpointLocation", "/tmp/hdfs_checkpoint") \
    .outputMode("append") \
    .start()