<div style="text-align: left;">

## Módulo: ED-NA-001 - Extração de Dados I
<br>

## Aula 3 - Parte 1

<br>

### Programação da Aula 2:

> #### 1. **O que é uma carga em batch ou processamento em lotes e a diferença com um streaming de dados**;
> #### 2. **Exemplos de carga em batch**;
> #### 3. **O que é um Webhook**;
> #### 4. **Implementando um Webhook**;
> #### 5. **Exercício 1: Desenvolver um pipeline de dados com processamento em lotes;**
> #### 6. **Exercício 2: Implementar um Webhook com uma API.**


##O que é uma carga em batch ou processamento em lotes? Qual é a diferença com um streaming de dados?

#### O processamento em lotes envolve a análise de dados em intervalos fixos de tempo, o que resulta em uma latência maior, sendo adequado para análises retrospectivas, enquanto o processamento de streaming processa dados em tempo real à medida que são gerados, oferecendo baixa latência e sendo ideal para análises em tempo real e tomada de decisões imediatas com base nos dados recebidos.
<br>
<div style="text-align: center;">

## Exemplo de carga em batch
<img src="./processamento_batch.png"  width="70%" height="40%">
<br>
<br>

## Exemplo de streaming
<div style="text-align: center;">
<img src="./processamento_streaming.png"  width="70%" height="40%">
<br>
<br>

## Exemplo de carga em batch
### Implementar um processamento a lote que a cada 5 segundos percorre um diretório no DBFS buscando novos arquivos e carrega os novos resultados em um arquivo final de formato parquet. Após a carga, efetua uma transformação dos dados e armazena os resultados em outro arquivo parquet. 
### Os arquivos brutos de formato parquet são da seguinte base de dados do Kaggle relacionada a voos atrasados e cancelados: 
https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022

### Lista os arquivos brutos

In [None]:
dbutils.fs.mkdirs("/FileStore/tables/dados_brutos/")

In [0]:
dbutils.fs.ls("/FileStore/tables/dados_brutos/")

### A parit da listagem, é possível pegar os nomes dos arquivos brutos

In [0]:
dbutils.fs.ls("/FileStore/tables/dados_brutos/")[0][1]

In [0]:
#imprime o nome dos arquivos brutos
for arquivo in dbutils.fs.ls("/FileStore/tables/dados_brutos/"):
    print("nome_do_arquivo:", arquivo[1])

#### Chamada da biblioteca do Pandas API

In [0]:
import pyspark.pandas as ps

### Cria uma função para extração

In [0]:
def extract_data(dir_leitura, columns_name, index_col_name):

    df_res = ps.DataFrame() #inicializa um dataframe em branco
    #percorre todos os arquivos no diretorio de arquivos brutos
    for arquivo in dbutils.fs.ls(dir_leitura):
        print("nome_do_arquivo:", arquivo[1])
        dir_leitura_arquivo = dir_leitura + arquivo[1]
        #faz a leitura do arquivo corrente
        df = ps.read_parquet(dir_leitura_arquivo, columns=columns_name, index_col=index_col_name)
        #concatena o resultado corrente com o dataframe final
        df_res = ps.concat([df_res, df], ignore_index=True)
    
    print("resultado extraido com sucesso")

    # retorna o resultado final extraido
    return df_res

### Criar uma forma de identificar se existe arquivos em diretório do DBFS

In [0]:
#Quando não existe nenhum arquivo com o diretório passado, é gerado o seguinte erro: "java.io.FileNotFoundException"
dbutils.fs.ls("/FileStore/tables/dado_carregado/dado_consolidado.parquet")

In [0]:
dir_escrita = "/FileStore/tables/dado_carregado/dado_consolidado.parquet"
#Com uma exceção, é possível efetuar o processamento correto quando um diretório ainda não existe
try:
    dbutils.fs.ls(dir_escrita)
except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
        print("Arquivo não encontrado, primeiro processamento")

### Cria uma função para carregar os dados extraídos

In [0]:
def load_data(df_new, dir_escrita_bruto):
    try: #caso já exista o arquivo transformado, segue direto com a concatenação e com a carga do resultado final
        arquivo = dbutils.fs.ls(dir_escrita_bruto)
        df_res = ps.read_parquet(dir_escrita_bruto)
        df_res = ps.concat([df_res, df_new])
        df_res.to_parquet(dir_escrita_bruto)

    except Exception as e: #caso o arquivo transformando ainda não exista, quer dizer que é o primeiro processo do pipeline e é preciso criar o arquivo destino
        if 'java.io.FileNotFoundException' in str(e):
            print("Arquivo não encontrado, primeiro processamento")
            df_new.to_parquet(dir_escrita_bruto)

    print("resultado carregado com sucesso")

    #realiza a movimentação da pasta de dados brutos para a pasta de dados brutos já carregados
    dbutils.fs.mv("/FileStore/tables/", "/FileStore/tables/dados_brutos_carregados/", True)

### Cria uma função para a transformação dos dados

In [0]:
def transform_data(dir_escrita_bruto, dir_escrita_transformado):

    df = ps.read_parquet(dir_escrita_bruto) 
    #Coluna de total de atraso que é a soma do atraso na chegada no local de origem e destino
    df["Total_Delay"] = df["ArrDelayMinutes"] + df["DepDelayMinutes"]

    #Filtra o estado de origem igual a Texas e o estado destino igual a California
    df = df[(df["OriginStateName"] == "Texas") | (df["DestStateName"] == "California")]

    #Agrupa os resultados pelo nome da cidade de origem e calcula a média do total de atrasos e o total da distância percorrida.
    df = df.groupby(["Year","Month"]).agg(Total_Delay_Average=('Total_Delay', 'mean'), Total_Distance=('Distance', 'sum'))

    #reseta o index
    df = df.reset_index()

    #Armazena o resultado transformado
    df.to_parquet(dir_escrita_transformado)

    print("transformação realizada com sucesso")

### cria uma função para o ELT

In [0]:
def elt(columns_name, index_col_name, dir_leitura, dir_escrita_bruto, dir_escrita_transformado):
    print("inicializa o ELT")
    try: # verifica se existe novos dados para serem processados
        arquivo = dbutils.fs.ls(dir_leitura)
        print("Arquivos novos encontrados")
        #faz a chamada da extração, da carga e da transformação dos dados
        df_new = extract_data(dir_leitura, columns_name, index_col_name)
        load_data(df_new, dir_escrita_bruto)
        transform_data(dir_escrita_bruto, dir_escrita_transformado)
    except Exception as e: #caso não exista nenhum dado novo, retorna com a mensagem e encerra o processo
        if 'java.io.FileNotFoundException' in str(e):
            print("Nenhum dado novo")
        else:
            print("erro no ELT:", str(e))

### faz a chamada do ELT para um teste

In [0]:
#define o nome dados colunas do arquivo brutos que serão carregadas
columns_name = ["FlightDate",
"Year",
"Month",
"Airline", 
"Cancelled", 
"Diverted", 
"CRSDepTime", 
"DepTime", 
"DepDelayMinutes",
"DepDelay", 
"ArrTime", 
"ArrDelayMinutes",
"AirTime", 
"CRSElapsedTime",
"ActualElapsedTime",
"Distance", 
"OriginCityName",
"OriginStateName",
"DestCityName", 
"DestStateName"]

#define o indice do arquivo bruto que será carregado
index_col_name = ["Flight_Number_Operating_Airline"]

#define o diretório dos arquivos brutos
dir_leitura = "/FileStore/tables/"
#define o diretório de qual será o arquivo parquet final que irá armazenar os dados carregados
dir_escrita_bruto = "/FileStore/tables/dado_carregado/dado_consolidado.parquet"
#define o diretório de qual será o arquivo parquet final que irá armazenar os dados transformados
dir_escrita_transformado = "/FileStore/tables/dados_transformados/dado_transformado.parquet"

#faz a chmada do ELT
elt(columns_name, index_col_name, dir_leitura, dir_escrita_bruto, dir_escrita_transformado)

### Imprime o resultado do primeiro processamento de dados com os arquivos dos anos de 2020 e 2021

In [0]:
df = ps.read_parquet(dir_escrita_transformado)
df = df.sort_values(by=["Year", "Month"])
df.head(50)

### verifica se os dados processados foram movimentados para a pasta de dados brutos carregados

In [0]:
dbutils.fs.ls("/FileStore/tables/dados_brutos_carregados/")

### Cria uma função para contar um tempo e permitir um processamento a cada periodo de tempo definido

In [0]:
import time #utiliza a função time do python

tempo_proc = 5 #segundos

print("inicializa contagem")
for t in range(1, tempo_proc+1):
    print(t)
    time.sleep(1) #utiliza a função time sleep para forçar aguardar 1 segundo a cada iteração no loop
print("finaliza contagem")

In [0]:
# função para chamar a contagem de "tempo_proc" segundos
def timer(tempo_proc):
    print("inicializa contagem")
    for t in range(1, tempo_proc+1):
        print(t)
        time.sleep(1)
    print("finaliza contagem")

### Definição dos parametros para a chamada definitiva do processamento em lotes

In [0]:
#define o nome dados colunas do arquivo brutos que serão carregadas
columns_name = ["FlightDate",
"Year",
"Month",
"Airline", 
"Cancelled", 
"Diverted", 
"CRSDepTime", 
"DepTime", 
"DepDelayMinutes",
"DepDelay", 
"ArrTime", 
"ArrDelayMinutes",
"AirTime", 
"CRSElapsedTime",
"ActualElapsedTime",
"Distance", 
"OriginCityName",
"OriginStateName",
"DestCityName", 
"DestStateName"]

#define o indice do arquivo bruto que será carregado
index_col_name = ["Flight_Number_Operating_Airline"]

#define o diretório dos arquivos brutos
dir_leitura = "/FileStore/tables/"
#define o diretório de qual será o arquivo parquet final que irá armazenar os dados carregados
dir_escrita_bruto = "/FileStore/tables/dado_carregado/dado_consolidado.parquet"
#define o diretório de qual será o arquivo parquet final que irá armazenar os dados transformados
dir_escrita_transformado = "/FileStore/tables/dados_transformados/dado_transformado.parquet"

#definição do parametro do contador de 5 segundos
tempo_proc = 5

### Chamada definitiva do processamento em lotes a cada 5 segundos

In [0]:
#loop infinitivo chamando o contador e o ELT 
while 1==1:
    timer(tempo_proc)
    elt(columns_name, index_col_name, dir_leitura, dir_escrita_bruto, dir_escrita_transformado)