In [0]:
# Driver: Orchestrate the execution of the silver layer ---------------------------------------------------
import sys, importlib, pkgutil, pprint

repo_path = "/Workspace/Repos/soaresbianca_@outlook.com/ifood-case/src"
if repo_path not in sys.path:
    sys.path.append(repo_path)

from core import process_taxi_data 
from config import *              
from core import process_taxi_data
import transforms

column_data_dictionaries = {
    "fhv_silver": FHV_DICT,
    "green_silver": GREEN_DICT,
    "hv_fhv_silver": HVFHV_DICT,
    "yellow_silver": YELLOW_DICT
}

# -----------------------------------------------------------------
# 1. Create Database
# -----------------------------------------------------------------
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
print(f"Database '{DATABASE_NAME}' pronto.\n")

# -----------------------------------------------------------------
# 2. Data ingestion
# -----------------------------------------------------------------
for trip_type in TRIP_CONFIG.keys():
    for m in MONTHS:
        print("Trip Type: " + trip_type)
        process_taxi_data(trip_type, YEAR, m)

print("\n--- Ingestão Silver finalizada ---\n")

# -----------------------------------------------------------------
# 3. Create silver tables at s3
# -----------------------------------------------------------------
for trip_type, cfg in TRIP_CONFIG.items():
    suffix = cfg["silver_suffix"]
    table_name = f"{DATABASE_NAME}.{suffix}"
    table_path = f"{SILVER_S3_BASE_PATH}{suffix}"
    table_description = TRIP_COMMENTS[suffix]
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
        USING DELTA
        LOCATION '{table_path}'
    """)
    print(f"Tabela criada: {table_name} -> {table_path}")

    # Add Comment to Table
    try:
        spark.sql(f"""
            COMMENT ON TABLE {table_name} IS '{table_description}'
        """)
    except Exception as e:
        print(f"  - ERRO ao adicionar comentário para a tabela '{table_name}': {e}")
    print(f"\nComentario Tabela: {table_name}: {table_description}")

    # Add columns description
    if suffix in column_data_dictionaries:
        column_dict = column_data_dictionaries[suffix]
        print(f"Adicionando comentários para as colunas da tabela {table_name}...")
        for column_name, column_description in column_dict.items():
            try:
                spark.sql(f"""
                    COMMENT ON COLUMN {table_name}.{column_name} IS '{column_description}'
                """)
            except Exception as e:
                print(f"  - ERRO ao adicionar comentário para a coluna '{column_name}': {e}")
                print(f"    Verifique se a coluna '{column_name}' existe no esquema da tabela '{table_name}'.")
    else:
        print(f"Nenhum dicionário de colunas encontrado para o sufixo: {suffix}. Comentários de coluna não adicionados.")

print("\n Pipeline Silver concluído com sucesso.")