In [1]:
import pandas as pd
import numpy as np

DB = "Itransition_project.dbo"

silver_taxi_daily_zone_sp = spark.sql(f"SELECT * FROM {DB}.silver_taxi_daily_zone")
dim_taxi_zone_sp = spark.sql(f"SELECT * FROM {DB}.dim_taxi_zone")
bronze_fx_sp = spark.sql(f"SELECT * FROM {DB}.bronze_ecb_fx_rates")
bronze_gdp_sp= spark.sql(f"SELECT * FROM {DB}.bronze_worldbank_gdp")

gold_openaq_air_daily_sp  = spark.sql(f"SELECT * FROM {DB}.gold_openaq_air_daily")

taxi_pd = silver_taxi_daily_zone_sp.toPandas()
zone_pd = dim_taxi_zone_sp.toPandas()
fx_pd   = bronze_fx_sp.toPandas()
gdp_pd  = bronze_gdp_sp.toPandas()
air_pd  = gold_openaq_air_daily_sp.toPandas()

taxi_pd["date"] = pd.to_datetime(taxi_pd["date"], errors="coerce").dt.date
taxi_pd = taxi_pd.dropna(subset=["date", "PULocationID"])
taxi_pd["PULocationID"] = pd.to_numeric(taxi_pd["PULocationID"], errors="coerce").astype("Int64")

# numeric cols 
for c in ["trips_count","avg_total_amount_usd","avg_trip_distance","avg_duration_min"]:
    if c in taxi_pd.columns:
        taxi_pd[c] = pd.to_numeric(taxi_pd[c], errors="coerce")

# zone dim
if "LocationID" in zone_pd.columns:
    zone_pd["PULocationID"] = pd.to_numeric(zone_pd["LocationID"], errors="coerce").astype("Int64")
else:
    zone_pd["PULocationID"] = pd.to_numeric(zone_pd["PULocationID"], errors="coerce").astype("Int64")

# выберем полезные колонки 
keep_zone_cols = [c for c in ["PULocationID","Borough","Zone","service_zone"] if c in zone_pd.columns]
dim_zone_pd = zone_pd[keep_zone_cols].dropna(subset=["PULocationID"]).drop_duplicates()

# fx dim
# ожидаем: date, usd_eur
fx_pd["date"] = pd.to_datetime(fx_pd["date"], errors="coerce").dt.date
fx_pd["usd_eur"] = pd.to_numeric(fx_pd["usd_eur"], errors="coerce")
fx_pd = fx_pd.dropna(subset=["date"]).drop_duplicates(subset=["date"])

# gdp dim
# ожидаем: year, gdp_usd
gdp_pd["year"] = pd.to_numeric(gdp_pd["year"], errors="coerce").astype("Int64")
gdp_pd["gdp_usd"] = pd.to_numeric(gdp_pd["gdp_usd"], errors="coerce")
gdp_pd = gdp_pd.dropna(subset=["year"]).drop_duplicates(subset=["year"])

# air daily
air_pd["date"] = pd.to_datetime(air_pd["date"], errors="coerce").dt.date
air_pd = air_pd.dropna(subset=["date"])

if "location_id" in air_pd.columns:
    air_pd["location_id"] = pd.to_numeric(air_pd["location_id"], errors="coerce").astype("Int64")
else:
    air_pd["location_id"] = pd.Series([pd.NA]*len(air_pd), dtype="Int64")

for c in ["pm25","temperature"]:
    if c in air_pd.columns:
        air_pd[c] = pd.to_numeric(air_pd[c], errors="coerce")

# dim date
all_dates = pd.Series(pd.concat([
    pd.Series(taxi_pd["date"].dropna().unique()),
    pd.Series(air_pd["date"].dropna().unique()),
    pd.Series(fx_pd["date"].dropna().unique())
]).unique())

dim_date_pd = pd.DataFrame({"date": pd.to_datetime(all_dates)})
dim_date_pd = dim_date_pd.dropna().drop_duplicates().sort_values("date")
dim_date_pd["date_key"] = dim_date_pd["date"].dt.strftime("%Y%m%d").astype(int)
dim_date_pd["year"] = dim_date_pd["date"].dt.year
dim_date_pd["month"] = dim_date_pd["date"].dt.month
dim_date_pd["day"] = dim_date_pd["date"].dt.day
dim_date_pd["week_of_year"] = dim_date_pd["date"].dt.isocalendar().week.astype(int)
dim_date_pd["day_of_week"] = dim_date_pd["date"].dt.day_name()
dim_date_pd["is_weekend"] = (dim_date_pd["date"].dt.weekday >= 5).astype(int)
dim_date_pd["date"] = dim_date_pd["date"].dt.date

# dim fx
dim_fx_pd = fx_pd.copy()
dim_fx_pd["date_key"] = pd.to_datetime(dim_fx_pd["date"]).dt.strftime("%Y%m%d").astype(int)
dim_fx_pd = dim_fx_pd[["date_key","date","usd_eur"]].sort_values("date")

# dim GDP
dim_gdp_pd = gdp_pd.copy()
dim_gdp_pd = dim_gdp_pd.rename(columns={"year":"year_key"})
dim_gdp_pd["year_key"] = dim_gdp_pd["year_key"].astype(int)
dim_gdp_pd = dim_gdp_pd[["year_key","gdp_usd"]].sort_values("year_key")

# FactTaxiDaily
fact_taxi_daily_pd = taxi_pd.copy()
fact_taxi_daily_pd["date_key"] = pd.to_datetime(fact_taxi_daily_pd["date"]).dt.strftime("%Y%m%d").astype(int)
fact_taxi_daily_pd["year_key"] = pd.to_datetime(fact_taxi_daily_pd["date"]).dt.year.astype(int)

# оставим только нужные поля (и переименуем красиво)
rename_map = {
    "avg_total_amount_usd": "avg_total_amount_usd",
    "avg_trip_distance": "avg_trip_distance",
    "avg_duration_min": "avg_duration_min",
}
fact_taxi_daily_pd = fact_taxi_daily_pd.rename(columns=rename_map)

keep_taxi_fact = [c for c in [
    "date_key","date","year_key","PULocationID",
    "trips_count","avg_total_amount_usd","avg_trip_distance","avg_duration_min"
] if c in fact_taxi_daily_pd.columns]

fact_taxi_daily_pd = fact_taxi_daily_pd[keep_taxi_fact]

# FactAirQualityDaily
AIR_AS_ZONE_ID = 90

fact_air_daily_pd = air_pd.copy()
fact_air_daily_pd["date_key"] = pd.to_datetime(fact_air_daily_pd["date"]).dt.strftime("%Y%m%d").astype(int)
fact_air_daily_pd["PULocationID"] = AIR_AS_ZONE_ID

keep_air_fact = [c for c in ["date_key","date","PULocationID","location_id","pm25","temperature"] if c in fact_air_daily_pd.columns]
fact_air_daily_pd = fact_air_daily_pd[keep_air_fact]

# (опционально) FactTaxiDaily + FX (для евро)
fact_taxi_daily_fx_pd = fact_taxi_daily_pd.merge(dim_fx_pd, on=["date_key","date"], how="left")
if "usd_eur" in fact_taxi_daily_fx_pd.columns and "avg_total_amount_usd" in fact_taxi_daily_fx_pd.columns:
    fact_taxi_daily_fx_pd["avg_total_amount_eur"] = fact_taxi_daily_fx_pd["avg_total_amount_usd"] / fact_taxi_daily_fx_pd["usd_eur"]

# (опционально) FactTaxiDaily + GDP
fact_taxi_daily_gdp_pd = fact_taxi_daily_pd.merge(dim_gdp_pd, on="year_key", how="left")


def save_pd_to_table(df_pd: pd.DataFrame, table_name: str, mode: str = "overwrite"):
    sp = spark.createDataFrame(df_pd)
    sp.write.mode(mode).saveAsTable(f"{DB}.{table_name}")


save_pd_to_table(dim_date_pd, "dim_date", mode="overwrite")
save_pd_to_table(dim_zone_pd, "dim_zone", mode="overwrite")
save_pd_to_table(dim_fx_pd,   "dim_fx",   mode="overwrite")
save_pd_to_table(dim_gdp_pd,  "dim_gdp",  mode="overwrite")

# Facts
save_pd_to_table(fact_taxi_daily_pd, "fact_taxi_daily", mode="overwrite")
save_pd_to_table(fact_air_daily_pd, "fact_airquality_daily",  mode="overwrite")

# Optional facts
save_pd_to_table(fact_taxi_daily_fx_pd, "fact_taxi_daily_fx", mode="overwrite")
save_pd_to_table(fact_taxi_daily_gdp_pd, "fact_taxi_daily_gdp", mode="overwrite")

print("WORK DOOOOOOONE")

StatementMeta(, a07ccba6-401f-4c1b-bea2-82ef0e3e2bad, 3, Finished, Available, Finished)

WORK DOOOOOOONE
