Ejercicio: Diseño de arquitectura híbrida para plataforma de streaming de video

Análisis de requisitos y patrones de datos:

In [None]:
# Requisitos para plataforma de streaming
requisitos_streaming = {
    'datos_transaccionales': {
        'suscripciones': 'ACID crítico, joins complejos',
        'pagos': 'Consistencia fuerte requerida',
        'usuarios': 'Datos maestros normalizados'
    },
    'datos_analiticos': {
        'eventos_reproduccion': 'Volumen masivo, time-series',
        'recomendaciones': 'Relaciones complejas entre usuarios/contenido',
        'analytics_contenido': 'Métricas agregadas variables'
    },
    'requisitos_performance': {
        'latencia_vista': '< 100ms para recomendaciones',
        'throughput_eventos': '1M eventos/segundo',
        'almacenamiento': '100PB datos históricos'
    }
}

print("REQUISITOS PLATAFORMA DE STREAMING")
print("=" * 35)

for categoria, detalles in requisitos_streaming.items():
    print(f"\n{categoria.upper().replace('_', ' ')}:")
    if isinstance(detalles, dict):
        for subcat, desc in detalles.items():
            print(f"  {subcat.title()}: {desc}")
    else:
        print(f"  {detalles}")

Selección de tecnologías por caso de uso:

In [None]:
# Arquitectura híbrida seleccionada
arquitectura_hibrida = {
    'postgresql': {
        'rol': 'Base de datos transaccional principal',
        'casos_uso': ['Suscripciones', 'Pagos', 'Perfiles de usuario'],
        'justificacion': 'ACID para finanzas, joins complejos para billing',
        'escalabilidad': 'Vertical (hasta ~10TB)',
        'limitaciones': 'Escalabilidad horizontal limitada'
    },
    
    'cassandra': {
        'rol': 'Base de datos de eventos y analytics',
        'casos_uso': ['Eventos de reproducción', 'Métricas de usuario', 'Logs'],
        'justificacion': 'Escalabilidad horizontal masiva, writes de alto throughput',
        'escalabilidad': 'Horizontal ilimitada',
        'limitaciones': 'Queries complejas limitadas'
    },
    
    'neo4j': {
        'rol': 'Motor de recomendaciones y relaciones',
        'casos_uso': ['Sistema de recomendaciones', 'Análisis de afinidad', 'Detección de fraude'],
        'justificacion': 'Queries de relaciones complejas, algoritmos de grafos',
        'escalabilidad': 'Hasta ~100B nodos/relaciones',
        'limitaciones': 'No optimizado para agregaciones masivas'
    },
    
    'redis': {
        'rol': 'Caché y sesiones de alto performance',
        'casos_uso': ['Sesiones de usuario', 'Caché de recomendaciones', 'Leaderboards'],
        'justificacion': 'Latencia < 1ms, estructuras de datos ricas',
        'escalabilidad': 'Cluster horizontal',
        'limitaciones': 'Datos volátiles (reinicio borra datos)'
    },
    
    'elasticsearch': {
        'rol': 'Búsqueda y analytics de contenido',
        'casos_uso': ['Búsqueda de contenido', 'Analytics de catálogo', 'Logs estructurados'],
        'justificacion': 'Búsqueda full-text, agregaciones complejas, APIs REST',
        'escalabilidad': 'Horizontal con sharding',
        'limitaciones': 'No transaccional, eventual consistency'
    }
}

print("ARQUITECTURA HÍBRIDA SELECCIONADA") 
print("=" * 40)
for tecnologia, detalles in arquitectura_hibrida.items(): 
    print(f"\n{tecnologia.upper()}:") 
    print(f" Rol: {detalles['rol']}") 
    print(f" Casos de uso: {', '.join(detalles['casos_uso'])}") 
    print(f" Escalabilidad: {detalles['escalabilidad']}")



Implementación de patrón CQRS:

In [13]:
import json

class PostgreSQL:
    def user_exists(self, user_id):
        return True
    
    def plan_available(self, plan_id):
        return True
    
    def create_subscription(self, data):
        return {
            **data,
            'id': 1,
            'created_at': '2026-01-01'
        }

class Cassandra:
    def get_user_recommendations(self, user_id):
        return ['movie_1', 'movie_2']
    
    def update_user_profile(self, user_id, data):
        pass

class Redis:
    def __init__(self):
        self.store = {}
    
    def get(self, key):
        return self.store.get(key)
    
    def setex(self, key, ttl, value):
        self.store[key] = value
    
    def delete(self, key):
        self.store.pop(key, None)

class Elasticsearch:
    pass


# Implementación CQRS para plataforma de streaming
class StreamingCQRS:
    def __init__(self):
        self.command_db = PostgreSQL()   # Writes normalizados
        self.query_db = Cassandra()      # Reads optimizados
        self.cache = Redis()
        self.search = Elasticsearch()
    
    # ---------- Command side ----------
    def create_subscription(self, user_id, plan_id, payment_info):
        """Crear suscripción - lado comando"""

        if not self.command_db.user_exists(user_id):
            raise ValueError("Usuario no existe")
        
        if not self.command_db.plan_available(plan_id):
            raise ValueError("Plan no disponible")
        
        payment_result = self.process_payment(payment_info)
        
        if not payment_result['success']:
            raise ValueError("Pago fallido")

        subscription = self.command_db.create_subscription({
            'user_id': user_id,
            'plan_id': plan_id,
            'payment_id': payment_result['id']
        })
        
        self.publish_event('SubscriptionCreated', subscription)
        return subscription

    def process_payment(self, payment_info):
        """Simulación de pago externo"""
        return {
            'success': True,
            'id': 'PAY-001'
        }

    def publish_event(self, event_type, payload):
        """Simulación de publicación de eventos"""
        if event_type == 'SubscriptionCreated':
            self.handle_subscription_created(payload)

    # ---------- Query side ----------
    def get_user_recommendations(self, user_id):
        """Obtener recomendaciones - lado query"""

        cache_key = f"recommendations:{user_id}"
        cached = self.cache.get(cache_key)
        
        if cached:
            return json.loads(cached)
        
        recommendations = self.query_db.get_user_recommendations(user_id)
        self.cache.setex(cache_key, 3600, json.dumps(recommendations))
        return recommendations

    # ---------- Event handler ----------
    def handle_subscription_created(self, event):
        """Actualizar read models"""

        self.query_db.update_user_profile(event['user_id'], {
            'subscription_active': True,
            'plan_id': event['plan_id'],
            'subscription_date': event['created_at']
        })
        
        self.cache.delete(f"user_profile:{event['user_id']}")
        self.cache.delete(f"recommendations:{event['user_id']}")


# ---------------- USO DEL SISTEMA ----------------
cqrs = StreamingCQRS()

subscription = cqrs.create_subscription(
    user_id=123,
    plan_id=1,
    payment_info={}
)

recommendations = cqrs.get_user_recommendations(user_id=123)

print("Suscripción creada:", subscription)
print("Recomendaciones:", recommendations)


Suscripción creada: {'user_id': 123, 'plan_id': 1, 'payment_id': 'PAY-001', 'id': 1, 'created_at': '2026-01-01'}
Recomendaciones: ['movie_1', 'movie_2']


In [None]:
Verificación: ¿Por qué elegirías esta arquitectura híbrida sobre un sistema SQL puro o NoSQL puro? 
¿Qué desafíos introduciría esta complejidad adicional y cómo los mitigarías?

Respuesta 1: ¿Por qué elegir una arquitectura híbrida y no solo SQL o NoSQL?

Porque una plataforma de streaming maneja distintos tipos de datos con patrones de acceso muy diferentes:

SQL (PostgreSQL) es ideal para datos transaccionales críticos (suscripciones, pagos) por su consistencia 
ACID y soporte de joins.

NoSQL (Cassandra, Neo4j, Redis, Elasticsearch) es más adecuado para alto volumen, baja latencia, eventos 
time-series, relaciones complejas y búsqueda.

Una arquitectura híbrida permite usar cada tecnología donde es más eficiente, algo que un sistema único 
no puede resolver de forma óptima.

Respuesta 2: Qué desafíos introduce esta complejidad y cómo se mitigan?

Mayor complejidad operativa
Mitigación: separación de responsabilidades mediante CQRS y arquitectura por servicios.

Consistencia eventual entre sistemas
Mitigación: uso de eventos, caché con TTL y aceptación de consistencia eventual en lecturas no críticas.

Curva de aprendizaje tecnológica
Mitigación: modelar primero las consultas, documentar esquemas y comenzar con casos de uso simples.