In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
df_bronze = spark.table("grao.bronze.grain_shipping")

duplicados = df_bronze.groupBy("id_envio").count().filter(F.col("count") > 1 )

window_spec = Window.partitionBy("id_envio")
df_bronze = df_bronze.withColumn("valid", F.when(F.count("id_envio").over(window_spec) == 1, F.lit(1)).otherwise(F.lit(0)))
if duplicados.count() > 0:
    duplicados.display()
    print(f"Existem id_envio duplicados no dataframe {duplicados.count()}")
else:
    print("Não existem id_duplicados")

In [0]:
def clean_col_names(df):
    """
    Renomeia as colunas de um DataFrame:
    - Remove acentos e caracteres especiais.
    - Converte para minúsculas.
    """
    new_column_names = []
    
    # Itera sobre a lista de nomes de colunas (que são strings)
    for name in df.columns:
        
        # Converte para minúsculas
        clean_name = name.lower()
        
        # Remove acentos e caracteres especiais comuns no seu schema
        clean_name = clean_name.replace('ã', 'a').replace('ç', 'c').replace('é', 'e').replace('á', 'a').replace('õ', 'o')
        
        # Remove qualquer caractere que não seja letra, número ou underscore (_)
        import re
        clean_name = re.sub(r'[^a-z0-9_]', '', clean_name)
        
        new_column_names.append(clean_name)

    
    return df.toDF(*new_column_names)

In [0]:
# Aplica a função para obter o DataFrame com as colunas limpas
df_bronze = clean_col_names(df_bronze)

In [0]:

# Adicionado um ID único à tabela calendário para usar como chave estrangeira na fato
df_calendario = spark.table("grao.bronze.calendario").withColumn(
    "id_data", F.row_number().over(Window.orderBy("data"))
)

In [0]:
# Filtrando somente dados validos da tabela bronze
df_bronze = df_bronze.filter(F.col("valid") == 1).drop(F.col("valid"))

In [0]:
print("\n1. Criando Dim_Metodo_Envio...")
dim_metodo_envio = (
    df_bronze.select("metodo_de_envio", "corredor_de_armazenagem", "importancia")
    .distinct()
    .withColumn("id_metodo_envio", F.row_number().over(Window.orderBy("metodo_de_envio")))
)
dim_metodo_envio.write.mode("overwrite").format("delta").saveAsTable("grao.silver.dim_metodo_envio")

In [0]:
print("\n2. Criando Dim_Localizacao...")
dim_localizacao = (
    df_bronze.select("Destino")
    .distinct()
    .withColumn("id_localizacao", F.row_number().over(Window.orderBy("Destino")))
)
dim_localizacao.write.mode("overwrite").format("delta").saveAsTable("grao.silver.dim_localizacao")

In [0]:
print("\n3. Criando Dim_Cliente...")
dim_cliente = (
    df_bronze.select("genero")
    .distinct()
    .withColumn("id_cliente", F.row_number().over(Window.orderBy("genero")))
)
dim_cliente.write.mode("overwrite").format("delta").saveAsTable("grao.silver.dim_cliente")

In [0]:
print("\n--- Criando Tabela Fato usando a Tabela Calendário ---")


cal_envio = df_calendario.alias("cal_envio")
cal_entrega = df_calendario.alias("cal_entrega")


fato_envios = (
    df_bronze
    # JOIN para buscar o ID da data de ENVIO pela coluna de texto completo
    .join(
        cal_envio,
        df_bronze["DataEnvio"] == F.col("cal_envio.data_completa"),
        "left"
    )
    # JOIN para buscar o ID da data de ENTREGA pela coluna de texto completo
    .join(
        cal_entrega,
        df_bronze["dataEntrega"] == F.col("cal_entrega.data_completa"),
        "left"
    )
    # JOIN para buscar o ID do método de envio
    .join(
        dim_metodo_envio,
        ["metodo_de_envio", "corredor_de_armazenagem", "importancia"], # Join em múltiplas colunas
        "left"
    )
    # JOIN para buscar o ID da localização
    .join(dim_localizacao, "Destino", "left")
    # JOIN para buscar o ID do cliente
    .join(dim_cliente, "genero", "left")
)

# Selecionamos as colunas da tabela fato final
fato_envios_final = fato_envios.select(
    F.col("id_envio"),
    F.col("id_metodo_envio"),
    F.col("id_localizacao"),
    F.col("id_cliente"),
    F.col("cal_envio.id_data").alias("id_data_envio"),
    F.col("cal_entrega.id_data").alias("id_data_entrega"),
    F.col("ligacoes_do_cliente"),
    F.col("avaliacao_do_cliente"),
    F.col("preco"),
    F.col("qtd_itens"),
    F.col("desconto"),
    F.col("peso_g"),
    F.col("chegou_no_tempo"),
    F.col("avaliacaoentrega")
)


fato_envios_final.write.mode("overwrite").format("delta").saveAsTable("grao.silver.fato_envios")
print("Processo finalizado com sucesso !!")