In [None]:
from datetime import datetime, date
from pathlib import PurePosixPath
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import FloatType, IntegerType

# Configuração
CATALOG = "workspace"
SCHEMA  = "default"
VOLUME  = "elt_volume"

VOL_ROOT = PurePosixPath("/Volumes") / CATALOG / SCHEMA / VOLUME
BRONZE_ROOT = VOL_ROOT / "bronze"
today_str = date.today().isoformat()
time_str  = datetime.now().strftime("%H%M") 
BRONZE_PART = BRONZE_ROOT / today_str.replace("-", "/")
BRONZE_BASE = BRONZE_PART / f"vendas_bronze_cleaned_{time_str}"

spark = SparkSession.builder.getOrCreate()

# Recupera caminho do raw_csv da task anterior
task_key = "Execute_process_file_to_parquet"
try:
    RAW_CSV = dbutils.jobs.taskValues.get(taskKey=task_key, key="raw_csv", debugValue="/Volumes/workspace/default/elt_volume/raw/2025/07/30/vendas_mock_1452/vendas_mock.csv")
    if not dbutils.fs.ls(str(RAW_CSV)):
        raise FileNotFoundError(f"Arquivo não encontrado em {RAW_CSV}")
except Exception as e:
    RAW_ROOT = VOL_ROOT / "raw"
    raw_base = str(RAW_ROOT / today_str.replace("-", "/"))
    raw_files = dbutils.fs.ls(raw_base)
    RAW_CSV = None
    for f in raw_files:
        if f.isDir():
            sub_dir = f.path.replace("dbfs:", "")
            sub_files = dbutils.fs.ls(sub_dir)
            for sf in sub_files:
                if sf.name == "vendas_mock.csv":
                    RAW_CSV = sf.path.replace("dbfs:", "")
                    break
            if RAW_CSV:
                break
    if not RAW_CSV or not dbutils.fs.ls(str(RAW_CSV)):
        raise FileNotFoundError(f"Arquivo não encontrado em subdiretórios de {raw_base}")

# Cria diretórios necessários
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")
dbutils.fs.mkdirs(str(BRONZE_PART))

# Leitura do CSV da camada raw
df = (spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(f"dbfs:{RAW_CSV}"))

# Total de linhas recebidas
total_rows_received = df.count()

# Padronização de nomes de colunas para snake_case
column_mapping = {
    'venda_id': 'venda_id',
    'data': 'data',
    'vendedor_id': 'vendedor_id',
    'vendedor_nome': 'vendedor_nome',
    'produto_id': 'produto_id',
    'produto_nome': 'produto_nome',
    'categoria': 'categoria',
    'preco_unit': 'preco_unit',
    'qtd': 'qtd',
    'desconto': 'desconto',
    'canal': 'canal',
    'cidade': 'cidade',
    'estado': 'estado',
    'comentarios': 'comentarios'
}

df = df.select([col(old_name).alias(new_name) for old_name, new_name in column_mapping.items()])

# Identifica colunas com nulos antes da limpeza
null_columns = [col_name for col_name in df.columns if df.filter(col(col_name).isNull()).count() > 0]

# Exclusão de linhas com null em campos críticos
critical_columns = ['venda_id', 'data', 'produto_id', 'vendedor_id', 'preco_unit', 'qtd']
df_clean = df.dropna(subset=critical_columns)

# Conversão de tipos
df_clean = (df_clean
    .withColumn('data', to_date(col('data')))
    .withColumn('preco_unit', col('preco_unit').cast(FloatType()))
    .withColumn('qtd', col('qtd').cast(IntegerType())))

# Contagem de linhas válidas e inválidas
total_rows_valid = df_clean.count()
total_rows_invalid = total_rows_received - total_rows_valid

# Salva o arquivo limpo
df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(f"dbfs:{BRONZE_BASE}")

# Encontra o primeiro arquivo particionado para task value
bronze_files = dbutils.fs.ls(f"dbfs:{BRONZE_BASE}")
if not bronze_files:
    raise FileNotFoundError(f"Nenhum arquivo encontrado em {BRONZE_BASE}")
bronze_csv_path = [f.path.replace("dbfs:", "") for f in bronze_files if f.path.endswith(".csv")][0]

# Registro de metadados
metadata = {
    "total_rows_received": total_rows_received,
    "total_rows_valid": total_rows_valid,
    "total_rows_invalid": total_rows_invalid,
    "null_columns_removed": null_columns,
    "output_path": bronze_csv_path
}

# Salva metadados como JSON
metadata_path = BRONZE_PART / f"metadata_vendas_bronze_{time_str}.json"
dbutils.fs.put(f"dbfs:{metadata_path}", str(metadata), overwrite=True)

# Salva contextos para próximas tasks
try:
    dbutils.jobs.taskValues.set(key="bronze_csv", value=bronze_csv_path)
    dbutils.jobs.taskValues.set(key="bronze_metadata", value=str(metadata_path))
except Exception as e:
    raise
