# Pipeline de Processamento ClickBus - SageMaker

**Descrição:** Pipeline completo para processamento de dados ClickBus utilizando SageMaker Processing Jobs.

**Estrutura:**
- **Script 1:** `transform_dados_01.py` - Transformação e enriquecimento
- **Script 2:** `proc_dados_02.py` - Segmentação e features agregadas

**Região:** sa-east-1 (São Paulo)

**Autor:** Pipeline ClickBus  
**Data:** 2024

## 1. Configuração Inicial e Imports

In [2]:
# Imports necessários
import boto3
import sagemaker
from sagemaker.processing import ScriptProcessor
from datetime import datetime
import pandas as pd

# Configurações do pipeline
BUCKET = 'cbchallenge'
BASE_PATH = 'Gold'
REGION = 'sa-east-1'
CODE_PATH = f'{BASE_PATH}/code'

# Configuração do container Docker personalizado
ACCOUNT_ID = '005102550942'
CONTAINER_URI = f'{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/clickbus-pipeline:latest'

# Inicializar sessão SageMaker
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

print("Configuração do Pipeline ClickBus - Docker Container")
print("=" * 60)
print(f"Bucket S3: {BUCKET}")
print(f"Pasta Base: {BASE_PATH}")
print(f"Região AWS: {REGION}")
print(f"Container URI: {CONTAINER_URI}")
print(f"Role SageMaker: {role}")
print(f"Início da sessão: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)

sagemaker.config INFO - Fetched defaults config from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
Configuração do Pipeline ClickBus - Docker Container
Bucket S3: cbchallenge
Pasta Base: Gold
Região AWS: sa-east-1


## 2. Configuração do Processador SageMaker

In [3]:
# Configurar processador para execução dos scripts
processor = ScriptProcessor(
    command=['python3'],
    image_uri = CONTAINER_URI,
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',  # Ajustar conforme necessidade
    volume_size_in_gb=30,
    max_runtime_in_seconds=3600,  # 1 hora de timeout
    sagemaker_session=sagemaker_session
)

print("Processador SageMaker Configurado")
print("-" * 40)
print(f"Imagem Docker: Python 3.8 (AWS Managed)")
print(f"Tipo de Instância: ml.m5.large")
print(f"Volume de Armazenamento: 30 GB")
print(f"Timeout Máximo: 1 hora")
print(f"Região de Execução: {REGION}")

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds
Processador SageMaker Configurado
----------------------------------------
Imagem Docker: Python 3.8 (AWS Managed)
Tipo de Instância: ml.m5.large
Volume de Armazenamento: 30 GB
Timeout Máximo: 1 hora
Região de Execução: sa-east-1


## 3. Validação Pré-Execução

Verificar se todos os arquivos necessários estão disponíveis no S3.

In [4]:
# Função para verificar existência de arquivos no S3
def verificar_arquivo_s3(bucket, key):
    """Verifica se um arquivo existe no S3."""
    s3_client = boto3.client('s3', region_name=REGION)
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
        return True
    except:
        return False

# Lista de arquivos necessários para execução
arquivos_necessarios = [
    f'{CODE_PATH}/transform_dados_01.py',
    f'{CODE_PATH}/proc_dados_02.py'
]

# Verificar scripts
print("Validação de Arquivos no S3")
print("-" * 30)
for arquivo in arquivos_necessarios:
    existe = verificar_arquivo_s3(BUCKET, arquivo)
    status = "OK" if existe else "FALTANDO"
    print(f"{arquivo}: {status}")

# Verificar pastas de dados de entrada
print("\nValidação de Pastas de Entrada")
print("-" * 30)
pastas_entrada = [
    f'{BASE_PATH}/Database Clickbus/',
    f'{BASE_PATH}/Identificador/',
    f'{BASE_PATH}/Eventos passados/',
    f'{BASE_PATH}/eventos/'
]

s3_client = boto3.client('s3', region_name=REGION)
for pasta in pastas_entrada:
    try:
        response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=pasta, MaxKeys=1)
        tem_arquivos = 'Contents' in response
        status = "OK" if tem_arquivos else "VAZIA"
        print(f"{pasta}: {status}")
    except Exception as e:
        print(f"{pasta}: ERRO - {str(e)}")

Validação de Arquivos no S3
------------------------------
Gold/code/transform_dados_01.py: OK
Gold/code/proc_dados_02.py: OK

Validação de Pastas de Entrada
------------------------------
Gold/Database Clickbus/: OK
Gold/Identificador/: OK
Gold/Eventos passados/: OK
Gold/eventos/: OK


## 4. Execução do Script 1 - Transformação e Enriquecimento

Executa a primeira etapa do pipeline: limpeza, padronização e enriquecimento dos dados.

In [None]:
print("EXECUTANDO SCRIPT 1: TRANSFORMAÇÃO E ENRIQUECIMENTO")
print("=" * 60)
print(f"Início: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

try:
    # Executar o primeiro script
    processor.run(
        code=f's3://{BUCKET}/{CODE_PATH}/transform_dados_01.py',
        arguments=[
            '--bucket', BUCKET,
            '--base-path', BASE_PATH
        ],
        wait=True,  # Aguarda conclusão antes de prosseguir
        logs=True   # Exibe logs em tempo real
    )
    
    print("\nScript 1 concluído com sucesso!")
    print(f"Fim: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
except Exception as e:
    print(f"\nERRO no Script 1: {str(e)}")
    print("Verifique os logs acima para mais detalhes.")
    raise

## 5. Validação Intermediária

Verificar se os arquivos do Script 1 foram gerados corretamente antes de prosseguir.

In [5]:
# Verificar arquivos gerados pelo Script 1
print("Validação dos Resultados do Script 1")
print("-" * 40)

arquivos_script1 = [
    f'{BASE_PATH}/Feriado/feriados_brasil_2016_2025.csv',
    f'{BASE_PATH}/Resultados/df_clickbus_v2_enriquecido.csv',
    f'{BASE_PATH}/Resultados/eventos_passados_padronizado.csv',
    f'{BASE_PATH}/Resultados/eventos_futuros_padronizados.csv'
]

todos_ok = True
for arquivo in arquivos_script1:
    existe = verificar_arquivo_s3(BUCKET, arquivo)
    status = "OK" if existe else "FALTANDO"
    print(f"{arquivo.split('/')[-1]}: {status}")
    if not existe:
        todos_ok = False

if todos_ok:
    print("\nTodos os arquivos do Script 1 foram gerados com sucesso.")
    print("Pronto para executar o Script 2.")
else:
    print("\nALERTA: Alguns arquivos não foram gerados.")
    print("Verifique os logs do Script 1 antes de prosseguir.")

Validação dos Resultados do Script 1
----------------------------------------
feriados_brasil_2016_2025.csv: OK
df_clickbus_v2_enriquecido.csv: OK
eventos_passados_padronizado.csv: OK
eventos_futuros_padronizados.csv: OK

Todos os arquivos do Script 1 foram gerados com sucesso.
Pronto para executar o Script 2.


## 6. Execução do Script 2 - Segmentação e Agregação

Executa a segunda etapa do pipeline: segmentação de clientes e criação de features agregadas.

In [7]:
print("EXECUTANDO SCRIPT 2: SEGMENTAÇÃO E AGREGAÇÃO")
print("=" * 60)
print(f"Início: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

try:
    # Executar o segundo script
    processor.run(
        code=f's3://{BUCKET}/{CODE_PATH}/proc_dados_02.py',
        arguments=[
            '--bucket', BUCKET,
            '--base-path', BASE_PATH
        ],
        wait=True,  # Aguarda conclusão antes de prosseguir
        logs=True   # Exibe logs em tempo real
    )
    
    print("\nScript 2 concluído com sucesso!")
    print(f"Fim: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("\nPIPELINE COMPLETO EXECUTADO COM SUCESSO!")
    
except Exception as e:
    print(f"\nERRO no Script 2: {str(e)}")
    print("Verifique os logs acima para mais detalhes.")
    raise

EXECUTANDO SCRIPT 2: SEGMENTAÇÃO E AGREGAÇÃO
Início: 2025-09-23 01:36:37
[34mSCRIPT 2: PROCESSAMENTO AVANÇADO E ANÁLISE DE DADOS CLICKBUS[0m
[34mData de execução: 23/09/2025 01:37:41[0m
[34mBucket S3: cbchallenge[0m
[34mPasta Base: Gold[0m
[34mETAPA 1: CARREGAMENTO DOS DADOS PROCESSADOS[0m
[34mClickBus v2 carregado: 1,741,344 registros[0m
[34mEventos carregados: 5,000 passados, 4,689 futuros[0m
[34m   Tratando e validando valores de GMV...
   Valores de GMV inválidos marcados como NaN (sem remover linhas): 185
   Calculando frequência real entre compras...[0m
[34mETAPA 2: ANÁLISE BÁSICA POR CLIENTE[0m
[34m   Clientes analisados: 581,817
   Distribuição por recorrência (NOVA LÓGICA):
     • Pontual: 311,039 (53.5%)
     • Sazonal: 208,522 (35.8%)
     • Recorrente: 62,256 (10.7%)[0m
[34mETAPA 3: ANÁLISE DE MOMENTUM E TENDÊNCIA[0m
   Clientes elegíveis para momentum: 159,821
   Calculando momentum...
   Progresso: 0.0% - 0/159,821[0m
[34m   Progresso: 6.3% - 10,00

## 7. Validação Final e Resumo dos Resultados

Verificar todos os arquivos gerados e fornecer um resumo completo da execução.

In [9]:
print("VALIDAÇÃO FINAL DOS RESULTADOS")
print("=" * 50)

# Função para listar arquivos em uma pasta S3
def listar_arquivos_s3(bucket, prefix):
    """Lista todos os arquivos em um prefixo S3."""
    s3_client = boto3.client('s3', region_name=REGION)
    try:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        if 'Contents' in response:
            return [obj['Key'] for obj in response['Contents'] if not obj['Key'].endswith('/')]
        return []
    except Exception as e:
        print(f"Erro ao listar {prefix}: {e}")
        return []

# Verificar pasta Resultados
resultados = listar_arquivos_s3(BUCKET, f'{BASE_PATH}/Resultados/')
print(f"Pasta Resultados ({len(resultados)} arquivos):")
for arquivo in sorted(resultados):
    nome = arquivo.split('/')[-1]
    print(f"  - {nome}")

# Verificar pasta Feriado
feriados = listar_arquivos_s3(BUCKET, f'{BASE_PATH}/Feriado/')
print(f"\nPasta Feriado ({len(feriados)} arquivos):")
for arquivo in sorted(feriados):
    nome = arquivo.split('/')[-1]
    print(f"  - {nome}")

print(f"\nARQUIVO PRINCIPAL PARA MACHINE LEARNING:")
print(f"  Arquivo: df_clientes_final_completo.csv")
print(f"  Localização: s3://{BUCKET}/{BASE_PATH}/Resultados/df_clientes_final_completo.csv")

print(f"\nTotal de arquivos gerados: {len(resultados) + len(feriados)}")

VALIDAÇÃO FINAL DOS RESULTADOS
Pasta Resultados (16 arquivos):
  - analise_detalhada_eventos.csv
  - analise_origem_destino.csv
  - analise_recorrencia_destinos.csv
  - analise_recorrencia_temporal.csv
  - analise_trechos_detalhada.csv
  - df_clickbus_v2_enriquecido.csv
  - df_clickbus_v2_final.csv
  - df_clientes_final.csv
  - df_clientes_final_completo.csv
  - df_sazonalidade.csv
  - eventos_futuros_padronizados.csv
  - eventos_passados_padronizado.csv
  - municipios_padronizados.csv
  - resumo_impacto_eventos_municipios.csv
  - top_trechos_por_gmv.csv
  - top_trechos_por_volume.csv

Pasta Feriado (1 arquivos):
  - feriados_brasil_2016_2025.csv

ARQUIVO PRINCIPAL PARA MACHINE LEARNING:
  Arquivo: df_clientes_final_completo.csv
  Localização: s3://cbchallenge/Gold/Resultados/df_clientes_final_completo.csv

Total de arquivos gerados: 17


## 8. Análise Rápida dos Resultados

Carregar e examinar rapidamente o arquivo principal gerado.

In [10]:
# Carregar e analisar o arquivo principal
try:
    df_principal = pd.read_csv(f's3://{BUCKET}/{BASE_PATH}/Resultados/df_clientes_final_completo.csv')
    
    print("ANÁLISE DO ARQUIVO PRINCIPAL")
    print("=" * 40)
    print(f"Shape do dataset: {df_principal.shape}")
    print(f"Clientes únicos: {df_principal['client_id_seq'].nunique():,}")
    print(f"\nColunas disponíveis ({len(df_principal.columns)}):")
    for i, col in enumerate(df_principal.columns, 1):
        print(f"  {i:2d}. {col}")
    
    print(f"\nDistribuição de Segmentação por Recorrência:")
    print(df_principal['seg_recorrencia'].value_counts())
    
    print(f"\nDistribuição de Segmentação por Velocidade:")
    print(df_principal['seg_velocidade'].value_counts())
    
    print(f"\nEstatísticas de GMV:")
    print(f"  GMV Total Médio: R$ {df_principal['gmv_total'].mean():,.2f}")
    print(f"  GMV Total Mediano: R$ {df_principal['gmv_total'].median():,.2f}")
    print(f"  Compras Médias por Cliente: {df_principal['qtd_compras'].mean():.1f}")
    
except Exception as e:
    print(f"Erro ao carregar arquivo principal: {e}")
    print("Verifique se o pipeline foi executado com sucesso.")

ANÁLISE DO ARQUIVO PRINCIPAL
Shape do dataset: (581817, 29)
Erro ao carregar arquivo principal: 'client_id_seq'
Verifique se o pipeline foi executado com sucesso.


## 9. Machine Learning - Previsibilidade de compra em 7 dias ou 30 dias + Previsão de próximo techo


In [26]:
print("EXECUTANDO SCRIPT ML: PREDIÇÃO DE COMPRAS 7/30 DIAS")
print("=" * 60)
print(f"Início: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

try:
    # Executar o script de Machine Learning
    processor.run(
        code=f's3://{BUCKET}/{CODE_PATH}/ml_7_30.py',
        arguments=[
            '--bucket', BUCKET,
            '--base-path', BASE_PATH
        ],
        wait=True,  # Aguarda conclusão antes de prosseguir
        logs=True   # Exibe logs em tempo real
    )
    
    print("\\nScript ML concluído com sucesso!")
    print(f"Fim: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
except Exception as e:
    print(f"\\nERRO no Script ML: {str(e)}")
    print("Verifique os logs acima para mais detalhes.")
    raise

EXECUTANDO SCRIPT ML: PREDIÇÃO DE COMPRAS 7/30 DIAS
Início: 2025-09-22 03:13:15
[34mML 7/30 SAGEMAKER PROCESSING - BASEADO NO MELHOR CÓDIGO[0m
[34mData de execução: 22/09/2025 03:14:21[0m
[34mTargets: 10% base elegível (7d) | 22% base elegível (30d)[0m
[34mMétricas esperadas: P_7d=36.8%, P_30d=60.2%, AUC>90%[0m
[34mETAPA 1: CARREGAMENTO DOS DADOS[0m
[34mDataset carregado: 581,817 clientes[0m
[34mETAPA 2: CRIAÇÃO DE FEATURES SEM DATA LEAKAGE[0m
   Criando features sem data leakage...[0m
[34m   ✓ Features sem leakage criadas[0m
[34mETAPA 3: ANÁLISE DE PADRÕES HISTÓRICOS[0m
   Analisando padrões históricos reais...
   Base elegível: 270,778 clientes
   PADRÕES HISTÓRICOS IDENTIFICADOS:
     • Clientes com freq <= 7d: 16.6%
     • Clientes com freq <= 30d: 30.0%
     • Clientes ativos (30d): 6.2%
     • Clientes ativos (60d): 10.6%
     • Em alta temporada: 38.4%[0m
[34mETAPA 4: TARGETS BASEADOS EM PADRÕES REAIS[0m
   Criando targets baseados em padrões históricos...


In [25]:
print(" EXECUTANDO ML: PREVISÃO DO PRÓXIMO TRECHO")
print("=" * 60)
print(f"Início: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

try:
    # Executar o script de ML
    processor.run(
        code=f's3://{BUCKET}/{CODE_PATH}/ml_previsao_trecho.py',
        arguments=[
            '--bucket', BUCKET,
            '--base-path', BASE_PATH,
            '--top-trechos', '20'  # OTIMIZADO: Menos trechos = mais velocidade
        ],
        wait=True,  # Aguarda conclusão
        logs=True   # Exibe logs em tempo real
    )
    
    print("\\n ML de Previsão de Trecho concluído com sucesso!")
    print(f"Fim: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
except Exception as e:
    print(f"\\n ERRO no ML: {str(e)}")
    print("Verifique os logs acima para mais detalhes.")
    raise

 EXECUTANDO ML: PREVISÃO DO PRÓXIMO TRECHO
Início: 2025-09-22 02:37:49
[34mSCRIPT ML: PREVISÃO DO PRÓXIMO TRECHO CLICKBUS[0m
[34mDesafio 3: A Estrada à Frente[0m
[34mData de execução: 22/09/2025 02:38:53[0m
[34mBucket S3: cbchallenge[0m
[34mPasta Base: Gold[0m
[34mTop Trechos: 20[0m
[34mETAPA 1: CARREGAMENTO DOS DADOS[0m
   Carregando dados processados (MODO OTIMIZADO)...[0m
[34m   Aplicando sample de 500,000 transações para otimização...[0m
[34m   Dados carregados (otimizado):
     • Transações: 500,000
     • Clientes: 581,817
     • Trechos: 200
     • Feriados: 80[0m
[34mETAPA 2: FILTRO DE DADOS VÁLIDOS[0m
   Filtrando dados válidos...
   Registros válidos: 291,617 de 500,000 (58.3%)
   Trechos únicos: 2,848
   Clientes únicos: 167,563[0m
[34mETAPA 3: CRIAÇÃO DE FEATURES[0m
   Criando features de histórico do cliente...[0m
[34m   Coluna status_ativo não encontrada, mantendo todos os clientes: 167,563
   Clientes elegíveis (≥2 compras): 48,080 (28.7%)
   Fe

## 10. Monitoramento e Logs

Ferramentas para monitorar execuções e acessar logs históricos.

In [None]:
# Função para listar jobs recentes
def listar_jobs_recentes(max_results=10):
    """Lista os jobs de processamento mais recentes."""
    sagemaker_client = boto3.client('sagemaker', region_name=REGION)
    
    try:
        jobs = sagemaker_client.list_processing_jobs(
            SortBy='CreationTime',
            SortOrder='Descending',
            MaxResults=max_results
        )
        
        print(f"Jobs de Processamento Recentes ({len(jobs['ProcessingJobSummaries'])}):")
        print("-" * 60)
        
        for job in jobs['ProcessingJobSummaries']:
            print(f"Nome: {job['ProcessingJobName']}")
            print(f"Status: {job['ProcessingJobStatus']}")
            print(f"Criado: {job['CreationTime']}")
            if 'EndTime' in job:
                print(f"Finalizado: {job['EndTime']}")
            print("-" * 30)
            
    except Exception as e:
        print(f"Erro ao listar jobs: {e}")

# Executar listagem de jobs
listar_jobs_recentes()

## Conclusão

Pipeline de processamento ClickBus executado com sucesso no SageMaker.

**Próximos passos sugeridos:**
1. Validar a qualidade dos dados gerados
2. Configurar monitoramento automático
3. Implementar agendamento periódico
4. Desenvolver modelos de Machine Learning com os dados processados

**Arquivos principais gerados:**
- `df_clientes_final_completo.csv` - Base principal para ML
- Múltiplas análises agregadas para insights de negócio

**Localização dos resultados:** `s3://cbchallenge/Gold/Resultados/`