# ## Carga Incremental com Auto Loader na camada Bronze


O Auto Loader é um recurso do Apache Spark que facilita a ingestão incremental de dados em tempo real. Ele permite a leitura automática de novos arquivos em um diretório monitorado, aplicando um esquema definido e gerenciando checkpoints para garantir a consistência dos dados. 

Com o Auto Loader, os dados são ingeridos de forma atômica, garantindo que não haja perda de informações em caso de falhas no processo de carregamento. Essa abordagem é especialmente útil para cenários de big data, onde grandes volumes de dados são gerados constantemente, proporcionando uma solução eficiente e resiliente para a ingestão de dados.

In [0]:
checkpoint_path = "dbfs:/mnt/checkpoints/bronze"
schema_path = "dbfs:/mnt/checkpoints/bronze_schema"
source_path = "dbfs:/mnt/daily_data/"  # Caminho de origem dos arquivos JSON

def process_bronze():
    query = (spark.readStream
                  .format("cloudFiles")  # Utiliza Auto Loader para ingestão eficiente de dados
                  .option("cloudFiles.format", "json") # Especifica que os dados de origem estão no formato JSON
                  .option("cloudFiles.schemaLocation", schema_path)
                  .load(source_path) # Diretório de origem contendo os arquivos de dados diários
                  .writeStream
                  .option("checkpointLocation", checkpoint_path)  # Localização do checkpoint para processamento com estado
                  .trigger(availableNow=True) # Dispara a consulta para processar todos os dados disponíveis e terminar
                  # pode ser usado diferentes parâmetros, como "processingTime", com o qual você define um intervalo de tempo para os dados serem processados novamente na tabela Bronze.
                  .table("bronze"))

    query.awaitTermination()

In [0]:
process_bronze()

abaixo, a consulta é feita da tabela gerada, você pode repetir a consulta assim que fizer a ingestão de novos dados com o gerador de dados.

In [0]:
# Consultar a tabela bronze para ver os dados processados
bronze_df = spark.table("bronze")
bronze_df.show()  # Mostrar as primeiras linhas


+-----+-----------+--------+--------------------+-------------+
|price|    product|quantity|           sale_date|_rescued_data|
+-----+-----------+--------+--------------------+-------------+
|40.26| Product_49|       5|2024-09-12T20:32:...|         NULL|
|98.56| Product_92|       9|2024-10-02T20:32:...|         NULL|
|53.93| Product_23|       4|2024-09-12T20:32:...|         NULL|
|78.76| Product_90|       9|2024-09-14T20:32:...|         NULL|
|20.55| Product_83|       7|2024-10-11T20:32:...|         NULL|
|47.49| Product_73|       4|2024-09-25T20:32:...|         NULL|
|92.56| Product_65|       7|2024-09-29T20:32:...|         NULL|
|14.79|  Product_6|       3|2024-10-05T20:32:...|         NULL|
|88.42| Product_52|       9|2024-09-24T20:32:...|         NULL|
|15.56| Product_23|       6|2024-09-11T20:32:...|         NULL|
|79.61| Product_68|       5|2024-10-07T20:32:...|         NULL|
|42.42|Product_100|       6|2024-09-26T20:32:...|         NULL|
|98.07| Product_41|       3|2024-10-11T2

In [0]:
bronze_df.count()

6000

## Processamento em batch da camada bronze para silver usando processamento incremental com Structured Streaming

In [0]:
from pyspark.sql import functions as F
    
def update_silver():
    query = (spark.readStream
                  .table("bronze")  # Leitura da tabela Bronze em modo de streaming
                  .withColumn("processed_time", F.current_timestamp())  # Adiciona uma coluna de timestamp
                  .writeStream
                  .option("checkpointLocation", "dbfs:/mnt/checkpoints/silver")  # Localização do checkpoint para a tabela Silver
                  .trigger(availableNow=True)  # Processa todos os dados disponíveis e, em seguida, encerra
                  .table("silver"))  # Grava os dados processados na tabela Silver
    
    query.awaitTermination()  # Aguarda a finalização da consulta antes de encerrar

In [0]:
#chamando a função
update_silver()

In [0]:
# Realizando o count na Silver para verificar a carga incremental:
spark.sql("""
select count(1)
from silver
""").display()

count(1)
6000
