In [0]:
from pyspark.sql import functions as F
from functools import reduce
import re

# -----------------------------
# Diretórios
# -----------------------------
DIRS = {
    "yellow": "/Volumes/workspace/nyc_taxi/raw/yellow",
    "green":  "/Volumes/workspace/nyc_taxi/raw/green",
    "fhv":    "/Volumes/workspace/nyc_taxi/raw/fhv",
    "fhvhv":  "/Volumes/workspace/nyc_taxi/raw/fhvhv",
}

# --------------------------------------
# Destino
# --------------------------------------
TABLES = {
    "yellow": "workspace.nyc_taxi.yellow_trips_bronze",
    "green":  "workspace.nyc_taxi.green_trips_bronze",
    "fhv":    "workspace.nyc_taxi.fhv_trips_bronze",
    "fhvhv":  "workspace.nyc_taxi.fhvhv_trips_bronze",
}

# ---------------------------------------------------------
# Coluna de pickup utilizada para derivar 'anomes' (YYYYMM)
# ---------------------------------------------------------
PICKUP_COL = {
    "yellow": "tpep_pickup_datetime",
    "green":  "lpep_pickup_datetime",
    "fhv":    "pickup_datetime",
    "fhvhv":  "pickup_datetime",
}

# --------------------------------------
# Schemas
# --------------------------------------
TARGET_YELLOW = {
    "vendorid": "long",
    "tpep_pickup_datetime": "timestamp",
    "tpep_dropoff_datetime": "timestamp",
    "passenger_count": "integer",
    "trip_distance": "double",
    "ratecodeid": "long",
    "store_and_fwd_flag": "string",
    "pulocationid": "long",
    "dolocationid": "long",
    "payment_type": "long",
    "fare_amount": "double",
    "extra": "double",
    "mta_tax": "double",
    "tip_amount": "double",
    "tolls_amount": "double",
    "improvement_surcharge": "double",
    "total_amount": "double",
    "congestion_surcharge": "double",
    "airport_fee": "double",
}

TARGET_GREEN = {
    "vendorid": "long",
    "lpep_pickup_datetime": "timestamp",
    "lpep_dropoff_datetime": "timestamp",
    "passenger_count": "integer",
    "trip_distance": "double",
    "ratecodeid": "long",
    "store_and_fwd_flag": "string",
    "pulocationid": "long",
    "dolocationid": "long",
    "payment_type": "long",
    "fare_amount": "double",
    "extra": "double",
    "mta_tax": "double",
    "tip_amount": "double",
    "tolls_amount": "double",
    "improvement_surcharge": "double",
    "total_amount": "double",
    "congestion_surcharge": "double",
    "airport_fee": "double",
}

TARGET_FHV = {
    "dispatching_base_num": "string",
    "pickup_datetime": "timestamp",
    "dropoff_datetime": "timestamp",
    "pulocationid": "long",
    "dolocationid": "long",
    "sr_flag": "integer",
    "affiliated_base_number": "string",
}

TARGET_FHVHV = {
    "hvfhs_license_num": "string",
    "dispatching_base_num": "string",
    "pickup_datetime": "timestamp",
    "dropoff_datetime": "timestamp",
    "pulocationid": "long",
    "dolocationid": "long",
    "originating_base_num": "string",
    "sr_flag": "integer",
}

# -----------------------------------
# Funções
# -----------------------------------
def list_parquets(base_dir: str):
    """
    Lista todos os arquivos .parquet no diretório.
    """
    return [f.path for f in dbutils.fs.ls(base_dir) if f.path.endswith(".parquet")]

def _sanitize(name: str) -> str:
    """
    Normaliza o nome de uma coluna:
      - trim + lower()
      - espaços/traços → underscore
      - múltiplos underscores → underscore único
    Garante consistência entre meses/anos com capitalizações diferentes.
    """
    n = name.strip().lower()
    n = re.sub(r"[ \t\-]+", "_", n)
    n = re.sub(r"__+", "_", n)
    return n

def to_lower_columns(df):
    """
    Renomeia TODAS as colunas do DataFrame para nomes normalizados (lower + underscore).
    Resolve potenciais colisões geradas pela normalização adicionando sufixo __dupN.
    Ex.: 'DropOff_datetime' e 'dropoff_datetime' → 'dropoff_datetime' / 'dropoff_datetime__dup1'
    """
    current = df.columns
    used = set()
    for c in current:
        new = _sanitize(c)
        if new in used:
            # Evita colisões criando um sufixo incremental
            i, cand = 1, f"{new}__dup1"
            while cand in used:
                i += 1
                cand = f"{new}__dup{i}"
            new = cand
        if new != c:
            df = df.withColumnRenamed(c, new)
        used.add(new)
    return df

def ensure_schema(df, target_schema: dict):
    """
    Garante o schema alvo:
      - cria colunas ausentes como NULL (cast para o tipo alvo)
      - faz cast das colunas existentes para o tipo desejado
    Isso neutraliza divergências entre anos (ex.: DOUBLE vs INT64).
    """
    for col, dtype in target_schema.items():
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None).cast(dtype))
        else:
            df = df.withColumn(col, F.col(col).cast(dtype))
    return df

def derive_anomes_from(df, pickup_col_name: str):
    """
    Cria 'anomes' (YYYYMM) a partir da **coluna de pickup informada**.
    """
    col_norm = _sanitize(pickup_col_name)            # normaliza o nome esperado
    if col_norm not in df.columns:
        # Dica: logue df.columns para inspecionar o arquivo problemático
        raise ValueError(
            f"Coluna de pickup '{pickup_col_name}' (normalizada: '{col_norm}') não encontrada. "
            f"Colunas disponíveis: {df.columns}"
        )
    return df.withColumn("anomes", F.date_format(F.col(col_norm), "yyyyMM"))

def normalize_one_file(path: str, target_schema: dict, pickup_col_name: str):
    """
      1) Leitura
      2) Normalização de nomes
      3) Aplicação do schema alvo (casts + colunas ausentes)
      4) Derivação da partição 'anomes' (YYYYMM) a partir da coluna de pickup informada
      5) Seleção apenas do schema alvo + 'anomes'
    """
    # 1) Leitura
    df = spark.read.parquet(path)

    # 2) Nomes normalizados
    df = to_lower_columns(df)

    # 3) Tipagem/colunas segundo o schema alvo
    df = ensure_schema(df, target_schema)

    # 4) Partição derivada a partir do pickup
    df = derive_anomes_from(df, pickup_col_name)

    # 5) Seleciona schema + partição
    select_cols = list(target_schema.keys()) + ["anomes"]
    return df.select(*select_cols).filter(F.col("anomes").isNotNull())

def build_bronze_for_category(category: str, base_dir: str, target_schema: dict,
                              pickup_col_name: str, table_name: str):
    """
    Constrói a tabela Bronze de UMA categoria:
      - Lista e normaliza todos os arquivos do diretório
      - Une com unionByName (tolerante à ordem/colunas)
      - Loga a volumetria por partição (anomes -> linhas)
      - Sobrescreve a tabela Delta particionada por 'anomes'
    Retorna o DataFrame final consolidado.
    """
    # Lista arquivos Parquet da categoria
    files = list_parquets(base_dir)
    assert files, f"Nenhum parquet encontrado em {base_dir}"
    print(f"🔎 {category}: {len(files)} arquivo(s) em {base_dir}")

    # Normaliza todos os arquivos individualmente
    dfs, skipped = [], 0
    for p in files:
        try:
            df_one = normalize_one_file(
                path=p,
                target_schema=target_schema,
                pickup_col_name=pickup_col_name,
            )
            dfs.append(df_one)
        except ValueError as e:
            print(f"⚠️ Ignorando arquivo sem coluna '{pickup_col_name}': {p} | {e}")
            skipped += 1

    # União segura de todos os pedaços normalizados
    df_all = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs)

    # ---------------------------
    # Log por partição
    # ---------------------------

    counts_df = df_all.groupBy("anomes").count().orderBy("anomes")
    print(f"📆 Partições detectadas para {category} (anomes -> linhas):")
    for row in counts_df.collect():
        print(f"  - {row['anomes']}: {row['count']:,}")

    # ---------------------------
    # Escrita
    # ---------------------------
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    (df_all.repartition("anomes")          # melhora distribuição por partição na escrita
          .write
          .format("delta")
          .mode("overwrite")
          .partitionBy("anomes")
          .saveAsTable(table_name))

    total = df_all.count()
    print(f"✅ Bronze '{category}' criada: {table_name} | Linhas: {total:,}")
    return df_all

# -------------------------
#         Execução 
# -------------------------

df_yellow = build_bronze_for_category(
    category="yellow",
    base_dir=DIRS["yellow"],
    target_schema=TARGET_YELLOW,
    pickup_col_name=PICKUP_COL["yellow"],
    table_name=TABLES["yellow"]
)

df_green = build_bronze_for_category(
    category="green",
    base_dir=DIRS["green"],
    target_schema=TARGET_GREEN,
    pickup_col_name=PICKUP_COL["green"],
    table_name=TABLES["green"]
)

df_fhv = build_bronze_for_category(
    category="fhv",
    base_dir=DIRS["fhv"],
    target_schema=TARGET_FHV,
    pickup_col_name=PICKUP_COL["fhv"],
    table_name=TABLES["fhv"]
)

df_fhvhv = build_bronze_for_category(
    category="fhvhv",
    base_dir=DIRS["fhvhv"],
    target_schema=TARGET_FHVHV,
    pickup_col_name=PICKUP_COL["fhvhv"],
    table_name=TABLES["fhvhv"]
)

# -------------------------
# Amostras
# -------------------------
print("🔎 Amostras das tabelas Bronze:")
display(spark.table(TABLES["yellow"]).limit(5))
display(spark.table(TABLES["green"]).limit(5))
display(spark.table(TABLES["fhv"]).limit(5))
display(spark.table(TABLES["fhvhv"]).limit(5))
