In [0]:
from pyspark.sql import functions as F

# Caminhos
processar_file   = "dbfs:/Volumes/workspace/pessoas/proj_pessoas/landing/processar/PESSOAS.csv"
processados_file = "dbfs:/Volumes/workspace/pessoas/proj_pessoas/landing/processados/PESSOAS.csv"
url = "https://raw.githubusercontent.com/JDLNas/FORMACAO_MICROSOFT_POWERBI_PROFESSIONAL/refs/heads/main/fontes/PESSOAS.csv"

def path_exists(p):
    try:
        dbutils.fs.ls(p); return True
    except:
        return False

# 0) Garantir que o arquivo exista em processar/ (ou baixar)
if not path_exists(processar_file) and not path_exists(processados_file):
    import urllib.request
    tmp = "/tmp/PESSOAS.csv"
    urllib.request.urlretrieve(url, tmp)
    dbutils.fs.mv(f"file:{tmp}", processar_file, True)

# 1) Escolher fonte efetiva
landing_csv = processar_file if path_exists(processar_file) else processados_file

# 2) Ler CSV (bruto)
df_bronze = (spark.read
  .option("header","true")
  .option("encoding","utf-8")
  .option("inferSchema","true")
  .option("multiLine","true")
  .option("mode","PERMISSIVE")
  .csv(landing_csv)
)

# 3) Tabela Bronze (Delta gerenciada)
spark.sql("DROP TABLE IF EXISTS workspace.pessoas.bronze_pessoas")
df_bronze.write.mode("overwrite").format("delta").saveAsTable("workspace.pessoas.bronze_pessoas")

# 4) Conferir
spark.sql("SELECT * FROM workspace.pessoas.bronze_pessoas LIMIT 10").show()

# 5) Arquivar (mover p/ processados se estava em processar/)
if path_exists(processar_file):
    dbutils.fs.mv(processar_file, processados_file, True)

# 6) Onde está armazenado
spark.sql("DESCRIBE DETAIL workspace.pessoas.bronze_pessoas").show(truncate=False)
