In [0]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import current_timestamp, lit, trim
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from datetime import datetime

# Inicializa a sessão
spark = SparkSession.builder.getOrCreate()

In [0]:
catalog       = "workspace"
schema_bronze = "bronze"
schema_silver = "silver"
schema_gold   = "gold"

volume = "formacaomicrosoftpowerbiprofessional"
base   = f"/Volumes/{catalog}/default/{volume}"

landing_processar   = f"{base}/landingzone/processar"
landing_processados = f"{base}/landingzone/processados"

dbutils.fs.mkdirs(landing_processar)
dbutils.fs.mkdirs(landing_processados)


In [0]:
# Ler CSV do Landing com schema explícito
schema_pessoas = StructType([
    StructField("Codigo", IntegerType(), True)
    , StructField("PrimeiroNome", StringType(), True)
    , StructField("UltimoNome", StringType(), True)
    , StructField("email", StringType(), True)
    , StructField("Genero", StringType(), True)
    , StructField("Avatar", StringType(), True)
    , StructField("Pais", StringType(), True)
    , StructField("Nascimento", StringType(), True) # dd/MM/yyyy ainda como string
])

caminho_csv = f"{landing_processar}/PESSOAS.csv"

df_raw = (spark
          .read
          .schema(schema_pessoas)
          .options(header="true", encoding="UTF-8")
          .csv(caminho_csv)
          .withColumn("_ingestion_ts", current_timestamp())
          .withColumn("_source_file", lit("PESSOAS.csv"))
          )

# Escrever Bronze como tabela Delta gerenciada
(df_raw
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable(f"{catalog}.{schema_bronze}.pessoas"))

In [0]:
# Arquivar o arquivo processado (mover para "processados")
ts = datetime.now().strftime("%Y%m%d%H%M%S")
dbutils.fs.cp(caminho_csv, f"{landing_processados}/PESSOAS_{ts}.csv", True)