# Sessão Spark

In [1]:
import findspark
import os

findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Spark csv schema inference") \
    .config("spark.sql.warehouse.dir") \
    .enableHiveSupport() \
    .getOrCreate()

## Imports

In [128]:
from pyspark.sql.functions import *
import pandas as pd
import sqlite3
import time
import pytest
from unittest.mock import MagicMock

## Parâmetros do notebook

In [3]:
# Caminho dos CSVs
sales_data_path = 'data/sales_data.csv'

## Carregando dados

In [4]:
sales = spark.read.csv(sales_data_path, sep = ',', header = True)

# Verificando se todos os IDs de usuários são únicos

In [6]:
sales.select(count('transaction_id'), countDistinct('transaction_id')).show()

+---------------------+------------------------------+
|count(transaction_id)|count(DISTINCT transaction_id)|
+---------------------+------------------------------+
|                   40|                            40|
+---------------------+------------------------------+



### transaction_id único, verificado que há 40 IDs na tabela sales e todos são distintos

## Confirmando se os valores de vendas não são negativos

In [11]:
sales.select('sale_value').where(col('sale_value') <= 0).count()

0

### Não há nenhum valor de venda negativo

## Garantindo que todas as entradas tenham timestamps válidos

In [18]:
sales.select(col('date').cast('date')).distinct().printSchema()

root
 |-- date: date (nullable = true)



In [19]:
sales.select(col('date').cast('date')).distinct().show()

+----------+
|      date|
+----------+
|2023-07-29|
|2023-08-03|
|2023-07-31|
|2023-07-28|
|2023-08-01|
|2023-07-25|
|2023-07-30|
|2023-07-26|
|2023-07-27|
|2023-08-02|
+----------+



In [20]:
sales.select(col('date').cast('date')).distinct().where(isnull(col('date'))).show()

+----+
|date|
+----+
+----+



### Não há datas nulas ou inválidas

## Quantidade de linhas ingeridas no banco de dados de sua escolha é igual a quantidade de linhas originais

In [38]:
conexao = sqlite3.connect('itau.db')

In [23]:
rows = conexao.execute('select count(*) from tb_sales_data')
columns = [col[0] for col in rows.description]
info = pd.DataFrame(rows.fetchall(), columns=columns)
display(info)

Unnamed: 0,count(*)
0,40


In [24]:
sales.count()

40

In [25]:
conexao.close()

### Como é possível verificar, há 40 linhas no itau.db e 40 no CSV carregado, ambas com as mesmas informações, sales_data

## Monitoramento do tempo que leva para os dados serem extraídos, transformados e carregados

In [36]:
# Extração
inicio1 = time.time()
sales = spark.read.csv(sales_data_path, sep = ',', header = True)
fim1 = time.time()
print("Extraction time: ", fim1 - inicio1)

# Limpando
inicio2 = time.time()
sales_clean = sales.distinct()
fim2 = time.time()
print("Cleaning time: ", fim2 - inicio2)

# Transformando
inicio3 = time.time()
sales_converted = sales_clean\
    .withColumn('sale_value', when(col('currency') == 'FICT', col('sale_value') * 0.75))\
    .withColumn('currency', when(col('currency') == 'FICT', lit('USD')))
fim3 = time.time()
print("Transforming time: ", fim3 - inicio3)

# Carregando
inicio4 = time.time()
sales_pd = sales_converted.toPandas()
conexao = sqlite3.connect('itau.db')
sales_pd.to_sql('tb_sales_data', conexao, if_exists='replace')
conexao.close()
fim4 = time.time()
print("Loading time: ", fim4 - inicio4)

# Tempo total
delta_time = (fim1 - inicio1) + (fim2 - inicio2) + (fim3 - inicio3) + (fim4 - inicio4)
print("Total time: ", delta_time)

Extraction time:  0.08404183387756348
Cleaning time:  0.0020034313201904297
Transforming time:  0.02193617820739746
Loading time:  2.1966471672058105
Total time:  2.304628610610962


## Implementando alertas para qualquer falha ou anomalia durante o processo ETL.

#### Para o primeiro alerta, a verificação é em cima da contagem de linhas na extração e no carregamento
#### O segundo ponto a se verificar é a quantidade de nulos
#### Por fim, verificar o tempo de execução pode ser importante para detectar gargalos de processamento

In [114]:
def verifica_tb_extract(df_extracted, df_loaded):
    aproved = 0
    # Primeiro, contagens
    if len(df_extracted.index) == len(df_loaded.index):
        print('Contagens iguais: ', len(df_extracted.index))
        aproved += 1
    else:
        print('Alerta! Contagens não batem, extracted: ', len(df_extracted.index), 'loaded: ', len(df_loaded.index))
    
    # Verificando se a quantidade de nulos, em cada coluna, bate
    if (df_extracted.isna().sum() == df_loaded.isna().sum()).sum()/len(df_extracted.columns) == 1.0:
        print("Quantidades de nulo iguais: \n", df_extracted.isna().sum())
        aproved += 1
    else:
        print("Quantidades de nulo desiguais! Verifique as colunas: \n", 
              (df_extracted.isna().sum() == df_loaded.isna().sum()))
    
    return True if aproved == 2 else False

In [111]:
# Loaded
conexao = sqlite3.connect('itau.db')
rows = conexao.execute('select * from tb_sales_data')
columns = [col[0] for col in rows.description]
df_loaded = pd.DataFrame(rows.fetchall(), columns=columns).drop('index', axis=1)
conexao.close()

# Extracted
df_extracted = pd.read_csv(sales_data_path, sep = ',')

In [115]:
verifica_tb_extract(df_extracted, df_loaded)

Contagens iguais:  40
Quantidades de nulo iguais: 
 transaction_id    0
date              0
product_id        0
seller_id         0
sale_value        0
currency          0
dtype: int64


True

### A função construída verifica se a contagem de linhas e nulos coincide e retorna verdadeiro ou falso para aprovar o processo

## Descreva como você rastrearia um problema no pipeline, desde o alerta até a fonte do problema:

#### Meu primeiro passo seria verificar todas as contagens, em seguida os tipos de dados e a possibilidade disso intervir em alguma etapa do processo.
#### Verificado o passo anterior, caso haja uma inconsistência nos dados e/ou tipo de dados, eu verificaria um teste unitário até encontrar alguma inconsistência
#### Dependendo da natureza do problema, caso seja problema com processamento de dados que estão excedendo a capacidade do sistema, seria necessário averiguar cada etapa da pipeline a fim de encontrar o gargalo, a parte do processo onde está consumindo mais recursos, e dividí-la em mais etapas, utilizando a ideia de dividir para conquistar.

# Teste Unitário

### O teste unitário será realizado de forma sucinta utilizando a função 'verifica_tb_extract', aproveitando a função escrita anteriormente

## O arquivo do teste encontra-se na pasta e é denominado 'test_func.py'

### A seguir, serão realizadas uma sequência de testes, mudando os data frames de testes a serem testados diretamente no código, em seguida, os resultados serão comentados!

#### Happy path

In [144]:
!pytest

platform win32 -- Python 3.11.3, pytest-7.3.1, pluggy-1.0.0
rootdir: D:\arquivos\Documentos-HD\Scripts\Jupyter\teste_eng_dados_itau
plugins: anyio-3.5.0
collected 1 item

test_func.py [32m.[0m[32m                                                           [100%][0m



#### Com Nulos, porém mesmas quantidades de linhas e nulos, esperado que dê certo

In [148]:
!pytest 

platform win32 -- Python 3.11.3, pytest-7.3.1, pluggy-1.0.0
rootdir: D:\arquivos\Documentos-HD\Scripts\Jupyter\teste_eng_dados_itau
plugins: anyio-3.5.0
collected 1 item

test_func.py [32m.[0m[32m                                                           [100%][0m



#### Diferente quantidade de linhas (Erro!)

In [149]:
!pytest 

platform win32 -- Python 3.11.3, pytest-7.3.1, pluggy-1.0.0
rootdir: D:\arquivos\Documentos-HD\Scripts\Jupyter\teste_eng_dados_itau
plugins: anyio-3.5.0
collected 1 item

test_func.py [31mF[0m[31m                                                           [100%][0m

[31m[1m__________________________ test_verifica_tb_extract ___________________________[0m

    [94mdef[39;49;00m [92mtest_verifica_tb_extract[39;49;00m():[90m[39;49;00m
        actual_result = verifica_tb_extract(df5, df6)[90m[39;49;00m
>       [94massert[39;49;00m actual_result == [94mTrue[39;49;00m[90m[39;49;00m
[1m[31mE       assert False == True[0m

[1m[31mtest_func.py[0m:37: AssertionError
---------------------------- Captured stdout call -----------------------------
Alerta! Contagens não batem, extracted:  2 loaded:  3
Quantidades de nulo iguais: 
 A    0
B    0
C    0
dtype: int64
[31mFAILED[0m test_func.py::[1mtest_verifica_tb_extract[0m - assert False == True


#### Diferente quantidade de nulo, mesmo número de linhas (Erro!)

In [150]:
!pytest 

platform win32 -- Python 3.11.3, pytest-7.3.1, pluggy-1.0.0
rootdir: D:\arquivos\Documentos-HD\Scripts\Jupyter\teste_eng_dados_itau
plugins: anyio-3.5.0
collected 1 item

test_func.py [31mF[0m[31m                                                           [100%][0m

[31m[1m__________________________ test_verifica_tb_extract ___________________________[0m

    [94mdef[39;49;00m [92mtest_verifica_tb_extract[39;49;00m():[90m[39;49;00m
        actual_result = verifica_tb_extract(df7, df8)[90m[39;49;00m
>       [94massert[39;49;00m actual_result == [94mTrue[39;49;00m[90m[39;49;00m
[1m[31mE       assert False == True[0m

[1m[31mtest_func.py[0m:37: AssertionError
---------------------------- Captured stdout call -----------------------------
Contagens iguais:  3
Quantidades de nulo desiguais! Verifique as colunas: 
 A    False
B    False
C    False
dtype: bool
[31mFAILED[0m test_func.py::[1mtest_verifica_tb_extract[0m - assert False == True


# Conclusões

### Realizado o teste unitário, utilizando o pytest e um script simples dentro da pasta, podemos observar que:

#### -> O Teste foi bem sucedido em encontrar as falhas previstas e aprovar os resultados corretos
#### -> A função é simples, ela apenas verifica quantidades de nulos e linhas entre os DFs, porém, ela indica de maneira satisfatória se houve algum problema na extração, transformação e carregamento dos dados ao BD itau.db simulado