In [None]:
import os
print(os.environ.get("JAVA_HOME"))


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-21-openjdk-amd64"
print("Updated JAVA_HOME:", os.environ["JAVA_HOME"])


Updated JAVA_HOME: /usr/lib/jvm/java-21-openjdk-amd64


In [3]:
# data = [("John", 29), ("Alice", 25), ("Bob", 31)]

# Define schema
# columns = ["Name", "Age"]

# Create DataFrame
# df = spark.createDataFrame(data, columns)

# Show DataFrame
# df.show()

In [4]:
from pyspark.sql import SparkSession
import logging

spark = SparkSession.builder \
    .appName("FlightDataPipeline") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0,"
                                    "org.postgresql:postgresql:42.5.0") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()
spark.conf.set("spark.sql.streaming.schemaInference", True)
spark.sparkContext.setLogLevel("WARN")
spark.conf.set("spark.sql.streaming.schemaInference", True)

print("🔥 Spark session initialized successfully!")



:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jovyan/.ivy2.5.2/cache
The jars for the packages stored in: /home/jovyan/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-19244db0-7aa0-4379-ac30-efac51d25bcb;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.7 in central
	found org.slf4j#slf4j-api;2.0.16 in central
	found org.apache.hadoop#hadoop-client-runtime;3.4.1 in central
	found org.apache.hadoop#hadoop-client-api;3.4.1 in central
	found com.google.code.f

🔥 Spark session initialized successfully!


In [5]:
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "airline") \
    .option("startingOffsets", "earliest") \
    .load()




In [6]:
# df.show()

In [7]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
from pyspark.sql.functions import expr,col,from_json

In [9]:
parsed_df = df.selectExpr("CAST(value AS STRING) as json_value")
# parsed_df.show()

In [10]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType

airline_schema = StructType([
    StructField("flight_number", StringType(), True),
    StructField("airline", StringType(), True),
    StructField("route_type", StringType(), True),
    StructField("expected_departure", StringType(), True),
    StructField("actual_departure", StringType(), True),
    StructField("capacity", IntegerType(), True),
    StructField("passengers", IntegerType(), True),
    StructField("ticket_price_range", StringType(), True),
    StructField("departure_airport", StringType(), True),
    StructField("arrival_airport", StringType(), True),
    StructField("fuel_consumption", FloatType(), True),
    StructField("carbon_emission", FloatType(), True),
    StructField("flight_status", StringType(), True),
    StructField("luggage_weight", FloatType(), True)
])


In [11]:
json_df=parsed_df.select(from_json(col("json_value"), airline_schema).alias("data")) 
# json_df.show()

In [12]:
json_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- flight_number: string (nullable = true)
 |    |-- airline: string (nullable = true)
 |    |-- route_type: string (nullable = true)
 |    |-- expected_departure: string (nullable = true)
 |    |-- actual_departure: string (nullable = true)
 |    |-- capacity: integer (nullable = true)
 |    |-- passengers: integer (nullable = true)
 |    |-- ticket_price_range: string (nullable = true)
 |    |-- departure_airport: string (nullable = true)
 |    |-- arrival_airport: string (nullable = true)
 |    |-- fuel_consumption: float (nullable = true)
 |    |-- carbon_emission: float (nullable = true)
 |    |-- flight_status: string (nullable = true)
 |    |-- luggage_weight: float (nullable = true)



In [13]:


# Flatten the parsed DataFrame
flat_df = json_df.select(
    col("data.flight_number"),
    col("data.airline"),
    col("data.route_type"),
    col("data.expected_departure"),
    col("data.actual_departure"),
    col("data.capacity"),
    col("data.passengers"),
    col("data.ticket_price_range"),
    col("data.departure_airport"),
    col("data.arrival_airport"),
    col("data.fuel_consumption"),
    col("data.carbon_emission"),
    col("data.flight_status"),
    col("data.luggage_weight")
)

# flat_df.show(truncate=False)


In [14]:
# flat_df.show()

In [15]:
from pyspark.sql.functions import col, from_unixtime,to_timestamp,regexp_replace

# Convert STRING scientific notation timestamps to BIGINT
flat_df = flat_df.withColumn("expected_departure", col("expected_departure").cast("double").cast("bigint"))
flat_df = flat_df.withColumn("actual_departure", col("actual_departure").cast("double").cast("bigint"))

# Apply from_unixtime to convert BIGINT to readable timestamps
flat_df = flat_df.withColumn("expected_departure", from_unixtime(col("expected_departure")))
flat_df = flat_df.withColumn("actual_departure", from_unixtime(col("actual_departure")))
from pyspark.sql.functions import col, regexp_replace, to_timestamp

# Apply both transformations in a single step
flat_df = flat_df.withColumn("actual_departure", to_timestamp(col("actual_departure"), "yyyy-MM-dd HH:mm:ss")) \
                 .withColumn("ticket_price_range", regexp_replace(col("ticket_price_range"), "\\$", "").cast("float")) \
                 .withColumn("expected_departure", to_timestamp(col("expected_departure"), "yyyy-MM-dd HH:mm:ss"))





In [16]:
flat_df.printSchema()

root
 |-- flight_number: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- route_type: string (nullable = true)
 |-- expected_departure: timestamp (nullable = true)
 |-- actual_departure: timestamp (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- passengers: integer (nullable = true)
 |-- ticket_price_range: float (nullable = true)
 |-- departure_airport: string (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- fuel_consumption: float (nullable = true)
 |-- carbon_emission: float (nullable = true)
 |-- flight_status: string (nullable = true)
 |-- luggage_weight: float (nullable = true)



In [None]:
from pyspark.sql.functions import *

flat_f = flat_df \
    .withColumn("delay_minutes",
        (unix_timestamp("actual_departure") - unix_timestamp("expected_departure")) / 60
    ) \
    .withColumn("occupancy_rate",
        (col("passengers") / col("capacity")) * 100
    ) \
    .withColumn("flight_duration_minutes",
        (unix_timestamp("arrival_time") - unix_timestamp("actual_departure")) / 60
    ) \
    .withColumn("is_weekend",
        date_format("expected_departure", "u").cast("int") >= 6
    ) \
    .withColumn("day_part",
        when(hour("actual_departure").between(5, 11), "Morning")
        .when(hour("actual_departure").between(12, 16), "Afternoon")
        .when(hour("actual_departure").between(17, 20), "Evening")
        .otherwise("Night")
    )


In [17]:
# rm -rf /tmp/spark-checkpoint


In [18]:
# def print_batch(df, batch_id):
#     df.show(truncate=False)  # Ensures data prints to terminal only

# Modify your streaming query
# flat_df.writeStream \
#     .outputMode("append") \
#     .option("startingOffsets", "latest") \
#     .format("console") \
#     .option("checkpointLocation", "/tmp/spark-checkpoint") \
#     .option("failOnDataLoss", "false")\
#     .start() \
#     .awaitTermination()



In [19]:
# for query in spark.streams.active:
#     query.stop()


In [None]:
def write_to_postgres(df, epoch_id):
    df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/flight_data") \
        .option("dbtable", "flights") \
        .option("user", "admin") \
        .option("password", "admin") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()
    
    print(f"✅ Batch {epoch_id} successfully written to PostgreSQL")

query = flat_df.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_postgres) \
    .option("checkpointLocation", "/tmp/spark-checkpoint") \
    .start()

query.awaitTermination()


25/06/09 18:13:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/06/09 18:13:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

✅ Batch 12 successfully written to PostgreSQL
✅ Batch 13 successfully written to PostgreSQL


                                                                                

✅ Batch 14 successfully written to PostgreSQL


                                                                                

✅ Batch 15 successfully written to PostgreSQL


                                                                                

✅ Batch 16 successfully written to PostgreSQL
