### Camada Gold (Delta): Criação de Fatos e Dimensões

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession with the required configurations for Delta Lake
spark = SparkSession.builder \
  .appName("Carga Delta") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .getOrCreate()

**Importando o SparkSession**
- **SparkSession**: O ponto de entrada para interagir com o Spark, utilizado para criar DataFrames e realizar operações de consulta, escrita e leitura.

**Criando a Sessão do Spark**
- **SparkSession.builder**: Inicia a construção de uma sessão do Spark.
- **.appName("Carga Delta")**: Define o nome do aplicativo Spark como **"Carga Delta"**, o que é útil para identificar o job em logs e no UI do Spark.

**Habilitando o Suporte ao Delta Lake**
- **"spark.sql.extensions"**: Adiciona a extensão **DeltaSparkSessionExtension** que habilita o suporte ao Delta Lake no Spark. O Delta Lake é uma camada de armazenamento transacional que adiciona ACID (Atomicidade, Consistência, Isolamento e Durabilidade) a dados armazenados em formato Parquet no Spark.
- **"spark.catalog.spark_catalog"**: Configura o DeltaCatalog, que permite ao Spark lidar com tabelas Delta por meio do catálogo de metadados. Isso possibilita que as operações de leitura/escrita em Delta sejam tratadas da mesma forma que em tabelas Spark SQL tradicionais.

In [0]:
# Define os caminhos de armazenamento no Data Lake
silver_path = "/mnt/lhdw/silver/vendas"
gold_path = "/mnt/lhdw/gold/vendas_delta"

### Ler dados Camada Silver

In [0]:
# Lendo os dados da camada Silver no formato Parquet
df_silver = spark.read.format("parquet").load(silver_path)

**Lendo os Dados da Camada Silver**
- Lê os dados da camada Silver, que estão armazenados no formato Parquet, criando o DataFrame **df_silver**. O caminho **silver_path** é o local onde os dados Parquet da camada Silver estão armazenados.

### Criação da Dimensão Produto

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Definindo a tabela de destino na camada Gold
tb_destino = "dim_produto"

# Selecionando as colunas necessárias para a tabela de dimensões de produto e removendo duplicatas
dim_produto_df = df_silver.select(
  "IDProduto", "Produto", "Categoria").dropDuplicates()

#  Adiciona uma nova coluna chamada sk_produto, que será a chave surrogate (chave substituta)que gera IDs únicos, começando em 1
dim_produto_df = dim_produto_df.withColumn("sk_produto", monotonically_increasing_id()+1)

# Gravando os dados no formato Delta na camada Gold
dim_produto_df.write.format("delta").mode("overwrite").save(f"{gold_path}/{tb_destino}")

display(dim_produto_df)
dim_produto_df.count()

IDProduto,Produto,Categoria,sk_produto
585,Maximus UC-50,Urban,1
423,Maximus UM-28,Urban,2
562,Maximus UC-27,Youth,3
405,Maximus UM-10,Accessory,4
540,Maximus UC-05,Mix,5
681,Maximus UC-46,Urban,6
628,Maximus UC-93,Urban,7
415,Maximus UM-20,Urban,8
406,Maximus UM-11,Accessory,9
512,Maximus UR-01,Urban,10


Out[4]: 137

**Importando a Função monotonically_increasing_id**
- **monotonically_increasing_id()**: Esta função gera um ID único (número inteiro crescente) para cada linha, usado aqui como uma chave substituta (surrogate key). É útil quando você precisa adicionar um identificador único para cada registro.

**Definindo a Tabela de Destino**
- O nome da tabela de destino é definido como **dim_produto**, o que sugere que os dados são destinados para uma tabela de dimensões que contém informações sobre produtos. Dimensões são usadas em modelagem dimensional para apoiar análises OLAP.

**Selecionando Colunas Necessárias e Removendo Duplicatas**
- select("IDProduto", "Produto", "Categoria")**: Seleciona apenas as colunas relacionadas ao produto, que serão armazenadas na tabela de dimensões.
- **.dropDuplicates()**: Remove linhas duplicadas com base nas colunas selecionadas. Isso garante que cada produto seja único na tabela de dimensões.

**Adicionando uma Chave Surrogate (sk_produto)**
- **.withColumn("sk_produto", ...)**: Adiciona uma nova coluna chamada **sk_produto**, que será a chave surrogate (chave substituta).
- **monotonically_increasing_id() + 1**: Gera IDs únicos, começando em 1. As surrogate keys são usadas em tabelas de dimensões para garantir que cada registro tenha um identificador exclusivo, que não depende da chave natural (neste caso, **IDProduto**).

**Gravando os Dados no Formato Delta**
- **.write**: Inicia a operação de escrita no DataFrame **dim_produto_df**.
- **.format("delta")**: Especifica que os dados serão gravados no formato Delta, que suporta transações ACID e versionamento de dados.
- **.mode("overwrite")**: Usa o modo de sobrescrita. Se a tabela já existir no local **gold_path/dim_produto**, ela será sobrescrita.
- **.save(f"{gold_path}/{tb_destino}")**: Salva os dados no caminho especificado em **gold_path**, criando a tabela de dimensões **dim_produto** na camada Gold.

### Criação da Dimensão Categoria

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Definindo a tabela de destino na camada Gold
tb_destino = "dim_categoria"

# Selecionando as colunas necessárias para a tabela de dimensões de categoria e removendo duplicatas
dim_categoria_df = df_silver.select(
  "Categoria").dropDuplicates()

#  Adiciona uma nova coluna chamada sk_categoria, que será a chave surrogate (chave substituta)que gera IDs únicos, começando em 1
dim_categoria_df = dim_categoria_df.withColumn("sk_categoria", monotonically_increasing_id()+1)

# Gravando os dados no formato Delta na camada Gold
dim_categoria_df.write.format("delta").mode("overwrite").save(f"{gold_path}/{tb_destino}")

display(dim_categoria_df)
dim_categoria_df.count()

Categoria,sk_categoria
Mix,1
Urban,2
Youth,3
Accessory,4
Rural,5


Out[5]: 5

### Criação da Dimensão Segmento

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Definindo a tabela de destino na camada Gold
tb_destino = "dim_segmento"

# Selecionando as colunas necessárias para a tabela de dimensões de segmento e removendo duplicatas
dim_segmento_df = df_silver.select(
  "Segmento").dropDuplicates()

#  Adiciona uma nova coluna chamada sk_segmento, que será a chave surrogate (chave substituta)que gera IDs únicos, começando em 1
dim_segmento_df = dim_segmento_df.withColumn("sk_segmento", monotonically_increasing_id()+1)

# Gravando os dados no formato Delta na camada Gold
dim_segmento_df.write.format("delta").mode("overwrite").save(f"{gold_path}/{tb_destino}")

display(dim_segmento_df)
dim_segmento_df.count()

Segmento,sk_segmento
All Season,1
Extreme,2
Youth,3
Accessory,4
Select,5
Productivity,6
Regular,7
Convenience,8
Moderation,9


Out[6]: 9

### Criação da Dimensão Fabricante

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Definindo a tabela de destino na camada Gold
tb_destino = "dim_fabricante"

# Selecionando as colunas necessárias para a tabela de dimensões de fabricante e removendo duplicatas
dim_fabricante_df = df_silver.select(
  "IDFabricante", "Fabricante").dropDuplicates()

#  Adiciona uma nova coluna chamada sk_fabricante, que será a chave surrogate (chave substituta)que gera IDs únicos, começando em 1
dim_fabricante_df = dim_fabricante_df.withColumn("sk_fabricante", monotonically_increasing_id()+1)

# Gravando os dados no formato Delta na camada Gold
dim_fabricante_df.write.format("delta").mode("overwrite").save(f"{gold_path}/{tb_destino}")

display(dim_fabricante_df)
dim_fabricante_df.count()

IDFabricante,Fabricante,sk_fabricante
7,VanArsdel,1


Out[7]: 1

### Criação da Dimensão Geografia

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Definindo a tabela de destino na camada Gold
tb_destino = "dim_geografia"

# Selecionando as colunas necessárias para a tabela de dimensões de geografia e removendo duplicatas
dim_geografia_df = df_silver.select(
  "Cidade", "Estado", "Regiao", "Distrito", "Pais", "CodigoPostal").dropDuplicates()

#  Adiciona uma nova coluna chamada sk_geografia, que será a chave surrogate (chave substituta)que gera IDs únicos, começando em 1
dim_geografia_df = dim_geografia_df.withColumn("sk_geografia", monotonically_increasing_id()+1)

# Gravando os dados no formato Delta na camada Gold
dim_geografia_df.write.format("delta").mode("overwrite").save(f"{gold_path}/{tb_destino}")

display(dim_geografia_df)
dim_geografia_df.count()

Cidade,Estado,Regiao,Distrito,Pais,CodigoPostal,sk_geografia
Batavia,IL,Central,District #31,USA,60510,1
Denver,CO,Central,District #20,USA,80232,2
Nampa,ID,West,District #33,USA,83651,3
Belmar,NJ,East,District #04,USA,7719,4
Mount Dora,FL,East,District #12,USA,32757,5
Moses Lake,WA,West,District #33,USA,98837,6
Indio,CA,West,District #38,USA,92203,7
Jacksonville,FL,East,District #12,USA,32225,8
Warrenton,MO,Central,District #29,USA,63383,9
Crosby,TX,Central,District #23,USA,77532,10


Out[8]: 20467

### Criação da Dimensão Cliente

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Definindo a tabela de destino como "dim_cliente"
tb_destino = "dim_cliente"

# Selecionando colunas relevantes de cliente e removendo duplicatas
dim_cliente_df = df_silver.select(
  "IDCliente", "Nome", "Email", "Cidade", "Estado", "Regiao", "Distrito", "Pais", "CodigoPostal"
).dropDuplicates()

# Realizando um join para associar geografia com base nos atributos geográficos
dim_cliente_com_sk_df = dim_cliente_df.alias("cliente") \
    .join(dim_geografia_df.alias("geografia"), 
          (col("cliente.Cidade") == col("geografia.Cidade")) &
          (col("cliente.Estado") == col("geografia.Estado")) &
          (col("cliente.Regiao") == col("geografia.Regiao")) &
          (col("cliente.Distrito") == col("geografia.Distrito")) &
          (col("cliente.Pais") == col("geografia.Pais")) &
          (col("cliente.CodigoPostal") == col("geografia.CodigoPostal")), 
          "left") \
    .select("cliente.IDCliente", "cliente.Nome", "cliente.Email", "geografia.sk_geografia")

# Adicionando uma chave surrogate para o cliente
dim_cliente_com_sk_df = dim_cliente_com_sk_df.withColumn("sk_cliente", monotonically_increasing_id()+1)

# Selecionando as colunas necessárias com as chaves surrogate
dim_cliente_com_sk_df = dim_cliente_com_sk_df.select("IDCliente", "Nome", "Email", "sk_geografia", "sk_cliente")

# Salvando a tabela de dimensão de cliente no formato Delta na camada Gold
dim_cliente_com_sk_df.write.format("delta").mode("overwrite").save(f"{gold_path}/{tb_destino}")

display(dim_cliente_com_sk_df)
dim_cliente_com_sk_df.count()

IDCliente,Nome,Email,sk_geografia,sk_cliente
255010,John Trevino,john.trevino@xyza.com,3468,1
272755,Kameko Langley,kameko.langley@xyza.com,8589941780,2
111898,Rogan Saunders,rogan.saunders@xyza.com,153,3
8933,Jackson Levy,jackson.levy@xyza.com,4822,4
88529,Ignacia Jacobson,ignacia.jacobson@xyza.com,6282,5
141432,Tanya Whitley,tanya.whitley@xyza.com,4,6
151370,Kaden Washington,kaden.washington@xyza.com,8589938460,7
45239,Tara Sloan,tara.sloan@xyza.com,8589942604,8
5562,Moses Carney,moses.carney@xyza.com,8589934743,9
106710,Mechelle Watson,mechelle.watson@xyza.com,3296,10


Out[9]: 112081

**Importando Funções Necessárias**
- **col**: Função usada para acessar colunas no Spark DataFrame.
- **monotonically_increasing_id**: Função usada para gerar IDs únicos crescentes, que serão usados como chaves surrogate (substitutas).

**Definindo a Tabela de Destino**
- Definindo o nome da tabela de destino na camada Gold como **dim_cliente**, o que indica que este DataFrame representa uma dimensão de clientes.

**Selecionando Colunas Relacionadas a Cliente e Removendo Duplicatas**
- Seleciona as colunas que contêm informações de cliente e também atributos geográficos como Cidade, Estado, Região, etc.
- **dropDuplicates()**: Remove registros duplicados para garantir que cada cliente seja único na tabela.

**Realizando o Join para Associar Geografia**
- Realiza um join entre o cliente e a geografia, associando as chaves geográficas (como **Cidade**, **Estado**, **Regiao**, etc.) usando uma combinação de colunas.
- **alias("cliente")** e **alias("geografia")**: Usados para diferenciar as tabelas durante o join.
- **left join**: Um join à esquerda garante que todos os clientes sejam incluídos, mesmo que não tenham correspondência exata em atributos geográficos.

**Adicionando a Chave Surrogate (sk_cliente)**
- Adiciona uma nova coluna chamada sk_cliente que contém uma chave surrogate para cada cliente.
- **monotonically_increasing_id() + 1**: Gera valores crescentes exclusivos para cada registro, começando em 1.

**Selecionando as Colunas Finais**
- Aqui, apenas as colunas necessárias são mantidas: **IDCliente**, **Nome**, **Email**, **sk_geografia** (chave surrogate de geografia) e **sk_cliente** (chave surrogate de cliente).

**Gravando os Dados no Formato Delta**
- Escreve o DataFrame resultante no formato Delta na camada Gold.
- **.format("delta")**: Especifica o formato Delta, que oferece suporte a transações ACID e versionamento.
- **.mode("overwrite")**: Usa o modo overwrite, sobrescrevendo quaisquer dados anteriores que possam existir no caminho especificado.
- **save(f"{gold_path}/{tb_destino}")**: Salva a tabela **dim_cliente** no local da camada Gold (especificado pelo caminho **gold_path**).

### Criação de Tabela Fato

In [0]:
tb_destino = "fato_vendas"

from pyspark.sql.functions import broadcast, year, month

# Realizando os joins com as tabelas de dimensão para criar a tabela fato de vendas
fato_vendas_df = df_silver.alias("s") \
    .join(broadcast(dim_produto_df.select("IDProduto", "sk_produto").alias("dprod")), "IDProduto") \
    .join(broadcast(dim_categoria_df.select("Categoria", "sk_categoria").alias("dcat")), "Categoria") \
    .join(broadcast(dim_segmento_df.select("Segmento", "sk_segmento").alias("dseg")), "Segmento") \
    .join(broadcast(dim_fabricante_df.select("Fabricante", "sk_fabricante").alias("dfab")), "Fabricante") \
    .join(broadcast(dim_cliente_com_sk_df.select("IDCliente", "sk_cliente").alias("dcli")), "IDCliente") \
    .select(
        col("s.Data").alias("DataVenda"),
        "sk_produto",
        "sk_categoria",
        "sk_segmento",
        "sk_fabricante",
        "sk_cliente",
        "Unidades",
        col("s.PrecoUnitario"),
        col("s.CustoUnitario"),
        col("s.TotalVendas")
    )

# Adicionando as colunas de Ano e Mês para particionamento
fato_vendas_df.withColumn("Ano", year("DataVenda")) \
             .withColumn("Mes", month("DataVenda")) \
             .write.format("delta") \
             .mode("overwrite") \
             .option("MaxRecordsPerFile", 1000000) \
             .partitionBy("Ano", "Mes") \
             .save(f"{gold_path}/{tb_destino}")

# Exibindo o DataFrame resultante
display(fato_vendas_df)

# Contando o número total de registros na tabela fato
fato_vendas_df.count()

DataVenda,sk_produto,sk_categoria,sk_segmento,sk_fabricante,sk_cliente,Unidades,PrecoUnitario,CustoUnitario,TotalVendas
2011-04-27,51,2,8,1,25769804455,1,68.24,49.82,68.24
2011-04-02,51,2,8,1,25769805583,1,68.24,49.82,68.24
2011-04-08,51,2,8,1,42949676743,1,68.24,49.82,68.24
2011-04-14,51,2,8,1,42949673877,1,68.24,49.82,68.24
2011-04-24,51,2,8,1,42949673434,1,68.24,49.82,68.24
2011-04-05,51,2,8,1,8589934593,1,68.24,49.82,68.24
2011-04-05,51,2,8,1,42949676956,1,68.24,49.82,68.24
2011-04-22,51,2,8,1,51539609218,1,68.24,49.82,68.24
2011-04-23,51,2,8,1,8589934795,1,68.24,49.82,68.24
2011-04-23,51,2,8,1,51539611716,1,68.24,49.82,68.24


Out[10]: 112202

**Importando Funções Necessárias**
- **broadcast**: Usada para transmitir (broadcast) uma tabela de dimensão menor para otimizar a junção com uma tabela maior (no caso, a tabela de fato).
- **year** e **month**: Funções que extraem o ano e o mês de uma data, usadas para criar colunas adicionais no DataFrame.

**Definindo a Tabela de Destino**
- Aqui, o nome da tabela de destino é definido como **fato_vendas**, que representa a tabela de fatos onde as vendas e as chaves surrogate de várias dimensões serão armazenadas.

**Realizando o Join com as Tabelas de Dimensão**
- **alias("s")**: Apelido dado à tabela de fatos **df_silver** para facilitar o acesso às suas colunas.
- O join é feito entre **df_silver** (fatos de vendas) e as várias dimensões (produto, categoria, segmento, fabricante e cliente) com o uso de **broadcast**. Isso ajuda a otimizar o processo de junção quando as tabelas de dimensão são menores.
- Após o join, o **select** é usado para selecionar as colunas relevantes, como as chaves surrogate (**sk_produto**, **sk_categoria**, etc.) e outras informações de vendas como **Unidades**, **PrecoUnitario**, **CustoUnitario** e **TotalVendas**.

**Adicionando Colunas de Ano e Mês**
- **withColumn("Ano", year("DataVenda"))** e **withColumn("Mes", month("DataVenda"))**: São adicionadas duas novas colunas, **Ano** e **Mês**, extraídas da coluna **DataVenda**, para facilitar a partição dos dados.
- **.write.format("delta")**: Especifica que os dados serão gravados no formato Delta, que suporta transações ACID e versionamento.
- **.mode("overwrite")**: O modo overwrite sobrescreve quaisquer dados existentes no caminho de destino.
- **.option("MaxRecordsPerFile", 1000000)**: Define o máximo de 1 milhão de registros por arquivo.
- **.partitionBy("Ano", "Mes")**: Os dados são particionados por ano e mês, o que melhora a performance de consultas quando filtradas por essas colunas.

### Limpeza de Memória

In [0]:
import gc

# Coletar lixo após operações pesadas para liberar memória
gc.collect()

# Limpar todos os dados em cache
spark.catalog.clearCache()

###Evidências de Carga na Camada Gold (Delta)

In [0]:
%fs ls /mnt/lhdw/gold/vendas_delta/

path,name,size,modificationTime
dbfs:/mnt/lhdw/gold/vendas_delta/dim_categoria/,dim_categoria/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/dim_cliente/,dim_cliente/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/dim_fabricante/,dim_fabricante/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/dim_geografia/,dim_geografia/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/dim_produto/,dim_produto/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/dim_segmento/,dim_segmento/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/,fato_vendas/,0,0


In [0]:
%fs ls /mnt/lhdw/gold/vendas_delta/dim_categoria/

path,name,size,modificationTime
dbfs:/mnt/lhdw/gold/vendas_delta/dim_categoria/_delta_log/,_delta_log/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/dim_categoria/part-00000-cd87c664-4216-4121-9c07-2dd0c697e43a-c000.snappy.parquet,part-00000-cd87c664-4216-4121-9c07-2dd0c697e43a-c000.snappy.parquet,948,1741958676000


In [0]:
%fs ls /mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/

path,name,size,modificationTime
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=1/,Mes=1/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=10/,Mes=10/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=11/,Mes=11/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=12/,Mes=12/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=2/,Mes=2/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=3/,Mes=3/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=4/,Mes=4/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=5/,Mes=5/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=6/,Mes=6/,0,0
dbfs:/mnt/lhdw/gold/vendas_delta/fato_vendas/Ano=2011/Mes=7/,Mes=7/,0,0
