In [None]:
# Databricks notebook source
# ============================================================
#  Landing ➜ Raw/dia=YYYY‑MM‑DD/hora=HHMMSS ➜ 1 Parquet
# ============================================================

from datetime import datetime, date
from pathlib import PurePosixPath
import pandas as pd
from pyspark.sql import SparkSession

# ───────────── 1. Configuração fixa ─────────────
catalog = "workspace"
schema  = "default"
volume  = "elt_volume"

vol_root = PurePosixPath("/Volumes") / catalog / schema / volume
landing  = vol_root / "landing"
raw_root = vol_root / "raw"

today_str = date.today().isoformat()                # 2025‑07‑29
time_str  = datetime.now().strftime("%H%M%S")       # e.g., 172637
raw_part  = raw_root / f"dia={today_str}" / f"hora={time_str}"

landing_csv = landing / "vendas_mock.csv"
raw_csv     = raw_part / "vendas_mock.csv"
parquet_file = raw_part / f"vendas_mock_{time_str}.parquet"

spark = SparkSession.builder.getOrCreate()

# ───────────── 2. Garante Volume e diretórios ─────────────
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.{volume}")
for p in [landing, raw_part]:
    dbutils.fs.mkdirs(str(p))

# ───────────── 3. Valida arquivo em landing ─────────────
if not dbutils.fs.ls(str(landing_csv)):
    raise FileNotFoundError(f"Arquivo não encontrado em {landing_csv}")

# ───────────── 4. Move (sobrescreve) para Raw ────────────
try:
    dbutils.fs.rm(str(raw_csv))
except Exception:
    pass
dbutils.fs.mv(str(landing_csv), str(raw_csv))
print(f"✅ Movido para {raw_csv}")

# ───────────── 5. Lê CSV e grava Parquet único ──────────
try:
    # Verifica se o arquivo existe
    if not dbutils.fs.ls(str(raw_csv)):
        raise FileNotFoundError(f"Arquivo não encontrado em {raw_csv}")

    # Tenta ler o CSV com Spark (prioridade, já que Pandas falhou)
    try:
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"dbfs:{raw_csv}")
        print("Schema do DataFrame Spark:")
        df.printSchema()
        print("Conteúdo do CSV:")
        df.show(5)
        
        # Salva como Parquet único
        tmp_dir = str(raw_part / "_tmp_parquet")
        df.coalesce(1).write.mode("overwrite").parquet(f"dbfs:{tmp_dir}")
        
        # Move part-file para nome final
        part_file = [f.path for f in dbutils.fs.ls(f"dbfs:{tmp_dir}") if f.path.endswith(".parquet")][0]
        # Remove destino se existir
        try:
            dbutils.fs.rm(f"dbfs:{parquet_file}")
        except Exception:
            pass
        dbutils.fs.mv(part_file, f"dbfs:{parquet_file}")
        dbutils.fs.rm(f"dbfs:{tmp_dir}", recurse=True)
        
        print(f"✅ Parquet único salvo em {parquet_file}")
        
        # Verifica o Parquet
        parquet_df = spark.read.parquet(f"dbfs:{parquet_file}")
        print("Conteúdo do Parquet:")
        parquet_df.show(5)
        
    except Exception as e:
        print(f"❌ Erro ao processar com Spark: {str(e)}")
        print("Tentando ler com Pandas como última tentativa...")
        
        # Fallback: Lê CSV com Pandas
        fuse_csv = f"/dbfs{raw_csv}"
        pdf = pd.read_csv(fuse_csv, sep=",", encoding="utf-8")
        print("Tipos de dados do DataFrame Pandas:", pdf.dtypes)
        print("Primeiras linhas do CSV:")
        print(pdf.head())
        
        # Tenta salvar como Parquet com Pandas
        try:
            temp_parquet = str(raw_part / f"temp_vendas_mock_{time_str}.parquet")
            pdf.to_parquet(temp_parquet, engine="pyarrow", index=False)
            
            # Move o arquivo Parquet para o destino final
            try:
                dbutils.fs.rm(f"dbfs:{parquet_file}")
            except Exception:
                pass
            dbutils.fs.mv(f"file:{temp_parquet}", f"dbfs:{parquet_file}")
            print(f"✅ Parquet único salvo em {parquet_file}")
            
            # Verifica o Parquet
            parquet_df = spark.read.parquet(f"dbfs:{parquet_file}")
            print("Conteúdo do Parquet:")
            parquet_df.show(5)
            
        except Exception as e2:
            print(f"❌ Erro ao salvar Parquet com Pandas: {str(e2)}")
            print("🚨 Não foi possível processar o arquivo. Verifique o formato do CSV e as permissões no Volume.")

except Exception as e:
    print(f"❌ Erro ao processar o arquivo: {str(e)}")
    print("🚨 Não foi possível processar o arquivo. Verifique o formato do CSV e as permissões no Volume.")

print("🏁 Processo concluído.")