# 🚀 Projeto: Construção da Tabela Gold - Fato Vendas

## 📌 Visão Geral

Este módulo representa a construção da **tabela de fato `fato_vendas`** na camada **Gold** do projeto de Data Lakehouse. A tabela é derivada da camada Silver e consolidada com foco em **eficiência analítica**, **integridade dos dados**, **otimização de performance** e **auditoria completa**.

---

## 🎯 Objetivo

Criar uma tabela de fatos robusta que consolide informações de vendas, garantindo:

- 🧾 Unicidade por combinação de `OrderID` + `ItemID`
- 💰 Registro de métricas de vendas como `TotalAmount`, `Quantity` e `Price`
- 🕒 Auditoria temporal com `Created_at` e versionamento Delta
- 🔄 Atualizações seguras com **MERGE Delta**
- ⚡ Performance otimizada com **Z-ORDER** e **particionamento eficiente**

---

## 📐 Detalhes Técnicos

- **Fonte**: Camada Silver (`tabela_prata_desnormalizadas`)
- **Destino**: Camada Gold (`gold.fato_vendas`)
- **Particionamento**: `Status` (categoria do pedido)
- **Chave de Negócio**: Composição de `OrderID` e `ItemID`
- **Colunas da Tabela Fato Vendas**:
  - `OrderID` (identificador do pedido)
  - `ItemID` (identificador do item)
  - `CustomerID` (identificador do cliente)
  - `OrderDate` (data do pedido)
  - `Status` (status do pedido)
  - `Quantity` (quantidade)
  - `Price` (preço unitário)
  - `TotalAmount` (valor total)
  - `Created_at` (timestamp de inserção na camada Gold)

---

## ⚙️ Etapas do Pipeline

1. **Criação do Banco de Dados `gold`**, se não existir.
2. **Leitura da Tabela Silver** contendo dados desnormalizados.
3. **Cache do DataFrame** para otimizar múltiplas operações sequenciais.
4. **Criação da Coluna `Created_at`** para auditoria e rastreabilidade.
5. **Geração da Coluna `hash_value`** para identificação de registros únicos.
6. **Remoção de Duplicatas** usando o hash como referência de unicidade.
7. **Uso de Window Function** para manter apenas o registro mais recente de cada combinação `OrderID` + `ItemID`.
8. **Auditoria Inicial**: contagem de registros únicos preparados para a Gold.
9. **Aplicação do MERGE Delta**: inserção e atualização eficiente baseada nas chaves de negócio.
10. **Auditoria Final**: contagem pós-processamento.
11. **Otimização com Z-ORDER** por `OrderID` para aceleração de leitura.
12. **Limpeza com VACUUM** para liberação de espaço de arquivos antigos.
13. **Registro no Catálogo Hive/Unity Catalog** para permitir consultas SQL e governança.

---

## ✅ Benefícios Técnicos Aplicados

| Técnica                      | Finalidade                                                                 |
|-----------------------------|----------------------------------------------------------------------------|
| `MERGE` Delta               | Atualizações incrementais seguras e sem duplicidade                        |
| `sha2(hash)`                | Identificação rápida e leve de duplicatas completas                        |
| `dropDuplicates` + `Window` | Remoção de linhas redundantes com base nas chaves e timestamp              |
| `cache()`                   | Otimização de múltiplas operações no mesmo DataFrame                       |
| `Z-ORDER`                   | Melhoria significativa no tempo de resposta para filtros por `OrderID`     |
| `VACUUM`                    | Redução de uso de armazenamento e remoção de arquivos obsoletos            |
| `current_timestamp()`       | Registro confiável de quando os dados foram inseridos                      |
| Registro no catálogo        | Acesso via consultas SQL, dashboards BI e governança centralizada          |

---

## 📊 Métricas Utilizáveis

A tabela permite geração de KPIs e análises como:

- 🛍️ Total de vendas por pedido ou cliente
- 📈 Evolução temporal de faturamento
- 📦 Itens mais vendidos
- 🧾 Ticket médio por cliente
- 🧭 Performance de vendas por status (`Status`) ou data (`OrderDate`)

---

## 🧱 Modelo Estrela (Star Schema)

Esta tabela representa a **Fato Vendas** de um **modelo dimensional**, conectada a dimensões como:

- `dim_cliente` → via `CustomerID`
- `dim_tempo` → via `OrderDate`
- `dim_produto` (futura) → via `ItemID`

Permite análises OLAP em dashboards e modelos de Machine Learning supervisionado com base em comportamento de compra.

---





In [0]:
!pip install great_expectations
from pyspark.sql import functions as F
from pyspark.sql.functions import col, current_timestamp, sha2, concat_ws
from pyspark.sql.window import Window
from delta.tables import DeltaTable


# 1. Definição dos Caminhos das Tabelas Delta
# ------------------------------------------
# Definimos os caminhos da camada Silver e Gold onde os dados são lidos e gravados.
# O caminho da camada Silver é onde os dados brutos e desnormalizados estão armazenados.
# O caminho da camada Gold será o destino da tabela Fato de Vendas tratada e otimizada.
SILVER_PATH = "abfss://silver@dlsprojetofixo.dfs.core.windows.net/tabela_prata_desnormalizadas"
GOLD_FACT_PATH = "abfss://gold@dlsprojetofixo.dfs.core.windows.net/gold_fato_vendas"
GOLD_FACT_TABLE = "gold.fato_vendas"

# ------------------------------------------
# 2. Criação do Banco de Dados Gold
# ------------------------------------------
# Criamos o banco de dados "gold" se ele não existir, garantindo que as tabelas
# sejam registradas dentro de um contexto de banco estruturado e organizado.
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

# ------------------------------------------
# 3. Leitura dos Dados da Camada Silver
# ------------------------------------------
# Carregamos os dados da camada Silver (dados desnormalizados) em um DataFrame.
# O Delta Lake permite a leitura eficiente de dados armazenados em formato Delta.
silver_df = spark.read.format("delta").load(SILVER_PATH)

# ------------------------------------------
# 4. Adição de Timestamp de Criação
# ------------------------------------------
# Adicionamos uma coluna "Created_at" com o timestamp atual, para auditar a data e hora
# da inserção dos dados na camada Gold. Isso ajuda a garantir rastreabilidade.
silver_df = silver_df.withColumn("Created_at", current_timestamp())

# ------------------------------------------
# 5. Habilita a atualização automática de schema no Delta Lake
# ------------------------------------------
# Permite que o Delta aceite novos campos durante a escrita sem erro de schema incompatível.
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# ------------------------------------------
#6. Geração do Hash para Identificação de Duplicatas
# ------------------------------------------
# Criamos uma coluna de hash para cada linha do DataFrame. Essa coluna combina várias
# colunas relevantes da tabela e gera uma "assinatura" única para cada linha.
# A vantagem de usar um hash é que facilita a verificação de duplicatas e a comparação
# entre os dados sem precisar de comparações de várias colunas.
silver_df = silver_df.withColumn(
    "hash_value",
    sha2(concat_ws("||",
        col("OrderID").cast("string"),
        col("CustomerID").cast("string"),
        col("ItemID").cast("string"),
        col("OrderDate").cast("string"),
        col("TotalAmount").cast("string"),
        col("Quantity").cast("string"),
        col("Price").cast("string"),
        col("Status").cast("string")
    ), 256)  # Gera um código hash de 256 bits dessa string concatenada
)

# ------------------------------------------
# 7. Limpeza de Dados Duplicados com Base no Hash
# ------------------------------------------
# Removemos as duplicatas, considerando o hash gerado como referência.
# Isso garante que apenas uma instância única de cada linha seja mantida, melhorando a qualidade dos dados.
# A coluna "hash_value" é removida após o processo de limpeza, pois não é necessária para a tabela final.
fact_df_duplicado = (
    silver_df
    .dropDuplicates(["hash_value"])  # Remove linhas duplicadas baseadas no hash
    .drop("hash_value")  # Remove a coluna de hash após a limpeza
    .select(  
        "OrderDate", "OrderID", "Status", "Price", "Quantity", 
        "TotalAmount", "CustomerID", "ItemID", "Created_at"
    )
)

# ------------------------------------------
# 8. Contagem de Registros Únicos
# ------------------------------------------
# Verificamos quantos registros únicos serão processados e inseridos na tabela Gold.
# Isso ajuda a monitorar a eficiência do processo de deduplicação e entender o volume de dados.
unique_count = fact_df_duplicado.select("OrderID", "ItemID").distinct().count()
print(f"🔎 Registros únicos prontos para escrita na Gold: {unique_count}")

# ------------------------------------------
# 9. Cache do DataFrame para Melhorar a Performance
# ------------------------------------------
# O cache do DataFrame ajuda a evitar a recomputação durante a execução de operações subsequentes.
# Isso melhora a performance especialmente quando o DataFrame será utilizado várias vezes em operações subsequentes.
fact_df_duplicado.cache()

# ------------------------------------------
# 10. Escrita na Tabela Delta Gold
# ------------------------------------------
# Escrevemos os dados tratados e deduplicados na tabela Delta Gold. O modo "append" é utilizado para adicionar os dados.
# A opção "mergeSchema" permite que o esquema seja atualizado se necessário.
fact_df_duplicado.write.format("delta")\
    .mode("append")\
    .option("mergeSchema", "true")\
    .save(GOLD_FACT_PATH)

print("✅ Nova tabela Gold Fato escrita com dados únicos.")

# ------------------------------------------
# 11. Otimização da Tabela Delta Gold
# ------------------------------------------
# Realizamos a otimização da tabela Delta Gold com o comando "OPTIMIZE". Isso organiza fisicamente os arquivos
# para melhorar a performance de leitura, especialmente para consultas que envolvem filtros.
# A otimização é feita com base no particionamento e organização dos arquivos no disco.
spark.sql(f"OPTIMIZE delta.`{GOLD_FACT_PATH}`")

print("🚀 Tabela otimizada com sucesso.")

# ------------------------------------------
# 12. Registro no Catálogo Hive/Unity Catalog
# ------------------------------------------
# Registramos a tabela Gold no catálogo para que ela esteja disponível para consultas SQL e outros processos.
# Isso permite a governança dos dados e facilita a gestão e o uso dos dados na plataforma.
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {GOLD_FACT_TABLE}
USING DELTA
LOCATION '{GOLD_FACT_PATH}'
""")
print("📚 Tabela Gold Fato registrada no catálogo.")

# ------------------------------------------
# 13. Aplicação de Z-ORDER para Otimizar Consultas
# ------------------------------------------
# O Z-ORDER é aplicado para otimizar consultas que envolvem filtros em colunas específicas.
# Essa técnica melhora o tempo de resposta ao garantir que os dados relacionados sejam fisicamente armazenados
# próximos uns dos outros no armazenamento, reduzindo o custo de I/O.
spark.sql(f"OPTIMIZE delta.`{GOLD_FACT_PATH}` ZORDER BY (OrderID)")

print("🚀 Z-ORDER aplicado com sucesso.")

# ------------------------------------------
# 14. Limpeza com VACUUM
# ------------------------------------------
# O comando VACUUM é usado para remover arquivos obsoletos e antigos, liberando espaço de armazenamento.
# Ele elimina arquivos que não são mais necessários após as operações de atualização e exclusão.
# Isso ajuda a otimizar o uso de armazenamento no Delta Lake.
spark.sql(f"VACUUM delta.`{GOLD_FACT_PATH}` RETAIN 168 HOURS")

print("🧹 Espaço de armazenamento liberado com VACUUM.")

# ------------------------------------------
# 15. Resumo Final
# ------------------------------------------
# Exibimos um resumo do processo, com a contagem final de registros inseridos na tabela Gold.
# Isso ajuda na monitoração e verificação do sucesso do pipeline.
print("\n📌 Resumo final:")
print(f"- Total de registros únicos inseridos na Gold: {unique_count}")


Collecting great_expectations
  Obtaining dependency information for great_expectations from https://files.pythonhosted.org/packages/62/cd/71b371a7b219e583e96fb3dde787f785538a705a7678aae0e7a592d43a87/great_expectations-1.4.1-py3-none-any.whl.metadata
  Downloading great_expectations-1.4.1-py3-none-any.whl.metadata (8.8 kB)
Collecting altair<5.0.0,>=4.2.1 (from great_expectations)
  Obtaining dependency information for altair<5.0.0,>=4.2.1 from https://files.pythonhosted.org/packages/18/62/47452306e84d4d2e67f9c559380aeb230f5e6ca84fafb428dd36b96a99ba/altair-4.2.2-py3-none-any.whl.metadata
  Downloading altair-4.2.2-py3-none-any.whl.metadata (13 kB)
Collecting jinja2>=3 (from great_expectations)
  Obtaining dependency information for jinja2>=3 from https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl.metadata
  Downloading jinja2-3.1.6-py3-none-any.whl.metadata (2.9 kB)
Collecting jsonschema>=2.5.1 (from g

In [0]:
# 📊 Monitoramento e Governança - Log de Execução do Pipeline Gold (Fato Vendas)

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from datetime import datetime
import time

# ------------------------------------------
# 1. Início da contagem do tempo de execução
# ------------------------------------------
start_time = time.time()

# ------------------------------------------
# 2. Parâmetros do log
# ------------------------------------------
job_name = "gold_fato_vendas"
status = "SUCESSO"
erro = None

try:
    # Quantidade de registros únicos inseridos, já definida anteriormente no seu pipeline
    qtd_linhas = unique_count

except Exception as e:
    status = "ERRO"
    erro = str(e)
    qtd_linhas = 0

# ------------------------------------------
# 3. Cálculo do tempo total da execução
# ------------------------------------------
tempo_total = round(time.time() - start_time, 2)

# ------------------------------------------
# 4. Definição do schema do log para evitar erro de inferência
# ------------------------------------------
schema_log = StructType([
    StructField("job_name", StringType(), True),
    StructField("data_execucao", StringType(), True),
    StructField("qtd_linhas", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("erro", StringType(), True),
    StructField("tempo_total_segundos", DoubleType(), True)
])

# ------------------------------------------
# 5. Criação do DataFrame de log
# ------------------------------------------
log_execucao_df = spark.createDataFrame([(
    job_name,
    datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    qtd_linhas,
    status,
    erro,
    tempo_total
)], schema=schema_log)

# ------------------------------------------
# 6. Escrita do log no Delta Lake (pasta de log específica da Fato Vendas)
# ------------------------------------------
log_execucao_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("abfss://gold@dlsprojetofixo.dfs.core.windows.net/log_execucoes_gold_fato_vendas")

print(f"📌 Log da execução do job '{job_name}' registrado com sucesso.")


📌 Log da execução do job 'gold_fato_vendas' registrado com sucesso.


In [0]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.log_execucoes_gold_fato_vendas
    USING DELTA
    LOCATION 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/log_execucoes_gold_fato_vendas'
""")

DataFrame[]

In [0]:
%sql
SELECT * FROM gold.log_execucoes_gold_fato_vendas

job_name,data_execucao,qtd_linhas,status,erro,tempo_total_segundos
gold_fato_vendas,2025-04-22 02:48:15,29945,SUCESSO,,0.0
gold_fato_vendas,2025-04-23 00:29:34,29945,SUCESSO,,0.0
