In [10]:
# Setup completo
import sys
sys.path.append('/app/src')
import pandas as pd
from pathlib import Path

# Importar funciones
from load import load_csv
from transform import (
    standardize_dates,
    deduplicate_logic,
    normalize_clients_metadata,
    normalize_events_metadata, 
    normalize_retry_logs_metadata
)

# Definir rutas
DATA_RAW = Path('/app/data/raw')
DATA_PROCESSED = Path('/app/data/processed')

print("🚀 Setup completo!")
print(f"📁 Raw: {DATA_RAW}")
print(f"📁 Processed: {DATA_PROCESSED}")


🚀 Setup completo!
📁 Raw: /app/data/raw
📁 Processed: /app/data/processed


In [11]:
# LOAD - Cargar datasets desde Raw
print("📥 STEP 1: LOADING DATASETS")
print("=" * 40)

# Cargar los 3 datasets
df_clients = load_csv('clients.csv', DATA_RAW)
df_events = load_csv('events.csv', DATA_RAW)
df_retries = load_csv('retry_logs.csv', DATA_RAW)

print(f"\n✅ Datasets cargados:")
print(f"   Clients: {df_clients.shape}")
print(f"   Events: {df_events.shape}")
print(f"   Retries: {df_retries.shape}")

📥 STEP 1: LOADING DATASETS
[Load CSV] Cargado clients.csv: 60 rows, 6 columns.
[Load CSV] Cargado events.csv: 15000 rows, 11 columns.
[Load CSV] Cargado retry_logs.csv: 1500 rows, 5 columns.

✅ Datasets cargados:
   Clients: (60, 6)
   Events: (15000, 11)
   Retries: (1500, 5)


In [12]:
print("\n📅 STEP 2: STANDARDIZING DATES")
print("=" * 40)

# Dates columns para cada dataset
date_columns = {
    'clients': ['sign_up_date'],
    'events': ['created_at', 'completed_at'],
    'retry_logs': ['retry_time']
}

print("\n📅 Estandarizando fechas en clients...")
df_clients = standardize_dates(df_clients, date_columns['clients'], 'clients')

print("\n📅 Estandarizando fechas en events...")
df_events = standardize_dates(df_events, date_columns['events'], 'events')

print("\n📅 Estandarizando fechas en retry_logs...")
df_retries = standardize_dates(df_retries, date_columns['retry_logs'], 'retry_logs')

print(f"\n✅ Estandarización de fechas completada")


📅 STEP 2: STANDARDIZING DATES

📅 Estandarizando fechas en clients...
[clients] 'sign_up_date': 0 valores no parseados

📅 Estandarizando fechas en events...
[events] 'created_at': 0 valores no parseados
[events] 'completed_at': 3814 nulls (0 en eventos con status ≠ processing/created)

📅 Estandarizando fechas en retry_logs...
[retry_logs] 'retry_time': 0 valores no parseados

✅ Estandarización de fechas completada


In [9]:
print("\n🔄 STEP 3: DEDUPLICATING DATA")
print("=" * 40)

print("\n🔧 Deduplicando clients...")
df_clients = deduplicate_logic(
    df_clients, 
    subset=['client_id'], 
    sort_by='sign_up_date', 
    name='clients'
)

print("\n🔧 Deduplicando events...")
df_events = deduplicate_logic(
    df_events, 
    subset=['event_id'], 
    sort_by='created_at', 
    name='events'
)

print("\n🔧 Deduplicando retry_logs...")
df_retries = deduplicate_logic(
    df_retries, 
    subset=['retry_id'], 
    sort_by='retry_time', 
    name='retry_logs'
)

print(f"\n✅ Deduplicación completada")


🔄 STEP 3: DEDUPLICATING DATA

🔧 Deduplicando clients...
[Deduplicate] clients: 60 → 60 rows (kept earliest by sign_up_date)

🔧 Deduplicando events...
[Deduplicate] events: 15000 → 15000 rows (kept earliest by created_at)

🔧 Deduplicando retry_logs...
[Deduplicate] retry_logs: 1500 → 1500 rows (kept earliest by retry_time)

✅ Deduplicación completada


In [6]:
# TRANSFORM - Aplicar normalización
print("\n🔄 STEP 4: TRANSFORMING & NORMALIZING")
print("=" * 40)

# Aplicar normalización a clients
print("\n🔧 Normalizando clients...")
df_clients_clean = normalize_clients_metadata(df_clients)

# Aplicar normalización a events
print("\n🔧 Normalizando events...")
df_events_clean = normalize_events_metadata(df_events)

# Aplicar normalización a retry_logs
print("\n🔧 Normalizando retry_logs...")
df_retries_clean = normalize_retry_logs_metadata(df_retries)

print(f"\n✅ Transformaciones completadas:")
print(f"   Clients: {df_clients.shape} → {df_clients_clean.shape}")
print(f"   Events: {df_events.shape} → {df_events_clean.shape}")
print(f"   Retries: {df_retries.shape} → {df_retries_clean.shape}")


🔄 STEP 4: TRANSFORMING & NORMALIZING

🔧 Normalizando clients...
[Metadata] clients: 17/60 sectors set to 'unknown'
[Metadata] clients: 9/60 tiers set to 'unknown'

🔧 Normalizando events...
[Metadata] events: 15000 → 15000 after dropping missing keys
[Metadata] events: 0/15000 invalid type set to 'unknown'
[Metadata] events: 0/15000 invalid currency codes set to 'XXX'
[Metadata] events: 0/15000 invalid status code set to 'unknown'
[Metadata] events: 0/15000 invalid error code set to 'unknown'
[Metadata] events: 0/15000 invalid origin_country codes set to 'XX'
[Metadata] events: 0/15000 invalid destination_country codes set to 'XX'

🔧 Normalizando retry_logs...
[Metadata] retry_logs: 1500 → 1500 after dropping missing keys
[Metadata] retry_logs: 0/1500 invalid status set to 'unknown'

✅ Transformaciones completadas:
   Clients: (60, 6) → (60, 6)
   Events: (15000, 11) → (15000, 11)
   Retries: (1500, 5) → (1500, 5)


In [7]:
# EXPORT - Guardar datos procesados
print("\n💾 STEP 5: EXPORTING PROCESSED DATA")
print("=" * 40)

# Función helper para exportar
def export_to_csv(df, filename, description):
    output_path = DATA_PROCESSED / filename
    df.to_csv(output_path, index=False)
    print(f"✅ {description}: {output_path}")
    print(f"   Guardados: {df.shape[0]:,} filas × {df.shape[1]} columnas")

# Exportar datasets limpios
export_to_csv(df_clients_clean, 'clients.csv', 'Clients')
export_to_csv(df_events_clean, 'events.csv', 'Events')  
export_to_csv(df_retries_clean, 'retry_logs.csv', 'Retry Logs')

print(f"\n🎯 ¡Todos los datos procesados guardados en {DATA_PROCESSED}!")


💾 STEP 5: EXPORTING PROCESSED DATA
✅ Clients: /app/data/processed/clients.csv
   Guardados: 60 filas × 6 columnas
✅ Events: /app/data/processed/events.csv
   Guardados: 15,000 filas × 11 columnas
✅ Retry Logs: /app/data/processed/retry_logs.csv
   Guardados: 1,500 filas × 5 columnas

🎯 ¡Todos los datos procesados guardados en /app/data/processed!
