# **Camada Silver (Cleaned / Refined Layer)**

Dados já limpos, validados e com estruturação melhorada.

Normalmente inclui tratamento de erros, remoção de duplicatas e padronização.

Serve para análises intermediárias e alimentação de modelos.

In [0]:
%sql
SELECT current_catalog();

In [0]:
spark.sql("USE CATALOG pipe1")

# Tratamento de dados da LCB

✅ O que é particionamento?
Particionamento é a técnica de organizar fisicamente os dados em pastas separadas com base nos valores de uma ou mais colunas (ex: ano, mês, região etc).
📂 Exemplo:
Se você particionar uma tabela pela coluna ano, o Delta Lake vai salvar os dados assim:
ano=2023/part-0001.parquet  
ano=2024/part-0002.parquet  
Se particionar por ano e mes:
/ano=2024/mes=01/part-001.snappy.parquet  
/ano=2024/mes=02/part-002.snappy.parquet  
🚀 Por que o particionamento é importante no Delta Lake?
O Delta Lake usa o particionamento para acelerar as consultas, pois ele:

1. 🔍 Reduz o volume de dados lido
Se você filtrar WHERE ano = 2024, o Spark lê só a pasta ano=2024, ignorando as demais.

Isso acelera muito a leitura e economiza recursos de CPU/memória.

2. 🧹 Organiza melhor os dados
Ideal para dados temporais (ano, mês, dia), regionais (estado, cidade) ou categorias.

Ajuda na manutenção e gerenciamento do data lake.

3. ⚙️ Facilita atualização incremental
Se cada partição representa um mês, por exemplo, é possível atualizar apenas aquele mês com overwrite sem afetar os demais.

4. 📊 Melhora performance de ferramentas BI
Power BI, Metabase, Superset... todas se beneficiam de datasets bem particionados (menos tempo de carregamento e resposta).
⚠️ Cuidado: particionar demais também atrapalha
Particionar por colunas de alta cardinalidade (como CPF, ID de cliente, ou timestamps completos) cria milhares de pastas, e isso piora a performance.

Sempre particione por colunas com baixo número de valores únicos e que você costuma usar em filtros.

✅ Resumo visual:
Sem particionamento	Com particionamento por ano, mes
Spark lê todos os arquivos	Spark lê só os arquivos relevantes
Consulta lenta	Consulta rápida
Uso excessivo de recursos	Economia de CPU e memória
Mais difícil de manter	Organização mais clara e modular

🧠 O que é o Z-Ordering?
É um tipo especial de ordenação multidimensional dos dados dentro de cada partição Delta. Ele reorganiza os arquivos internamente, colocando juntos os registros que compartilham valores semelhantes nas colunas mais consultadas.

É inspirado no Z-order curve, uma técnica de indexação espacial que melhora a localidade dos dados em disco.

 Exemplo prático (sem Z-ordering):
Você tem uma tabela com milhões de linhas e faz consultas com filtros assim:

SELECT * FROM vendas WHERE cidade = 'São Paulo'

Se os dados estão desordenados, o Spark precisa ler muitos arquivos, mesmo que apenas alguns contenham 'São Paulo'.

✅ Exemplo com Z-Ordering:
Se você aplica:
OPTIMIZE minha_tabela ZORDER BY (cidade)

O Delta Lake reorganiza os dados internamente, agrupando os registros de 'São Paulo' nos mesmos arquivos ou blocos.

A próxima vez que você rodar o SELECT, o Spark lerá menos arquivos, reduzindo o tempo e custo da consulta.

🧪 Quando usar Z-Ordering?
A tabela já tem particionamento por colunas como ano, mes, e você quer melhorar o desempenho de filtros por colunas não particionadas (ex: cidade, produto_id, cliente_id).

Você faz muitas leituras com filtros em uma ou mais colunas específicas.

A tabela tem grande volume de dados e você quer otimizar leitura sem alterar a partição física.

📌 Sintaxe no Databricks:

OPTIMIZE minha_tabela
ZORDER BY (coluna1, coluna2)

Ou em PySpark:
spark.sql("""
  OPTIMIZE pipe1.silver.slv_despesas_lcb_contemporaneo
  ZORDER BY (condominio, tipo_despesa)
""")

⚠️ Observações:
Item	Z-Ordering
Nível de atuação	Dentro de arquivos/parquet
Altera a partição física	❌ Não
Aumenta o custo de escrita	✅ Sim (reordena arquivos)
Melhora leitura	✅ Muito, quando bem usado

✅ Conclusão:
Z-Ordering = reorganização interna dos dados em Delta Lake para tornar consultas filtradas mais rápidas.

Use quando:

Você tem filtros repetitivos por coluna(s).

Sua tabela já está estabilizada e você quer otimizar leitura.

In [0]:
from pyspark.sql.functions import col, date_format, round, year, month, to_timestamp, to_date, substring

# 1. Ler tabela Bronze
df_bronze = spark.table("pipe1.bronze.brz_despesas_lcb_contemporaneo")

# 2. Remover a coluna _line
df = df_bronze.drop("_line")

# 3. Transformar _fivetran_synced para string com timestamp formatado
df = df.withColumn("_fivetran_synced", date_format(col("_fivetran_synced"), "yyyy-MM-dd'T'HH:mm:ss"))

# 4. Garantir que 'periodo' esteja no formato string "yyyy-MM"
# (caso venha como date ou timestamp, usamos date_format)
df = df.withColumn("periodo", date_format(col("periodo"), "yyyy-MM"))

# 5. Arredondar colunas numéricas
df = df.withColumn("kwh_dia", round(col("kwh_dia"), 2))
df = df.withColumn("total_agua", round(col("total_agua"), 2))

# 6. Substituir valores nulos por 0
df = df.fillna(0)

# 7. Criar colunas de partição ano e mês extraídas da string 'periodo'
df = df.withColumn("ano", substring(col("periodo"), 1, 4).cast("int")) \
       .withColumn("mes", substring(col("periodo"), 6, 2).cast("int"))

# 8. Salvar como Delta particionado por ano e mês
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("ano", "mes") \
    .saveAsTable("pipe1.silver.slv_despesas_lcb_contemporaneo")


In [0]:
# dropar a tabela
#spark.sql("DROP TABLE IF EXISTS pipe1.silver.slv_despesas_lcb_contemporaneo")



In [0]:
df_silver = spark.table("pipe1.silver.slv_despesas_lcb_contemporaneo")
display(df_silver)

# Tratamento de dados da Timbauvas

In [0]:
%sql
select * from workspace.google_drive.dados_despesas_home_despesa_timbauvas

In [0]:
from pyspark.sql.functions import col, to_date, to_timestamp, date_format, round, substring
from pyspark.sql.types import DoubleType

# 1. Carregar dados da camada Bronze
df = spark.table("pipe1.bronze.brz_despesas_timbauvas")

# 2. Remover a coluna _line
df = df.drop("_line")

# 3. Transformar _fivetran_synced para yyyy-MM-dd e timestamp formatado
df = df.withColumn("_fivetran_synced", date_format(col("_fivetran_synced"), "yyyy-MM-dd'T'HH:mm:ss"))

# 4. Alterar a coluna 'periodo' para string no formato ano-mês ("yyyy-MM")
df = df.withColumn("periodo", date_format(to_timestamp(col("periodo")), "yyyy-MM"))

# 5. Arredondar kwh_dia para 2 casas decimais
df = df.withColumn("kwh_dia", round(col("kwh_dia"), 2))

# 6. Converter despesa_de_luz para número com 2 casas decimais
df = df.withColumn("despesa_de_luz", round(col("despesa_de_luz").cast(DoubleType()), 2))

# 7. Substituir todos os valores nulos por 0
df = df.fillna(0)

# 8. Arredondar consumo_kg para 2 casas decimais
df = df.withColumn("consumo_de_agua", round(col("consumo_de_agua"), 2))

# 9. Arredondar total_agua para 2 casas decimais
df = df.withColumn("total_agua", round(col("total_agua"), 2))

# 10. Criar colunas 'ano' e 'mes' extraídas da string 'periodo'
df = df.withColumn("ano", substring(col("periodo"), 1, 4).cast("int")) \
       .withColumn("mes", substring(col("periodo"), 6, 2).cast("int"))

# 11. Gravar na camada Silver particionado por ano e mês
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("ano", "mes") \
    .saveAsTable("pipe1.silver.slv_despesas_timbauvas")


In [0]:
df_silver = spark.table("pipe1.silver.slv_despesas_timbauvas")
display(df_silver)

In [0]:
# dropar a tabela
#spark.sql("DROP TABLE IF EXISTS pipe1.silver.slv_despesas_timbauvas")