## Desafio DWE 2025 - Transformação Camada Gold
**Esta é a camada que ficará disponível para a análise de dados. Nela serão aplicados:**
- Gravação SCD do tipo 2 das tabelas de dimensão
- Inserção das SKs nas tabelas dimensão e na relação com a fato

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Transformação Camada Gold") \
    .config("spark.sql.shuffle.partitions", "200")  \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()


# Diretórios de trabalho 
sv_dim_path = '/mnt/desafio-dwe-25/lhdw/silver/dim'
sv_fato_path = '/mnt/desafio-dwe-25/lhdw/silver/fato'

gd_dim_path = '/mnt/desafio-dwe-25/lhdw/gold/dim'
gd_fato_path = '/mnt/desafio-dwe-25/lhdw/gold/fato'


In [0]:
import gc

# Função para auxiliar a limpeza dos dataframes
def limpeza_cache(*args):
    for df in args:
        if hasattr(df, "unpersist"):
            df.unpersist()
            gc.collect()

### Dimensão Categoria
- Aplicar SCD2
- Aplicar SK

In [0]:
sv_file = f'{sv_dim_path}/CATEGORIA'
gd_file = f'{gd_dim_path}/CATEGORIA'

# Carregamento de Categoria na Camada Silver
df_categoria = spark.read.format("delta").load(sv_file)

# Procura de dados na camada Gold
try:
    df_categoria_gold = spark.read.format("delta").load(gd_file)
    ultimo_sk = df_categoria_gold.agg(max_("CategoriaSK")).collect()[0][0]
    if ultimo_sk is None:
        ultimo_sk = 0
except:
    # Caso ainda não haja dados na Gold, cria-se o formato de tabela vazia
    schema = df_categoria.schema \
                      .add("DataINI", "timestamp") \
                      .add("DataFIM", "timestamp") \
                      .add("ATIVO", "boolean") \
                      .add("CategoriaSK", "long")
    df_categoria_gold = spark.createDataFrame([], schema)
    ultimo_sk = 0

# Ajustando a tipagem da coluna de SK nos Dataframes
df_categoria = df_categoria.withColumn("CategoriaSK", lit(None).cast("long"))
df_categoria_gold = df_categoria_gold.withColumn("CategoriaSK", col("CategoriaSK").cast("long"))

# Filtra registros ativos e inativos no caso de já existirem registros
df_gold_ativos = df_categoria_gold.filter(col("Ativo") == True)
df_gold_inativos = df_categoria_gold.filter(col("Ativo") == False)

# Juntando registros ativos já existentes na gold com já existentes vindos da silver
df_silver_gold = df_categoria.alias("silver").join(df_categoria_gold.alias("gold"), "CategoriaID", "left")

# Checando se algum registro já existente na gold foi modificado
df_updated = df_silver_gold.filter(col("silver.NomeCategoria") != col("gold.NomeCategoria"))

# Marca os registros que foram modificados como INATIVO na gold (False)
df_updated_inativos = df_updated.select(
    col("gold.CategoriaID"),
    col("gold.NomeCategoria"),
    col("gold.DATA_ATUALIZACAO"),
    col("gold.DataINI"),
    current_timestamp().alias("DataFIM"),
    lit(False).alias("Ativo"),
    col("gold.CategoriaSK")
)

# Inclusão das SKs nos registros novos e ATIVAÇÃO dos registros como vigentes (True)
df_updated_novos = df_updated.select(
    col("silver.CategoriaID"),
    col("silver.NomeCategoria"),
    col("silver.DATA_ATUALIZACAO"),
    col("silver.DATA_ATUALIZACAO").alias("DataINI"),
    lit(None).cast("timestamp").alias("DataFIM"),
    lit(True).alias("Ativo"),
    (monotonically_increasing_id() + ultimo_sk + 1).cast("long").alias("CategoriaSK")
)

# Contando quantos SKs já temos até o momento (juntando os ativos e os das modificações)
total_sks = ultimo_sk + df_gold_ativos.count() + df_updated_novos.count()

# Coletando os registros novos que não estavam em Gold ainda e inserindo SKs de acordo com a contagem
df_novos_categoria = df_categoria.alias("silver") \
    .join(df_gold_ativos.alias("gold"), "CategoriaID", "left_anti") \
    .withColumn("DataINI", col("silver.DATA_ATUALIZACAO")) \
    .withColumn("DataFIM", lit(None).cast("timestamp")) \
    .withColumn("Ativo", lit(True)) \
    .withColumn("CategoriaSK", (monotonically_increasing_id() + total_sks + 1).cast("long"))

# Pegando a lista de CategoriaIDs dos dados modificados
list_ids_updated = df_updated.select("CategoriaID").rdd.flatMap(lambda x: x).collect()

# União dos registros atualizados (inativos e novos) na Gold e novos registros da Silver
df_categoria_gold = df_gold_inativos \
    .unionByName(df_updated_inativos) \
    .unionByName(df_updated_novos) \
    .unionByName(df_novos_categoria) \
    .unionByName(df_gold_ativos.filter(~col("CategoriaID").isin(list_ids_updated)))

df_categoria_gold.show(5)
df_categoria_gold.printSchema()

+-----------+-------------+--------------------+--------------------+-------+-----+-----------+
|CategoriaID|NomeCategoria|    DATA_ATUALIZACAO|             DataINI|DataFIM|ATIVO|CategoriaSK|
+-----------+-------------+--------------------+--------------------+-------+-----+-----------+
|          1|  Confections|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          1|
|          6|      Seafood|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          2|
|          3|      Cereals|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          3|
|          5|    Beverages|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          4|
|          9|      Poultry|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          5|
+-----------+-------------+--------------------+--------------------+-------+-----+-----------+
only showing top 5 rows

root
 |-- CategoriaID: integer (nullable = true)
 |-- NomeCategoria: string (nullable = true)
 |-- DATA_ATUALIZ

In [0]:
gold_path = f'{gd_dim_path}/CATEGORIA'

df_categoria_gold.write.format("delta").mode("overwrite").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
spark.read.format("delta").load(gold_path).show(5)

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/dim/CATEGORIA:
+-----------+-------------+--------------------+--------------------+-------+-----+-----------+
|CategoriaID|NomeCategoria|    DATA_ATUALIZACAO|             DataINI|DataFIM|ATIVO|CategoriaSK|
+-----------+-------------+--------------------+--------------------+-------+-----+-----------+
|          1|  Confections|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          1|
|          6|      Seafood|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          2|
|          3|      Cereals|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          3|
|          5|    Beverages|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          4|
|          9|      Poultry|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|          5|
+-----------+-------------+--------------------+--------------------+-------+-----+-----------+
only showing top 5 rows



In [0]:
# Fazendo a limpeza dos Dataframes
limpeza_cache(
    df_categoria, df_categoria_gold, df_gold_ativos, 
    df_gold_inativos, df_novos_categoria, df_silver_gold, 
    df_updated, df_updated_inativos, df_updated_novos
)

### Dimensão Produto
- Aplicar SCD2
- Aplicar SK
- Aplicar relação com Categoria

In [0]:
sv_file = f'{sv_dim_path}/PRODUTO'
gd_file = f'{gd_dim_path}/PRODUTO'
gd_categoria = f'{gd_dim_path}/CATEGORIA'

# Carregamento de Produto na Camada Silver
df_produto = spark.read.format("delta").load(sv_file)

# Procura de dados na camada Gold
try:
    df_produto_gold = spark.read.format("delta").load(gd_file)
    ultimo_sk = df_produto_gold.agg(max_("ProdutoSK")).collect()[0][0]
    if ultimo_sk is None:
        ultimo_sk = 0
except:
    # Caso ainda não haja dados na Gold, cria-se o formato de tabela vazia
    schema = df_produto.schema \
                      .add("DataINI", "timestamp") \
                      .add("DataFIM", "timestamp") \
                      .add("Ativo", "boolean") \
                      .add("ProdutoSK", "long") \
                      .add("CategoriaSK", "long")
    df_produto_gold = spark.createDataFrame([], schema)
    ultimo_sk = 0

# Ajustando a tipagem da coluna de SK nos Dataframes
df_produto = df_produto.withColumn("ProdutoSK", lit(None).cast("long"))
df_produto_gold = df_produto_gold.withColumn("ProdutoSK", col("ProdutoSK").cast("long"))

# Trazendo SKs de Categoria na Gold
df_categoria_gold = spark.read.format("delta") \
    .load(gd_categoria).filter(col("Ativo") == True) \
    .select("CategoriaID", "CategoriaSK")

# Fazendo a junção entre os produtos da Silver e as categorias da Gold
df_produto = df_produto.join(df_categoria_gold, "CategoriaID", "left")

# Certifica a existência da coluna
if "CategoriaID" not in df_produto_gold.columns:
    df_produto_gold.withColumn("CategoriaID", lit(None).cast(dtype))
    
# Filtra registros ativos e inativos no caso de já existirem registros em Gold
df_gold_ativos = df_produto_gold.filter(col("Ativo") == True)
df_gold_inativos = df_produto_gold.filter(col("Ativo") == False)

# Juntando registros ativos já existentes na gold com já existentes vindos da silver
df_silver_gold = df_produto.alias("silver").join(df_gold_ativos.alias("gold"), "ProdutoID", "left")

# Checando se algum registro já existente na Gold foi modificado
df_modificados = df_silver_gold.filter(
    (col("silver.ProdutoNome") != col("gold.ProdutoNome")) |
    (col("silver.Preco") != col("gold.Preco")) |
    (col("silver.CategoriaSK") != col("gold.CategoriaSK")) |
    (col("Silver.Classe") != col("gold.Classe")) |
    (col("silver.Resistencia") != col("gold.Resistencia")) |
    (col("silver.EAlergico") != col("gold.EAlergico")) |
    (col("silver.ValidadeDias") != col("gold.ValidadeDias"))
)

# Marca os registros antigos da Gold que foram modificados como INATIVO (False)
df_modificados_inativos = df_modificados.select(
    col("gold.ProdutoID"),
    col("gold.ProdutoNome"),
    col("gold.Preco"),
    col("gold.Classe"),
    col("gold.DataCadastro"),
    col("gold.Resistencia"),
    col("gold.EAlergico"),
    col("gold.ValidadeDias"),
    col("gold.DATA_ATUALIZACAO"),
    col("gold.DataINI"),
    col("gold.CategoriaSK"),
    current_timestamp().alias("DataFIM"),
    lit(False).alias("Ativo"),
    col("gold.ProdutoSK"),
    col("gold.CategoriaID")
)

# Inclusão das SKs nos registros novos que vem da Silver e ATIVAÇÃO dos registros como vigentes (True)
df_modificados_novos = df_modificados.select(
    col("silver.ProdutoID"),
    col("silver.ProdutoNome"),
    col("silver.Preco"),
    col("silver.Classe"),
    col("silver.DataCadastro"),
    col("silver.Resistencia"),
    col("silver.EAlergico"),
    col("silver.ValidadeDias"),
    col("silver.DATA_ATUALIZACAO"),
    col("silver.DATA_ATUALIZACAO").alias("DataINI"),
    col("silver.CategoriaSK"),
    lit(None).cast("timestamp").alias("DataFIM"),
    lit(True).alias("Ativo"),
    (monotonically_increasing_id() + ultimo_sk + 1).cast("long").alias("ProdutoSK"),
    col("silver.CategoriaID")
)

# Contando quantos SKs já temos até o momento (juntando os ativos e os das modificações)
total_sks = ultimo_sk + df_gold_ativos.count() + df_modificados_novos.count()

# Coletando os registros novos que não estavam em Gold ainda e inserindo SKs de acordo com a contagem
df_novos_produtos = df_produto.alias("silver") \
    .join(df_gold_ativos.alias("gold"), "ProdutoID", "left_anti") \
    .withColumn("DataINI", col("silver.DATA_ATUALIZACAO")) \
    .withColumn("DataFIM", lit(None).cast("timestamp")) \
    .withColumn("Ativo", lit(True)) \
    .withColumn("ProdutoSK", (monotonically_increasing_id() + total_sks + 1).cast("long"))

# Pegando a lista de ProdutoIDs dos dados modificados
list_ids_modificados = df_modificados.select("ProdutoID").rdd.flatMap(lambda x: x).collect()

# União dos registros atualizados (inativos e novos) na Gold e novos registros da Silver
df_produto_gold = df_gold_inativos \
    .unionByName(df_modificados_inativos) \
    .unionByName(df_modificados_novos) \
    .unionByName(df_novos_produtos) \
    .unionByName(df_gold_ativos.filter(~col("ProdutoID").isin(list_ids_modificados)))

# Dropando a coluna CategoriaID da tabela final
df_produto_gold = df_produto_gold.drop("CategoriaID")

# Verificando schema antes de salvar
df_produto_gold.show(5)
df_produto_gold.printSchema()

+---------+--------------------+-------+------+--------------------+-----------+---------+------------+--------------------+--------------------+-------+-----+---------+-----------+
|ProdutoID|         ProdutoNome|  Preco|Classe|        DataCadastro|Resistencia|EAlergico|ValidadeDias|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|ProdutoSK|CategoriaSK|
+---------+--------------------+-------+------+--------------------+-----------+---------+------------+--------------------+--------------------+-------+-----+---------+-----------+
|      148|Beer - Sleemans C...|28.5553|Medium|2018-04-19 00:16:...|    Durable|     True|        91.0|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|        1|         10|
|      243|Sun - Dried Tomatoes|45.1883|Medium|2017-11-02 21:24:...|       Weak|    False|       104.0|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|        2|          2|
|      392|Puree - Passion F...|98.8263|  High|2018-02-25 13:36:...|    Unknown|     True|

In [0]:
gold_path = f'{gd_dim_path}/PRODUTO'

# Salvando os arquivos na Gold
df_produto_gold.write.format("delta").mode("overwrite").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
spark.read.format("delta").load(gold_path).show(5)

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/dim/PRODUTO:
+---------+--------------------+-------+------+--------------------+-----------+---------+------------+--------------------+--------------------+-------+-----+---------+-----------+
|ProdutoID|         ProdutoNome|  Preco|Classe|        DataCadastro|Resistencia|EAlergico|ValidadeDias|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|ProdutoSK|CategoriaSK|
+---------+--------------------+-------+------+--------------------+-----------+---------+------------+--------------------+--------------------+-------+-----+---------+-----------+
|        1| Flour - Whole Wheat|74.2988|Medium|2018-02-16 08:21:...|    Durable|  Unknown|         0.0|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|        1|          3|
|        2|Cookie Chocolate ...|91.2329|Medium|2017-02-12 11:39:...|    Unknown|  Unknown|         0.0|2025-02-20 20:55:...|2025-02-20 20:55:...|   null| true|        2|          3|
|        3|  Onions - Cippolini

In [0]:
# Limpando cache do cluster
limpeza_cache(
    df_categoria_gold, df_gold_ativos, df_gold_inativos,
    df_modificados, df_modificados_inativos, df_modificados_novos,
    df_novos_produtos, df_produto, df_produto_gold, df_silver_gold
)

### Dimensão - Vendedores
- Aplicar SCD2
- Aplicar SK

In [0]:
sv_file = f'{sv_dim_path}/VENDEDOR'
gd_file = f'{gd_dim_path}/VENDEDOR'

# Carregamento de Vendedor na Camada Silver
df_vendedor = spark.read.format("delta").load(sv_file)

# Procura de dados existentes na camada Gold
try:
    df_vendedor_gold = spark.read.format("delta").load(gd_file)
    ultimo_sk = df_vendedor_gold.agg(max_("VendedorSK")).collect()[0][0]
    if ultimo_sk is None:
        ultimo_sk = 0
except:
    # Caso ainda não haja dados na Gold, cria-se o formato de tabela vazia
    schema = df_vendedor.schema \
                      .add("DataINI", "timestamp") \
                      .add("DataFIM", "timestamp") \
                      .add("Ativo", "boolean") \
                      .add("VendedorSK", "long")
    df_vendedor_gold = spark.createDataFrame([], schema)
    ultimo_sk = 0


# Ajuste da tipagem da coluna de SK nos Dataframes
df_vendedor = df_vendedor.withColumn("VendedorSK", lit(None).cast("long"))
df_vendedor_gold = df_vendedor_gold.withColumn("VendedorSK", col("VendedorSK").cast("long"))

# Filtra registros ativos e inativos no caso de já existirem registros na Gold
df_gold_ativos = df_vendedor_gold.filter(col("Ativo") == True)
df_gold_inativos = df_vendedor_gold.filter(col("Ativo") == False)

# Juntando registros ativos já existentes na Gold os vindos da Silver para achar mudanças
df_silver_gold = df_vendedor.alias("silver").join(df_vendedor_gold.alias("gold"), "VendedorID", "left")

# Checando se algum registro existente na gold foi modificado por comparação Silver - Gold
df_modificados = df_silver_gold.filter(
    (col("silver.NomeVendedor") != col("gold.NomeVendedor")) |
    (col("silver.DataNascimento") != col("gold.DataNascimento")) |
    (col("silver.Genero") != col("gold.Genero")) |
    (col("silver.CidadeID") != col("gold.CidadeID")) |
    (col("silver.DataAdmissao") != col("gold.DataAdmissao"))
)

# Marca os registros que foram modificados como INATIVO na Gold (False)
df_modificados_inativos = df_modificados.select(
    col("gold.VendedorID"),
    col("gold.NomeVendedor"),
    col("gold.DataNascimento"),
    col("gold.Genero"),
    col("gold.CidadeID"),
    col("gold.DataAdmissao"),
    col("gold.DATA_ATUALIZACAO"),
    col("gold.DataINI"),
    current_timestamp().alias("DataFIM"),
    lit(False).alias("Ativo"),
    col("gold.VendedorSK")
)

# Inclusão das SKs nos registros novos e ATIVAÇÃO dos registros como vigentes (True)
df_modificados_novos = df_modificados.select(
    col("silver.VendedorID"),
    col("silver.NomeVendedor"),
    col("silver.DataNascimento"),
    col("silver.Genero"),
    col("silver.CidadeID"),
    col("silver.DataAdmissao"),
    col("silver.DATA_ATUALIZACAO"),
    col("silver.DATA_ATUALIZACAO").alias("DataINI"),
    lit(None).cast("timestamp").alias("DataFIM"),
    lit(True).alias("Ativo"),
    (monotonically_increasing_id() + ultimo_sk + 1).cast("long").alias("VendedorSK")
)


# Contando quantos SKs já temos até o momento (juntando os ativos e os das modificações)
total_sks = ultimo_sk + df_gold_ativos.count() + df_modificados_novos.count()

# Coletando os registros novos que não estavam em Gold ainda e inserindo data e SKs de acordo com a contagem
df_novos_vendedores = df_vendedor.alias("silver") \
    .join(df_gold_ativos.alias("gold"), "VendedorID", "left_anti") \
    .withColumn("DataINI", col("silver.DATA_ATUALIZACAO")) \
    .withColumn("DataFIM", lit(None).cast("timestamp")) \
    .withColumn("Ativo", lit(True)) \
    .withColumn("VendedorSK", (monotonically_increasing_id() + total_sks + 1).cast("long"))


# Pegando a lista de VendedorIDs dos dados modificados
vendedores_modificados = df_modificados.select("VendedorID").rdd.flatMap(lambda x: x).collect()

# União dos registros atualizados (inativos e novos ativos) da Gold e novos registros vindos da Silver
df_vendedor_gold = df_gold_inativos \
    .unionByName(df_modificados_inativos) \
    .unionByName(df_modificados_novos) \
    .unionByName(df_novos_vendedores) \
    .unionByName(df_gold_ativos.filter(~col("VendedorID").isin(vendedores_modificados)))

df_vendedor_gold.show(5)
df_vendedor_gold.printSchema()

+----------+-----------------+--------------+------+--------+------------+--------------------+--------------------+-------+-----+----------+
|VendedorID|     NomeVendedor|DataNascimento|Genero|CidadeID|DataAdmissao|    DATA_ATUALIZACAO|             DataINI|DataFIM|ATIVO|VendedorSK|
+----------+-----------------+--------------+------+--------+------------+--------------------+--------------------+-------+-----+----------+
|        12|   Lindsay M Chen|    1951-09-03|     F|      58|  2011-11-03|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         1|
|        22|Tonia O Mc Millan|    1952-03-02|     F|      53|  2015-11-25|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         2|
|         1|  Nicole T Fuller|    1981-03-07|     F|      80|  2011-06-20|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         3|
|        13|   Katina Y Marks|    1963-04-18|     M|      68|  2011-12-12|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         4|
|     

In [0]:
gold_path = f'{gd_dim_path}/VENDEDOR'

df_vendedor_gold.write.format("delta").mode("overwrite").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
spark.read.format("delta").load(gold_path).show(5)

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/dim/VENDEDOR:
+----------+-----------------+--------------+------+--------+------------+--------------------+--------------------+-------+-----+----------+
|VendedorID|     NomeVendedor|DataNascimento|Genero|CidadeID|DataAdmissao|    DATA_ATUALIZACAO|             DataINI|DataFIM|ATIVO|VendedorSK|
+----------+-----------------+--------------+------+--------+------------+--------------------+--------------------+-------+-----+----------+
|        12|   Lindsay M Chen|    1951-09-03|     F|      58|  2011-11-03|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         1|
|        22|Tonia O Mc Millan|    1952-03-02|     F|      53|  2015-11-25|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         2|
|         1|  Nicole T Fuller|    1981-03-07|     F|      80|  2011-06-20|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|         3|
|        13|   Katina Y Marks|    1963-04-18|     M|      68|  2011-12-12|2025-02-21 18:

In [0]:
# Limpando o cache
limpeza_cache(
    df_gold_ativos, df_gold_inativos, df_modificados,
    df_modificados_inativos, df_modificados_novos, df_novos_vendedores,
    df_silver_gold, df_vendedor, df_vendedor_gold
)

### Dimensão - País
- Aplicar SCD2
- Aplicar SK

In [0]:
sv_file = f'{sv_dim_path}/PAIS'
gd_file = f'{gd_dim_path}/PAIS'

# Carregamento de País na Camada Silver
df_pais = spark.read.format("delta").load(sv_file)

# Procura de dados existentes na camada Gold
try:
    df_pais_gold = spark.read.format("delta").load(gd_file)
    ultimo_sk = df_pais_gold.agg(max_("PaisSK")).collect()[0][0]
    if ultimo_sk is None:
        ultimo_sk = 0
except:
    # Caso ainda não haja dados na Gold, cria-se o formato de tabela vazia
    schema = df_pais.schema \
                      .add("DataINI", "timestamp") \
                      .add("DataFIM", "timestamp") \
                      .add("Ativo", "boolean") \
                      .add("PaisSK", "long")
    df_pais_gold = spark.createDataFrame([], schema)
    ultimo_sk = 0


# Ajuste da tipagem da coluna de SK nos Dataframes
df_pais = df_pais.withColumn("PaisSK", lit(None).cast("long"))
df_pais_gold = df_pais_gold.withColumn("PaisSK", col("PaisSK").cast("long"))

# Filtra registros ativos e inativos no caso de já existirem registros na Gold
df_gold_ativos = df_pais_gold.filter(col("Ativo") == True)
df_gold_inativos = df_pais_gold.filter(col("Ativo") == False)

# Juntando registros ativos já existentes na Gold os vindos da Silver para achar mudanças
df_silver_gold = df_pais.alias("silver").join(df_pais_gold.alias("gold"), "PaisID", "left")

# Checando se algum registro existente na gold foi modificado por comparação Silver - Gold
df_modificados = df_silver_gold.filter(
    (col("silver.PaisNome") != col("gold.PaisNome")) |
    (col("silver.SiglaPais") != col("gold.SiglaPais")) 
)

# Marca os registros que foram modificados como INATIVO na Gold (False)
df_modificados_inativos = df_modificados.select(
    col("gold.PaisID"),
    col("gold.PaisNome"),
    col("gold.SiglaPais"),
    col("gold.DATA_ATUALIZACAO"),
    col("gold.DataINI"),
    current_timestamp().alias("DataFIM"),
    lit(False).alias("Ativo"),
    col("gold.PaisSK")
)

# Inclusão das SKs nos registros novos e ATIVAÇÃO dos registros como vigentes (True)
df_modificados_novos = df_modificados.select(
    col("silver.PaisID"),
    col("silver.PaisNome"),
    col("silver.SiglaPais"),
    col("silver.DATA_ATUALIZACAO"),
    col("silver.DATA_ATUALIZACAO").alias("DataINI"),
    lit(None).cast("timestamp").alias("DataFIM"),
    lit(True).alias("Ativo"),
    (monotonically_increasing_id() + ultimo_sk + 1).cast("long").alias("PaisSK")
)


# Contando quantos SKs já temos até o momento (juntando os ativos e os das modificações)
total_sks = ultimo_sk + df_gold_ativos.count() + df_modificados_novos.count()

# Coletando os registros novos que não estavam em Gold ainda e inserindo data e SKs de acordo com a contagem
df_novos_paises = df_pais.alias("silver") \
    .join(df_gold_ativos.alias("gold"), "PaisID", "left_anti") \
    .withColumn("DataINI", col("silver.DATA_ATUALIZACAO")) \
    .withColumn("DataFIM", lit(None).cast("timestamp")) \
    .withColumn("Ativo", lit(True)) \
    .withColumn("PaisSK", (monotonically_increasing_id() + total_sks + 1).cast("long"))


# Pegando a lista de PaisIDs dos dados modificados
paises_modificados = df_modificados.select("PaisID").rdd.flatMap(lambda x: x).collect()

# União dos registros atualizados (inativos e novos ativos) da Gold e novos registros vindos da Silver
df_pais_gold = df_gold_inativos \
    .unionByName(df_modificados_inativos) \
    .unionByName(df_modificados_novos) \
    .unionByName(df_novos_paises) \
    .unionByName(df_gold_ativos.filter(~col("PaisID").isin(paises_modificados)))

df_pais_gold.show(5)
df_pais_gold.printSchema()


+------+--------+---------+--------------------+--------------------+-------+-----+------+
|PaisID|PaisNome|SiglaPais|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|PaisSK|
+------+--------+---------+--------------------+--------------------+-------+-----+------+
|   148|    Mali|       BF|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     1|
|    31|   Congo|       TR|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     2|
|    85|   Yemen|       NO|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     3|
|   137|    Fiji|       MQ|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     4|
|    65| Lebanon|       LA|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     5|
+------+--------+---------+--------------------+--------------------+-------+-----+------+
only showing top 5 rows

root
 |-- PaisID: integer (nullable = true)
 |-- PaisNome: string (nullable = true)
 |-- SiglaPais: string (nullable = true)
 |-- DATA_ATUALIZACAO: timestam

In [0]:
gold_path = f'{gd_dim_path}/PAIS'

df_pais_gold.write.format("delta").mode("overwrite").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
spark.read.format("delta").load(gold_path).show(5)

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/dim/PAIS:
+------+--------+---------+--------------------+--------------------+-------+-----+------+
|PaisID|PaisNome|SiglaPais|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|PaisSK|
+------+--------+---------+--------------------+--------------------+-------+-----+------+
|   148|    Mali|       BF|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     1|
|    31|   Congo|       TR|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     2|
|    85|   Yemen|       NO|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     3|
|   137|    Fiji|       MQ|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     4|
|    65| Lebanon|       LA|2025-02-21 18:43:...|2025-02-21 18:43:...|   null| true|     5|
+------+--------+---------+--------------------+--------------------+-------+-----+------+
only showing top 5 rows



### Dimensão - Cidade
- Aplicar SCD2
- Aplicar SK
- Aplicar relação com País

In [0]:
sv_file = f'{sv_dim_path}/CIDADE'
gd_file = f'{gd_dim_path}/CIDADE'
gd_pais = f'{gd_dim_path}/PAIS'

# Carregamento de Cidade na Camada Silver
df_cidade = spark.read.format("delta").load(sv_file)

# Procura de dados na camada Gold
try:
    df_cidade_gold = spark.read.format("delta").load(gd_file)
    ultimo_sk = df_cidade_gold.agg(max_("CidadeSK")).collect()[0][0]
    if ultimo_sk is None:
        ultimo_sk = 0
except:
    # Caso ainda não haja dados na Gold, cria-se o formato de tabela vazia
    # já inserindo a SK de Cidade e a SK estrangeira de País
    schema = df_cidade.schema \
                      .add("DataINI", "timestamp") \
                      .add("DataFIM", "timestamp") \
                      .add("Ativo", "boolean") \
                      .add("CidadeSK", "long") \
                      .add("PaisSK", "long")
    df_cidade_gold = spark.createDataFrame([], schema)
    ultimo_sk = 0


# Ajustando a tipagem da coluna de SK nos Dataframes
df_cidade = df_cidade.withColumn("CidadeSK", lit(None).cast("long"))
df_cidade_gold = df_cidade_gold.withColumn("CidadeSK", col("CidadeSK").cast("long"))

# Trazendo SKs e IDs de País da Gold
df_pais_gold = spark.read.format("delta") \
    .load(gd_pais).filter(col("Ativo") == True) \
    .select("PaisID", "PaisSK")

# Fazendo a junção entre as cidades da Silver e os países da Gold
df_cidade = df_cidade.join(df_pais_gold, "PaisID", "left")

# Certifica a existência da coluna PaisID nos dados da Gold
if "PaisID" not in df_cidade_gold.columns:
    df_cidade_gold.withColumn("PaisID", lit(None).cast(dtype))
    
# Filtra registros ativos e inativos no caso de já existirem registros em Gold
df_gold_ativos = df_cidade_gold.filter(col("Ativo") == True)
df_gold_inativos = df_cidade_gold.filter(col("Ativo") == False)

# Juntando registros ativos já existentes na Gold com os vindos da Silver para achar mudanças
df_silver_gold = df_cidade.alias("silver").join(df_gold_ativos.alias("gold"), "CidadeID", "left")

# Checando se algum registro existente na gold foi modificado por comparação Silver - Gold
df_modificados = df_silver_gold.filter(
    (col("silver.NomeCidade") != col("gold.NomeCidade")) |
    (col("silver.Cep") != col("gold.Cep")) |
    (col("silver.PaisSK") != col("gold.PaisSK"))
)

# Marca os registros antigos da Gold que foram modificados como INATIVO (False)
df_modificados_inativos = df_modificados.select(
    col("gold.CidadeID"),
    col("gold.NomeCidade"),
    col("gold.Cep"),
    col("gold.DATA_ATUALIZACAO"),
    col("gold.DataINI"),
    col("gold.PaisSK"),
    current_timestamp().alias("DataFIM"),
    lit(False).alias("Ativo"),
    col("gold.CidadeSK"),
    col("gold.PaisID")
)

# Inclusão das SKs nos registros novos que vem da Silver e ATIVAÇÃO dos registros como vigentes (True)
df_modificados_novos = df_modificados.select(
    col("silver.CidadeID"),
    col("silver.NomeCidade"),
    col("silver.Cep"),
    col("silver.DATA_ATUALIZACAO"),
    col("silver.DATA_ATUALIZACAO").alias("DataINI"),
    col("silver.PaisSK"),
    lit(None).cast("timestamp").alias("DataFIM"),
    lit(True).alias("Ativo"),
    (monotonically_increasing_id() + ultimo_sk + 1).cast("long").alias("CidadeSK"),
    col("silver.PaisID")
)

# Contando quantos SKs já temos até o momento (juntando os ativos e os das modificações)
total_sks = ultimo_sk + df_gold_ativos.count() + df_modificados_novos.count()

# Coletando os registros novos que não estavam em Gold ainda e inserindo data e SKs de acordo com a contagem
df_novas_cidades = df_cidade.alias("silver") \
    .join(df_gold_ativos.alias("gold"), "CidadeID", "left_anti") \
    .withColumn("DataINI", col("silver.DATA_ATUALIZACAO")) \
    .withColumn("DataFIM", lit(None).cast("timestamp")) \
    .withColumn("Ativo", lit(True)) \
    .withColumn("CidadeSK", (monotonically_increasing_id() + total_sks + 1).cast("long"))

# Pegando a lista de CidadeIDs dos dados modificados
ids_modificados = df_modificados.select("CidadeID").rdd.flatMap(lambda x: x).collect()

# União dos registros atualizados (inativos e novos ativos) da Gold e novos registros vindos da Silver
df_cidade_gold = df_gold_inativos \
    .unionByName(df_modificados_inativos) \
    .unionByName(df_modificados_novos) \
    .unionByName(df_novas_cidades) \
    .unionByName(df_gold_ativos.filter(~col("CidadeID").isin(ids_modificados)))

# Dropando a coluna PaisID da tabela final
df_cidade_gold = df_cidade_gold.drop("PaisID")

# Verificando schema antes de salvar
df_cidade_gold.show(5)
df_cidade_gold.printSchema()

+--------+--------------+-----+--------------------+--------------------+-------+-----+--------+------+
|CidadeID|    NomeCidade|  Cep|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|CidadeSK|PaisSK|
+--------+--------------+-----+--------------------+--------------------+-------+-----+--------+------+
|       1|        Dayton|80563|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       1|   146|
|       2|       Buffalo|17420|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       2|   146|
|       3|       Chicago|44751|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       3|   146|
|       4|       Fremont|20641|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       4|   146|
|       5|Virginia Beach|62389|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       5|   146|
+--------+--------------+-----+--------------------+--------------------+-------+-----+--------+------+
only showing top 5 rows

root
 |-- CidadeID: integer (nullable =

In [0]:
gold_path = f'{gd_dim_path}/CIDADE'

# Salvando os arquivos na Gold
df_cidade_gold.write.format("delta").mode("overwrite").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
spark.read.format("delta").load(gold_path).show(5)

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/dim/CIDADE:
+--------+--------------+-----+--------------------+--------------------+-------+-----+--------+------+
|CidadeID|    NomeCidade|  Cep|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|CidadeSK|PaisSK|
+--------+--------------+-----+--------------------+--------------------+-------+-----+--------+------+
|      31|       Wichita|93028|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       1|   146|
|      85|St. Petersburg|88713|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       2|   146|
|      65|     Baltimore|89197|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       3|   146|
|      53|     Las Vegas|90989|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       4|   146|
|      78|     San Diego|83701|2025-02-21 18:45:...|2025-02-21 18:45:...|   null| true|       5|   146|
+--------+--------------+-----+--------------------+--------------------+-------+-----+--------+------+
only s

In [0]:
# Limpando o cache
limpeza_cache(
    df_cidade, df_cidade_gold, df_gold_ativos, df_gold_inativos,
    df_modificados, df_modificados_inativos, df_pais_gold, 
    df_modificados_novos, df_novas_cidades, df_silver_gold
)

### Dimensão - Clientes
- Aplicar SCD2
- Aplicar SK
- Aplicar relação com Cidade

In [0]:
sv_file = f'{sv_dim_path}/CLIENTE'
gd_file = f'{gd_dim_path}/CLIENTE'
gd_cidade = f'{gd_dim_path}/CIDADE'

# Carregamento de Cliente na Camada Silver
df_cliente = spark.read.format("delta").load(sv_file)

# Procura de dados na camada Gold
try:
    df_cliente_gold = spark.read.format("delta").load(gd_file)
    ultimo_sk = df_cliente_gold.agg(max_("CidadeSK")).collect()[0][0]
    if ultimo_sk is None:
        ultimo_sk = 0
except:
    # Caso ainda não haja dados na Gold, cria-se o formato de tabela vazia
    # já inserindo a SK de Cidade e a SK estrangeira de País
    schema = df_cliente.schema \
                      .add("DataINI", "timestamp") \
                      .add("DataFIM", "timestamp") \
                      .add("Ativo", "boolean") \
                      .add("ClienteSK", "long") \
                      .add("CidadeSK", "long")
    df_cliente_gold = spark.createDataFrame([], schema)
    ultimo_sk = 0


# Ajustando a tipagem da coluna de SK nos Dataframes
df_cliente = df_cliente.withColumn("ClienteSK", lit(None).cast("long"))
df_cliente_gold = df_cliente_gold.withColumn("ClienteSK", col("ClienteSK").cast("long"))

# Trazendo SKs e IDs de Cidade da Gold
df_cidade_gold = spark.read.format("delta") \
    .load(gd_cidade).filter(col("Ativo") == True) \
    .select("CidadeID", "CidadeSK")

# Fazendo a junção entre os clientes da Silver e as cidades da Gold
df_cliente = df_cliente.join(df_cidade_gold, "CidadeID", "left")

# Certifica a existência da coluna CidadeID nos dados da Gold
if "CidadeID" not in df_cliente_gold.columns:
    df_cliente_gold.withColumn("CidadeID", lit(None).cast(dtype))

# Filtra registros ativos e inativos no caso de já existirem registros em Gold
df_gold_ativos = df_cliente_gold.filter(col("Ativo") == True)
df_gold_inativos = df_cliente_gold.filter(col("Ativo") == False)

# Juntando registros ativos já existentes na Gold com os vindos da Silver para achar mudanças
df_silver_gold = df_cliente.alias("silver").join(df_gold_ativos.alias("gold"), "ClienteID", "left")

# Checando se algum registro existente na gold foi modificado por comparação Silver - Gold
df_modificados = df_silver_gold.filter(
    (col("silver.NomeCliente") != col("gold.NomeCliente")) |
    (col("silver.Endereco") != col("gold.Endereco")) |
    (col("silver.CidadeSK") != col("gold.CidadeSK"))
)

# Marca os registros antigos da Gold que foram modificados como INATIVO (False)
df_modificados_inativos = df_modificados.select(
    col("gold.ClienteID"),
    col("gold.ClienteSK"),
    col("gold.NomeCliente"),
    col("gold.Endereco"),
    col("gold.CidadeSK"),
    col("gold.CidadeID"),
    col("gold.DATA_ATUALIZACAO"),
    col("gold.DataINI"),
    current_timestamp().alias("DataFIM"),
    lit(False).alias("Ativo")
)

# Inclusão das SKs nos registros novos que vem da Silver e ATIVAÇÃO dos registros como vigentes (True)
df_modificados_novos = df_modificados.select(
    col("silver.ClienteID"),
    (monotonically_increasing_id() + ultimo_sk + 1).cast("long").alias("ClienteSK"),
    col("silver.NomeCliente"),
    col("silver.Endereco"),
    col("silver.CidadeSK"),
    col("silver.CidadeID"),
    col("silver.DATA_ATUALIZACAO"),
    col("silver.DATA_ATUALIZACAO").alias("DataINI"),
    lit(None).cast("timestamp").alias("DataFIM"),
    lit(True).alias("Ativo")
)

# Contando quantos SKs já temos até o momento (juntando os ativos e os das modificações)
total_sks = ultimo_sk + df_gold_ativos.count() + df_modificados_novos.count()

# Coletando os registros novos que não estavam em Gold ainda e inserindo data e SKs de acordo com a contagem
df_novos_clientes = df_cliente.alias("silver") \
    .join(df_gold_ativos.alias("gold"), "ClienteID", "left_anti") \
    .withColumn("DataINI", col("silver.DATA_ATUALIZACAO")) \
    .withColumn("DataFIM", lit(None).cast("timestamp")) \
    .withColumn("Ativo", lit(True)) \
    .withColumn("ClienteSK", (monotonically_increasing_id() + total_sks + 1).cast("long"))

# Pegando a lista de ClienteIDs dos dados modificados
ids_modificados = df_modificados.select("ClienteID").rdd.flatMap(lambda x: x).collect()

# União dos registros atualizados (inativos e novos ativos) da Gold e novos registros vindos da Silver
df_cliente_gold = df_gold_inativos \
    .unionByName(df_modificados_inativos) \
    .unionByName(df_modificados_novos) \
    .unionByName(df_novos_clientes) \
    .unionByName(df_gold_ativos.filter(~col("ClienteID").isin(ids_modificados)))

# Dropando a coluna PaisID da tabela final
df_cliente_gold = df_cliente_gold.drop("CidadeID")

# Verificando schema antes de salvar
df_cliente_gold.show(5)
df_cliente_gold.printSchema()

+---------+------------------+------------------+--------------------+--------------------+-------+-----+---------+--------+
|ClienteID|       NomeCliente|          Endereco|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|ClienteSK|CidadeSK|
+---------+------------------+------------------+--------------------+--------------------+-------+-----+---------+--------+
|      148|   Darrick H Kirby|     480 Oak Drive|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        1|      68|
|      463|Bryon Q Montgomery|   914 Old Freeway|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        2|      86|
|      471|  Preston Y Vargas|  76 Nobel Parkway|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        3|      11|
|      496|      Leo S Hardin|77 Green Hague Way|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        4|      63|
|      833|   Manuel W Rhodes|    18 Cowley Road|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        5|       1|


In [0]:
gold_path = f'{gd_dim_path}/CLIENTE'

# Salvando os arquivos na Gold
df_cliente_gold.write.format("delta").mode("overwrite").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
spark.read.format("delta").load(gold_path).show(5)

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/dim/CLIENTE:
+---------+---------------+--------------------+--------------------+--------------------+-------+-----+---------+--------+
|ClienteID|    NomeCliente|            Endereco|    DATA_ATUALIZACAO|             DataINI|DataFIM|Ativo|ClienteSK|CidadeSK|
+---------+---------------+--------------------+--------------------+--------------------+-------+-----+---------+--------+
|        1|Stefanie Y Frye|       97 Oak Avenue|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        1|      88|
|        2|  Sandy T Kirby|52 White First Fr...|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        2|      30|
|        3|    Lee T Zhang|921 White Fabien ...|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        3|      47|
|        4| Regina S Avery|       75 Old Avenue|2025-02-21 18:46:...|2025-02-21 18:46:...|   null| true|        4|      26|
|        5|Daniel S Mccann|283 South Green H...|2025-02-21 18:46:...|2025

In [0]:
# Limpando o cache
limpeza_cache(
    df_cidade_gold, df_cliente, df_cliente_gold,
    df_gold_ativos, df_gold_inativos, df_modificados,
    df_modificados_inativos, df_modificados_novos,
    df_novos_clientes, df_silver_gold
)

### Fato - Vendas


In [0]:
sv_file = f'{sv_fato_path}/VENDAS'
gd_file = f'{sv_fato_path}/VENDAS'

gd_cliente = f'{gd_dim_path}/CLIENTE'
gd_vendedor = f'{gd_dim_path}/VENDEDOR'
gd_produto = f'{gd_dim_path}/PRODUTO'

# Carregamento da Fato Vendas na Camada Silver
df_vendas = spark.read.format("delta").load(sv_file)

# Carregando as dimensões Cliente, Vendedor e Produto
df_cliente_gold = spark.read.format("delta").load(gd_cliente).filter(col("Ativo") == True)
df_vendedor_gold = spark.read.format("delta").load(gd_vendedor).filter(col("Ativo") == True)
df_produto_gold = spark.read.format("delta").load(gd_produto).filter(col("Ativo") == True)

# Carregando a fato silver com as referências de Cliente, Vendedor e Produto
df_silver_fato = df_vendas \
    .join(df_cliente_gold.select("ClienteID", "ClienteSK"), "ClienteID", "left") \
    .join(df_vendedor_gold.select("VendedorID", "VendedorSK"), "VendedorID", "left") \
    .join(df_produto_gold.select("ProdutoID", "ProdutoSK"), "ProdutoID", "left")

# Remover as colunas de IDs e manter as SKs
df_silver_fato = df_silver_fato.drop("ClienteID", "VendedorID", "ProdutoID") \
    .withColumnRenamed("ClienteSK", "ClienteSK") \
    .withColumnRenamed("VendedorSK", "VendedorSK") \
    .withColumnRenamed("ProdutoSK", "ProdutoSK")

# Buscando a ultima venda armazenada na Gold
try:
    df_vendas_gold = spark.read.format("delta").load(gd_file)
    ultima_venda = df_vendas_gold.agg(max_("VendasID")).collect()[0][0]
    if ultima_venda is None:
        ultima_venda = 0
except:
    ultima_venda = 0

# Separando os registros novos de venda da Silver que irão pra Gold
df_novas_vendas = df_silver_fato.filter(col("VendasID") > ultima_venda)

df_novas_vendas.show(5)
df_novas_vendas.printSchema()


+--------+----------+--------+------------------+----------+--------------------+----+---+---------+----------+---------+
|VendasID|Quantidade|Desconto|        PrecoTotal| DataVenda|    DATA_ATUALIZACAO| ANO|MES|ClienteSK|VendedorSK|ProdutoSK|
+--------+----------+--------+------------------+----------+--------------------+----+---+---------+----------+---------+
| 1447191|        13|     0.0|          826.1812|2018-02-07|2025-02-21 18:48:...|2018|  2|    48640|        21|      155|
|  233900|         1|     0.2|49.759440000000005|2018-02-07|2025-02-21 18:48:...|2018|  2|     2782|        19|       96|
|  930189|         4|     0.0|          275.5156|2018-02-07|2025-02-21 18:48:...|2018|  2|    15286|        23|      105|
| 4792802|         7|     0.0|          132.8299|2018-02-07|2025-02-21 18:48:...|2018|  2|    26268|        10|      205|
| 6576409|        17|     0.0|492.82829999999996|2018-02-07|2025-02-21 18:48:...|2018|  2|    65099|        17|      338|
+--------+----------+---

In [0]:
gold_path = f'{gd_fato_path}/VENDAS'

# Salvando os arquivos na Gold
df_novas_vendas.write.partitionBy("ANO", "MES").format("delta").mode("append").save(gold_path)

# Verificação dos resultados
print(f'Tabela salva em {gold_path}:')
gold_fato_vendas = spark.read.format("delta").load(gold_path)
gold_fato_vendas.show(5)
print(f"Número de registros na tabela Delta: {gold_fato_vendas.count()}")

Tabela salva em /mnt/desafio-dwe-25/lhdw/gold/fato/VENDAS:
+--------+----------+--------+------------------+----------+--------------------+----+---+---------+----------+---------+
|VendasID|Quantidade|Desconto|        PrecoTotal| DataVenda|    DATA_ATUALIZACAO| ANO|MES|ClienteSK|VendedorSK|ProdutoSK|
+--------+----------+--------+------------------+----------+--------------------+----+---+---------+----------+---------+
| 1447191|        13|     0.0|          826.1812|2018-02-07|2025-02-21 18:48:...|2018|  2|    48640|        21|      155|
|  233900|         1|     0.2|49.759440000000005|2018-02-07|2025-02-21 18:48:...|2018|  2|     2782|        19|       96|
|  930189|         4|     0.0|          275.5156|2018-02-07|2025-02-21 18:48:...|2018|  2|    15286|        23|      105|
| 4792802|         7|     0.0|          132.8299|2018-02-07|2025-02-21 18:48:...|2018|  2|    26268|        10|      205|
| 6576409|        17|     0.0|492.82829999999996|2018-02-07|2025-02-21 18:48:...|2018| 

In [0]:
# Limpeza de cache
limpeza_cache(
    df_cliente_gold, df_novas_vendas,
    df_produto_gold, df_silver_fato,
    df_vendas, df_vendas_gold, df_vendedor_gold
)