In [0]:
SILVER_PATH = "wasbs://silver@flightdatastorage.blob.core.windows.net/flight_market/"

df_silver = (
    spark.read
         .format("delta")
         .load(SILVER_PATH)
)

###### Extract Carrier Master Records

Consolidate reporting, ticketing, and operating carriers into a single carrier master dataset.

In [0]:
from pyspark.sql.functions import col

reporting_carriers = (
    df_silver
    .select(col("REPORTING_CARRIER").alias("carrier_code"))
)

ticketing_carriers = (
    df_silver
    .select(col("TICKET_CARRIER").alias("carrier_code"))
)

operating_carriers = (
    df_silver
    .select(col("OPERATING_CARRIER").alias("carrier_code"))
)

df_carrier_master = (
    reporting_carriers
    .unionByName(ticketing_carriers)
    .unionByName(operating_carriers)
    .filter(col("carrier_code").isNotNull())
    .dropDuplicates(["carrier_code"])
)


###### Derive Carrier Attributes

Add carrier-level descriptive attributes and standard SCD tracking columns.

In [0]:
from pyspark.sql.functions import current_date, lit

df_carrier_scd = (
    df_carrier_master
    .withColumn("carrier_name", col("carrier_code"))  # placeholder (no lookup table)
    .withColumn("carrier_type", lit("AIRLINE"))
    .withColumn("is_active", lit(True))
    .withColumn("effective_from", current_date())
    .withColumn("effective_to", lit("9999-12-31").cast("date"))
    .withColumn("is_current", lit(True))
)


###### Generate Carrier Surrogate Key

Creates a deterministic surrogate key using the carrier natural key and effective start date.

In [0]:
from pyspark.sql.functions import sha2, concat_ws

df_dim_carrier_staged = (
    df_carrier_scd
    .withColumn(
        "carrier_key",
        sha2(
            concat_ws(
                "||",
                col("carrier_code"),
                col("effective_from").cast("string")
            ),
            256
        )
    )
)


###### Load Existing Carrier Dimension

Loads the current Gold carrier dimension to support SCD Type 2 change detection.

In [0]:
from delta.tables import DeltaTable

GOLD_DIM_CARRIER_PATH = (
    "wasbs://gold@flightdatastorage.blob.core.windows.net/dim_carrier/"
)

dim_carrier_exists = DeltaTable.isDeltaTable(
    spark, GOLD_DIM_CARRIER_PATH
)


###### Load Existing Carrier Dimension

Loads the current Gold carrier dimension to support SCD Type 2 change detection.

In [0]:
from delta.tables import DeltaTable

GOLD_DIM_CARRIER_PATH = (
    "wasbs://gold@flightdatastorage.blob.core.windows.net/dim_carrier/"
)

dim_carrier_exists = DeltaTable.isDeltaTable(
    spark, GOLD_DIM_CARRIER_PATH
)


###### Detect New and Changed Carrier Records

Identifies new carriers or attribute changes requiring new SCD versions.

In [0]:
from pyspark.sql.functions import col

if dim_carrier_exists:
    df_dim_carrier_current = (
        spark.read
             .format("delta")
             .load(GOLD_DIM_CARRIER_PATH)
             .filter(col("is_current") == True)
    )

    tracked_cols = ["carrier_name", "carrier_type", "is_active"]

    df_changes = (
        df_dim_carrier_staged.alias("stg")
        .join(
            df_dim_carrier_current.alias("cur"),
            on="carrier_code",
            how="left"
        )
        .where(
            col("cur.carrier_code").isNull()
            | (
                col("stg.carrier_name") != col("cur.carrier_name")
            )
            | (
                col("stg.is_active") != col("cur.is_active")
            )
        )
    )
else:
    df_changes = df_dim_carrier_staged


###### Validate Carrier Dimension Data Quality
Ensures carrier dimension meets SCD and key integrity requirements.

In [0]:
# natural key must not be null
assert df_changes.filter(col("carrier_code").isNull()).count() == 0

# one current row per carrier in changes
assert (
    df_changes
    .filter(col("is_current") == True)
    .groupBy("carrier_code")
    .count()
    .filter(col("count") > 1)
    .count()
    == 0
)


###### Expire Current Carrier Records

Closes existing carrier records where attribute changes were detected.

In [0]:
from pyspark.sql.functions import current_date

if dim_carrier_exists:
    delta_dim_carrier = DeltaTable.forPath(
        spark, GOLD_DIM_CARRIER_PATH
    )

    (
        delta_dim_carrier.alias("tgt")
        .merge(
            df_changes.alias("src"),
            "tgt.carrier_code = src.carrier_code AND tgt.is_current = true"
        )
        .whenMatchedUpdate(
            set={
                "effective_to": "date_sub(current_date(), 1)",
                "is_current": "false"
            }
        )
        .execute()
    )


###### Persist Carrier Dimension

In [0]:
(
    df_changes
    .write
    .format("delta")
    .mode("append")
    .save(GOLD_DIM_CARRIER_PATH)
)
