In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import trim, col
from pyspark.sql.types import StringType

In [0]:
df = spark.read.table('airport_project.flights_bronze.raw_flights')
display(df.limit(10))

In [0]:
RENAME_MAP = {
    "CHAORD":   "arrival_departure",        # A / D
    "CHCINT":   "checkin_counter",           # often empty for arrivals
    "CHCKZN":   "checkin_zone",              # B
    "CHFLTN":   "flight_number",             # 235
    "CHLOC1":   "origin_airport_code",       # VIE
    "CHLOC1CH": "origin_country_he",         # אוסטריה
    "CHLOC1D":  "origin_city_en",            # VIENNA
    "CHLOC1T":  "origin_city_alt_en",        # VIENNA
    "CHLOC1TH": "origin_city_he",             # וינה
    "CHLOCCT":  "origin_country_en",         # AUSTRIA
    "CHOPER":   "airline_code",              # AC
    "CHOPERD":  "airline_name",              # AIR CANADA
    "CHPTOL":   "actual_landing_time",       # 2026-01-31T14:32:00
    "CHRMINE":  "status_en",                 # LANDED
    "CHRMINH":  "status_he",                 # נחתה
    "CHSTOL":   "scheduled_landing_time",    # 2026-01-31T14:45:00
    "CHTERM":   "terminal",                  # 3
    "_id":      "record_id",                 # source id
}

for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)


# Removing unecessary columns for gold layer

In [0]:
COLUMNS_TO_DROP = {
  "checkin_counter",
  "checkin_zone",
  "origin_city_alt_en",
  "origin_country_en",
  "origin_city_he",
  "status_he",
  "record_id",
  "source_file"
}

df = df.drop(*COLUMNS_TO_DROP)

In [0]:
df.display()

# Transformations

## Standardize arrival_departure values 

In [0]:
df = (
    df.withColumn
    ("arrival_departure",
     F.when(F.upper(F.col("arrival_departure")) == "A", "arrival")
      .when(F.upper(F.col("arrival_departure")) == "D", "departure")
      .otherwise("n/a")
    )
)

## Deleting leading spaces - trimming the values

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

## Uppercase codes and status for consistency

In [0]:
df = df.withColumn("airline_code", F.upper(F.col("airline_code")))
df = df.withColumn("origin_airport_code", F.upper(F.col("origin_airport_code")))
df = df.withColumn("status_en", F.upper(F.col("status_en")))

## Conveting timestamp strings to actual timestamps

In [0]:
df = df.withColumn("actual_landing_time", F.to_timestamp(F.col("actual_landing_time")))
df = df.withColumn("scheduled_landing_time", F.to_timestamp(F.col("scheduled_landing_time")))

## Adding flight date for the merge key

In [0]:
df = df.withColumn("flight_date", F.to_date(F.col("scheduled_landing_time")))

## Adding updated date column

In [0]:
df = df.withColumn("updated_date", F.current_timestamp())

In [0]:
df.display()

# Making sure there are no nulls in potential merge key field

In [0]:
null_flight_numbers = df.filter(F.col("flight_number").isNull()).count()
null_airline_codes = df.filter(F.col("airline_code").isNull()).count()
null_scheduled_times = df.filter(F.col("scheduled_landing_time").isNull()).count()
null_flight_dates = df.filter(F.col("flight_date").isNull()).count()
null_arrival_departure = df.filter(F.col("arrival_departure").isNull()).count()

assert null_flight_numbers == 0, f"Found {null_flight_numbers} null flight numbers"
assert null_airline_codes == 0, f"Found {null_airline_codes} null airline codes"
assert null_scheduled_times == 0, f"Found {null_scheduled_times} null scheduled times"
assert null_flight_dates == 0, f"Found {null_flight_dates} null flight dates"
assert null_arrival_departure == 0, f"Found {null_arrival_departure} null arrival_departure values"

# Creating silver table

In [0]:
%sql
  CREATE TABLE IF NOT EXISTS airport_project.flights_silver.flights_clean (
    arrival_departure STRING NOT NULL,
    flight_number STRING NOT NULL,
    airline_code STRING NOT NULL,
    flight_date DATE NOT NULL,
    origin_airport_code STRING,
    origin_city_en STRING,
    origin_country_he STRING,
    airline_name STRING,
    actual_landing_time TIMESTAMP,
    scheduled_landing_time TIMESTAMP,
    status_en STRING,
    terminal BIGINT,
    ingestion_timestamp TIMESTAMP,
    updated_at TIMESTAMP
  )
  USING DELTA

# Merge

## Creating a view for incoming new rows

In [0]:
df.createOrReplaceTempView("incoming_flights")

## Merge query
## merge key based on flight number + airline code + day of the flight + departure/arrival (same filght can be operated on daily basis) (will use those fields for the merge)

In [0]:
%sql
  MERGE INTO airport_project.flights_silver.flights_clean AS target
  USING incoming_flights AS source
  ON target.flight_number = source.flight_number
    AND target.airline_code = source.airline_code
    AND target.arrival_departure = source.arrival_departure
    AND target.flight_date = source.flight_date
  WHEN MATCHED THEN
    UPDATE SET
      target.origin_airport_code = source.origin_airport_code,
      target.origin_city_en = source.origin_city_en,
      target.origin_country_he = source.origin_country_he,
      target.airline_name = source.airline_name,
      target.actual_landing_time = source.actual_landing_time,
      target.scheduled_landing_time = source.scheduled_landing_time,
      target.status_en = source.status_en,
      target.terminal = source.terminal,
      target.ingestion_timestamp = source.ingestion_timestamp,
      target.updated_at = source.updated_date
  WHEN NOT MATCHED THEN
    INSERT (
      arrival_departure,
      flight_number,
      airline_code,
      flight_date,
      origin_airport_code,
      origin_city_en,
      origin_country_he,
      airline_name,
      actual_landing_time,
      scheduled_landing_time,
      status_en,
      terminal,
      ingestion_timestamp,
      updated_at
    )
    VALUES (
      source.arrival_departure,
      source.flight_number,
      source.airline_code,
      source.flight_date,
      source.origin_airport_code,
      source.origin_city_en,
      source.origin_country_he,
      source.airline_name,
      source.actual_landing_time,
      source.scheduled_landing_time,
      source.status_en,
      source.terminal,
      source.ingestion_timestamp,
      source.updated_date
    )

In [0]:
%sql
SELECT COUNT(*) as total_flights FROM airport_project.flights_silver.flights_clean