In [0]:
# --- 1. Import de bibliotecas ---
from pyspark.sql.functions import current_timestamp, col

# --- 2. Configura√ß√£o dos caminhos ---
# caminho da fonte
landing_path = "/Volumes/people_analytics/default/00_source"

# destino dos dados (tables)
target_table_employees = "people_analytics.default.bronze_employees"
target_table_attendance ="people_analytics.default.bronze_attendance"

# caminho do checkpoint spark
checkpoint_base = "/Volumes/people_analytics/default/00_source/_checkpoints"


# --- 3. Fun√ß√£o Ingest ---
def ingest_bronze(tabela_origem, tabela_destino, formato):
    
    # 1. Aponta para a PASTA (Diret√≥rio do Volume)
    source_directory = landing_path 

    # 2. Define o filtro para pegar s√≥ o arquivo certo (ex: "employees.csv")
    arquivo_filtro = f"{tabela_origem}*.{formato}"

    checkpoint_path = f"{checkpoint_base}/{tabela_origem}"

    print(f"üöÄ Ingerindo {tabela_origem} -> {tabela_destino}...")

    # --- A. LEITURA (READSTREAM) ---
    df = (
        spark.readStream
        .format("cloudFiles")                       # 1. Ativa o Autoloader
        .option("cloudFiles.format", formato)       # 2. Formato da origem (csv/json)
        .option("cloudFiles.schemaLocation", checkpoint_path)   # 3. Onde salvar o schema
        .option("header", "true")                   # (S√≥ afeta CSV)
        .option("multiLine", "true")                # (S√≥ afeta JSON)
        .option("pathGlobFilter", arquivo_filtro)   # 4. Filtra apenas o arquivo espec√≠fico dentro da pasta
        .load(source_directory)                     # 5. Monitoramento
        
        # --- B. TRANSFORMA√á√ÉO (METADADOS) ---
        .withColumn("data_ingestao", current_timestamp())   # 6. Metadado de Auditoria de tempo
        .withColumn("arquivo_origem", col("_metadata.file_path"))    # 7. Metadado de Rastreabilidade    
    )
    (
        df.writeStream
        .format("delta")                # 8. Formato final (Delta Lake)
        .outputMode("append")           # 9. S√≥ adiciona novos dados
        .option("checkpointLocation", checkpoint_path)   # 10. Controle de estado
        .option("mergeSchema", "true")  # 11. Aceita colunas novas
        .trigger(availableNow=True)
        .toTable(tabela_destino)
    )
    print(f"‚úÖ Sucesso! Tabela {tabela_destino} atualizada.")


# --- 3. EXECU√á√ÉO ---
ingest_bronze("employees", target_table_employees, "csv")
ingest_bronze("attendance", target_table_attendance, "json")