**1. Configurações Iniciais e Importações**

Aqui está um exemplo de um notebook em PySpark para implementar a arquitetura Medallion com as camadas Bronze, Silver e Gold, utilizando Databricks e Delta Lake. Este exemplo segue as boas práticas de desenvolvimento e performance, incluindo a criação de surrogate keys (chaves substitutas) para as dimensões e otimização da tabela de fatos na camada Gold.

**Explicações:**

- Importar bibliotecas e funções necessárias.
- Definir os caminhos de arquivo para as camadas Bronze, Silver e Gold.
- Configurar as definições do Spark para um desempenho ótimo, como partições de shuffle automático.

In [0]:
# Importar as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Iniciar a SparkSession com configurações otimizadas
spark = SparkSession.builder \
    .appName("Load Data Bronze") \
    .config("spark.sql.shuffle.partitions", "200")  \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Define um número fixo de partições para shuffle, melhorando o paralelismo                 
# Define o tamanho máximo de partições para evitar muitos arquivos pequenos        
# Usa o codec Snappy para compressão rápida, otimizando tempo de leitura e escrita    
# Habilita otimizações adaptativas, ajustando o número de partições dinamicamente com base no tamanho dos dados

# Definir caminhos de armazenamento no Data Lake
lz_path_in = "/mnt/lhdw/landingzone/vendas/processar"
lz_path_out = "/mnt/lhdw/landingzone/vendas/processado"
bronze_path = "/mnt/lhdw/bronze/vendas"



**Justificativa:**

- **spark.sql.shuffle.partitions**: Define o número de partições para operações que envolvem shuffle (como joins e agregações). Escolher um valor fixo, como 200, garante que o cluster trabalhe de forma paralela de maneira eficiente.

Um cálculo comum para o número de partições é o seguinte:

_`número de partições = número de núcleos de CPU * 2 ou 3`_

Isso ajuda a garantir que o Spark use todos os núcleos disponíveis.
- **spark.sql.files.maxPartitionByte**s: Definimos o tamanho máximo dos arquivos particionados para evitar a criação de muitos arquivos pequenos, o que prejudicaria a performance de leitura e escrita.
- **spark.sql.parquet.compression.codec**: Snappy é uma escolha comum para Parquet, pois oferece uma boa combinação de compressão rápida e descompressão eficiente.
- **spark.sql.adaptive.enabled**: A otimização adaptativa ajusta o plano de execução conforme o tamanho dos dados, melhorando o desempenho automaticamente.

%md
**2. Camada Bronze: Ingestão de Dados Brutos**

A camada Bronze armazena dados brutos com formato parquet, sem transformações significativas. Aqui vamos simplesmente gravar os dados brutos como parquet.

### Criando um Schema para dados brutos

In [0]:
# Definir o esquema dos dados brutos
schema_lz = StructType([
    StructField("IDProduto", IntegerType(), True),
    StructField("Data", DateType(), True),
    StructField("IDCliente", IntegerType(), True),
    StructField("IDCampanha", IntegerType(), True),
    StructField("Unidades", IntegerType(), True),
    StructField("Produto", StringType(), True),
    StructField("Categoria", StringType(), True),
    StructField("Segmento", StringType(), True),
    StructField("IDFabricante", IntegerType(), True),
    StructField("Fabricante", StringType(), True),
    StructField("CustoUnitario", DoubleType(), True),
    StructField("PrecoUnitario", DoubleType(), True),
    StructField("CodigoPostal", StringType(), True),
    StructField("EmailNome", StringType(), True),
    StructField("Cidade", StringType(), True),
    StructField("Estado", StringType(), True),
    StructField("Regiao", StringType(), True),
    StructField("Distrito", StringType(), True),
    StructField("Pais", StringType(), True)
])

# Leitura dos dados e adição da coluna nome do arquivo durante a leitura
df_vendas = spark.read.option("header", "true").schema(schema_lz).csv(lz_path_in) \
                      .withColumn("filename", regexp_extract(input_file_name(), "([^/]+)$", 0))

distinct_filenames = df_vendas.select("filename").distinct()

# Exibindo o DataFrame para verificar a leitura correta dos dados
display(df_vendas)

IDProduto,Data,IDCliente,IDCampanha,Unidades,Produto,Categoria,Segmento,IDFabricante,Fabricante,CustoUnitario,PrecoUnitario,CodigoPostal,EmailNome,Cidade,Estado,Regiao,Distrito,Pais,filename
449,2012-07-26,247546,22,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33194,"(Nerea.Barry@xyza.com): Barry, Nerea","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-12-12,99747,14,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33156,"(Chester.Rollins@xyza.com): Rollins, Chester","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-08-25,31106,19,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33157,"(Bertha.Alvarado@xyza.com): Alvarado, Bertha","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-12-15,31114,12,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33157,"(Julie.Winters@xyza.com): Winters, Julie","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-12-30,31115,14,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33157,"(Shelley.Howell@xyza.com): Howell, Shelley","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-08-06,143358,17,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33185,"(Walter.Hurst@xyza.com): Hurst, Walter","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-08-22,80314,18,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33131,"(Amy.Green@xyza.com): Green, Amy","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-07-30,190511,22,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33183,"(Paloma.Simmons@xyza.com): Simmons, Paloma","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-08-30,58109,22,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33186,"(Troy.Solomon@xyza.com): Solomon, Troy","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv
449,2012-11-08,209504,17,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33184,"(Aaron.Cotton@xyza.com): Cotton, Aaron","Miami, FL, USA",FL,East,District #10,USA,dados_2012.csv


### Apresentando os arquivos lidos

In [0]:

display(distinct_filenames)

filename
dados_2012.csv


### Salvar/Persistir dados na camada Bronze Bronze

Os dados serão salvos de forma particionada **Ano e Mês**

In [0]:
# Escrever a tabela no formato Parquet, particionando por DataVenda (ano e mês)
df_vendas.withColumn("Ano", year("Data")) \
             .withColumn("Mes", month("Data")) \
             .write.mode("overwrite").partitionBy("Ano", "Mes").parquet(bronze_path)

# Apresentando o DataFrame
#display(df_vendas)

**Justificativas:**

- Lê os dados brutos a partir de um arquivo CSV na landing zone e escreve esses dados no formato Parquet na camada Bronze.
- O Parquet é escolhido pelo seu suporte a colunas e sua eficiência tanto em termos de espaço quanto em desempenho de leitura e escrita.

### Mover os arquivos processados para pasta processado

In [0]:
from pyspark.sql import functions as F
# Unpersist the DataFrame to ensure it does not hold onto file references
distinct_filenames.unpersist()
# Mover os arquivos processados para o caminho lz_path_out
# Nota: A operação de mover arquivos diretamente não é suportada pelo DataFrame API do Spark.
#       É necessário utilizar o dbutils.fs.mv para mover os arquivos manualmente após o processamento.


# Primeiro, verifique se há arquivos a serem movidos
if distinct_filenames.select("filename").distinct().count() > 0:
    filenames = distinct_filenames.select("filename").distinct().collect()

    for row in filenames:
        src_path = row.filename
        dbutils.fs.mv(lz_path_in + "/" + src_path, lz_path_out)


####Evidências

In [0]:
%fs ls /mnt/lhdw/landingzone/vendas/processar

In [0]:
%fs ls /mnt/lhdw/landingzone/vendas/processado/

path,name,size,modificationTime
dbfs:/mnt/lhdw/landingzone/vendas/processado/dados_2011.csv,dados_2011.csv,21493733,1727725272000
dbfs:/mnt/lhdw/landingzone/vendas/processado/dados_2012.csv,dados_2012.csv,22400712,1727802324000


In [0]:
%fs ls /mnt/lhdw/bronze/vendas/Ano=2012/Mes=10

path,name,size,modificationTime
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/_SUCCESS,_SUCCESS,0,1727802305000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/_committed_6446019627177129860,_committed_6446019627177129860,622,1727802304000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/_started_6446019627177129860,_started_6446019627177129860,0,1727802299000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/part-00000-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-8-10.c000.snappy.parquet,part-00000-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-8-10.c000.snappy.parquet,58296,1727802301000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/part-00001-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-9-10.c000.snappy.parquet,part-00001-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-9-10.c000.snappy.parquet,47364,1727802302000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/part-00002-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-10-10.c000.snappy.parquet,part-00002-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-10-10.c000.snappy.parquet,78265,1727802302000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/part-00003-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-11-10.c000.snappy.parquet,part-00003-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-11-10.c000.snappy.parquet,59592,1727802302000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/part-00004-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-12-10.c000.snappy.parquet,part-00004-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-12-10.c000.snappy.parquet,63143,1727802301000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2012/Mes=10/part-00005-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-13-10.c000.snappy.parquet,part-00005-tid-6446019627177129860-63089e8f-b1d4-41bd-b99d-1e076cdfd01c-13-10.c000.snappy.parquet,33132,1727802299000


### A opção de gravar dados no modo "append" 

Permite adicionar novos dados a um arquivo existente, sem substituir ou excluir os dados já presentes. 

No caso específico do código fornecido, a linha de código comentada `df_vendas.withColumn("Ano", year("Data")) \ .withColumn("Mes", month("Data")) \ .write.mode("append").partitionBy("Ano", "Mes").parquet(bronze_path)` indica que os dados do DataFrame `df_vendas` serão adicionados ao arquivo Parquet existente no caminho `bronze_path`, mantendo a estrutura de particionamento por ano e mês.

Essa opção é útil quando se deseja adicionar novos dados a um conjunto de dados já existente, como por exemplo, quando novas vendas são registradas e precisam ser incorporadas ao conjunto de dados de vendas existente.

In [0]:
#df_vendas.withColumn("Ano", year("Data")) \
#         .withColumn("Mes", month("Data")) \
#         .write.mode("append").partitionBy("Ano", "Mes").parquet(bronze_path)

### Gerenciar o uso de memória 
Em PySpark, é importante gerenciar o uso de memória eficientemente, especialmente quando se trabalha com grandes conjuntos de dados. Para isso, você pode usar alguns comandos específicos que ajudam a liberar memória, remover objetos em cache ou persistidos e forçar a coleta de lixo.

**1. Limpar cache:**
PySpark armazena dados em cache para melhorar o desempenho de operações repetidas. Para liberar esses dados, você pode usar o comando unpersist().

In [0]:
# Exemplo de como liberar o cache de um DataFrame

df_vendas.unpersist()

# O comando unpersist() remove o DataFrame do cache, liberando a memória associada. Ele é especialmente útil quando você já não precisa mais dos dados persistidos.

Out[7]: DataFrame[IDProduto: int, Data: date, IDCliente: int, IDCampanha: int, Unidades: int, Produto: string, Categoria: string, Segmento: string, IDFabricante: int, Fabricante: string, CustoUnitario: double, PrecoUnitario: double, CodigoPostal: string, EmailNome: string, Cidade: string, Estado: string, Regiao: string, Distrito: string, Pais: string, filename: string]

**2. Limpar todos os dados em cache:**

Se houver vários DataFrames em cache, você pode limpá-los todos de uma vez.

In [0]:
# Limpar todos os dados em cache

spark.catalog.clearCache()

# clearCache() limpa o cache de todos os objetos em cache no SparkSession atual, liberando uma quantidade significativa de memória quando múltiplos DataFrames estão sendo reutilizados.

**3. Forçar coleta de lixo:**

O Python possui um coletor de lixo que remove objetos não referenciados da memória. Você pode forçar a coleta de lixo para liberar memória.

In [0]:
import gc
gc.collect()

#Comentário: Esse comando força o coletor de lixo a executar imediatamente, liberando a memória de objetos Python que não estão mais em uso.

Out[9]: 368

**4. Liberar variáveis manualmente:**

Se você criou variáveis grandes que não são mais necessárias, você pode removê-las explicitamente.

In [0]:
del df_vendas

# O comando del remove o objeto da memória. Isso é útil quando você tem grandes DataFrames ou objetos Python que já não são necessários.

%md
**Dicas adicionais:**
- Evite cachear DataFrames desnecessários.

**Resumo**

- **Para uma limpeza rápida e geral**: Use spark.catalog.clearCache().
- **Para liberar memória de DataFrames específicos**: Use df.unpersist().
- **Para remover variáveis específicas**: Use del.
- **Para uma solução completa**: Reinicie o cluster.