In [0]:
# ----------------------------- 
# IMPORTS (TOPO)
# -----------------------------
import re
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, trim, lower, coalesce, expr, regexp_replace, when, isnan, 
    to_timestamp, current_timestamp, lit, to_date
)
from pyspark.sql.types import DoubleType, IntegerType, FloatType, LongType, DecimalType

# -----------------------------
# 1 Inicializa Spark
# -----------------------------
spark = SparkSession.builder.getOrCreate()

# -----------------------------
# 2 Lê Silver 1
# -----------------------------
silver_1_table = "saas_project.core.silver_data"
df = spark.table(silver_1_table)

# -----------------------------
# 3 Normaliza nomes das colunas
# -----------------------------
def normalizar_coluna(nome):
    """
    Converte o nome da coluna para minúsculo e substitui caracteres especiais por underscore.
    Mantém apenas letras, números e underscore.
    """
    nome = nome.lower()
    nome = re.sub(r'[^a-z0-9_]', '_', nome)
    nome = re.sub(r'_+', '_', nome)
    return nome.strip('_')

df = df.toDF(*[normalizar_coluna(c) for c in df.columns])

# -----------------------------
# 4 Identifica tipos
# -----------------------------
string_cols = [c for c, t in df.dtypes if t == "string"]
numeric_cols = [c for c, t in df.dtypes if t in ["int", "bigint", "double", "float", "decimal"]]
timestamp_cols = [c for c, t in df.dtypes if t.startswith("timestamp") or "date" in c.lower()]

# -----------------------------
# 5 Limpeza strings
# -----------------------------
valores_invalidos_ext = [
    "null","na","n/a","nan","-","none","undefined","erro","error",
    "fail","failed","invalid","invalido","inválido",
    "not applicable","n.a","nao aplicavel","não aplicavel",
    "missing","missing_value","unknown","unk","nullstr",
    "??","???","@","@@","?","nan","ABC","NaN"
]
invalidos_lower = [v.lower().strip() for v in valores_invalidos_ext]

# -----------------------------
# 5.0 Limpeza de inválidos em TODAS as colunas (qualquer tipo)
# -----------------------------
for c in df.columns:
    df = df.withColumn(
        c,
        when(
            lower(trim(col(c).cast("string"))).isin(invalidos_lower),
            None
        ).otherwise(col(c))
    )

# -----------------------------
# 5.1 Limpeza de números inválidos (NaN, etc)
# -----------------------------
numeric_cols_all = [
    field.name for field in df.schema.fields
    if isinstance(field.dataType, (IntegerType, DoubleType, FloatType, LongType, DecimalType))
]

for c in numeric_cols_all:
    df = df.withColumn(
        c,
        when(isnan(col(c)), None).otherwise(col(c))
    )

# -----------------------------
# 5.2 Limpeza avançada de strings
# -----------------------------
for c in string_cols:
    # Remove tabs, quebras de linha e espaços extras
    df = df.withColumn(c, trim(regexp_replace(col(c), r"[\t\n\r]", "")))
    
    # Remove valores inválidos
    df = df.withColumn(c, when(lower(trim(col(c))).isin(invalidos_lower), None).otherwise(col(c)))

    # REGRA AJUSTADA – não apaga mais "Pessoa 1", "Cliente 2", etc
    df = df.withColumn(
        c,
        when(
            col(c).rlike("^[0-9a-zA-Z]+$") &
            col(c).rlike(".*[0-9].*") &
            col(c).rlike(".*[a-zA-Z].*"),
            None
        ).otherwise(col(c))
    )

    # Remove strings que só têm caracteres não alfanuméricos
    df = df.withColumn(c, when(regexp_replace(col(c), r"[a-z0-9]", "") == col(c), None).otherwise(col(c)))
    
    # Remove strings vazias
    df = df.withColumn(c, when(trim(col(c)) == "", None).otherwise(col(c)))

# -----------------------------
# 5.3 Observação importante:
# Não alteramos maiúsculas/minúsculas do conteúdo original.
# O texto será mantido conforme veio no arquivo:
# - Primeira letra maiúscula se estiver maiúscula
# - Minúsculas permanecem minúsculas (ex.: e-mails)
# -----------------------------

# -----------------------------
# 6 Conversão números e tratamento de negativos
# -----------------------------
numeric_cols = [field.name for field in df.schema.fields 
                if isinstance(field.dataType, (IntegerType, DoubleType, FloatType, LongType, DecimalType))]

for c in numeric_cols:
    temp_col = c + "_temp"
    df = df.withColumn(temp_col, col(c).cast(DoubleType()))
    df = df.drop(c).withColumnRenamed(temp_col, c)

# -----------------------------
# 6.1 Ajusta coluna ID (se existir)
# -----------------------------
if "id" in df.columns:
    # Converte para inteiro
    df = df.withColumn("id", col("id").cast(IntegerType()))
    
    # Ordena crescente
    df = df.orderBy("id")
    
    # Move id para primeira coluna
    other_cols = [c for c in df.columns if c != "id"]
    df = df.select(["id"] + other_cols)

# -----------------------------
# 7 Datas
# -----------------------------
for c in timestamp_cols:
    ts_col = c + "_ts"
    df = df.withColumn(ts_col, lit(None).cast("timestamp"))
    df = df.withColumn(ts_col, coalesce(
        col(ts_col),
        when(col(c).rlike(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"), to_timestamp(col(c), "yyyy-MM-dd HH:mm:ss")),
        when(col(c).rlike(r"^\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}Z$"), to_timestamp(col(c), "dd/MM/yyyy HH:mm:ss")),
        when(col(c).rlike(r"^\d{2}-\d{2}-\d{4} \d{2}:\d{2}:\d{2}$"), to_timestamp(col(c), "dd-MM-yyyy HH:mm:ss")),
        when(col(c).rlike(r"^\d{4}-\d{2}-\d{2}$"), to_timestamp(col(c), "yyyy-MM-dd")),
        when(col(c).rlike(r"^\d{2}/\d{2}/\d{4}$"), to_timestamp(col(c), "dd/MM/yyyy")),
        when(col(c).rlike(r"^\d{2}-\d{2}-\d{4}$"), to_timestamp(col(c), "dd-MM-yyyy"))
    ))
    df = df.drop(c).withColumnRenamed(ts_col, c)
    df = df.withColumn(c, when(col(c) > current_timestamp(), None).otherwise(col(c)))

# -----------------------------
# 7.1 Corrige data_contratacao_date
# -----------------------------
if "data_contratacao_date" in df.columns:
    df = df.withColumn(
        "data_contratacao_date",
        to_timestamp(col("data_contratacao_date"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
    )
    df = df.withColumn(
        "data_contratacao_date",
        to_date(col("data_contratacao_date"))
    )

# -----------------------------
# 7.2 Mantém ingestion_time como última coluna
# -----------------------------
if "ingestion_time" in df.columns:
    cols = [c for c in df.columns if c != "ingestion_time"] + ["ingestion_time"]
    df = df.select(*cols)

# -----------------------------
# 8 Mantém índice se existir
# -----------------------------
if "idx" in df.columns:
    df = df.orderBy("idx")

# -----------------------------
# 8.1 Remove duplicados (ignorando ingestion_time)
# -----------------------------
if "ingestion_time" in df.columns:
    cols_dedup = [c for c in df.columns if c != "ingestion_time"]
    df = df.dropDuplicates(cols_dedup)
else:
    df = df.dropDuplicates()

display(df.limit(10))

# -----------------------------
# 9 Salva Silver 2 (somente tabela Delta)
# -----------------------------
silver_2_table = "saas_project.core.silver_2_data"
spark.sql(f"DROP TABLE IF EXISTS {silver_2_table}")
df.write.format("delta").mode("overwrite").saveAsTable(silver_2_table)
print("Silver 2 recriada com schema do arquivo atual")

# -----------------------------
# 10 Salva Gold CSV (apenas se for Gold)
# -----------------------------
is_gold = False  # <- definir True só quando gerar CSV Gold

if is_gold:
    output_dir = "/Volumes/saas_project/core/download/gold_data_csv/"
    final_csv_path = os.path.join(output_dir, "gold_data.csv")

    df_gold_csv = df  # mantém todos os dados do Silver 2 tratados

    temp_path = os.path.join(output_dir, "temp_csv")
    df_gold_csv.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_path)

    for file_name in os.listdir(temp_path):
        if file_name.endswith(".csv"):
            shutil.move(os.path.join(temp_path, file_name), final_csv_path)
            break

    shutil.rmtree(temp_path)
    print(f"CSV Gold salvo em: {final_csv_path}")


id,nome,idade,email,cidade,salario,ingestion_time
1,Pessoa 0,26.0,user0@exemplo.com,,2036.32,2026/02/19-18:05:36
2,Pessoa 1,68.0,user1@exemplo.com,Rio,3705.09,2026/02/19-18:05:36
3,Pessoa 2,67.0,user2@exemplo.com,São Paulo,5693.16,2026/02/19-18:05:36
4,Pessoa 3,38.0,user3@exemplo.com,Belo Horizonte,9376.51,2026/02/19-18:05:36
5,Pessoa 4,53.0,user4@exemplo.com,,6392.52,2026/02/19-18:05:36
6,Pessoa 5,49.0,user5@exemplo.com,Rio,7172.05,2026/02/19-18:05:36
7,Pessoa 6,,user6@exemplo.com,,3802.53,2026/02/19-18:05:36
8,Pessoa 7,47.0,user7@exemplo.com,,4748.2,2026/02/19-18:05:36
9,Pessoa 8,27.0,user8@exemplo.com,Belo Horizonte,4476.1,2026/02/19-18:05:36
10,Pessoa 9,61.0,user9@exemplo.com,,9242.02,2026/02/19-18:05:36


Silver 2 recriada com schema do arquivo atual
