# Projeto 06- Bigdata - Pandas

## Contextualização

A PyCoders Ltda., cada vez mais especializada no mundo da Engenharia de Dados, foi procurada por uma fintech para desenvolver um projeto de análise de dados.

A fintech percebeu que muito dos seus processos estão se tornando lentos pelo uso incorreto de ferramentas! Desde que está se trabalhando com Big Data, uso de bibliotecas como pandas e sklearn tornam a Extracção, Tratamento e Carregamento dos dados (ETL) processos muito lentos, inclusive o treinamento de modelos de machine learning (ML) tem se tornado um processo muito demorado.

Para lidar com esse problema, foi sugerido fazer uso da biblioteca pyspark, para implementar todo o fluxo de ETL.


## Objetivo de projeto

Como queremos demostrar que de fato a solução proposta traz uma melhora, foi solicitado implementar uma análise comparativa de resultados usando a antiga abordagem (usando pandas e sklearn) e usando a nova proposta de solução (pyspark). Para isso, tome em consideração o seguinte:

1. Escolha dois conjuntos de dados interessantes, sendo que um deles é pequeno (menos de 10.000 linhas) e o outro bem maior (acima de 1.000.000 linhas). Uma possivel sugestão seria usar um unico dataset (com muitos dados), e extrair uma pequena proporção dos dados desde dataset e considerar essa parte como o dataset menor.

   - **Sugestão:** <a href="https://www.kaggle.com/datasets/computingvictor/transactions-fraud-datasets" target="_blank">Transactions Fraud Dataset</a>.

2. Aplique todas as etapas de ETL nos dois conjuntos de dados usando pandas y pyspark. As etapas incluem: (1) Extração dos dados, por exemplo de um csv, (2) Tratamento dos dados (limpeza, alteração de nomes de colunas, criação de mais tabelas, transformação nas colunas, etc.), e, (3) Carregamento dos dados (salvar a transformação feita sobre os dados). 

3. Lembre que cada etapa tem que ser feita usando unicamente pandas/sklearn ou pyspark.

4. Como o objetivo é fazer uma análise comparativa, tome em consideração o tempo que demora cada etapa, para depois facilitar as comparações. 

Boa sorte e divirta-se!!

## Datasets


Você pode procurar conjuntos de dados aqui:

1. No repositório da <a href="https://archive.ics.uci.edu/ml/datasets.php" target="_blank">UCI</a>.
1. No <a href="https://www.kaggle.com/datasets" target="_blank">Kaggle</a>.


## Organização e entregáveis

1. O projeto pode ser feito em grupo de até 05 participantes.
2. O projeto completo (Notebook, código-fonte, link para fontes, bases e demais artefatos) deve ser enviado por e-mail ``jchambyd@gmail.com`` com nome dos participantes. Colocar no assunto do e-mail: ``Projeto Santander Coders 2024 - Data - Turma 1180``

## Deadline
**Apresentação**: 10/12/2024 <br>


## Exemplo:

[ETL simples usando pandas][1]

[ETL simples usando pyspark][2]

[1]: https://blog.devgenius.io/basic-etl-using-pandas-23729ae4e05e

[2]: https://blog.devgenius.io/basic-etl-using-pyspark-ed08b7e53cf4

### ETL pandas

In [None]:
pip install gdown

In [None]:
import pandas as pd
import time
import gdown
import os

In [None]:
dict_datasets = {
    "transactions_data.csv": "1cZrxq0hTNZkZvH9_LrtG8idximIuZxiy",
    "cards_data.csv": "1YRzsRf5xVLEfq8oSb2KOdFG5E3Zfk03P",
    "users_data.csv": "1iVEIVr_2RgOnR_TjOHKQ1WgbXf5i2nFq"}

for file_output, file_id in dict_datasets.items():
    gdown.download(f"https://drive.google.com/uc?id={file_id}", file_output, quiet=False)
    dbfs_path = f"dbfs:/FileStore/{file_output}"
    os.makedirs(os.path.dirname(dbfs_path), exist_ok=True)
    os.rename(file_output, dbfs_path)
    print(f"Arquivo salvo em {dbfs_path}")

In [2]:
# Controle dos tempos de execução de cada tabela para cada processo
colunas = ['transacoes', 'cartoes', 'usuarios']
linhas = ['extracao', 'transformacao', 'carga']
df_tempo_execucao = pd.DataFrame(index=linhas, columns=colunas)

#### 1. Extração de dados

##### 1.1. Dataset de Transações

In [None]:
start_time_extraction_transactions = time.time()

data_raw = pd.read_csv('dbfs:/FileStore/transactions_data.csv')
data_raw.head()

end_time_extraction_transactions = time.time()
df_tempo_execucao.loc['extracao', 'transacoes'] = end_time_extraction_transactions - start_time_extraction_transactions
print(f"Tempo total da extração do dataset de transações: {df_tempo_execucao.loc['extracao', 'transacoes']:.2f} segundos")

Tempo total da extração do dataset de transações: nan segundos


##### 1.2. Dataset de Cartões

In [None]:
start_time_extraction_cards = time.time()

data_raw_card=pd.read_csv('dbfs:/FileStore/cards_data.csv')
data_raw_card.head()

end_time_extraction_cards = time.time()
df_tempo_execucao.loc['extracao', 'cartoes'] = end_time_extraction_cards - start_time_extraction_cards
print(f"Tempo total da extração do dataset de cartões: {df_tempo_execucao.loc['extracao', 'cartoes']:.2f} segundos")

Tempo total da extração do dataset de cartões: nan segundos


##### 1.3. Dataset de Usuários

In [None]:
start_time_extraction_users = time.time()

data_raw_users=pd.read_csv('dbfs:/FileStore/users_data.csv')
data_raw_users.head()

end_time_extraction_users = time.time()
df_tempo_execucao.loc['extracao', 'usuarios'] = end_time_extraction_users - start_time_extraction_users
print(f"Tempo total da extração do dataset de usuários: {df_tempo_execucao.loc['extracao', 'usuarios']:.2f} segundos")

Tempo total da extração do dataset de usuários: nan segundos


#### 2. Análise inicial dos Dados

In [6]:
data_bronze_trans = data_raw.copy()
data_bronze_users = data_raw_users.copy()
data_bronze_card = data_raw_card.copy()

In [7]:
print(f'Quantidade de registro de dados transacionais: {data_bronze_trans.shape[0]}')
print(f'Quantidade de registro de cartão de crédito: {data_bronze_card.shape[0]}')
print(f'Quantidade de registro de usuários: {data_bronze_users.shape[0]}')

Quantidade de registro de dados transacionais: 13305915
Quantidade de registro de cartão de crédito: 6146
Quantidade de registro de usuários: 2000


In [8]:
print("Tipos de dados da tabela de transações: \n")
print(data_bronze_trans.dtypes)
print("\nTipos de dados da tabela de cartões: \n")
print(data_bronze_card.dtypes)
print("\nTipos de dados da tabela de usuários: \n")
print(data_bronze_users.dtypes)

Tipos de dados da tabela de transações: 

id                  int64
date               object
client_id           int64
card_id             int64
amount             object
use_chip           object
merchant_id         int64
merchant_city      object
merchant_state     object
zip               float64
mcc                 int64
errors             object
dtype: object

Tipos de dados da tabela de cartões: 

id                        int64
client_id                 int64
card_brand               object
card_type                object
card_number               int64
expires                  object
cvv                       int64
has_chip                 object
num_cards_issued          int64
credit_limit             object
acct_open_date           object
year_pin_last_changed     int64
card_on_dark_web         object
dtype: object

Tipos de dados da tabela de usuários: 

id                     int64
current_age            int64
retirement_age         int64
birth_year             int64
birth

#### 3. Trasformação e limpeza de dados

In [9]:
def amount_null_dataframe(data_frame: pd.DataFrame) -> pd.Series:
    """Método utilitário que verifica em porcentagem a quantidade de dados nulas em cada coluna do Dataframe.

    Args:
        data_frame (pd.DataFrame): Dataframe para análise

    Returns:
        pd.Series: Porcetagem de nulos de cada coluna.
    """
    data_missing = data_frame.isna().sum()
    data_missing = (data_missing/len(data_frame))*100
    return data_missing.sort_values(ascending=False)

##### 3.1. Dataset de Transações

In [10]:
def adjust_columns_transactions(data_bronze_trans: pd.DataFrame) -> pd.DataFrame:
    data_bronze_trans['date'] = pd.to_datetime(data_bronze_trans['date'])
    data_bronze_trans['amount'] = data_bronze_trans['amount'].str.replace('$','')
    data_bronze_trans['amount'] = data_bronze_trans['amount'].astype(float)
    data_bronze_trans['merchant_city'] = data_bronze_trans['merchant_city'].astype(str)
    data_bronze_trans['merchant_state'] = data_bronze_trans['merchant_state'].astype(str)
    data_bronze_trans['zip'] = data_bronze_trans['zip'].astype(str)
    data_bronze_trans['zip'] = data_bronze_trans['zip'].str.replace('.0','')
    return data_bronze_trans


start_time_transform_transactions = time.time()

data_bronze_trans = adjust_columns_transactions(data_bronze_trans)
print()
print(amount_null_dataframe(data_bronze_trans))
data_bronze_trans.drop(columns=['errors'], inplace=True)  #Coluna quase toda nula

end_time_transform_transactions = time.time()
df_tempo_execucao.loc['transformacao', 'transacoes'] = end_time_transform_transactions - start_time_transform_transactions
print(f"\nTempo total da transformação do dataset de transações: {df_tempo_execucao.loc['transformacao', 'transacoes']:.2f} segundos")

print()
print(data_bronze_trans.dtypes)


errors            98.411286
id                 0.000000
date               0.000000
client_id          0.000000
card_id            0.000000
amount             0.000000
use_chip           0.000000
merchant_id        0.000000
merchant_city      0.000000
merchant_state     0.000000
zip                0.000000
mcc                0.000000
dtype: float64

Tempo total da transformação do dataset de transações: 62.87 segundos

id                         int64
date              datetime64[ns]
client_id                  int64
card_id                    int64
amount                   float64
use_chip                  object
merchant_id                int64
merchant_city             object
merchant_state            object
zip                       object
mcc                        int64
dtype: object


##### 3.2. Dataset de Cartões

In [11]:
def adjust_columns_card(data_bronze_card: pd.DataFrame) -> pd.DataFrame:
    data_bronze_card['expires'] =  pd.to_datetime(data_bronze_card['expires'], format='%m/%Y').dt.to_period('M')
    data_bronze_card['acct_open_date'] =  pd.to_datetime(data_bronze_card['acct_open_date'], format='%m/%Y').dt.to_period('M')
    data_bronze_card['has_chip'] = data_bronze_card['has_chip'].map({'YES': True, 'NO': False})
    data_bronze_card['card_on_dark_web'] = data_bronze_card['card_on_dark_web'].map({'Yes': True, 'No': False})
    data_bronze_card['credit_limit'] = data_bronze_card['credit_limit'].str.replace('$','')
    data_bronze_card['credit_limit'] = data_bronze_card['credit_limit'].str.replace('$','').astype(float)
    return data_bronze_card

start_time_transform_cards = time.time()

data_bronze_card = adjust_columns_card(data_bronze_card)
print()
print(amount_null_dataframe(data_bronze_card))
#Não há colunas a serem excluídas por motivo de nulos

end_time_transform_cards = time.time()
df_tempo_execucao.loc['transformacao', 'cartoes'] = end_time_transform_cards - start_time_transform_cards
print(f"\nTempo total da transformação do dataset de cartões: {df_tempo_execucao.loc['transformacao', 'cartoes']:.2f} segundos")

print()
print(data_bronze_card.dtypes)


id                       0.0
client_id                0.0
card_brand               0.0
card_type                0.0
card_number              0.0
expires                  0.0
cvv                      0.0
has_chip                 0.0
num_cards_issued         0.0
credit_limit             0.0
acct_open_date           0.0
year_pin_last_changed    0.0
card_on_dark_web         0.0
dtype: float64

Tempo total da transformação do dataset de cartões: 0.10 segundos

id                           int64
client_id                    int64
card_brand                  object
card_type                   object
card_number                  int64
expires                  period[M]
cvv                          int64
has_chip                      bool
num_cards_issued             int64
credit_limit               float64
acct_open_date           period[M]
year_pin_last_changed        int64
card_on_dark_web              bool
dtype: object


##### 3.3. Dataset de Usuários

In [12]:
def adjust_columns_users(data_bronze_users: pd.DataFrame) -> pd.DataFrame:
    data_bronze_users['per_capita_income'] = data_bronze_users['per_capita_income'].str.replace('$','')
    data_bronze_users['per_capita_income'] = data_bronze_users['per_capita_income'].str.replace('$','').astype(float)
    data_bronze_users['yearly_income'] = data_bronze_users['yearly_income'].str.replace('$','')
    data_bronze_users['yearly_income'] = data_bronze_users['yearly_income'].str.replace('$','').astype(float)
    data_bronze_users['total_debt'] = data_bronze_users['total_debt'].str.replace('$','')
    data_bronze_users['total_debt'] = data_bronze_users['total_debt'].str.replace('$','').astype(float)
    return data_bronze_users

start_time_transform_users = time.time()

data_bronze_users = adjust_columns_users(data_bronze_users)

print()
print(amount_null_dataframe(data_bronze_users))
#Não há colunas a serem excluídas por motivo de nulos

end_time_transform_users = time.time()
df_tempo_execucao.loc['transformacao', 'usuarios'] = end_time_transform_users - start_time_transform_users
print(f"\nTempo total da transformação do dataset de usuários: {df_tempo_execucao.loc['transformacao', 'usuarios']:.2f} segundos")

print()
print(data_bronze_users.dtypes)


id                   0.0
current_age          0.0
retirement_age       0.0
birth_year           0.0
birth_month          0.0
gender               0.0
address              0.0
latitude             0.0
longitude            0.0
per_capita_income    0.0
yearly_income        0.0
total_debt           0.0
credit_score         0.0
num_credit_cards     0.0
dtype: float64

Tempo total da transformação do dataset de usuários: 2.40 segundos

id                     int64
current_age            int64
retirement_age         int64
birth_year             int64
birth_month            int64
gender                object
address               object
latitude             float64
longitude            float64
per_capita_income    float64
yearly_income        float64
total_debt           float64
credit_score           int64
num_credit_cards       int64
dtype: object


#### 4. Carregamento banco de dados

##### 4.1. Dataset de Transações

In [None]:
start_time_load_transactions = time.time()

df_trans_silver = data_bronze_trans.copy()
df_trans_silver.to_parquet('dbfs:/FileStore/Tabela_Transacoes_Silver')

end_time_load_transactions = time.time()
df_tempo_execucao.loc['carga', 'transacoes'] = end_time_load_transactions - start_time_load_transactions
print(f"Tempo total de execução da carga no dataset de transações: {df_tempo_execucao.loc['carga', 'transacoes']:.2f} segundos")

Tempo total de execução da carga no dataset de transações: 26.79 segundos


##### 4.2. Dataset de Cartões

In [None]:
start_time_load_cards = time.time()

df_card_silver = data_bronze_card.copy()
df_card_silver.to_parquet('dbfs:/FileStore/Tabela_Cartoes_Credito_Silver')

end_time_load_cards = time.time()
df_tempo_execucao.loc['carga', 'cartoes'] = end_time_load_cards - start_time_load_cards
print(f"Tempo total de execução da carga no dataset de cartões: {df_tempo_execucao.loc['carga', 'cartoes']:.2f} segundos")

Tempo total de execução da carga no dataset de cartões: 0.09 segundos


##### 4.3. Dataset de Usuários

In [None]:
start_time_load_users = time.time()

df_users_silver = data_bronze_users.copy()
df_users_silver.to_parquet('dbfs:/FileStore/Tabela_Usuarios_Silver')

end_time_load_users = time.time()
df_tempo_execucao.loc['carga', 'usuarios'] = end_time_load_users - start_time_load_users
print(f"Tempo total de execução da carga no dataset de usuários: {df_tempo_execucao.loc['carga', 'usuarios']:.2f} segundos")

Tempo total de execução da carga no dataset de usuários: 0.03 segundos


In [16]:
df_tempo_execucao

Unnamed: 0,transacoes,cartoes,usuarios
extracao,33.767671,0.032851,0.013856
transformacao,62.867598,0.103056,2.396764
carga,26.794364,0.090338,0.029251
