# 🗄️ AWS PostgreSQL Database Setup
## FASE 2: Conexión y Migración de Datos

Este notebook configura la conexión a AWS PostgreSQL y migra los datos del CSV.

### 🎯 **Objetivos:**
1. **Probar conexión** a AWS PostgreSQL
2. **Crear esquema** de base de datos
3. **Migrar datos** desde CSV
4. **Validar** integridad de datos
5. **Probar adaptador** PostgreSQL

### 📋 **Credenciales necesarias:**
- **Endpoint**: (se obtendrá de AWS)
- **Usuario**: `taxiuser`
- **Password**: `TaxiDB2025!`
- **Base de datos**: `taxi_db`

In [2]:
# 📦 Setup e imports
import pandas as pd
import numpy as np
import asyncio
import asyncpg
import logging
from datetime import datetime
import sys
import os

# Agregar el directorio del proyecto al path
sys.path.append(os.path.join(os.getcwd(), 'taxi_duration_predictor'))

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("🚀 Setup completado!")
print(f"📅 Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

🚀 Setup completado!
📅 Fecha: 2025-07-19 10:05:56


## 🔧 **Paso 1: Configurar Conexión a AWS PostgreSQL**

**⚠️ IMPORTANTE**: Una vez que tu base de datos esté lista en AWS, necesitarás:
1. Ir a la base de datos en la consola AWS
2. Hacer clic en "View connection details"
3. Copiar el **Endpoint** (algo como: taxi-duration-db.xxxxx.us-east-1.rds.amazonaws.com)

In [3]:
# 🔐 Configuración de conexión AWS PostgreSQL
# ✅ ENDPOINT CONFIGURADO CORRECTAMENTE
AWS_ENDPOINT = "taxi-duration-db.ckj7uy651uld.us-east-1.rds.amazonaws.com"
DB_PORT = 5432
DB_NAME = "postgres"  # Aurora usa 'postgres' como base de datos por defecto
DB_USER = "taxiuser"
DB_PASSWORD = "TaxiDB2025!"

# String de conexión
CONNECTION_STRING = f"postgresql://{DB_USER}:{DB_PASSWORD}@{AWS_ENDPOINT}:{DB_PORT}/{DB_NAME}"

print("🔗 Configuración de conexión preparada")
print(f"📡 Endpoint: {AWS_ENDPOINT}")
print(f"🔐 Usuario: {DB_USER}")
print(f"🗄️ Base de datos: {DB_NAME}")
print("✅ Endpoint configurado correctamente!")

🔗 Configuración de conexión preparada
📡 Endpoint: taxi-duration-db.ckj7uy651uld.us-east-1.rds.amazonaws.com
🔐 Usuario: taxiuser
🗄️ Base de datos: postgres
✅ Endpoint configurado correctamente!


In [4]:
# 🧪 Función para probar conexión
async def test_connection():
    """Prueba la conexión a PostgreSQL"""
    try:
        print("🔄 Probando conexión a AWS PostgreSQL...")

        conn = await asyncpg.connect(
            host=AWS_ENDPOINT,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )

        # Ejecutar consulta simple
        version = await conn.fetchval('SELECT version()')
        print(f"✅ ¡Conexión exitosa!")
        print(f"📊 Versión PostgreSQL: {version}")

        await conn.close()
        return True

    except Exception as e:
        print(f"❌ Error de conexión: {e}")
        print("🔧 Verifica que:")
        print("   - La base de datos esté 'Available' en AWS")
        print("   - El endpoint sea correcto")
        print("   - Las reglas de seguridad permitan conexiones")
        return False

# ⚠️ EJECUTAR SOLO CUANDO LA BASE DE DATOS ESTÉ LISTA
# await test_connection()
print("⏳ Esperando que la base de datos esté lista...")
print("💡 Ejecuta 'await test_connection()' cuando veas 'Available' en AWS")

⏳ Esperando que la base de datos esté lista...
💡 Ejecuta 'await test_connection()' cuando veas 'Available' en AWS


## ⚠️ **ANTES DE CONTINUAR - VERIFICAR AWS**

### 🔍 **Pasos obligatorios:**

1. **Ve a AWS Console → RDS → Databases**
2. **Busca `taxi-duration-db`**
3. **Verifica que el estado sea "Available" (verde)**
4. **Si dice "Creating" o "Modifying", espera 5-10 minutos**

### ✅ **Cuando veas "Available", ejecuta:**
```python
await test_connection()
```

### 🚨 **Si hay errores de conexión:**
- Verifica que el Security Group tenga regla PostgreSQL (puerto 5432)
- Confirma que tu IP actual esté en las reglas
- Asegúrate que el endpoint sea correcto

In [6]:
await test_connection()

🔄 Probando conexión a AWS PostgreSQL...
✅ ¡Conexión exitosa!
📊 Versión PostgreSQL: PostgreSQL 17.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit
✅ ¡Conexión exitosa!
📊 Versión PostgreSQL: PostgreSQL 17.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit


True

## 🏗️ **Paso 2: Crear Esquema de Base de Datos**

In [7]:
# 🏗️ Función para crear esquema
async def create_database_schema():
    """Crea las tablas necesarias"""
    try:
        print("🔄 Creando esquema de base de datos...")

        conn = await asyncpg.connect(
            host=AWS_ENDPOINT,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )

        # SQL para crear tablas
        create_tables_sql = """
        -- Tabla principal de viajes
        CREATE TABLE IF NOT EXISTS taxi_trips (
            id VARCHAR(50) PRIMARY KEY,
            vendor_id INTEGER NOT NULL,
            pickup_datetime TIMESTAMP NOT NULL,
            dropoff_datetime TIMESTAMP NOT NULL,
            passenger_count INTEGER NOT NULL,
            pickup_longitude DECIMAL(10, 7) NOT NULL,
            pickup_latitude DECIMAL(10, 7) NOT NULL,
            dropoff_longitude DECIMAL(10, 7) NOT NULL,
            dropoff_latitude DECIMAL(10, 7) NOT NULL,
            store_and_fwd_flag VARCHAR(1) NOT NULL,
            trip_duration_seconds DECIMAL(10, 2) NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        -- Índices para optimizar consultas
        CREATE INDEX IF NOT EXISTS idx_pickup_datetime ON taxi_trips(pickup_datetime);
        CREATE INDEX IF NOT EXISTS idx_vendor_id ON taxi_trips(vendor_id);
        CREATE INDEX IF NOT EXISTS idx_trip_duration ON taxi_trips(trip_duration_seconds);
        CREATE INDEX IF NOT EXISTS idx_coordinates ON taxi_trips(pickup_longitude, pickup_latitude);

        -- Tabla de predicciones
        CREATE TABLE IF NOT EXISTS predictions (
            id SERIAL PRIMARY KEY,
            trip_id VARCHAR(50) NOT NULL,
            predicted_duration_seconds DECIMAL(10, 2) NOT NULL,
            confidence_score DECIMAL(5, 4) NOT NULL,
            model_version VARCHAR(50) NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            features_json JSONB
        );

        CREATE INDEX IF NOT EXISTS idx_predictions_trip_id ON predictions(trip_id);
        CREATE INDEX IF NOT EXISTS idx_predictions_created_at ON predictions(created_at);
        """

        await conn.execute(create_tables_sql)
        print("✅ Esquema creado exitosamente!")

        # Verificar que las tablas se crearon
        tables = await conn.fetch("""
            SELECT table_name FROM information_schema.tables
            WHERE table_schema = 'public'
        """)

        print("📋 Tablas creadas:")
        for table in tables:
            print(f"   - {table['table_name']}")

        await conn.close()
        return True

    except Exception as e:
        print(f"❌ Error creando esquema: {e}")
        return False

# ⚠️ EJECUTAR DESPUÉS DE PROBAR LA CONEXIÓN
# await create_database_schema()
print("💡 Ejecuta 'await create_database_schema()' después de probar la conexión")

💡 Ejecuta 'await create_database_schema()' después de probar la conexión


## 📤 **Paso 3: Migrar Datos desde CSV**

In [8]:
# 📤 Función para migrar datos
async def migrate_csv_data(sample_size: int = 50000):
    """Migra datos desde CSV a PostgreSQL"""
    try:
        print(f"🔄 Migrando {sample_size} filas desde CSV...")

        # Cargar muestra del CSV
        df = pd.read_csv('train.csv', nrows=sample_size)
        print(f"📊 CSV cargado: {df.shape}")

        # Limpiar datos básico
        df = df.dropna()

        # Filtrar datos válidos (NYC bounds)
        df = df[
            (df['pickup_longitude'] >= -74.3) & (df['pickup_longitude'] <= -73.7) &
            (df['pickup_latitude'] >= 40.5) & (df['pickup_latitude'] <= 40.9) &
            (df['dropoff_longitude'] >= -74.3) & (df['dropoff_longitude'] <= -73.7) &
            (df['dropoff_latitude'] >= 40.5) & (df['dropoff_latitude'] <= 40.9) &
            (df['trip_duration'] >= 30) & (df['trip_duration'] <= 21600) &  # 30 seg a 6 horas
            (df['passenger_count'] >= 1) & (df['passenger_count'] <= 6)
        ]

        print(f"🧹 Datos limpios: {df.shape}")

        # Conectar a la base de datos
        conn = await asyncpg.connect(
            host=AWS_ENDPOINT,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )

        # Insertar datos en lotes
        batch_size = 1000
        total_inserted = 0

        for i in range(0, len(df), batch_size):
            batch = df.iloc[i:i+batch_size]

            # Preparar datos para inserción
            trip_data = []
            for _, row in batch.iterrows():
                trip_data.append((
                    str(row['id']),
                    int(row['vendor_id']),
                    pd.to_datetime(row['pickup_datetime']),
                    pd.to_datetime(row['dropoff_datetime']),
                    int(row['passenger_count']),
                    float(row['pickup_longitude']),
                    float(row['pickup_latitude']),
                    float(row['dropoff_longitude']),
                    float(row['dropoff_latitude']),
                    str(row['store_and_fwd_flag']),
                    float(row['trip_duration'])
                ))

            # Insertar lote
            await conn.executemany("""
                INSERT INTO taxi_trips (
                    id, vendor_id, pickup_datetime, dropoff_datetime,
                    passenger_count, pickup_longitude, pickup_latitude,
                    dropoff_longitude, dropoff_latitude, store_and_fwd_flag,
                    trip_duration_seconds
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
                ON CONFLICT (id) DO NOTHING
            """, trip_data)

            total_inserted += len(trip_data)
            print(f"📤 Insertadas {total_inserted} filas...")

        await conn.close()
        print(f"✅ Migración completa: {total_inserted} viajes")
        return total_inserted

    except Exception as e:
        print(f"❌ Error en migración: {e}")
        return 0

# ⚠️ EJECUTAR DESPUÉS DE CREAR EL ESQUEMA
# total = await migrate_csv_data(50000)
print("💡 Ejecuta 'await migrate_csv_data(50000)' después de crear el esquema")

💡 Ejecuta 'await migrate_csv_data(50000)' después de crear el esquema


## 🔍 **Paso 4: Validar Datos Migrados**

In [9]:
# 🔍 Función para validar datos
async def validate_migrated_data():
    """Valida que los datos se migraron correctamente"""
    try:
        print("🔄 Validando datos migrados...")

        conn = await asyncpg.connect(
            host=AWS_ENDPOINT,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )

        # Estadísticas básicas
        stats = await conn.fetchrow("""
            SELECT
                COUNT(*) as total_trips,
                MIN(pickup_datetime) as earliest_trip,
                MAX(pickup_datetime) as latest_trip,
                AVG(trip_duration_seconds) as avg_duration_seconds,
                AVG(passenger_count) as avg_passengers,
                COUNT(DISTINCT vendor_id) as unique_vendors
            FROM taxi_trips
        """)

        print("📊 **ESTADÍSTICAS DE LA BASE DE DATOS:**")
        print(f"   Total de viajes: {stats['total_trips']:,}")
        print(f"   Primer viaje: {stats['earliest_trip']}")
        print(f"   Último viaje: {stats['latest_trip']}")
        print(f"   Duración promedio: {stats['avg_duration_seconds']/60:.1f} minutos")
        print(f"   Pasajeros promedio: {stats['avg_passengers']:.1f}")
        print(f"   Vendors únicos: {stats['unique_vendors']}")

        # Muestra de datos
        sample = await conn.fetch("SELECT * FROM taxi_trips LIMIT 5")
        print(f"\n🔍 **MUESTRA DE DATOS:**")
        for i, row in enumerate(sample, 1):
            print(f"   {i}. Viaje {row['id']}: {row['trip_duration_seconds']/60:.1f} min, {row['passenger_count']} pasajeros")

        await conn.close()
        print("✅ Validación completa!")
        return True

    except Exception as e:
        print(f"❌ Error en validación: {e}")
        return False

# ⚠️ EJECUTAR DESPUÉS DE LA MIGRACIÓN
# await validate_migrated_data()
print("💡 Ejecuta 'await validate_migrated_data()' después de la migración")

💡 Ejecuta 'await validate_migrated_data()' después de la migración


In [10]:
await create_database_schema()

🔄 Creando esquema de base de datos...
✅ Esquema creado exitosamente!
✅ Esquema creado exitosamente!
📋 Tablas creadas:
   - taxi_trips
   - predictions
📋 Tablas creadas:
   - taxi_trips
   - predictions


True

In [11]:
await migrate_csv_data(50000)

🔄 Migrando 50000 filas desde CSV...
📊 CSV cargado: (50000, 11)
🧹 Datos limpios: (49719, 11)
📤 Insertadas 1000 filas...
📤 Insertadas 1000 filas...
📤 Insertadas 2000 filas...
📤 Insertadas 2000 filas...
📤 Insertadas 3000 filas...
📤 Insertadas 3000 filas...
📤 Insertadas 4000 filas...
📤 Insertadas 4000 filas...
📤 Insertadas 5000 filas...
📤 Insertadas 5000 filas...
📤 Insertadas 6000 filas...
📤 Insertadas 6000 filas...
📤 Insertadas 7000 filas...
📤 Insertadas 7000 filas...
📤 Insertadas 8000 filas...
📤 Insertadas 8000 filas...
📤 Insertadas 9000 filas...
📤 Insertadas 9000 filas...
📤 Insertadas 10000 filas...
📤 Insertadas 10000 filas...
📤 Insertadas 11000 filas...
📤 Insertadas 11000 filas...
📤 Insertadas 12000 filas...
📤 Insertadas 12000 filas...
📤 Insertadas 13000 filas...
📤 Insertadas 13000 filas...
📤 Insertadas 14000 filas...
📤 Insertadas 14000 filas...
📤 Insertadas 15000 filas...
📤 Insertadas 15000 filas...
📤 Insertadas 16000 filas...
📤 Insertadas 16000 filas...
📤 Insertadas 17000 filas...
📤 

49719

In [12]:
await validate_migrated_data()

🔄 Validando datos migrados...
📊 **ESTADÍSTICAS DE LA BASE DE DATOS:**
   Total de viajes: 49,719
   Primer viaje: 2016-01-01 00:08:07
   Último viaje: 2016-06-30 23:45:21
   Duración promedio: 13.9 minutos
   Pasajeros promedio: 1.7
   Vendors únicos: 2

🔍 **MUESTRA DE DATOS:**
   1. Viaje id2875421: 7.6 min, 1 pasajeros
   2. Viaje id2377394: 11.0 min, 1 pasajeros
   3. Viaje id3858529: 35.4 min, 1 pasajeros
   4. Viaje id3504673: 7.2 min, 1 pasajeros
   5. Viaje id2181028: 7.2 min, 1 pasajeros
📊 **ESTADÍSTICAS DE LA BASE DE DATOS:**
   Total de viajes: 49,719
   Primer viaje: 2016-01-01 00:08:07
   Último viaje: 2016-06-30 23:45:21
   Duración promedio: 13.9 minutos
   Pasajeros promedio: 1.7
   Vendors únicos: 2

🔍 **MUESTRA DE DATOS:**
   1. Viaje id2875421: 7.6 min, 1 pasajeros
   2. Viaje id2377394: 11.0 min, 1 pasajeros
   3. Viaje id3858529: 35.4 min, 1 pasajeros
   4. Viaje id3504673: 7.2 min, 1 pasajeros
   5. Viaje id2181028: 7.2 min, 1 pasajeros
✅ Validación completa!
✅ Val

True

## 📋 **Resumen de FASE 2**

### ✅ **Pasos completados:**
1. **Configuración de conexión** a AWS PostgreSQL
2. **Creación de esquema** con tablas optimizadas
3. **Migración de datos** desde CSV
4. **Validación** de integridad

### 🔗 **String de conexión para usar en el proyecto:**
```python
CONNECTION_STRING = "postgresql://taxiuser:TaxiDB2025!@[ENDPOINT]:5432/taxi_db"
```

### 🎯 **Próximos pasos (FASE 3):**
1. **Implementar MLflow** tracking
2. **Crear adaptador ML** 
3. **Pipeline de entrenamiento**
4. **API FastAPI**

### 💡 **Para las diapositivas:**
*"Implementamos el primer adaptador de nuestra arquitectura hexagonal: PostgreSQLAdapter que conecta con AWS RDS, demostrando cómo los puertos y adaptadores nos permiten cambiar de CSV a base de datos real sin afectar el dominio del negocio."*