In [None]:
import sqlite3
import os

db_path = "D:/flight_db/flight_predictions.db"
if os.path.exists(db_path):
    print(f"Database exists at {db_path}")
else:
    print(f"Database not found at {db_path}, creating new")

conn = sqlite3.connect(db_path)
cursor = conn.cursor()

cursor.execute("DROP TABLE IF EXISTS predictions")
cursor.execute("""
    CREATE TABLE predictions (
        flight_number TEXT,
        scheduled_departure TEXT,
        delay_minutes INTEGER,
        is_delayed INTEGER,
        predicted_delayed REAL,
        temperature REAL,
        departure_airport TEXT
    )
""")
conn.commit()

# Verify schema
cursor.execute("PRAGMA table_info(predictions)")
schema = cursor.fetchall()
print("Table schema:")
for col in schema:
    print(col)

conn.close()
print("Table recreated")

In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "D:\\python39venv\\Scripts\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "D:\\python39venv\\Scripts\\python.exe"
os.environ["HADOOP_HOME"] = "D:\\hadoop-3.3.6"

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.ml import PipelineModel
import plotly.express as px
import pandas as pd
import sqlite3

# SparkSession for model loading
spark = SparkSession.builder \
    .appName("FlightDelayStreaming") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print("Spark version:", spark.version)
print("Hadoop home:", os.environ.get("HADOOP_HOME"))

try:
    model = PipelineModel.load("D:/flight_delay_model")
    print("Model loaded successfully")
except Exception as e:
    print("Error loading model:", str(e))
    spark.stop()
    raise e

spark.stop()
spark = SparkSession.builder \
    .appName("FlightDelayStreaming") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem") \
    .config("spark.hadoop.fs.file.impl.disable.cache", "true") \
    .config("spark.hadoop.hadoop.io.native.lib.available", "false") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

try:
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "flight-data") \
        .option("startingOffsets", "latest") \
        .load()
    print("Connected to Kafka")
except Exception as e:
    print("Kafka connection error:", str(e))
    spark.stop()
    raise e

schema = StructType([
    StructField("flight_number", StringType()),
    StructField("departure_airport", StringType()),
    StructField("arrival_airport", StringType()),
    StructField("scheduled_departure", StringType()),
    StructField("temperature", DoubleType()),
    StructField("wind_speed", DoubleType()),
    StructField("precipitation", DoubleType()),
    StructField("delay_minutes", IntegerType())
])
flight_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

flight_df = flight_df.withColumn("is_delayed", when(col("delay_minutes") > 15, 1).otherwise(0))

pred_df = model.transform(flight_df)

output_df = pred_df.select(
    col("flight_number"),
    col("scheduled_departure"),
    col("delay_minutes"),
    col("is_delayed"),
    col("prediction").alias("predicted_delayed"),
    col("temperature"),
    col("departure_airport")
)

output_df.printSchema()

console_query = output_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

def write_to_sqlite(batch_df, batch_id):
    try:
        pandas_df = batch_df.toPandas()
        print(f"Batch {batch_id}: {len(pandas_df)} rows to save")
        if not pandas_df.empty:
            conn = sqlite3.connect("D:/flight_db/flight_predictions.db")
            pandas_df.to_sql("predictions", conn, if_exists="append", index=False)
            conn.commit()
            print(f"Batch {batch_id}: Saved {len(pandas_df)} rows to SQLite")
            conn.close()
        else:
            print(f"Batch {batch_id}: No data to save")
    except Exception as e:
        print(f"Batch {batch_id}: SQLite error: {str(e)}")

sqlite_query = output_df.writeStream \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .foreachBatch(write_to_sqlite) \
    .start()

def plot_batch(batch_df, batch_id):
    pandas_df = batch_df.toPandas()
    if not pandas_df.empty:
        fig = px.scatter(
            pandas_df,
            x="scheduled_departure",
            y="delay_minutes",
            color="predicted_delayed",
            size="temperature",
            title="Flight Delays: Predicted vs Actual",
            labels={
                "scheduled_departure": "Time",
                "delay_minutes": "Delay (min)",
                "predicted_delayed": "Predicted Delayed",
                "temperature": "Temp (°C)"
            },
            hover_data=["flight_number", "departure_airport"]
        )
        fig.update_traces(marker=dict(sizemode='area', sizemin=5))
        fig.show()

plot_query = output_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("flight_predictions") \
    .trigger(processingTime="10 seconds") \
    .foreachBatch(plot_batch) \
    .start()

def alert_delayed(batch_df, batch_id):
    delayed = batch_df.filter(col("predicted_delayed") == 1.0).toPandas()
    if not delayed.empty:
        print("Delayed flights:\n", delayed[["flight_number", "delay_minutes", "scheduled_departure"]].to_string(index=False))

alert_query = output_df.writeStream \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .foreachBatch(alert_delayed) \
    .start()

try:
    alert_query.awaitTermination(timeout=180)
except KeyboardInterrupt:
    console_query.stop()
    sqlite_query.stop()
    plot_query.stop()
    alert_query.stop()
    spark.stop()
    print("Streaming stopped")

In [1]:
#avg flight delay
import sqlite3

conn = sqlite3.connect("D:/flight_db/flight_predictions.db")
cursor = conn.cursor()

cursor.execute("""
    SELECT departure_airport, 
           COUNT(*) as total, 
           SUM(CASE WHEN is_delayed = 1 THEN 1 ELSE 0 END) as delayed
    FROM predictions 
    GROUP BY departure_airport
""")
print("Delays by Airport:")
for row in cursor.fetchall():
    airport, total, delayed = row
    print(f"{airport}: {delayed}/{total} ({delayed/total:.2%})")

cursor.execute("SELECT AVG(delay_minutes) FROM predictions WHERE is_delayed = 1")
avg_delay = cursor.fetchone()[0] or 0
print(f"Average delay (delayed flights): {avg_delay:.2f} minutes")

conn.close()

Delays by Airport:
AYD: 31/129 (24.03%)
HYD: 32/116 (27.59%)
IXD: 21/118 (17.80%)
JFK: 2/2 (100.00%)
VNS: 33/141 (23.40%)
Average delay (delayed flights): 38.11 minutes


In [2]:
#accuracy
import sqlite3

conn = sqlite3.connect("D:/flight_db/flight_predictions.db")
cursor = conn.cursor()

cursor.execute("SELECT COUNT(*) FROM predictions WHERE is_delayed = predicted_delayed")
correct = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM predictions")
total = cursor.fetchone()[0]

accuracy = correct / total if total > 0 else 0
print(f"Accuracy: {accuracy:.2%} ({correct}/{total} correct)")

conn.close()

Accuracy: 74.70% (378/506 correct)
