Bloco 1: Instalação de Dependências

In [1]:
import sys
!{sys.executable} -m pip install pyarrow



Bloco 2: Importação das Bibliotecas

In [2]:
# Importando todas as ferramentas necessárias
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime


# Importa as bibliotecas específicas para a conversão (do seu script)
import pyarrow as pa
import pyarrow.parquet as pq
import os

print(os.getcwd())


print("Bibliotecas de trabalho importadas.")

/home/ninja/Documentos/clickbuproto/notebooks
Bibliotecas de trabalho importadas.


Bloco 3: [SETUP ÚNICO] Leitura do CSV Original e Sample ( quando aplicável )



In [3]:
def sample_dataframe_by_group(df: pd.DataFrame, group_column: str, sample_percentage: float) -> pd.DataFrame:
    """
    Reduz o tamanho de um DataFrame amostrando um percentual de grupos únicos.

    Garante que, se um grupo for selecionado, todos os seus registros
    sejam incluídos no DataFrame final.

    Args:
        df: O DataFrame original.
        group_column: A coluna usada para agrupar os dados (ex: 'fk_contact').
        sample_percentage: O percentual de grupos a serem selecionados (ex: 0.12 para 12%).

    Returns:
        Um novo DataFrame contendo aproximadamente o percentual desejado
        de dados, com a garantia de que os grupos estão completos.
    """
    print(f"DataFrame original com {len(df)} registros e {df[group_column].nunique()} clientes únicos.")

    # 1. Obter a lista de todos os clientes únicos
    unique_customers = df[group_column].unique()

    # 2. Amostrar o percentual desejado de clientes únicos
    # O método sample() do pandas é altamente otimizado para isso.
    # A semente (random_state) garante que a amostragem seja reprodutível.
    sampled_customers = pd.Series(unique_customers).sample(
        frac=sample_percentage,
        random_state=42
    )

    # 3. Filtrar o DataFrame original para manter apenas os registros dos clientes selecionados
    # O método isin() é muito eficiente para este tipo de filtro.
    df_sampled = df[df[group_column].isin(sampled_customers)]

    print(f"DataFrame final com {len(df_sampled)} registros ({len(df_sampled)/len(df):.2%})")
    print(f"e {df_sampled[group_column].nunique()} clientes únicos ({df_sampled[group_column].nunique()/df[group_column].nunique():.2%}).")

    return df_sampled


# Define o caminho do CSV original e faz a leitura
print("Iniciando leitura do CSV original (pode demorar)...")

caminho_csv_original = '../data/df_t.csv'
dados = pd.read_csv(caminho_csv_original)

print("CSV original carregado com sucesso.")

print("CSV original carregado com sucesso.")
dados = sample_dataframe_by_group(
            df=dados,
            group_column='fk_contact',
            sample_percentage=0.12
        )


display(dados.head(2))

Iniciando leitura do CSV original (pode demorar)...
CSV original carregado com sucesso.
CSV original carregado com sucesso.
DataFrame original com 1741344 registros e 581817 clientes únicos.
DataFrame final com 205794 registros (11.82%)
e 69818 clientes únicos (12.00%).


Unnamed: 0,nk_ota_localizer_id,fk_contact,date_purchase,time_purchase,place_origin_departure,place_destination_departure,place_origin_return,place_destination_return,fk_departure_ota_bus_company,fk_return_ota_bus_company,gmv_success,total_tickets_quantity_success
0,bc02d5245bec63b30ff1102fa273fc03f58bc9cc3f674e...,a7218ff4ee7d37d48d2b4391b955627cb089870b934912...,2018-12-26,15:33:35,6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d...,50e9a8665b62c8d68bccc77c7c92431a1aa26ccbd38ed4...,0,0,8527a891e224136950ff32ca212b45bc93f69fbb801c3b...,1,89.09,1
2,fb3caed9b2f1b6016d45ccddb19095476e61a2c85faa8e...,3467ec081e2421e72c96e7203b929d21927fd00b6b5f28...,2018-12-21,18:41:54,7688b6ef52555962d008fff894223582c484517cea7da4...,8c1f1046219ddd216a023f792356ddf127fce372a72ec9...,0,0,ec2e990b934dde55cb87300629cedfc21b15cd28bbcf77...,1,121.99,1


Bloco 4: [SETUP ÚNICO] Conversão para Parquet

In [4]:
# Define o caminho de saída e salva em Parquet usando PyArrow
nome_do_arquivo_parquet = '../data/raw/dados.parquet'

print(f"Iniciando conversão para Parquet...")
dados.to_parquet(nome_do_arquivo_parquet, engine='pyarrow', index=False)

print(f"DataFrame salvo com sucesso em '{nome_do_arquivo_parquet}'")

# Verificação opcional: lemos o arquivo que acabamos de salvar para garantir que funcionou
df_lido = pd.read_parquet(nome_do_arquivo_parquet, engine='pyarrow')
print("\\nVerificação: DataFrame lido do Parquet:")
display(df_lido.head(2))

Iniciando conversão para Parquet...
DataFrame salvo com sucesso em '../data/raw/dados.parquet'
\nVerificação: DataFrame lido do Parquet:


Unnamed: 0,nk_ota_localizer_id,fk_contact,date_purchase,time_purchase,place_origin_departure,place_destination_departure,place_origin_return,place_destination_return,fk_departure_ota_bus_company,fk_return_ota_bus_company,gmv_success,total_tickets_quantity_success
0,bc02d5245bec63b30ff1102fa273fc03f58bc9cc3f674e...,a7218ff4ee7d37d48d2b4391b955627cb089870b934912...,2018-12-26,15:33:35,6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d...,50e9a8665b62c8d68bccc77c7c92431a1aa26ccbd38ed4...,0,0,8527a891e224136950ff32ca212b45bc93f69fbb801c3b...,1,89.09,1
1,fb3caed9b2f1b6016d45ccddb19095476e61a2c85faa8e...,3467ec081e2421e72c96e7203b929d21927fd00b6b5f28...,2018-12-21,18:41:54,7688b6ef52555962d008fff894223582c484517cea7da4...,8c1f1046219ddd216a023f792356ddf127fce372a72ec9...,0,0,ec2e990b934dde55cb87300629cedfc21b15cd28bbcf77...,1,121.99,1


Bloco 5: Carregamento dos Dados (Início do Pipeline Principal)


In [5]:
# Definindo o caminho e carregar o dataset (agora lendo o parquet)
caminho_dados = '../data/raw/dados.parquet'
nrows = 50000 # Definimos o tamanho da nossa amostra

# Lemos o arquivo parquet completo
df = pd.read_parquet(caminho_dados)

# Criamos nossa amostra de trabalho pegando as primeiras 'nrows' linhas
#df = df.head(nrows)

# Usado para visualizar as primeiras linhas para confirmar o carregamento
print(f"Amostra de dados com {len(df)} linhas carregada com sucesso:")
display(df.head())

Amostra de dados com 205794 linhas carregada com sucesso:


Unnamed: 0,nk_ota_localizer_id,fk_contact,date_purchase,time_purchase,place_origin_departure,place_destination_departure,place_origin_return,place_destination_return,fk_departure_ota_bus_company,fk_return_ota_bus_company,gmv_success,total_tickets_quantity_success
0,bc02d5245bec63b30ff1102fa273fc03f58bc9cc3f674e...,a7218ff4ee7d37d48d2b4391b955627cb089870b934912...,2018-12-26,15:33:35,6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d...,50e9a8665b62c8d68bccc77c7c92431a1aa26ccbd38ed4...,0,0,8527a891e224136950ff32ca212b45bc93f69fbb801c3b...,1,89.09,1
1,fb3caed9b2f1b6016d45ccddb19095476e61a2c85faa8e...,3467ec081e2421e72c96e7203b929d21927fd00b6b5f28...,2018-12-21,18:41:54,7688b6ef52555962d008fff894223582c484517cea7da4...,8c1f1046219ddd216a023f792356ddf127fce372a72ec9...,0,0,ec2e990b934dde55cb87300629cedfc21b15cd28bbcf77...,1,121.99,1
2,2ee9d0978acb5e113d0b3f846ab3f88c5a426321da8f87...,e15109b0b8e9f6f1554e560837eb55543f035f91d8be4f...,2021-02-19,19:11:40,2fca346db656187102ce806ac732e06a62df0dbb2829e5...,be47addbcb8f60566a3d7fd5a36f8195798e2848b36819...,0,0,1d0ebea552eb43d0b1e1561f6de8ae92e3de7f1abec523...,1,188.99,1
3,92c004fb7013d0b6cc363bf860b63ebcb4ab8827f4c2b2...,ec66b05934a8d00f661257ea91874241fccb4fb128028d...,2020-12-03,09:16:29,7688b6ef52555962d008fff894223582c484517cea7da4...,a3af7b3808c4cf72478d05c9bab9c0d47e31c1d2cb3a29...,0,0,3068430da9e4b7a674184035643d9e19af3dc7483e31cc...,1,232.96,1
4,7fdcd3e03f08ecea34edbe57ac88e0c8c2ef2645eceb40...,e15109b0b8e9f6f1554e560837eb55543f035f91d8be4f...,2021-04-09,09:25:08,2fca346db656187102ce806ac732e06a62df0dbb2829e5...,be47addbcb8f60566a3d7fd5a36f8195798e2848b36819...,0,0,1d0ebea552eb43d0b1e1561f6de8ae92e3de7f1abec523...,1,187.57,1


Bloco 6: Limpeza e Transformação dos Dados

In [6]:
# Diagnóstico técnico e conversão de tipos de dados
print("Estrutura inicial do DataFrame:")
df.info()

# Concatenando as colunas de data e hora e convertendo para o tipo datetime
print("\nConvertendo colunas de data e hora...")
df['datetime_purchase'] = pd.to_datetime(df['date_purchase'] + ' ' + df['time_purchase'])

# Apagando as colunas de texto originais para manter o DataFrame limpo
df = df.drop(columns=['date_purchase', 'time_purchase'])

print("\nEstrutura do DataFrame após conversão:")
df.info()

Estrutura inicial do DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 205794 entries, 0 to 205793
Data columns (total 12 columns):
 #   Column                          Non-Null Count   Dtype  
---  ------                          --------------   -----  
 0   nk_ota_localizer_id             205794 non-null  object 
 1   fk_contact                      205794 non-null  object 
 2   date_purchase                   205794 non-null  object 
 3   time_purchase                   205794 non-null  object 
 4   place_origin_departure          205794 non-null  object 
 5   place_destination_departure     205794 non-null  object 
 6   place_origin_return             205794 non-null  object 
 7   place_destination_return        205794 non-null  object 
 8   fk_departure_ota_bus_company    205794 non-null  object 
 9   fk_return_ota_bus_company       205794 non-null  object 
 10  gmv_success                     205794 non-null  float64
 11  total_tickets_quantity_success  205794 non-nul

Bloco 7: Persistência (Salvar o Resultado da Limpeza)

In [7]:
# Salvando o DataFrame limpo (A AMOSTRA de 50k linhas) para a próxima etapa
caminho_saida_limpo = '../data/processed/dados_limpos.parquet'
df.to_parquet(caminho_saida_limpo, index=False)

print(f"Dados limpos salvos com sucesso em: '{caminho_saida_limpo}'")

Dados limpos salvos com sucesso em: '../data/processed/dados_limpos.parquet'


Bloco 8: Extrair IDs Únicos para Criação de Aliases


In [8]:
print("Extraindo IDs únicos (da nossa amostra) para a criação de aliases...")

# Extrai todos os fk_contact únicos da nossa amostra
clientes_unicos = df['fk_contact'].unique()
df_clientes_unicos = pd.DataFrame(clientes_unicos, columns=['fk_contact'])
df_clientes_unicos.to_csv('../data/raw/clientes_unicos.csv', index=False)
print(f"{len(df_clientes_unicos)} IDs de clientes únicos salvos em 'clientes_unicos.csv'")

# Extrai todos os locais únicos (origem e destino) da nossa amostra
locais_origem = df['place_origin_departure'].unique()
locais_destino = df['place_destination_departure'].unique()
todos_locais = np.union1d(locais_origem, locais_destino) # Pega a união para não ter duplicados

df_locais_unicos = pd.DataFrame(todos_locais, columns=['place_hash'])
df_locais_unicos.to_csv('../data/raw/locais_unicos.csv', index=False)
print(f"{len(df_locais_unicos)} hashes de locais únicos salvos em 'locais_unicos.csv'")

Extraindo IDs únicos (da nossa amostra) para a criação de aliases...
69818 IDs de clientes únicos salvos em 'clientes_unicos.csv'
2252 hashes de locais únicos salvos em 'locais_unicos.csv'
