# Arquitecturas NoSQL vs SQL para Diferentes Casos de Uso 

## 1. Análisis de requisitos y patrones de datos

In [1]:
# Requisitos para plataforma de streaming
requisitos_streaming = {
    'datos_transaccionales': {
        'suscripciones': 'ACID crítico, joins complejos',
        'pagos': 'Consistencia fuerte requerida',
        'usuarios': 'Datos maestros normalizados'
    },
    'datos_analiticos': {
        'eventos_reproduccion': 'Volumen masivo, time-series',
        'recomendaciones': 'Relaciones complejas entre usuarios/contenido',
        'analytics_contenido': 'Métricas agregadas variables'
    },
    'requisitos_performance': {
        'latencia_vista': '< 100ms para recomendaciones',
        'throughput_eventos': '1M eventos/segundo',
        'almacenamiento': '100PB datos históricos'
    }
}

print("REQUISITOS PLATAFORMA DE STREAMING")
print("=" * 35)

for categoria, detalles in requisitos_streaming.items():
    print(f"\n{categoria.upper().replace('_', ' ')}:")
    if isinstance(detalles, dict):
        for subcat, desc in detalles.items():
            print(f"  {subcat.title()}: {desc}")
    else:
        print(f"  {detalles}")

REQUISITOS PLATAFORMA DE STREAMING

DATOS TRANSACCIONALES:
  Suscripciones: ACID crítico, joins complejos
  Pagos: Consistencia fuerte requerida
  Usuarios: Datos maestros normalizados

DATOS ANALITICOS:
  Eventos_Reproduccion: Volumen masivo, time-series
  Recomendaciones: Relaciones complejas entre usuarios/contenido
  Analytics_Contenido: Métricas agregadas variables

REQUISITOS PERFORMANCE:
  Latencia_Vista: < 100ms para recomendaciones
  Throughput_Eventos: 1M eventos/segundo
  Almacenamiento: 100PB datos históricos


## 2. Selección de tecnologías por caso de uso

In [6]:
    # Arquitectura híbrida seleccionada
    arquitectura_hibrida = {
        'postgresql': {
            'rol': 'Base de datos transaccional principal',
            'casos_uso': ['Suscripciones', 'Pagos', 'Perfiles de usuario'],
            'justificacion': 'ACID para finanzas, joins complejos para billing',
            'escalabilidad': 'Vertical (hasta ~10TB)',
            'limitaciones': 'Escalabilidad horizontal limitada'
        },
        
        'cassandra': {
            'rol': 'Base de datos de eventos y analytics',
            'casos_uso': ['Eventos de reproducción', 'Métricas de usuario', 'Logs'],
            'justificacion': 'Escalabilidad horizontal masiva, writes de alto throughput',
            'escalabilidad': 'Horizontal ilimitada',
            'limitaciones': 'Queries complejas limitadas'
        },
        
        'neo4j': {
            'rol': 'Motor de recomendaciones y relaciones',
            'casos_uso': ['Sistema de recomendaciones', 'Análisis de afinidad', 'Detección de fraude'],
            'justificacion': 'Queries de relaciones complejas, algoritmos de grafos',
            'escalabilidad': 'Hasta ~100B nodos/relaciones',
            'limitaciones': 'No optimizado para agregaciones masivas'
        },
        
        'redis': {
            'rol': 'Caché y sesiones de alto performance',
            'casos_uso': ['Sesiones de usuario', 'Caché de recomendaciones', 'Leaderboards'],
            'justificacion': 'Latencia < 1ms, estructuras de datos ricas',
            'escalabilidad': 'Cluster horizontal',
            'limitaciones': 'Datos volátiles (reinicio borra datos)'
        },
        
        'elasticsearch': {
            'rol': 'Búsqueda y analytics de contenido',
            'casos_uso': ['Búsqueda de contenido', 'Analytics de catálogo', 'Logs estructurados'],
            'justificacion': 'Búsqueda full-text, agregaciones complejas, APIs REST',
            'escalabilidad': 'Horizontal con sharding',
            'limitaciones': 'No transaccional, eventual consistency'
        }
    }

    print("ARQUITECTURA HÍBRIDA SELECCIONADA") 
    print("=" * 40)
 
    for tecnologia, detalles in arquitectura_hibrida.items():
        print(f"\n{tecnologia.upper()}:")
        print(f"  Rol: {detalles['rol']}")
        print(f"  Casos de uso: {', '.join(detalles['casos_uso'])}")
        print(f"  Escalabilidad: {detalles['escalabilidad']}")
        print(f"  Limitaciones: {detalles['limitaciones']}")

ARQUITECTURA HÍBRIDA SELECCIONADA

POSTGRESQL:
  Rol: Base de datos transaccional principal
  Casos de uso: Suscripciones, Pagos, Perfiles de usuario
  Escalabilidad: Vertical (hasta ~10TB)
  Limitaciones: Escalabilidad horizontal limitada

CASSANDRA:
  Rol: Base de datos de eventos y analytics
  Casos de uso: Eventos de reproducción, Métricas de usuario, Logs
  Escalabilidad: Horizontal ilimitada
  Limitaciones: Queries complejas limitadas

NEO4J:
  Rol: Motor de recomendaciones y relaciones
  Casos de uso: Sistema de recomendaciones, Análisis de afinidad, Detección de fraude
  Escalabilidad: Hasta ~100B nodos/relaciones
  Limitaciones: No optimizado para agregaciones masivas

REDIS:
  Rol: Caché y sesiones de alto performance
  Casos de uso: Sesiones de usuario, Caché de recomendaciones, Leaderboards
  Escalabilidad: Cluster horizontal
  Limitaciones: Datos volátiles (reinicio borra datos)

ELASTICSEARCH:
  Rol: Búsqueda y analytics de contenido
  Casos de uso: Búsqueda de contenido,

## 3. Diseño de esquemas y patrones de consulta

In [8]:
postgresql_schema = """
-- PostgreSQL: Datos transaccionales críticos
CREATE TABLE suscripciones (
    id SERIAL PRIMARY KEY,
    usuario_id INTEGER REFERENCES usuarios(id),
    plan_id INTEGER REFERENCES planes(id),
    fecha_inicio DATE,
    fecha_fin DATE,
    estado VARCHAR(20),
    precio_mensual DECIMAL(8,2),
    metodo_pago VARCHAR(50)
);
"""

cassandra_schema = """
-- Cassandra: Eventos de reproducción (time-series)
CREATE KEYSPACE streaming WITH REPLICATION = {
    'class': 'NetworkTopologyStrategy',
    'datacenter1': 3
};

CREATE TABLE eventos_reproduccion (
    usuario_id UUID,
    contenido_id UUID,
    timestamp TIMESTAMP,
    duracion_reproducida INT,
    posicion_actual INT,
    dispositivo TEXT,
    calidad TEXT,
    PRIMARY KEY ((usuario_id, contenido_id), timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);
"""

neo4j_schema = """
// Neo4j: Grafo de relaciones usuario-contenido
CREATE (u:Usuario {id: 1, nombre: "Ana"});
CREATE (c:Contenido {id: 100, titulo: "Serie Drama", genero: "Drama"});

CREATE (u)-[:VIO {rating: 5, tiempo_completo: true}]->(c);
CREATE (u)-[:BUSCO_GENERO]->(:Genero {nombre: "Drama"});

// Query de recomendaciones
MATCH (u:Usuario {id: 1})-[:VIO]->(c1:Contenido)-[:DEL_GENERO]->(g:Genero)<-[:DEL_GENERO]-(c2:Contenido)
WHERE NOT (u)-[:VIO]->(c2)
RETURN c2.titulo, COUNT(*) AS afinidad
ORDER BY afinidad DESC
LIMIT 10;
"""

redis_commands = """
# Redis: Caché de recomendaciones
HSET recomendaciones:usuario:1 serie:100 0.95 serie:200 0.87 serie:150 0.82

# Sorted sets para trending
ZADD trending:series 154 serie:100
ZADD trending:series 128 serie:200
"""

elasticsearch_commands = """
# Elasticsearch: Búsqueda de contenido
PUT /contenido/_doc/100
{
  "titulo": "Serie Drama Completa",
  "genero": ["Drama", "Suspenso"],
  "actores": ["Actor A", "Actor B"],
  "descripcion": "Serie de drama intenso...",
  "rating_promedio": 4.5,
  "temporadas": 3
}

# Query de búsqueda
GET /contenido/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "multi_match": {
            "query": "drama",
            "fields": ["titulo", "descripcion"]
          }
        },
        {
          "terms": {
            "genero": ["Drama", "Suspenso"]
          }
        }
      ],
      "filter": {
        "range": {
          "rating_promedio": { "gte": 4.0 }
        }
      }
    }
  }
}
"""

# ==============================
# Mostrar contenido (ejecutable)
# ==============================

print("=== POSTGRESQL ===")
print(postgresql_schema)

print("\n=== CASSANDRA ===")
print(cassandra_schema)

print("\n=== NEO4J ===")
print(neo4j_schema)

print("\n=== REDIS ===")
print(redis_commands)

print("\n=== ELASTICSEARCH ===")
print(elasticsearch_commands)


=== POSTGRESQL ===

-- PostgreSQL: Datos transaccionales críticos
CREATE TABLE suscripciones (
    id SERIAL PRIMARY KEY,
    usuario_id INTEGER REFERENCES usuarios(id),
    plan_id INTEGER REFERENCES planes(id),
    fecha_inicio DATE,
    fecha_fin DATE,
    estado VARCHAR(20),
    precio_mensual DECIMAL(8,2),
    metodo_pago VARCHAR(50)
);


=== CASSANDRA ===

-- Cassandra: Eventos de reproducción (time-series)
CREATE KEYSPACE streaming WITH REPLICATION = {
    'class': 'NetworkTopologyStrategy',
    'datacenter1': 3
};

CREATE TABLE eventos_reproduccion (
    usuario_id UUID,
    contenido_id UUID,
    timestamp TIMESTAMP,
    duracion_reproducida INT,
    posicion_actual INT,
    dispositivo TEXT,
    calidad TEXT,
    PRIMARY KEY ((usuario_id, contenido_id), timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);


=== NEO4J ===

// Neo4j: Grafo de relaciones usuario-contenido
CREATE (u:Usuario {id: 1, nombre: "Ana"});
CREATE (c:Contenido {id: 100, titulo: "Serie Drama", genero: "

## 4. Define las clases MOCK

In [19]:
import json
from datetime import datetime

# ==============================
# MOCKS de infraestructura
# ==============================

class PostgreSQL:
    """Simulación de base de datos transaccional"""
    def __init__(self):
        self.users = {123: {"id": 123, "name": "Usuario Demo"}}
        self.plans = {1: {"id": 1, "active": True}}
        self.subscriptions = []

    def user_exists(self, user_id):
        return user_id in self.users

    def plan_available(self, plan_id):
        return plan_id in self.plans and self.plans[plan_id]["active"]

    def create_subscription(self, data):
        subscription = {
            "id": len(self.subscriptions) + 1,
            "user_id": data["user_id"],
            "plan_id": data["plan_id"],
            "payment_id": data["payment_id"],
            "created_at": datetime.utcnow().isoformat()
        }
        self.subscriptions.append(subscription)
        return subscription


class Cassandra:
    """Simulación de read model"""
    def __init__(self):
        self.user_profiles = {}
        self.recommendations = {
            123: [
                {"content_id": 100, "score": 0.95},
                {"content_id": 200, "score": 0.87}
            ]
        }

    def update_user_profile(self, user_id, data):
        self.user_profiles[user_id] = data

    def get_user_recommendations(self, user_id):
        return self.recommendations.get(user_id, [])


class Redis:
    """Simulación de cache"""
    def __init__(self):
        self.store = {}

    def get(self, key):
        return self.store.get(key)

    def setex(self, key, ttl, value):
        self.store[key] = value

    def delete(self, key):
        self.store.pop(key, None)


class Elasticsearch:
    """Simulación de motor de búsqueda"""
    def search(self, query):
        return [{"content_id": 100, "title": "Serie Drama Completa"}]


## 4.1 Implementación de patrón CQRS

In [None]:
# Implementación CQRS para plataforma de streaming
class StreamingCQRS:
    def __init__(self):
        self.command_db = PostgreSQL()  # Writes normalizados
        self.query_db = Cassandra()     # Reads optimizados
        self.cache = Redis()
        self.search = Elasticsearch()

    def process_payment(self, payment_info):
        if payment_info.get("token"):
            return {"success": True, "id": "PAY_" + datetime.utcnow().strftime("%Y%m%d%H%M%S")}
        return {"success": False}

    def publish_event(self, event_name, payload):
        if event_name == "SubscriptionCreated":
            self.handle_subscription_created(payload)

    # Command side
    def create_subscription(self, user_id, plan_id, payment_info):
        if not self.command_db.user_exists(user_id):
            raise ValueError("Usuario no existe")

        if not self.command_db.plan_available(plan_id):
            raise ValueError("Plan no disponible")

        payment_result = self.process_payment(payment_info)

        if payment_result["success"]:
            subscription = self.command_db.create_subscription({
                "user_id": user_id,
                "plan_id": plan_id,
                "payment_id": payment_result["id"]
            })
            self.publish_event("SubscriptionCreated", subscription)
            return subscription

        raise ValueError("Pago fallido")

    # Query side
    def get_user_recommendations(self, user_id):
        cache_key = f"recommendations:{user_id}"
        cached = self.cache.get(cache_key)

        if cached:
            return json.loads(cached)

        recommendations = self.query_db.get_user_recommendations(user_id)
        self.cache.setex(cache_key, 3600, json.dumps(recommendations))
        return recommendations

    # Event handler
    def handle_subscription_created(self, event):
        self.query_db.update_user_profile(event["user_id"], {
            "subscription_active": True,
            "plan_id": event["plan_id"],
            "subscription_date": event["created_at"]
        })
        self.cache.delete(f"user_profile:{event['user_id']}")
        self.cache.delete(f"recommendations:{event['user_id']}")


## 4.2 Uso del sistema

In [21]:
cqrs = StreamingCQRS()

subscription = cqrs.create_subscription(
    user_id=123,
    plan_id=1,
    payment_info={"token": "tok_test_123"}
)

recommendations = cqrs.get_user_recommendations(user_id=123)

print("✅ Suscripción creada:")
print(subscription)

print("\n✅ Recomendaciones:")
print(recommendations)

print("\n✅ Perfil en Read Model:")
print(cqrs.query_db.user_profiles)


✅ Suscripción creada:
{'id': 1, 'user_id': 123, 'plan_id': 1, 'payment_id': 'PAY_20260110025322', 'created_at': '2026-01-10T02:53:22.745676'}

✅ Recomendaciones:
[{'content_id': 100, 'score': 0.95}, {'content_id': 200, 'score': 0.87}]

✅ Perfil en Read Model:
{123: {'subscription_active': True, 'plan_id': 1, 'subscription_date': '2026-01-10T02:53:22.745676'}}


## 5. Verificación: 
¿Por qué elegirías esta arquitectura híbrida sobre un sistema SQL puro o NoSQL puro? 
- Se elige la arquitectura híbrida porque una plataforma de streaming maneja datos con requerimientos muy distintos. Los datos transaccionales críticos, como pagos y suscripciones, requieren consistencia fuerte y garantías ACID, lo que justifica el uso de bases de datos SQL. En cambio, los eventos de reproducción y métricas generan grandes volúmenes de datos que necesitan alta escalabilidad y throughput, adecuados para sistemas NoSQL. Además, las recomendaciones se benefician de bases de datos de grafos y la búsqueda de motores especializados. 

¿Qué desafíos introduciría esta complejidad adicional y cómo los mitigarías?
- La arquitectura híbrida incrementa la complejidad operativa y la dificultad de mantener la consistencia entre sistemas. También introduce desafíos en el monitoreo y el diagnóstico de errores. Estos riesgos se mitigan aplicando patrones como CQRS y arquitectura orientada a eventos. El uso de caché, automatización y la observación permite controlar la complejidad y asegurar la estabilidad del sistema.