# Carregar dados do Lakehouse Silver para Gold


# Transformações de Silver para Gold

A transformação para a camada Gold envolve:

**Agregações:** Realizar agregações como somas, médias, contagens, etc., para gerar insights de alto nível.

**Cálculos de Negócio:** Aplicar regras de negócios que extraem métricas e KPIs importantes para a análise.

**Desnormalização:** Pode ser necessário juntar ou combinar dados de diferentes fontes para facilitar a análise.

**Criação de Modelos Analíticos:** Aplicação de métricas de negócios complexas, como cálculos de tendências, segmentação de clientes, etc.

## Transformações: Nesse exemplo, estamos agregando os dados para calcular:

O total de quantidade vendida (SUM).
O preço médio unitário (AVG).
O número total de clientes distintos (COUNT DISTINCT).
A última data de venda (MAX).
Criação da Tabela Gold: A tabela Gold será criada na camada Gold e armazenará esses dados agregados e transformados.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Inicializando o SparkSession (se ainda não estiver ativo)
spark = SparkSession.builder.getOrCreate()

# Carregando a tabela Silver
try:
    df_silver = spark.table("Silver_Lakehouse_GL.produto")
except Exception as e:
    print(f"Erro ao carregar a tabela Silver: {e}")
    raise

# Verificar o esquema da tabela Silver
df_silver.printSchema()

# Realizando a transformação para a camada Gold
df_gold = (
    df_silver
    .groupBy("IDPRODUTO", "NOME")
    .agg(
        F.sum("VALOR_UNITARIO").alias("valor_total_vendido"),       # Soma dos valores unitários
        F.avg("CUSTO_MEDIO").alias("custo_medio_produto"),          # Média do custo médio
        F.countDistinct("ID_CATEGORIA").alias("total_categorias"),  # Contagem de categorias distintas
        F.max("MARGEM_BRUTA").alias("maior_margem_bruta")           # Maior margem bruta
    )
)

# Exibir o schema da tabela Gold para conferir se as agregações estão corretas
df_gold.printSchema()

# Criar o esquema Gold_Lakehouse_GL se não existir
try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS Gold_Lakehouse_GL")
except Exception as e:
    print(f"Erro ao criar o esquema Gold_Lakehouse_GL: {e}")
    raise

# Alterar para o esquema correto
spark.sql("USE Gold_Lakehouse_GL")

# Salvando o resultado na camada Gold
try:
    df_gold.write.format("delta").mode("overwrite").saveAsTable("Gold_Lakehouse_GL.produto_gold")
except Exception as e:
    print(f"Erro ao salvar a tabela Gold: {e}")
    raise


StatementMeta(, 2280c21e-c6a9-4f8b-a442-26c929a854b2, 4, Finished, Available, Finished)

root
 |-- idsk: string (nullable = true)
 |-- idproduto: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- id_categoria: string (nullable = true)
 |-- custo_medio: double (nullable = true)
 |-- valor_unitario: double (nullable = true)
 |-- margem_bruta: double (nullable = true)

root
 |-- IDPRODUTO: string (nullable = true)
 |-- NOME: string (nullable = true)
 |-- valor_total_vendido: double (nullable = true)
 |-- custo_medio_produto: double (nullable = true)
 |-- total_categorias: long (nullable = false)
 |-- maior_margem_bruta: double (nullable = true)



In [7]:
# Renomeando as colunas para minúsculas (se tiver)
df_gold = df_gold.withColumnRenamed("IDPRODUTO", "idproduto") \
                 .withColumnRenamed("NOME", "nome")

StatementMeta(, 2280c21e-c6a9-4f8b-a442-26c929a854b2, 9, Finished, Available, Finished)

# Validação


In [3]:
# Listar todas as tabelas no catálogo atual
spark.sql("SHOW TABLES").show()

# Confirmar como a tabela está sendo salva
df_gold.write.format("delta").mode("overwrite").saveAsTable("produto_gold")

df_gold.write.format("delta").mode("overwrite").saveAsTable("Gold_Lakehouse_GL.produto_gold")

# Verificar o esquema atual
spark.sql("SELECT current_database()").show()

# Alterar para o esquema correto, se necessário
spark.sql("USE Gold_Lakehouse_GL")

spark.sql("SELECT * FROM produto_gold").show(5)

# Reexecutar transformação e salvar
df_gold.write.format("delta").mode("overwrite").saveAsTable("Gold_Lakehouse_GL.produto_gold")



StatementMeta(, 2280c21e-c6a9-4f8b-a442-26c929a854b2, 5, Finished, Available, Finished)

+-----------------+------------+-----------+
|        namespace|   tableName|isTemporary|
+-----------------+------------+-----------+
|gold_lakehouse_gl|produto_gold|      false|
+-----------------+------------+-----------+

+------------------+
|current_database()|
+------------------+
| gold_lakehouse_gl|
+------------------+

+---------+--------------------+-------------------+-------------------+----------------+------------------+
|IDPRODUTO|                NOME|valor_total_vendido|custo_medio_produto|total_categorias|maior_margem_bruta|
+---------+--------------------+-------------------+-------------------+----------------+------------------+
|       70|          lg optimus|              890.0|              500.0|               1|             390.0|
|      216|sao paulo / belo ...|              250.0|              200.0|               1|              50.0|
|      207|armario cozinha 6...|             2500.0|             2030.0|               1|             470.0|
|      118|   

## Melhorias na Tabela Gold
É possivel realizar outras melhorias para garantir que a camada Gold contenha dados prontos para consumo, como:

Limpeza de dados (remoção de registros nulos ou inconsistentes).
Transformações de tipo de dados.
Criação de índices ou outras otimizações.

In [4]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F

# Carregar a tabela Delta
delta_table = DeltaTable.forName(spark, "Gold_Lakehouse_GL.produto_gold")

# Deletar as linhas onde 'IDPRODUTO' é NULL
delta_table.delete(condition = F.col("IDPRODUTO").isNull())

StatementMeta(, 2280c21e-c6a9-4f8b-a442-26c929a854b2, 6, Finished, Available, Finished)

# Performance

## Particionamento de Dados

Particionar sua tabela pode melhorar significativamente a performance de leitura, especialmente quando você tem grandes volumes de dados. O particionamento divide os dados em subpastas com base em uma ou mais colunas. Por exemplo, se você tem dados por ano ou mês, pode particionar os dados com base na coluna de data.

In [None]:
## Exemplo: df_gold.write.partitionBy("ano_venda").format("delta").mode("overwrite").saveAsTable("Gold_Lakehouse_GL.produto_gold")

Neste cenario não é valido, pois não tem colunas adequadas

In [8]:
#Z-ordering
##O Z-ordering pode ajudar a melhorar a leitura de dados ao organizar fisicamente os dados de uma maneira mais eficiente.

# Renomeando as colunas para minúsculas
df_gold = df_gold.select([F.col(c).alias(c.lower()) for c in df_gold.columns])

# Agora escreva os dados novamente com particionamento e o esquema renomeado
df_gold.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("idproduto") \
    .saveAsTable("Gold_Lakehouse_GL.produto_gold")


StatementMeta(, 2280c21e-c6a9-4f8b-a442-26c929a854b2, 10, Finished, Available, Finished)

In [6]:
# VACUUM
## O comando VACUUM é importante para garantir que o Delta Lake limpe arquivos antigos de versões anteriores após várias operações de overwrite.

spark.sql("VACUUM Gold_Lakehouse_GL.produto_gold RETAIN 168 HOURS")


StatementMeta(, 2280c21e-c6a9-4f8b-a442-26c929a854b2, 8, Finished, Available, Finished)

DataFrame[path: string]