# Modelagem Data Vault com PySpark e Delta Lake

Este notebook demonstra como implementar uma arquitetura Data Vault usando PySpark e Delta Lake, com o conjunto de dados de E-commerce brasileiro da Olist.

## 1. Configuração do Ambiente

Primeiro, vamos configurar nossa sessão Spark com suporte ao Delta Lake.

In [1]:
import os
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Configurar a sessão Spark com Delta Lake
spark = SparkSession.builder \
    .appName("DataVaultModeling") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.warehouse.dir", "/home/jovyan/data/delta") \
    .getOrCreate()

# Verificar a versão do Spark
print(f"Versão do Apache Spark: {spark.version}")

# Configurações para melhor desempenho
spark.conf.set("spark.sql.shuffle.partitions", "10")
spark.conf.set("spark.default.parallelism", "10")

Versão do Apache Spark: 3.5.0


## 2. Carregar os Dados Brutos do Minio

Vamos carregar os dados do conjunto de dados da Olist que estão armazenados no Minio.

In [None]:
# Definir os caminhos para os arquivos no Minio
minio_bucket = "datalakeprd"
base_path = f"s3a://{minio_bucket}"

# Carregar os datasets
customers_df = spark.read.csv(f"{base_path}/olist_customers_dataset.csv", header=True, inferSchema=True)
orders_df = spark.read.csv(f"{base_path}/olist_orders_dataset.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv(f"{base_path}/olist_order_items_dataset.csv", header=True, inferSchema=True)
products_df = spark.read.csv(f"{base_path}/olist_products_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv(f"{base_path}/olist_sellers_dataset.csv", header=True, inferSchema=True)

# Visualizar um pouco dos dados
print("Amostra da tabela de Clientes:")
customers_df.show(5)

print("\nAmostra da tabela de Pedidos:")
orders_df.show(5)

## 3. Entendendo o Modelo Data Vault

O Data Vault consiste em três tipos principais de tabelas:

1. **Hubs**: Armazenam as chaves de negócio e são identificadores únicos das entidades
2. **Links**: Armazenam as relações entre os hubs
3. **Satellites**: Armazenam os atributos descritivos dos hubs e links

Vamos implementar um modelo Data Vault para este cenário de e-commerce.

### 3.1 Funções Auxiliares para Data Vault

Vamos criar algumas funções para ajudar na criação das tabelas do Data Vault.

In [None]:
# Função para gerar hash keys para as entidades
def generate_hash_key(df, columns, key_name):
    """Gera uma hash key a partir de uma ou mais colunas."""
    columns_concat = F.concat_ws("|", *[F.col(c) for c in columns])
    return df.withColumn(key_name, F.sha2(columns_concat, 256))

# Função para adicionar metadados padrão do Data Vault
def add_dv_metadata(df):
    """Adiciona colunas de metadados padrão do Data Vault."""
    return df.withColumn("load_date", F.current_timestamp()) \
             .withColumn("record_source", F.lit("OLIST_DATASET"))

# Definir o caminho base para as tabelas Delta
delta_base_path = "/home/jovyan/data/delta"
os.makedirs(delta_base_path, exist_ok=True)

## 4. Criação das Tabelas Hub

Vamos criar as seguintes tabelas Hub:
- Hub_Customer
- Hub_Order
- Hub_Product
- Hub_Seller

In [None]:
# Hub_Customer
hub_customer = customers_df.select("customer_id").distinct()
hub_customer = generate_hash_key(hub_customer, ["customer_id"], "hub_customer_key")
hub_customer = add_dv_metadata(hub_customer)

# Salvar Hub_Customer como Delta
hub_customer_path = f"{delta_base_path}/hub_customer"
hub_customer.write.format("delta").mode("overwrite").save(hub_customer_path)

# Hub_Order
hub_order = orders_df.select("order_id").distinct()
hub_order = generate_hash_key(hub_order, ["order_id"], "hub_order_key")
hub_order = add_dv_metadata(hub_order)

# Salvar Hub_Order como Delta
hub_order_path = f"{delta_base_path}/hub_order"
hub_order.write.format("delta").mode("overwrite").save(hub_order_path)

# Hub_Product
hub_product = products_df.select("product_id").distinct()
hub_product = generate_hash_key(hub_product, ["product_id"], "hub_product_key")
hub_product = add_dv_metadata(hub_product)

# Salvar Hub_Product como Delta
hub_product_path = f"{delta_base_path}/hub_product"
hub_product.write.format("delta").mode("overwrite").save(hub_product_path)

# Hub_Seller
hub_seller = sellers_df.select("seller_id").distinct()
hub_seller = generate_hash_key(hub_seller, ["seller_id"], "hub_seller_key")
hub_seller = add_dv_metadata(hub_seller)

# Salvar Hub_Seller como Delta
hub_seller_path = f"{delta_base_path}/hub_seller"
hub_seller.write.format("delta").mode("overwrite").save(hub_seller_path)

# Visualizar as tabelas Hub criadas
print("Hub_Customer:")
spark.read.format("delta").load(hub_customer_path).show(5)

print("\nHub_Order:")
spark.read.format("delta").load(hub_order_path).show(5)

## 5. Criação das Tabelas Link

Vamos criar as seguintes tabelas Link:
- Link_Customer_Order
- Link_Order_Product
- Link_Product_Seller

In [None]:
# Link_Customer_Order
# Juntar os dados necessários
customer_order_df = orders_df.select("order_id", "customer_id").distinct()

# Juntar com os Hubs para obter as chaves
customer_order_link = customer_order_df.join(
    spark.read.format("delta").load(hub_customer_path),
    on="customer_id"
).join(
    spark.read.format("delta").load(hub_order_path),
    on="order_id"
)

# Gerar a chave composta do link
customer_order_link = generate_hash_key(
    customer_order_link,
    ["hub_customer_key", "hub_order_key"],
    "link_customer_order_key"
)

# Selecionar as colunas necessárias e adicionar metadados
customer_order_link = customer_order_link.select(
    "link_customer_order_key", "hub_customer_key", "hub_order_key", "customer_id", "order_id"
)
customer_order_link = add_dv_metadata(customer_order_link)

# Salvar Link_Customer_Order como Delta
link_customer_order_path = f"{delta_base_path}/link_customer_order"
customer_order_link.write.format("delta").mode("overwrite").save(link_customer_order_path)

# Link_Order_Product (via order_items)
# Obter dados distintos
order_product_df = order_items_df.select("order_id", "product_id").distinct()

# Juntar com os Hubs para obter as chaves
order_product_link = order_product_df.join(
    spark.read.format("delta").load(hub_order_path),
    on="order_id"
).join(
    spark.read.format("delta").load(hub_product_path),
    on="product_id"
)

# Gerar a chave composta do link
order_product_link = generate_hash_key(
    order_product_link,
    ["hub_order_key", "hub_product_key"],
    "link_order_product_key"
)

# Selecionar as colunas necessárias e adicionar metadados
order_product_link = order_product_link.select(
    "link_order_product_key", "hub_order_key", "hub_product_key", "order_id", "product_id"
)
order_product_link = add_dv_metadata(order_product_link)

# Salvar Link_Order_Product como Delta
link_order_product_path = f"{delta_base_path}/link_order_product"
order_product_link.write.format("delta").mode("overwrite").save(link_order_product_path)

# Link_Product_Seller (via order_items)
# Obter dados distintos
product_seller_df = order_items_df.select("product_id", "seller_id").distinct()

# Juntar com os Hubs para obter as chaves
product_seller_link = product_seller_df.join(
    spark.read.format("delta").load(hub_product_path),
    on="product_id"
).join(
    spark.read.format("delta").load(hub_seller_path),
    on="seller_id"
)

# Gerar a chave composta do link
product_seller_link = generate_hash_key(
    product_seller_link,
    ["hub_product_key", "hub_seller_key"],
    "link_product_seller_key"
)

# Selecionar as colunas necessárias e adicionar metadados
product_seller_link = product_seller_link.select(
    "link_product_seller_key", "hub_product_key", "hub_seller_key", "product_id", "seller_id"
)
product_seller_link = add_dv_metadata(product_seller_link)

# Salvar Link_Product_Seller como Delta
link_product_seller_path = f"{delta_base_path}/link_product_seller"
product_seller_link.write.format("delta").mode("overwrite").save(link_product_seller_path)

# Visualizar as tabelas Link criadas
print("Link_Customer_Order:")
spark.read.format("delta").load(link_customer_order_path).show(5)

print("\nLink_Order_Product:")
spark.read.format("delta").load(link_order_product_path).show(5)

## 6. Criação das Tabelas Satellite

Vamos criar as seguintes tabelas Satellite:
- Sat_Customer_Details
- Sat_Order_Details
- Sat_Product_Details
- Sat_Seller_Details
- Sat_Order_Item_Details

In [None]:
# Sat_Customer_Details
# Obter os dados de clientes
customer_details = customers_df
customer_details = customer_details.join(
    spark.read.format("delta").load(hub_customer_path),
    on="customer_id"
)

# Gerar hashkey para os atributos descritivos (para detectar mudanças)
attribute_columns = [
    "customer_unique_id", "customer_zip_code_prefix", 
    "customer_city", "customer_state"
]
customer_details = generate_hash_key(
    customer_details, 
    attribute_columns, 
    "hashdiff"
)

# Selecionar as colunas necessárias e adicionar metadados
customer_details = customer_details.select(
    "hub_customer_key", "hashdiff", "customer_id", *attribute_columns
)
customer_details = add_dv_metadata(customer_details)

# Salvar Sat_Customer_Details como Delta
sat_customer_details_path = f"{delta_base_path}/sat_customer_details"
customer_details.write.format("delta").mode("overwrite").save(sat_customer_details_path)

# Sat_Order_Details
# Obter os dados de pedidos
order_details = orders_df
order_details = order_details.join(
    spark.read.format("delta").load(hub_order_path),
    on="order_id"
)

# Gerar hashkey para os atributos descritivos
attribute_columns = [
    "order_status", "order_purchase_timestamp", "order_approved_at",
    "order_delivered_carrier_date", "order_delivered_customer_date", 
    "order_estimated_delivery_date"
]
order_details = generate_hash_key(
    order_details, 
    attribute_columns, 
    "hashdiff"
)

# Selecionar as colunas necessárias e adicionar metadados
order_details = order_details.select(
    "hub_order_key", "hashdiff", "order_id", *attribute_columns
)
order_details = add_dv_metadata(order_details)

# Salvar Sat_Order_Details como Delta
sat_order_details_path = f"{delta_base_path}/sat_order_details"
order_details.write.format("delta").mode("overwrite").save(sat_order_details_path)

# Sat_Product_Details
# Obter os dados de produtos
product_details = products_df
product_details = product_details.join(
    spark.read.format("delta").load(hub_product_path),
    on="product_id"
)

# Gerar hashkey para os atributos descritivos
attribute_columns = [
    "product_category_name", "product_name_length", "product_description_length",
    "product_photos_qty", "product_weight_g", "product_length_cm", 
    "product_height_cm", "product_width_cm"
]
product_details = generate_hash_key(
    product_details, 
    attribute_columns, 
    "hashdiff"
)

# Selecionar as colunas necessárias e adicionar metadados
product_details = product_details.select(
    "hub_product_key", "hashdiff", "product_id", *attribute_columns
)
product_details = add_dv_metadata(product_details)

# Salvar Sat_Product_Details como Delta
sat_product_details_path = f"{delta_base_path}/sat_product_details"
product_details.write.format("delta").mode("overwrite").save(sat_product_details_path)

# Sat_Seller_Details
# Obter os dados de vendedores
seller_details = sellers_df
seller_details = seller_details.join(
    spark.read.format("delta").load(hub_seller_path),
    on="seller_id"
)

# Gerar hashkey para os atributos descritivos
attribute_columns = [
    "seller_zip_code_prefix", "seller_city", "seller_state"
]
seller_details = generate_hash_key(
    seller_details, 
    attribute_columns, 
    "hashdiff"
)

# Selecionar as colunas necessárias e adicionar metadados
seller_details = seller_details.select(
    "hub_seller_key", "hashdiff", "seller_id", *attribute_columns
)
seller_details = add_dv_metadata(seller_details)

# Salvar Sat_Seller_Details como Delta
sat_seller_details_path = f"{delta_base_path}/sat_seller_details"
seller_details.write.format("delta").mode("overwrite").save(sat_seller_details_path)

# Sat_Order_Item_Details (para o Link_Order_Product)
# Obter os dados de itens de pedido
order_item_details = order_items_df
order_item_details = order_item_details.join(
    order_product_link,
    on=["order_id", "product_id"]
)

# Gerar hashkey para os atributos descritivos
attribute_columns = [
    "order_item_id", "price", "freight_value", "shipping_limit_date"
]
order_item_details = generate_hash_key(
    order_item_details, 
    attribute_columns, 
    "hashdiff"
)

# Selecionar as colunas necessárias e adicionar metadados
order_item_details = order_item_details.select(
    "link_order_product_key", "hashdiff", "order_id", "product_id", *attribute_columns
)
order_item_details = add_dv_metadata(order_item_details)

# Salvar Sat_Order_Item_Details como Delta
sat_order_item_details_path = f"{delta_base_path}/sat_order_item_details"
order_item_details.write.format("delta").mode("overwrite").save(sat_order_item_details_path)

# Visualizar as tabelas Satellite criadas
print("Sat_Customer_Details:")
spark.read.format("delta").load(sat_customer_details_path).show(5)

print("\nSat_Order_Details:")
spark.read.format("delta").load(sat_order_details_path).show(5)

## 7. Consultando o Modelo Data Vault

Agora que temos nosso modelo Data Vault implementado, vamos realizar algumas consultas para demonstrar como ele pode ser usado para responder a perguntas de negócio.

In [None]:
# Exemplo 1: Contagem de pedidos por status
print("Contagem de pedidos por status:")
spark.read.format("delta").load(sat_order_details_path) \
    .groupBy("order_status") \
    .count() \
    .orderBy(F.desc("count")) \
    .show()

# Exemplo 2: Top 10 cidades com mais clientes
print("Top 10 cidades com mais clientes:")
spark.read.format("delta").load(sat_customer_details_path) \
    .groupBy("customer_state", "customer_city") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(10)

# Exemplo 3: Consulta mais complexa para obter detalhes de pedidos com clientes e produtos
print("Exemplo de Business Vault - Detalhes completos de pedidos:")

# Carregar os dados
hub_order = spark.read.format("delta").load(hub_order_path)
sat_order = spark.read.format("delta").load(sat_order_details_path)
link_customer_order = spark.read.format("delta").load(link_customer_order_path)
hub_customer = spark.read.format("delta").load(hub_customer_path)
sat_customer = spark.read.format("delta").load(sat_customer_details_path)

# Construir a consulta Business Vault
business_vault_query = hub_order.join(
    sat_order,
    on="hub_order_key"
).join(
    link_customer_order,
    on="hub_order_key"
).join(
    hub_customer,
    on="hub_customer_key"
).join(
    sat_customer,
    on="hub_customer_key"
)

# Selecionar colunas interessantes
result = business_vault_query.select(
    "order_id", 
    "order_status", 
    "order_purchase_timestamp", 
    "order_delivered_customer_date",
    "customer_id",
    "customer_city",
    "customer_state"
)

# Mostrar os resultados
result.show(10)

## 8. Demonstração de Histórico (Historização)

Uma das grandes vantagens do Data Vault é a capacidade de rastrear mudanças ao longo do tempo. Vamos simular uma atualização em alguns dados para demonstrar como isso funciona.

In [None]:
# Simular uma atualização no status de alguns pedidos
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Carregar pedidos atuais
current_orders = spark.read.format("delta").load(sat_order_details_path)

# Selecionar alguns pedidos para atualizar (por exemplo, os 10 primeiros)
orders_to_update = current_orders.limit(10)

# Mudar o status para 'delivered'
updated_orders = orders_to_update.withColumn("order_status", F.lit("delivered"))

# Recalcular o hashdiff para detectar a mudança
attribute_columns = [
    "order_status", "order_purchase_timestamp", "order_approved_at",
    "order_delivered_carrier_date", "order_delivered_customer_date", 
    "order_estimated_delivery_date"
]
updated_orders = generate_hash_key(
    updated_orders, 
    attribute_columns, 
    "hashdiff"
)

# Adicionar nova data de carregamento para indicar que este é um novo registro
updated_orders = updated_orders.withColumn("load_date", F.current_timestamp())

# Usar a operação MERGE do Delta Lake para adicionar os novos registros
from delta.tables import DeltaTable

# Carregar a tabela Delta existente
deltaTable = DeltaTable.forPath(spark, sat_order_details_path)

# Realizar a operação MERGE para adicionar os novos registros
deltaTable.alias("target").merge(
    updated_orders.alias("updates"),
    "target.hub_order_key = updates.hub_order_key AND target.hashdiff != updates.hashdiff"
).whenNotMatchedInsertAll().execute()

# Verificar o histórico agora (deve conter registros duplicados com datas diferentes)
updated_orders_history = spark.read.format("delta").load(sat_order_details_path)
print("Total de registros após atualização:", updated_orders_history.count())

# Verificar os pedidos atualizados (ordenados por hub_order_key e load_date)
updated_orders_history.filter(
    F.col("hub_order_key").isin(updated_orders.select("hub_order_key").collect()[0][0])
).orderBy("hub_order_key", "load_date").show()

## 9. Demonstração de Linhagem de Dados

Outra vantagem importante do Data Vault é a linhagem de dados (rastreabilidade). Vamos demonstrar como podemos rastrear a origem dos dados.

In [None]:
# Exemplo de rastreabilidade - seguindo um pedido específico desde a fonte até seus relacionamentos
# Escolher um order_id de exemplo
sample_order_id = spark.read.format("delta").load(hub_order_path).limit(1).select("order_id").collect()[0][0]
print(f"Rastreando o pedido: {sample_order_id}")

# 1. Encontrar o Hub_Order
order_hub = spark.read.format("delta").load(hub_order_path).filter(F.col("order_id") == sample_order_id)
print("\n1. Hub_Order:")
order_hub.show()

hub_order_key = order_hub.select("hub_order_key").collect()[0][0]

# 2. Encontrar os detalhes no Satellite
order_details = spark.read.format("delta").load(sat_order_details_path).filter(F.col("hub_order_key") == hub_order_key)
print("\n2. Sat_Order_Details:")
order_details.show()

# 3. Encontrar o cliente relacionado via Link
customer_link = spark.read.format("delta").load(link_customer_order_path).filter(F.col("hub_order_key") == hub_order_key)
print("\n3. Link_Customer_Order:")
customer_link.show()

hub_customer_key = customer_link.select("hub_customer_key").collect()[0][0]

# 4. Encontrar os detalhes do cliente
customer_details = spark.read.format("delta").load(sat_customer_details_path).filter(F.col("hub_customer_key") == hub_customer_key)
print("\n4. Sat_Customer_Details:")
customer_details.show()

# 5. Encontrar os produtos relacionados via Link
product_link = spark.read.format("delta").load(link_order_product_path).filter(F.col("hub_order_key") == hub_order_key)
print("\n5. Link_Order_Product:")
product_link.show()

# Demonstrar que a linhagem completa do pedido está preservada e pode ser facilmente rastreada

## 10. Conclusão

Neste notebook, demonstramos:

1. Como configurar um ambiente de modelagem Data Vault com PySpark e Delta Lake
2. Como criar as diferentes tabelas do Data Vault (Hubs, Links e Satellites)
3. Como consultar o modelo Data Vault para responder perguntas de negócio
4. Como implementar historização de dados no Data Vault
5. Como rastrear a linhagem dos dados através do modelo

Isso fornece uma base sólida para implementar uma arquitetura Data Vault em seu próprio projeto, garantindo:
- Flexibilidade para mudanças de requisitos
- Escalabilidade para grandes volumes de dados
- Rastreabilidade completa da linhagem de dados
- Histórico auditável de todas as alterações

O Data Vault é particularmente útil para Data Warehouses empresariais que precisam suportar requisitos em constante mudança e múltiplas fontes de dados.