# üöÄ Projeto: Constru√ß√£o da Tabela Gold - Dimens√£o Produto

## üìå Vis√£o Geral

Este m√≥dulo representa a constru√ß√£o da **tabela de dimens√£o dim_produto** na camada **Gold** do projeto de Data Lakehouse. A tabela √© derivada da Silver e foi desenhada para armazenar as informa√ß√µes relevantes dos produtos vendidos, com **altos padr√µes de qualidade de dados**, **otimiza√ß√£o de leitura** e **governan√ßa**.

---

## üéØ Objetivo

Criar uma tabela dimensional que represente os produtos, garantindo:

- üì¶ Dados √∫nicos e limpos de cada produto (ItemID)
- ‚öôÔ∏è Estrutura otimizada para consultas anal√≠ticas
- üìà Rastreabilidade com controle de cria√ß√£o (Created_at)
- üîÑ Atualiza√ß√µes eficientes via **MERGE Delta**
- üöÄ Alto desempenho com **Z-ORDER** e **reparticionamento estrat√©gico**

---

## üìê Detalhes T√©cnicos

- **Fonte**: Camada Silver (tabela_prata_desnormalizadas)
- **Destino**: Camada Gold (gold.dim_produto)
- **Particionamento**: ItemID (100 parti√ß√µes)
- **Chave de Neg√≥cio**: ItemID
- **Colunas da Dimens√£o Produto**:
  - ItemID (identificador do produto)
  - ProductName (nome do produto)
  - Created_at (timestamp de inser√ß√£o na tabela Gold)

---

## ‚öôÔ∏è Etapas do Pipeline

1. **Leitura da Tabela Silver** com controle de parti√ß√µes.
2. **Sele√ß√£o das Colunas Relevantes**: apenas ItemID e ProductName.
3. **Limpeza de Dados**: remo√ß√£o de duplicatas e valores nulos.
4. **Adi√ß√£o de Auditoria Temporal** com a coluna Created_at.
5. **Cache dos Dados** para melhorar performance nas opera√ß√µes subsequentes.
6. **Auditoria Inicial**: contagem dos produtos √∫nicos existentes.
7. **Aplica√ß√£o do MERGE Delta**: inser√ß√£o e atualiza√ß√£o baseada em ItemID.
8. **Auditoria Final**: nova contagem dos produtos √∫nicos p√≥s-processamento.
9. **Otimiza√ß√£o com Z-ORDER** por ItemID.
10. **Limpeza com VACUUM** para libera√ß√£o de espa√ßo.
11. **Registro no Cat√°logo Hive/Unity Catalog** para consultas SQL.

---

## ‚úÖ Benef√≠cios T√©cnicos Aplicados

| T√©cnica                      | Finalidade                                                                |
|----------------------------  | --------------------------------------------------------------------------|
| MERGE Delta                | Evita duplicidade e garante atualiza√ß√µes incrementais                     |
| Reparticionamento            | Aumenta performance de escrita/leitura por chave de neg√≥cio (ItemID)    |
| dropDuplicates + na.drop | Garante consist√™ncia e integridade dos dados                              |
| Z-ORDER                    | Melhora tempo de resposta nas consultas filtradas                         |
| VACUUM                     | Reduz uso de armazenamento eliminando arquivos obsoletos                  |
| Registro no cat√°logo         | Permite uso via SQL e dashboards com governan√ßa centralizada              |

---

## üß± Modelo Estrela (Star Schema)

Esta tabela √© utilizada como uma **Dimens√£o** em um **Esquema Estrela**, onde ser√° relacionada √† Tabela Fato de Vendas atrav√©s da chave ItemID, permitindo an√°lises como:

- Total de vendas por produto
- Produtos mais vendidos por per√≠odo
- Compara√ß√µes de performance entre categorias

---


In [0]:
from pyspark.sql.functions import col, current_timestamp
from delta.tables import DeltaTable


# 1. Defini√ß√£o dos caminhos das tabelas Delta na camada Silver e Gold
# ------------------------------------------
# Estes caminhos apontam para as localiza√ß√µes f√≠sicas no Data Lake onde os dados Delta s√£o armazenados.
# S√£o usados para leitura, escrita e opera√ß√µes de manuten√ß√£o da tabela de dimens√£o produto.
SILVER_PATH = "abfss://silver@dlsprojetofixo.dfs.core.windows.net/tabela_prata_desnormalizadas"
GOLD_DIM_PRODUCT_PATH = "abfss://gold@dlsprojetofixo.dfs.core.windows.net/gold_dim_produto"
GOLD_DIM_PRODUCT_TABLE = "gold.dim_produto"

# ------------------------------------------
# 2. Verifica se o banco de dados 'gold' existe e cria caso n√£o exista
# ------------------------------------------
# Criar o banco de dados 'gold' se n√£o existir para garantir que a tabela seja registrada corretamente.
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

# ------------------------------------------
# 3. Leitura dos dados da camada Silver (prata) no formato Delta
# ------------------------------------------
# Carrega os dados desnormalizados da camada intermedi√°ria Silver, que j√° passaram por tratamento inicial.
silver_df = spark.read.format("delta").load(SILVER_PATH)

# ------------------------------------------
# 4. Reparticionamento dos dados por 'ItemID' para balanceamento
# ------------------------------------------
# Reparticiona os dados para melhorar performance de escrita e leitura, especialmente em joins ou filtros por 'ItemID'.
silver_df = silver_df.repartition(100, "ItemID")  # Ajustar a quantidade conforme o volume de dados

# ------------------------------------------
# 5. Sele√ß√£o das colunas relevantes para a dimens√£o Produto
# ------------------------------------------
# Aqui extra√≠mos apenas os campos necess√°rios para a constru√ß√£o da dimens√£o: identificador e nome do produto.
dim_product_df = silver_df.select("ItemID", "ProductName")

# ------------------------------------------
# 6. Remo√ß√£o de duplicatas e valores nulos
# ------------------------------------------
# Elimina duplicidades por 'ItemID' e descarta linhas com nomes de produtos nulos.
dim_product_df = dim_product_df.dropDuplicates(["ItemID"])
dim_product_df = dim_product_df.na.drop(subset=["ProductName"])

# ------------------------------------------
# 7. Inclus√£o de coluna de auditoria 'Created_at'
# ------------------------------------------
# A coluna 'Created_at' registra a data/hora da carga, √∫til para rastreabilidade dos dados.
dim_product_df = dim_product_df.withColumn("Created_at", current_timestamp())

# ------------------------------------------
# 8. Cache do DataFrame para otimizar performance em m√∫ltiplas a√ß√µes subsequentes
# ------------------------------------------
# Evita recomputa√ß√µes desnecess√°rias durante a execu√ß√£o do pipeline.
dim_product_df.cache()

# ------------------------------------------
# 9. Auditoria: contagem de registros distintos antes do MERGE
# ------------------------------------------
# Carrega a vers√£o atual da dimens√£o, caso exista, e conta o n√∫mero de produtos distintos.
dim_product_count_before = 0
if DeltaTable.isDeltaTable(spark, GOLD_DIM_PRODUCT_PATH):
    existing_dim_product_df = spark.read.format("delta").load(GOLD_DIM_PRODUCT_PATH)
    dim_product_count_before = existing_dim_product_df.select("ItemID").distinct().count()

print(f"Total de registros distintos antes do processamento (Gold Dim Produto): {dim_product_count_before}")

# ------------------------------------------
# 10. Habilita a atualiza√ß√£o autom√°tica de schema no Delta Lake
# ------------------------------------------
# Permite que o Delta aceite novos campos durante a escrita sem erro de schema incompat√≠vel.
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# ------------------------------------------
# 11. MERGE ou cria√ß√£o da tabela Gold Dim Produto
# ------------------------------------------
# Se a tabela j√° existir, faz um MERGE para inserir/atualizar registros com base no 'ItemID'.
# Caso contr√°rio, cria a tabela do zero com os dados atuais.
if DeltaTable.isDeltaTable(spark, GOLD_DIM_PRODUCT_PATH):
    dim_product_table = DeltaTable.forPath(spark, GOLD_DIM_PRODUCT_PATH)

    dim_product_table.alias("target").merge(
        dim_product_df.alias("source"), "target.ItemID = source.ItemID"
    ).whenMatchedUpdate(set={
        "ProductName": col("source.ProductName"),
        "Created_at": col("source.Created_at")
    }).whenNotMatchedInsert(values={
        "ItemID": col("source.ItemID"),
        "ProductName": col("source.ProductName"),
        "Created_at": col("source.Created_at")
    }).execute()

    print("Dados adicionados com sucesso na tabela Gold Dim Produto.")
else:
    dim_product_df.write.format("delta").mode("append").option("mergeSchema", "true").save(GOLD_DIM_PRODUCT_PATH)
    print("Tabela Gold Dim Produto criada com sucesso.")

# ------------------------------------------
# 12. Auditoria: contagem de registros distintos ap√≥s o MERGE
# ------------------------------------------
# Compara o total de registros distintos depois da carga para garantir a integridade e rastreabilidade.
dim_product_count_after = spark.read.format("delta").load(GOLD_DIM_PRODUCT_PATH).select("ItemID").distinct().count()
print(f"Total de registros distintos ap√≥s o processamento (Gold Dim Produto): {dim_product_count_after}")

# ------------------------------------------
# 13. Otimiza√ß√£o com Z-ORDER para consultas eficientes
# ------------------------------------------
# O Z-ORDER organiza fisicamente os dados para melhorar a performance de leitura por colunas consultadas frequentemente.
spark.sql(f"OPTIMIZE delta.`{GOLD_DIM_PRODUCT_PATH}` ZORDER BY (ItemID)")
print("Tabela Gold Dim Produto otimizada com Z-ORDER para melhorar a performance.")

# ------------------------------------------
# 14. VACUUM para remo√ß√£o de arquivos obsoletos
# ------------------------------------------
# Executa a limpeza de arquivos antigos n√£o referenciados pela tabela nos √∫ltimos 7 dias (168 horas).
# Libera espa√ßo e mant√©m a efici√™ncia do armazenamento.
spark.sql(f"VACUUM delta.`{GOLD_DIM_PRODUCT_PATH}` RETAIN 168 HOURS")
print("VACUUM executado para liberar espa√ßo de armazenamento.")

# ------------------------------------------
# 15. Registro da tabela no cat√°logo (Metastore) do Spark
# ------------------------------------------
# Permite que a tabela seja consultada via SQL, conectores de BI e outros sistemas que usam o cat√°logo.
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {GOLD_DIM_PRODUCT_TABLE}
USING DELTA
LOCATION '{GOLD_DIM_PRODUCT_PATH}'
""")
print("Tabela Gold Dim Produto registrada no cat√°logo com sucesso.")

# ------------------------------------------
# 16. Resumo final das contagens para auditoria
# ------------------------------------------
# Apresenta de forma clara o antes e depois da quantidade de produtos √∫nicos processados.
print("\nResumo das contagens distintas:")
print(f"Total distinto antes do processamento (Gold Dim Produto): {dim_product_count_before}")
print(f"Total distinto ap√≥s o processamento (Gold Dim Produto): {dim_product_count_after}")


Total de registros distintos antes do processamento (Gold Dim Produto): 29945
Dados adicionados com sucesso na tabela Gold Dim Produto.
Total de registros distintos ap√≥s o processamento (Gold Dim Produto): 29945
Tabela Gold Dim Produto otimizada com Z-ORDER para melhorar a performance.
VACUUM executado para liberar espa√ßo de armazenamento.
Tabela Gold Dim Produto registrada no cat√°logo com sucesso.

Resumo das contagens distintas:
Total distinto antes do processamento (Gold Dim Produto): 29945
Total distinto ap√≥s o processamento (Gold Dim Produto): 29945


In [0]:
%sql
SELECT * FROM gold.dim_produto LIMIT 20

ItemID,ProductName,Created_at
0191a621-1663-465a-826c-872de1b756f1,Headphones,2025-04-22T02:36:09.232Z
01d9c3e2-6f2f-433e-8df0-90c3f3cb0851,Smartwatch,2025-04-22T02:36:09.232Z
020a1034-09ca-473b-a0f7-db3804659057,Tablet,2025-04-22T02:36:09.232Z
020c3d9a-5162-463d-a65f-7b3d61b88df9,External Hard Drive,2025-04-22T02:36:09.232Z
026b32c6-fbac-4a22-85de-ee4e6ac3fc8c,USB Cable,2025-04-22T02:36:09.232Z
02ed91fb-6890-4840-873d-ff6f7d31795a,Power Supply,2025-04-22T02:36:09.232Z
034d27d4-857e-4a0e-83b1-609a7e06c788,Scanner,2025-04-22T02:36:09.232Z
042f561d-78b4-4ab9-a40e-2e9bb25e8173,USB Cable,2025-04-22T02:36:09.232Z
04dff3d1-4b83-4631-897a-0ad73005107d,Webcam,2025-04-22T02:36:09.232Z
067395c3-5bfa-42fa-aeaf-2cafc9654b75,Mouse,2025-04-22T02:36:09.232Z


In [0]:
# üìä Monitoramento e Governan√ßa - Log de Execu√ß√£o do Pipeline Gold (Dimens√£o Produto)

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from datetime import datetime
import time

# ------------------------------------------
# 1. In√≠cio da contagem do tempo de execu√ß√£o
# ------------------------------------------
start_time = time.time()

# ------------------------------------------
# 2. Par√¢metros do log
# ------------------------------------------
job_name = "gold_dim_produto"
status = "SUCESSO"
erro = None

try:
    # Contagem de registros distintos no DataFrame final da Gold
    qtd_linhas = dim_product_df.select("ItemID").distinct().count()

except Exception as e:
    status = "ERRO"
    erro = str(e)
    qtd_linhas = 0

# ------------------------------------------
# 3. C√°lculo do tempo total de execu√ß√£o (em segundos)
# ------------------------------------------
tempo_total = round(time.time() - start_time, 2)

# ------------------------------------------
# 4. Esquema expl√≠cito do log para evitar erro de infer√™ncia
# ------------------------------------------
schema_log = StructType([
    StructField("job_name", StringType(), True),
    StructField("data_execucao", StringType(), True),
    StructField("qtd_linhas", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("erro", StringType(), True),
    StructField("tempo_total_segundos", DoubleType(), True)
])

# ------------------------------------------
# 5. Cria√ß√£o do DataFrame com os dados do log
# ------------------------------------------
log_execucao_df = spark.createDataFrame([(
    job_name,
    datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    qtd_linhas,
    status,
    erro,
    tempo_total
)], schema=schema_log)

# ------------------------------------------
# 6. Escrita do log no Delta Lake no container Gold
# ------------------------------------------
log_execucao_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("abfss://gold@dlsprojetofixo.dfs.core.windows.net/log_execucoes_gold_dim_produto")

print(f"üìå Log da execu√ß√£o do job '{job_name}' registrado com sucesso.")


üìå Log da execu√ß√£o do job 'gold_dim_produto' registrado com sucesso.


In [0]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.log_execucoes_gold_dim_produto
    USING DELTA
    LOCATION 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/log_execucoes_gold_dim_produto'
""")

DataFrame[]

In [0]:
%sql
SELECT * FROM gold.log_execucoes_gold_dim_produto

job_name,data_execucao,qtd_linhas,status,erro,tempo_total_segundos
gold_dim_produto,2025-04-22 02:33:51,0,ERRO,name 'dim_product_df' is not defined,0.0
gold_dim_produto,2025-04-22 02:38:07,29945,SUCESSO,,6.46
