In [0]:
df_bronze = spark.table("bronze.flights.us_flights")

In [0]:
df_states = spark.table("silver_db.dim.states_dim")

In [0]:
df_silver = df_bronze

df_silver = df_silver.drop("tbl", "tbl1apk", "Geocoded_City1", "Geocoded_City2")

rename_map = {
    "Year": "year",
    "quarter": "quarter",
    "airportid_1": "airportid_origin",
    "airportid_2": "airportid_dest",
    "airport_1": "airport_origin",
    "airport_2": "airport_dest",
    "passengers": "passengers",
    "nsmiles": "distance_miles",
    "citymarketid_1": "citymarket_origin",
    "citymarketid_2": "citymarket_dest",
    "city1": "city_origin",
    "city2": "city_dest",
    "fare": "avg_fare",
    "fare_lg": "avg_fare_large_carry",
    "large_ms": "market_share_large_carry",
    "fare_low": "avg_fare_low_cost",
    "lf_ms": "market_share_low_cost",
    "carrier_lg": "largest_carry_by_passengers",
    "carrier_low": "lowest_carry_by_passengers"
}

for old_col, new_col in rename_map.items():
    df_silver = df_silver.withColumnRenamed(old_col, new_col)

In [0]:
%python
from pyspark.sql.functions import split, expr

# Separate the state in the city_origin and city_dest
df_silver = df_silver.withColumn("state_origin", expr("try_element_at(split(city_origin, ', '), 2)"))
df_silver = df_silver.withColumn("city_origin", split(df_silver["city_origin"], ", ")[0])
df_silver = df_silver.withColumn("state_dest", expr("try_element_at(split(city_dest, ', '), 2)"))
df_silver = df_silver.withColumn("city_dest", split(df_silver["city_dest"], ", ")[0])

In [0]:
from pyspark.sql.functions import col, substring

# Cria colunas temporárias com apenas os dois primeiros caracteres
df_silver_tmp = (
    df_silver
    .withColumn("state_origin_prefix", substring(col("state_origin"), 1, 2))
    .withColumn("state_dest_prefix", substring(col("state_dest"), 1, 2))
)

# Cria os dataframes de estado (como antes, com nomes adaptados)
df_states_origin = df_states.selectExpr("state as state_origin_prefix", "name as state_origin_name")
df_states_dest   = df_states.selectExpr("state as state_dest_prefix", "name as state_dest_name")

# Faz os joins com os prefixos
df_silver = (
    df_silver_tmp
    .join(df_states_origin, on="state_origin_prefix", how="left")
    .join(df_states_dest, on="state_dest_prefix", how="left")
)

df_silver = df_silver.drop("state_origin_prefix", "state_dest_prefix")


In [0]:
# fix the column types
df_silver = df_silver.withColumn("year", df_silver["year"].cast("int"))
df_silver = df_silver.withColumn("quarter", df_silver["quarter"].cast("int"))
df_silver = df_silver.withColumn("passengers", df_silver["passengers"].cast("int"))
df_silver = df_silver.withColumn("distance_miles", df_silver["distance_miles"].cast("int"))
df_silver = df_silver.withColumn("avg_fare", df_silver["avg_fare"].cast("float"))
df_silver = df_silver.withColumn("avg_fare_large_carry", df_silver["avg_fare_large_carry"].cast("float"))
df_silver = df_silver.withColumn("market_share_large_carry", df_silver["market_share_large_carry"].cast("float"))
df_silver = df_silver.withColumn("avg_fare_low_cost", df_silver["avg_fare_low_cost"].cast("float"))
df_silver = df_silver.withColumn("market_share_low_cost", df_silver["market_share_low_cost"].cast("float"))

In [0]:
desired_order = [
    "year", "quarter", 
    "citymarket_origin", "city_origin", "state_origin", "state_origin_name", "airportid_origin",  "airport_origin", 
    "citymarket_dest", "city_dest", "state_dest", "state_dest_name", "airportid_dest", "airport_dest", 
    "passengers", "distance_miles", "avg_fare", "avg_fare_large_carry", "market_share_large_carry",
    "avg_fare_low_cost", "market_share_low_cost", "largest_carry_by_passengers",
    "lowest_carry_by_passengers"
]

# Reorganize the columns
df_silver = df_silver.select(desired_order)

display(df_silver)

In [0]:
# Write to the table
df_silver.coalesce(1).write.mode("overwrite").option("mergeSchema", "true").saveAsTable("silver_db.flights.flights")