In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

24/12/05 15:34:32 WARN Utils: Your hostname, MacBook-Air-cua-Pham-5.local resolves to a loopback address: 127.0.0.1; using 192.168.6.141 instead (on interface en0)
24/12/05 15:34:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/phamquangtrung/miniconda3/envs/big-data/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/phamquangtrung/.ivy2/cache
The jars for the packages stored in: /Users/phamquangtrung/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-18eaa1bd-0c33-44b4-ab56-f4cace5b5845;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 

24/12/05 15:36:48 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "flights")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [7]:
kafka_json_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [21]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from settings import JSON_SCHEMA
json_schema = JSON_SCHEMA

In [26]:
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("key", "values_json.*")

In [27]:
streaming_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- flight_date: string (nullable = true)
 |-- flight_status: string (nullable = true)
 |-- departure: struct (nullable = true)
 |    |-- airport: string (nullable = true)
 |    |-- timezone: string (nullable = true)
 |    |-- iata: string (nullable = true)
 |    |-- icao: string (nullable = true)
 |    |-- terminal: string (nullable = true)
 |    |-- gate: string (nullable = true)
 |    |-- delay: string (nullable = true)
 |    |-- scheduled: string (nullable = true)
 |    |-- estimated: string (nullable = true)
 |    |-- actual: string (nullable = true)
 |    |-- estimated_runway: string (nullable = true)
 |    |-- actual_runway: string (nullable = true)
 |-- arrival: struct (nullable = true)
 |    |-- airport: string (nullable = true)
 |    |-- timezone: string (nullable = true)
 |    |-- iata: string (nullable = true)
 |    |-- icao: string (nullable = true)
 |    |-- terminal: string (nullable = true)
 |    |-- gate: string (nullable = true

In [28]:
from pyspark.sql.functions import col
from pyspark.sql.types import (
    StructType, StructField, StringType, DateType, IntegerType, TimestampType
)
import pyspark.sql.functions as F

flight_df = streaming_df.select(
    # Flight-level fields
    "key",
    "flight_date",
    "flight_status",
    
    # Flatten 'departure' struct
    F.col("departure.airport").alias("departure_airport"),
    F.col("departure.timezone").alias("departure_timezone"),
    F.col("departure.iata").alias("departure_iata"),
    F.col("departure.icao").alias("departure_icao"),
    F.col("departure.terminal").alias("departure_terminal"),
    F.col("departure.gate").alias("departure_gate"),
    F.col("departure.delay").cast(IntegerType()).alias("departure_delay"),
    F.col("departure.scheduled").cast(TimestampType()).alias("departure_scheduled"),
    F.col("departure.estimated").cast(TimestampType()).alias("departure_estimated"),
    F.col("departure.actual").cast(TimestampType()).alias("departure_actual"),
    F.col("departure.estimated_runway").cast(TimestampType()).alias("departure_estimated_runway"),
    F.col("departure.actual_runway").cast(TimestampType()).alias("departure_actual_runway"),
    
    # Flatten 'arrival' struct
    F.col("arrival.airport").alias("arrival_airport"),
    F.col("arrival.timezone").alias("arrival_timezone"),
    F.col("arrival.iata").alias("arrival_iata"),
    F.col("arrival.icao").alias("arrival_icao"),
    F.col("arrival.terminal").alias("arrival_terminal"),
    F.col("arrival.gate").alias("arrival_gate"),
    F.col("arrival.baggage").alias("arrival_baggage"),
    F.col("arrival.delay").cast(IntegerType()).alias("arrival_delay"),
    F.col("arrival.scheduled").cast(TimestampType()).alias("arrival_scheduled"),
    F.col("arrival.estimated").cast(TimestampType()).alias("arrival_estimated"),
    F.col("arrival.actual").cast(TimestampType()).alias("arrival_actual"),
    F.col("arrival.estimated_runway").cast(TimestampType()).alias("arrival_estimated_runway"),
    F.col("arrival.actual_runway").cast(TimestampType()).alias("arrival_actual_runway"),
    
    # Flatten 'airline' struct
    F.col("airline.iata").alias("airline_id"),
    
    # Flatten 'flight' struct
    F.col("flight.number").alias("flight_number"),
    
    # Flatten 'codeshared' struct
    F.col("flight.codeshared.flight_iata").alias("codeshared_flight_id")
)

In [29]:
flight_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- flight_date: string (nullable = true)
 |-- flight_status: string (nullable = true)
 |-- departure_airport: string (nullable = true)
 |-- departure_timezone: string (nullable = true)
 |-- departure_iata: string (nullable = true)
 |-- departure_icao: string (nullable = true)
 |-- departure_terminal: string (nullable = true)
 |-- departure_gate: string (nullable = true)
 |-- departure_delay: integer (nullable = true)
 |-- departure_scheduled: timestamp (nullable = true)
 |-- departure_estimated: timestamp (nullable = true)
 |-- departure_actual: timestamp (nullable = true)
 |-- departure_estimated_runway: timestamp (nullable = true)
 |-- departure_actual_runway: timestamp (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- arrival_timezone: string (nullable = true)
 |-- arrival_iata: string (nullable = true)
 |-- arrival_icao: string (nullable = true)
 |-- arrival_terminal: string (nullable = true)
 |-- arrival_gate: string (n

In [33]:
transformed_airline_df = streaming_df.select(
    F.col("airline.iata").alias("id"),
    F.col("airline.iata").alias("iata"),
    F.col("airline.icao").alias("icao"),
    F.col("airline.name").alias("name"),
)

In [34]:
transformed_airline_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- name: string (nullable = true)



In [35]:
transformed_codeshared_df = streaming_df.select(
    # Flatten 'codeshared' struct
    F.col("flight.codeshared.flight_iata").alias("id"),
    F.col("flight.codeshared.airline_iata").alias("airline_id"),
    F.col("flight.codeshared.flight_number").alias("flight_number"),
    F.col("flight.codeshared.flight_iata").alias("flight_iata"),
    F.col("flight.codeshared.flight_icao").alias("flight_icao"),
)

In [36]:
(flight_df
 .writeStream
 .format("console")
 .outputMode("append")
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())

24/12/05 17:37:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/12/05 17:37:16 WARN StreamingQueryManager: Stopping existing streaming query [id=5e5400c5-a3e1-4e37-8e6c-a12a9b68eb48, runId=b54cefd6-77c5-44cc-acaa-9936bbdd465b], as a new run is being started.
24/12/05 17:37:16 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/12/05 17:37:16 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/12/05 17:37:16 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/12/05 17:37:16 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/12/05 17:37:16 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
24/12/05 17:47:29 WARN SparkStringUtils: Truncated the string r

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----------+-------------+-----------------+------------------+--------------+--------------+------------------+--------------+---------------+-------------------+-------------------+----------------+--------------------------+-----------------------+---------------+----------------+------------+------------+----------------+------------+---------------+-------------+-------------------+-------------------+--------------+------------------------+---------------------+----------+-------------+--------------------+
| key|flight_date|flight_status|departure_airport|departure_timezone|departure_iata|departure_icao|departure_terminal|departure_gate|departure_delay|departure_scheduled|departure_estimated|departure_actual|departure_estimated_runway|departure_actual_runway|arrival_airport|arrival_timezone|arrival_iata|arrival_icao|arrival_terminal|arrival_gate|arrival_baggage|arrival_delay|  

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+----+-----------+-------------+-----------------+------------------+--------------+--------------+------------------+--------------+---------------+-------------------+-------------------+----------------+--------------------------+-----------------------+---------------+----------------+------------+------------+----------------+------------+---------------+-------------+-------------------+-------------------+--------------+------------------------+---------------------+----------+-------------+--------------------+
| key|flight_date|flight_status|departure_airport|departure_timezone|departure_iata|departure_icao|departure_terminal|departure_gate|departure_delay|departure_scheduled|departure_estimated|departure_actual|departure_estimated_runway|departure_actual_runway|arrival_airport|arrival_timezone|arrival_iata|arrival_icao|arrival_terminal|arrival_gate|arrival_baggage|arrival_delay|  

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/phamquangtrung/miniconda3/envs/big-data/lib/python3.13/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/phamquangtrung/miniconda3/envs/big-data/lib/python3.13/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ~~~~~~~~~~~~~~~~~~~~^^
  File "/Users/phamquangtrung/miniconda3/envs/big-data/lib/python3.13/socket.py", line 719, in readinto
    return self._sock.recv_into(b)
           ~~~~~~~~~~~~~~~~~~~~^^^
KeyboardInterrupt


KeyboardInterrupt: 