In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def create_spark_session():
    return (SparkSession.builder
            .appName("FlightDataProcessor")
            .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0")
            .config("spark.sql.shuffle.partitions", "2")
            .getOrCreate())

def process_stream(df, epoch_id):
    """
    Xử lý mỗi batch của streaming data
    """
    # Định nghĩa schema cho dữ liệu flight
    schema = StructType([
        StructField("date", StringType()),
        StructField("Scheduled_Time", StringType()),
        StructField("Updated_Time", StringType()),
        StructField("Route", StringType()),
        StructField("Flight", StringType()),
        StructField("Counter", StringType()),
        StructField("Gate", StringType()),
        StructField("Status", StringType())
    ])
    
    # Parse JSON và xử lý dữ liệu
    parsed_df = df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col("value"), schema).alias("data")) \
        .select("data.*")
    
    # Thêm các cột phân tích
    processed_df = parsed_df \
        .withColumn("processing_timestamp", current_timestamp()) \
        .withColumn("departure_airport", split(col("Route"), "-")[0]) \
        .withColumn("arrival_airport", split(col("Route"), "-")[1]) \
        .withColumn("is_delayed", 
                   when(col("Updated_Time") != col("Scheduled_Time"), "Yes")
                   .otherwise("No"))
    
    # Ghi vào MySQL
    processed_df.write \
        .format("jdbc") \
        .mode("append") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/mydatabase") \
        .option("dbtable", "flight_info") \
        .option("user", "user") \
        .option("password", "password") \
        .save()
    
    # Hiển thị kết quả
    processed_df.show(truncate=False)

def main():
    # Tạo Spark Session
    spark = create_spark_session()
    
    # Đọc từ Kafka
    kafka_df = (spark
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "flights")  # Topic name từ producer
                .option("startingOffsets", "latest")
                .load())
    
    # Xử lý stream
    query = kafka_df.writeStream \
        .foreachBatch(process_stream) \
        .outputMode("update") \
        .trigger(processingTime="5 seconds") \
        .start()
    
    # Đợi đến khi streaming job kết thúc
    query.awaitTermination()

if __name__ == "__main__":
    main()