### Carga Incremental na Camada Gold (Delta)

O código apresentado é parte do processo de carga incremental de dados na Camada Gold da arquitetura Medallion. Essa camada é responsável por armazenar dados otimizados para consumo em análises, com tabelas de fatos e dimensões organizadas e enriquecidas.

No exemplo, a configuração da SparkSession e os caminhos para as camadas Silver e Gold no Data Lake são definidos, preparando o ambiente para leitura e gravação de dados no formato Delta.

In [0]:
# Importação da biblioteca necessária para criar a SparkSession
from pyspark.sql import SparkSession

# Configuração da SparkSession para suporte ao Delta Lake
spark = SparkSession.builder \
    .appName("Carga Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Definindo os caminhos para as camadas no Data Lake
silver_path = "/mnt/lhdw/silver/vendas"  # Localização dos dados refinados da camada Silver
gold_path = "/mnt/lhdw/gold/vendas_delta"  # Localização para salvar as dimensões na camada Gold
gold_fato_path = "/mnt/lhdw/gold/vendas_delta/fato_vendas"  # Localização específica para tabela fato


### Leitura Incremental de Dados na Camada Silver

O código realiza a leitura incremental da Camada Silver com base na maior data já registrada na tabela Fato Vendas na Camada Gold. Essa abordagem é essencial para garantir eficiência e evitar reprocessamento de dados já consumidos.

In [0]:
from pyspark.sql.functions import date_sub, lit  # Importação de funções auxiliares

# Ler a maior data de venda da tabela Fato Vendas na Camada Gold
# selectExpr("max(DataVenda) as MaxDataVenda"): Obtém o valor máximo da coluna DataVenda
# collect()[0]["MaxDataVenda"]: Recupera o valor do registro coletado para uso no filtro
max_data_venda = spark.read.format("delta").load(gold_fato_path) \
    .selectExpr("max(DataVenda) as MaxDataVenda") \
    .collect()[0]["MaxDataVenda"]

# Exibir a maior data obtida
display(max_data_venda)

# Carregar os dados da Camada Silver filtrando apenas os registros mais recentes que a DataVenda obtida
df_silver = spark.read.format("parquet").load(silver_path) \
    .filter(f"Data > '{max_data_venda}'")  # Filtra os dados cuja Data é maior que a maior DataVenda

# Contar o número de registros no DataFrame carregado
df_silver.count()
                        

datetime.date(2012, 12, 31)Out[6]: 0

### Criação da Dimensão Produto com Atualização Temporal

O código cria a tabela de dimensão Produto na Camada Gold, atribuindo chaves substitutas (surrogate keys) e adicionando uma coluna para rastrear a data de atualização dos registros. Isso é essencial para garantir a integridade das dimensões e habilitar análises temporais e incrementais.

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp  # Importação de funções necessárias

# Nome da tabela de destino
tb_destino = "dim_produto"

# Passo 1 - Extrair produtos únicos para a dimensão Produto
dim_produto_df = df_silver.select(
    "IDProduto", "Produto", "Categoria"  # Seleção das colunas relevantes
).dropDuplicates()  # Remover duplicados para garantir unicidade

# Passo 2 - Adicionar chave substituta (surrogate keys) e data de atualização
dim_produto_df = dim_produto_df.withColumn("sk_produto", monotonically_increasing_id() + 1) \
                               .withColumn("data_atualizacao", current_timestamp())  # Adicionar timestamp atual

# Passo 3 - Escrever a Dimensão Produto no formato Delta
# option("mergeSchema", "true"): Garantir compatibilidade de esquema
dim_produto_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(f"{gold_path}/{tb_destino}")

# Passo 4 - Visualizar a tabela gerada
display(dim_produto_df)  # Exibe os dados carregados


IDProduto,Produto,Categoria,sk_produto,data_atualizacao


### Criação da Dimensão Categoria com Atualização Temporal

O código constrói a tabela de dimensão Categoria na Camada Gold, utilizando chaves substitutas (surrogate keys) e registrando a data de atualização para garantir a rastreabilidade dos dados. Essa dimensão é fundamental em modelos dimensionais para organizar categorias únicas associadas a outros dados, como vendas ou produtos.

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp  # Importação de funções necessárias

# Nome da tabela de destino no Delta Lake
tb_destino = "dim_categoria"

# Passo 1 - Extrair categorias únicas para a dimensão
dim_categoria_df = df_silver.select(
    "Categoria"  # Seleciona apenas a coluna relevante
).dropDuplicates()  # Remove duplicatas para garantir unicidade das categorias

# Passo 2 - Adicionar chave substituta (surrogate keys) e data de atualização
dim_categoria_df = dim_categoria_df.withColumn("sk_categoria", monotonically_increasing_id() + 1) \
                                   .withColumn("data_atualizacao", current_timestamp())  # Adiciona o timestamp atual

# Passo 3 - Escrever a Dimensão Categoria no formato Delta
dim_categoria_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(f"{gold_path}/{tb_destino}")


### Criação da Dimensão Segmento com Atualização Temporal

Este código constrói a tabela de dimensão Segmento na Camada Gold, utilizando chaves substitutas (surrogate keys) e adicionando uma coluna de controle com a data de atualização. A dimensão Segmento é usada em modelos analíticos para categorizar ou agrupar dados transacionais, permitindo análises mais detalhadas e organizadas.



In [0]:
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp  # Importação das funções necessárias

# Nome da tabela de destino no Delta Lake
tb_destino = "dim_segmento"

# Passo 1 - Extrair segmentos únicos para a dimensão
dim_segmento_df = df_silver.select(
    "Segmento"  # Seleciona apenas a coluna Segmento
).dropDuplicates()  # Remove duplicatas, garantindo unicidade

# Passo 2 - Adicionar chave substituta (surrogate key) e data de atualização
dim_segmento_df = dim_segmento_df.withColumn("sk_segmento", monotonically_increasing_id() + 1) \
                                 .withColumn("data_atualizacao", current_timestamp())  # Adiciona timestamp atual

# Passo 3 - Escrever a Dimensão Segmento no formato Delta
dim_segmento_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(f"{gold_path}/{tb_destino}")


### Criação da Dimensão Fabricante com Atualização Temporal

Explicação: Criação da Dimensão Fabricante com Atualização Temporal
Este código cria a tabela de dimensão Fabricante na Camada Gold do Data Lake. Ele utiliza chaves substitutas (surrogate keys) para garantir um identificador único por fabricante e adiciona uma coluna de controle para rastrear a data de atualização. A dimensão Fabricante é essencial para identificar e categorizar produtos por seus fabricantes.

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp  # Importação de funções para manipulação de dados

# Nome da tabela de destino no Delta Lake
tb_destino = "dim_fabricante"

# Passo 1 - Extrair fabricantes únicos para a dimensão Fabricante
dim_fabricante_df = df_silver.select(
    "IDFabricante",  # Identificador único do fabricante
    "Fabricante"  # Nome do fabricante
).dropDuplicates()  # Remove duplicatas para garantir unicidade

# Passo 2 - Adicionar chave substituta e coluna de controle
dim_fabricante_df = dim_fabricante_df.withColumn("sk_fabricante", monotonically_increasing_id() + 1) \
                                      .withColumn("data_atualizacao", current_timestamp())  # Adiciona a data de atualização atual

# Passo 3 - Escrever a tabela DimFabricante no formato Delta
dim_fabricante_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(f"{gold_path}/{tb_destino}")


### Criação da Dimensão Geografia

Este código cria a dimensão Geografia na camada Gold, consolidando informações relacionadas a localidades únicas, como cidade, estado e país. A dimensão Geografia é fundamental para análises geográficas e relatórios que envolvem distribuição regional de vendas, clientes ou produtos.



In [0]:
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp  # Importação de funções necessárias

# Nome da tabela de destino no Delta Lake
tb_destino = "dim_geografia"

# Passo 1 - Extrair informações geográficas únicas para a dimensão Geografia
dim_geografia_df = df_silver.select(
    "Cidade",       # Nome da cidade
    "Estado",       # Nome do estado
    "Regiao",       # Região associada
    "Distrito",     # Distrito administrativo
    "Pais",         # Nome do país
    "CodigoPostal"  # Código postal
).dropDuplicates()  # Remove duplicatas para garantir unicidade

# Passo 2 - Adicionar chave substituta (SK) e coluna de controle
dim_geografia_df = dim_geografia_df.withColumn("sk_geografia", monotonically_increasing_id() + 1) \
                                   .withColumn("data_atualizacao", current_timestamp())  # Data de atualização atual

# Passo 3 - Escrever a tabela DimGeografia no formato Delta
dim_geografia_df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(f"{gold_path}/{tb_destino}")



### Criação da Dimensão Cliente

O código cria a tabela de dimensão Cliente na Camada Gold do Data Lake. Essa tabela relaciona os clientes com a geografia associada por meio de chaves substitutas (surrogate keys), facilitando análises futuras.

In [0]:
# Nome da tabela de destino no Delta Lake
tb_destino = "dim_cliente"

from pyspark.sql.functions import col, monotonically_increasing_id, current_timestamp

# Passo 1: Selecionar dados únicos para os clientes
dim_cliente_df = df_silver.select(
    "IDCliente",     # Identificador único do cliente
    "Nome",          # Nome do cliente
    "Email",         # E-mail do cliente
    "Cidade",        # Cidade associada ao cliente
    "Estado",        # Estado do cliente
    "Regiao",        # Região (ex.: Norte, Sul)
    "Distrito",      # Distrito do cliente
    "Pais",          # País do cliente
    "CodigoPostal"   # CEP do cliente
).dropDuplicates()   # Remove duplicatas para garantir unicidade

# Passo 2: Associar SK_Geografia com base nos dados geográficos
dim_cliente_com_sk_df = dim_cliente_df.alias("cliente") \
    .join(dim_geografia_df.alias("geografia"), 
          (col("cliente.Cidade") == col("geografia.Cidade")) &           # Combina cidade
          (col("cliente.Estado") == col("geografia.Estado")) &           # Combina estado
          (col("cliente.Regiao") == col("geografia.Regiao")) &           # Combina região
          (col("cliente.Distrito") == col("geografia.Distrito")) &       # Combina distrito
          (col("cliente.Pais") == col("geografia.Pais")) &               # Combina país
          (col("cliente.CodigoPostal") == col("geografia.CodigoPostal")), # Combina CEP
          "left") \
    .select("cliente.IDCliente", "cliente.Nome", "cliente.Email", "geografia.sk_geografia")  # Seleciona colunas necessárias

# Passo 3: Adicionar chave substituta e coluna de auditoria
dim_cliente_com_sk_df = dim_cliente_com_sk_df.withColumn("sk_cliente", monotonically_increasing_id()+1) \
                                             .withColumn("data_atualizacao", current_timestamp())  # Data e hora da atualização

# Passo 4: Selecionar colunas finais
dim_cliente_com_sk_df = dim_cliente_com_sk_df.select(
    "IDCliente",       # Identificador original do cliente
    "Nome",            # Nome do cliente
    "Email",           # E-mail do cliente
    "sk_geografia",    # Chave substituta da geografia
    "sk_cliente",      # Chave substituta do cliente
    "data_atualizacao" # Data e hora da última atualização
)

# Passo 5: Escrever a tabela no formato Delta
dim_cliente_com_sk_df.write.format("delta") \
                           .mode("overwrite") \
                           .option("mergeSchema", "true") \
                           .save(f"{gold_path}/{tb_destino}")  # Caminho da tabela na Camada Gold


### Criação de Tabela Fato

Este código cria a tabela fato_vendas na Camada Gold do Data Lake. A tabela fato consolida os dados transacionais, associando-os às tabelas de dimensões por meio de chaves substitutas (SKs) e particiona os dados para otimizar consultas.

In [0]:
# Nome da tabela de destino no Delta Lake
tb_destino = "fato_vendas"

from pyspark.sql.functions import broadcast, year, month, current_timestamp

# Passo 1: Juntar dados da Silver com as tabelas de dimensões
fato_vendas_df = df_silver.alias("s") \
    .join(broadcast(dim_produto_df.select("IDProduto", "sk_produto").alias("dprod")), "IDProduto") \
    .join(broadcast(dim_categoria_df.select("Categoria", "sk_categoria").alias("dcat")), "Categoria") \
    .join(broadcast(dim_segmento_df.select("Segmento", "sk_segmento").alias("dseg")), "Segmento") \
    .join(broadcast(dim_fabricante_df.select("Fabricante", "sk_fabricante").alias("dfab")), "Fabricante") \
    .join(broadcast(dim_cliente_com_sk_df.select("IDCliente", "sk_cliente").alias("dcli")), "IDCliente") \
    .select(
        col("s.Data").alias("DataVenda"),  # Data da venda
        "sk_produto",                     # Chave substituta do produto
        "sk_categoria",                   # Chave substituta da categoria
        "sk_segmento",                    # Chave substituta do segmento
        "sk_fabricante",                  # Chave substituta do fabricante
        "sk_cliente",                     # Chave substituta do cliente
        "Unidades",                       # Quantidade de unidades vendidas
        col("s.PrecoUnitario"),           # Preço unitário do produto
        col("s.CustoUnitario"),           # Custo unitário do produto
        col("s.TotalVendas"),             # Total da venda
        current_timestamp().alias("data_atualizacao")  # Data e hora da atualização
    )

# Passo 2: Particionar dados por ano e mês da DataVenda
fato_vendas_df = fato_vendas_df.withColumn("Ano", year("DataVenda")) \
                               .withColumn("Mes", month("DataVenda"))

# Passo 3: Escrever a tabela Fato no formato Delta, particionada por Ano e Mês
# mode("append"): Modo incremental para adicionar novos dados
fato_vendas_df.write.format("delta") \
             .mode("append") \
             .option("mergeSchema", "true") \
             .option("MaxRecordsPerFile", 1000000) \
             .partitionBy("Ano", "Mes") \
             .save(f"{gold_path}/{tb_destino}")


### Demonstração de informação total vendas por ano

O código fornecido realiza uma consulta na tabela fato_vendas, agrupando os dados por ano e somando o valor total das vendas para cada ano. Além disso, a consulta ordena os resultados de forma decrescente com base no valor total de vendas.

In [0]:
from pyspark.sql.functions import sum, col

# Define o caminho onde os dados da camada Gold estão armazenados
gold_path = "/mnt/lhdw/gold/vendas_delta/"

# Lê a tabela 'fato_vendas' no formato Delta
# A função 'read.format("delta")' permite que o Spark leia os dados do formato Delta
# Agrupa os dados pela coluna 'Ano' para calcular a soma das vendas por ano
# A função 'agg' permite realizar agregações, neste caso somar os valores da coluna 'TotalVendas'
# Ordena os resultados pela coluna 'Ano' e pela soma total das vendas em ordem decrescente
resultado = spark.read.format("delta").load(f"{gold_path}/fato_vendas") \
    .groupBy("Ano") \
    .agg(sum("TotalVendas").alias("SomaTotalVendas")) \
    .orderBy(col("Ano"), col("SomaTotalVendas").desc())

# Exibe o resultado da consulta (somatório das vendas por ano)
display(resultado)


Ano,SomaTotalVendas
2012,11399112.610003725


### Limpeza de Memória

In [0]:
import gc

# Coletar lixo após operações pesadas para liberar memória
gc.collect()

Out[18]: 668