In [1]:
# Setup
!pip install -q -U pydantic pydantic[email]

# ‚ö° Proyecto Integrador Nivel 4: Cortex Async Pipeline

En los niveles anteriores, Cortex procesaba tareas una por una. Esto es inaceptable para un Agente de IA moderno que debe:
1.  Leer el mensaje del usuario.
2.  Consultar 3 documentos en la base de datos vectorial.
3.  Buscar en Google.
4.  Llamar a GPT-4.

Si hacemos esto en orden, tardar√≠amos 10+ segundos. Con **Asyncio**, podemos hacerlo en 3 segundos.

Adem√°s, integramos **Pydantic**. Los LLMs a menudo alucinan o devuelven JSON mal formado. Pydantic act√∫a como un "filtro de calidad" (Guardrail) que asegura que solo datos perfectos entren en nuestro sistema.

## Componentes Clave

### 1. Modelos de Datos (`data_models.py`)
Usamos la clase `BaseModel` de Pydantic.
* `EmailStr`: Verifica autom√°ticamente si es un formato de correo v√°lido (ej: contiene @).
* `@field_validator`: Nos permite crear reglas de negocio complejas (ej: "El rol solo puede ser admin o user").

### 2. Pipeline As√≠ncrono (`main.py`)
* `mock_api_enrichment`: Simula una tarea lenta (I/O Bound). Usamos `await asyncio.sleep()` para no bloquear el procesador.
* `asyncio.gather(*tasks)`: La funci√≥n m√°gica. Recibe una lista de 50 tareas y las dispara todas simult√°neamente. El script solo avanza cuando todas han terminado.

## Resultado
Observa c√≥mo el tiempo total de ejecuci√≥n es apenas superior a la tarea m√°s lenta individual, en lugar de ser la suma de todas. ¬°Eso es eficiencia!

In [2]:
%%writefile data_models.py
# con %%writefile se crea el archivo data_models.py en colab.
# En local este codigo debe ser guardado como data_models.py
# dentro de la carpeta proyecto_cortex_v4

"""
Definici√≥n de Esquemas de Datos usando Pydantic.
Esto garantiza que los datos que fluyen por el pipeline sean v√°lidos.
"""
from pydantic import BaseModel, Field, EmailStr, field_validator
from typing import List, Optional

class UserData(BaseModel):
    # Definici√≥n estricta de tipos
    user_id: int
    username: str = Field(min_length=3, max_length=20)
    email: EmailStr # Valida autom√°ticamente que sea un email real
    role: str = "user"

    # Validaci√≥n personalizada
    @field_validator('role')
    def role_must_be_valid(cls, v):
        allowed = ['admin', 'user', 'guest']
        if v not in allowed:
            raise ValueError(f'Rol debe ser uno de {allowed}')
        return v

class ProcessingResult(BaseModel):
    user_id: int
    status: str
    processed_at: float

Writing data_models.py


In [4]:
# En local este codigo debe ser guardado como main.py
# dentro de la carpeta proyecto_cortex_v4
# Para probar en local ejecuta el main.py
# Nota: Necesitar√°s instalar pydantic y pydantic[email]

import asyncio
import random
import time
from typing import List
from data_models import UserData, ProcessingResult # Importamos nuestros modelos

# --- DECORADOR PARA MEDIR TIEMPO ---
def timer_decorator(func):
    async def wrapper(*args, **kwargs):
        start = time.time()
        result = await func(*args, **kwargs)
        end = time.time()
        print(f"üèÅ Pipeline finalizado en {end - start:.2f} segundos.")
        return result
    return wrapper

# --- SIMULACI√ìN DE API ---
async def mock_api_enrichment(user: UserData) -> ProcessingResult:
    """
    Simula una llamada a una API externa (ej: buscar info del usuario)
    que tarda tiempo variable.
    """
    delay = random.uniform(0.5, 2.0)
    # print(f"‚è≥ Procesando {user.username} (Espere {delay:.1f}s)...")
    await asyncio.sleep(delay) # Non-blocking sleep

    return ProcessingResult(
        user_id=user.user_id,
        status="success",
        processed_at=time.time()
    )

# --- N√öCLEO DEL PIPELINE ---
@timer_decorator
async def run_pipeline(raw_data: List[dict]):
    print(f"üöÄ Iniciando procesamiento de {len(raw_data)} usuarios...")

    tasks = []

    for entry in raw_data:
        try:
            # 1. VALIDACI√ìN (Pydantic)
            # Si el diccionario 'entry' no cumple el esquema, explota aqu√≠.
            user_model = UserData(**entry)

            # 2. CREACI√ìN DE TAREA AS√çNCRONA
            # No esperamos (await) aqu√≠, solo agendamos la tarea.
            task = mock_api_enrichment(user_model)
            tasks.append(task)

        except Exception as e:
            print(f"‚ùå Error de validaci√≥n para entrada {entry.get('user_id')}: {e}")

    # 3. EJECUCI√ìN CONCURRENTE
    # Lanzamos todas las tareas a la vez.
    results = await asyncio.gather(*tasks)

    print(f"‚úÖ Procesados correctamente: {len(results)}")
    return results

# --- PUNTO DE ENTRADA ---
if __name__ == "__main__":
    # Datos simulados (algunos v√°lidos, otros inv√°lidos)
    raw_dataset = [
        {"user_id": 1, "username": "Neo", "email": "neo@matrix.com", "role": "admin"},
        {"user_id": 2, "username": "Trinity", "email": "trin@matrix.com", "role": "user"},
        {"user_id": 3, "username": "Morpheus", "email": "morph@matrix.com", "role": "admin"},
        {"user_id": 4, "username": "Cypher", "email": "bad-email", "role": "traitor"}, # Email inv√°lido
        {"user_id": 5, "username": "Smith", "email": "smith@matrix.com", "role": "virus"}, # Rol inv√°lido
    ]

    # Ejecuci√≥n del Event Loop
    # asyncio.run(run_pipeline(raw_dataset)) # Original line
    import nest_asyncio
    nest_asyncio.apply()
    await run_pipeline(raw_dataset) # Corrected for Colab/Jupyter environments

üöÄ Iniciando procesamiento de 5 usuarios...
‚ùå Error de validaci√≥n para entrada 4: 2 validation errors for UserData
email
  value is not a valid email address: An email address must have an @-sign. [type=value_error, input_value='bad-email', input_type=str]
role
  Value error, Rol debe ser uno de ['admin', 'user', 'guest'] [type=value_error, input_value='traitor', input_type=str]
    For further information visit https://errors.pydantic.dev/2.12/v/value_error
‚ùå Error de validaci√≥n para entrada 5: 1 validation error for UserData
role
  Value error, Rol debe ser uno de ['admin', 'user', 'guest'] [type=value_error, input_value='virus', input_type=str]
    For further information visit https://errors.pydantic.dev/2.12/v/value_error
‚úÖ Procesados correctamente: 3
üèÅ Pipeline finalizado en 2.00 segundos.
