# 🔗 Spark + NoSQL Integration - Análisis Distribuido

## 📋 **Guía Completa de Integración Big Data**

### 🎯 **Objetivos del Notebook:**
1. **Integrar Apache Spark** con HBase y Cassandra
2. **Implementar análisis distribuido** sobre datos NoSQL
3. **Optimizar rendimiento** de consultas híbridas
4. **Casos de uso reales** de Big Data Analytics
5. **Mejores prácticas** de arquitectura distribuida

### 🏗️ **Arquitectura de Integración:**

```
┌─────────────────────────────────────────────────────────────────┐
│                    SPARK + NoSQL ECOSYSTEM                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐    ┌─────────────────┐                    │
│  │   Spark Driver  │    │  Spark Workers  │                    │
│  │                 │    │                 │                    │
│  │ ┌─────────────┐ │    │ ┌─────────────┐ │                    │
│  │ │ SparkSession│ │◄──►│ │  Executors  │ │                    │
│  │ │             │ │    │ │             │ │                    │
│  │ │ • Catalyst  │ │    │ │ • Tasks     │ │                    │
│  │ │ • Optimizer │ │    │ │ • Cache     │ │                    │
│  │ └─────────────┘ │    │ └─────────────┘ │                    │
│  └─────────────────┘    └─────────────────┘                    │
│           │                       │                            │
│           └───────────┬───────────┘                            │
│                       │                                        │
│  ┌────────────────────┼────────────────────┐                   │
│  │              CONNECTORS                │                   │
│  │                    │                    │                   │
│  │  ┌─────────────────┼─────────────────┐  │                   │
│  │  │    HBase        │   Cassandra     │  │                   │
│  │  │   Connector     │   Connector     │  │                   │
│  │  │                 │                 │  │                   │
│  │  │ • spark-hbase   │ • spark-cass    │  │                   │
│  │  │ • TableInputs   │ • CQL Support   │  │                   │
│  │  │ • Bulk Ops      │ • Token Aware   │  │                   │
│  │  └─────────────────┼─────────────────┘  │                   │
│  └────────────────────┼────────────────────┘                   │
│                       │                                        │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    DATA SOURCES                         │   │
│  │                                                         │   │
│  │  ┌─────────────────┐      ┌─────────────────────────┐   │   │
│  │  │     HBase       │      │       Cassandra        │   │   │
│  │  │                 │      │                         │   │   │
│  │  │ • Column Store  │      │ • Wide Column Store     │   │   │
│  │  │ • HDFS Storage  │      │ • Distributed Nodes     │   │   │
│  │  │ • Strong Cons.  │      │ • Eventual Cons.        │   │   │
│  │  │ • Real-time     │      │ • High Throughput       │   │   │
│  │  └─────────────────┘      └─────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
```

### 🎯 **Casos de Uso Cubiertos:**

#### **1. 📊 Real-time Analytics**
- **Fuente**: Datos streaming en HBase
- **Procesamiento**: Agregaciones con Spark
- **Destino**: Dashboards en tiempo real

#### **2. 🌐 IoT Data Processing**
- **Fuente**: Sensores → Cassandra
- **Procesamiento**: ML con Spark MLlib
- **Destino**: Alertas y predicciones

#### **3. 📈 Business Intelligence**
- **Fuente**: Transacciones en HBase
- **Procesamiento**: ETL con Spark SQL
- **Destino**: Data Warehouse

#### **4. 🔍 Log Analytics**
- **Fuente**: Logs distribuidos en Cassandra
- **Procesamiento**: Pattern mining con Spark
- **Destino**: Monitoreo y alertas

### 📊 **Beneficios de la Integración:**

| Beneficio | Spark + HBase | Spark + Cassandra |
|-----------|---------------|-------------------|
| **Consistencia** | Strong (ACID) | Tunable (BASE) |
| **Latencia** | Baja (< 100ms) | Ultra-baja (< 10ms) |
| **Throughput** | Alto (GB/s) | Muy Alto (TB/s) |
| **Escalabilidad** | Vertical + Horizontal | Horizontal Lineal |
| **Casos de Uso** | OLTP + Analytics | IoT + Time Series |

---


In [None]:
# 🚀 CONFIGURACIÓN INICIAL - SPARK + NoSQL INTEGRATION

import findspark
findspark.init('/opt/spark')

import os
import sys
import time
import warnings
from datetime import datetime
import subprocess

# Suprimir warnings
warnings.filterwarnings('ignore')

# Importar librerías de Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

print("🔗 INICIANDO CONFIGURACIÓN SPARK + NoSQL INTEGRATION")
print("="*60)

# Verificar servicios requeridos
services_status = {}
required_services = {
    "master": ("Spark Master", "7077"),
    "hbase": ("HBase Thrift", "9090"), 
    "cassandra": ("Cassandra Native", "9042")
}

print("\n1️⃣ VERIFICANDO SERVICIOS REQUERIDOS:")

import socket
for service, (description, port) in required_services.items():
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        result = sock.connect_ex((service, int(port)))
        sock.close()
        
        if result == 0:
            print(f"   ✅ {description}: Disponible ({service}:{port})")
            services_status[service] = True
        else:
            print(f"   ❌ {description}: No disponible ({service}:{port})")
            services_status[service] = False
    except Exception as e:
        print(f"   ⚠️ {description}: Error - {e}")
        services_status[service] = False

# Resumen de servicios
available_services = sum(services_status.values())
total_services = len(services_status)
print(f"\n📊 Servicios disponibles: {available_services}/{total_services}")

if available_services >= 2:
    print("✅ Configuración mínima disponible para continuar")
else:
    print("⚠️ Servicios insuficientes - algunas funciones pueden no estar disponibles")

# Información del entorno
print(f"\n2️⃣ INFORMACIÓN DEL ENTORNO:")
print(f"   🐍 Python: {sys.version.split()[0]}")
print(f"   🏠 Spark Home: {os.environ.get('SPARK_HOME', '/opt/spark')}")
print(f"   🖥️ Hostname: {os.environ.get('HOSTNAME', 'jupyterlab')}")
print(f"   ⏰ Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print("\n🎯 Configuración inicial completada")
print("🚀 Listo para crear sesión Spark integrada...")


In [None]:
# 🚀 CONFIGURACIÓN INICIAL Y VERIFICACIÓN DEL ENTORNO

import findspark
import os
import sys
import subprocess
import warnings
from datetime import datetime, date
import time

# Configurar findspark
findspark.init('/opt/spark')

# Importar librerías de PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

# Suprimir warnings
warnings.filterwarnings('ignore')

print("🔗 SPARK + NOSQL INTEGRATION - CONFIGURACIÓN INICIAL")
print("="*60)

# Información del entorno
print(f"📍 Spark Home: {os.environ.get('SPARK_HOME', '/opt/spark')}")
print(f"🐍 Python Version: {sys.version.split()[0]}")
print(f"🖥️ Hostname: {os.environ.get('HOSTNAME', 'jupyterlab')}")
print(f"⏰ Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# Verificar servicios NoSQL
print("\n🔍 VERIFICANDO SERVICIOS NOSQL:")

import socket

services_to_check = {
    "hbase": ("HBase Thrift", "9090"),
    "cassandra": ("Cassandra Native", "9042"),
    "master": ("Spark Master", "7077")
}

services_status = {}
for service, (description, port) in services_to_check.items():
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        result = sock.connect_ex((service, int(port)))
        sock.close()
        
        if result == 0:
            print(f"   ✅ {description}: Disponible en {service}:{port}")
            services_status[service] = True
        else:
            print(f"   ❌ {description}: No disponible en {service}:{port}")
            services_status[service] = False
    except Exception as e:
        print(f"   ⚠️ {description}: Error verificando - {e}")
        services_status[service] = False

print(f"\n📊 Servicios disponibles: {sum(services_status.values())}/{len(services_status)}")
print("✅ Configuración inicial completada")


In [None]:
# 🚀 CREAR SESIÓN SPARK OPTIMIZADA PARA INTEGRACIÓN NOSQL

print("🚀 CREANDO SESIÓN SPARK PARA INTEGRACIÓN NOSQL...")
print("="*55)

# Detener cualquier sesión existente
try:
    spark.stop()
    print("🛑 Sesión anterior detenida correctamente")
except:
    print("🔍 No había sesión activa")

# CONFIGURACIÓN OPTIMIZADA PARA INTEGRACIÓN NOSQL
# Incluye configuraciones específicas para conectores NoSQL
spark = SparkSession.builder \
    .appName("EducacionIT-Spark-NoSQL-Integration") \
    .master("spark://master:7077") \
    .config("spark.driver.memory", "1200m") \
    .config("spark.driver.cores", "2") \
    .config("spark.driver.maxResultSize", "500m") \
    .config("spark.executor.memory", "800m") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.memoryFraction", "0.8") \
    .config("spark.executor.memoryStorageFraction", "0.5") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.shuffle.service.enabled", "false") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("spark.network.timeout", "300s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.connection.keep_alive_ms", "30000") \
    .config("spark.cassandra.connection.timeout_ms", "30000") \
    .config("spark.hbase.host", "hbase") \
    .config("spark.hbase.port", "9090") \
    .enableHiveSupport() \
    .getOrCreate()

# Configurar nivel de logging
spark.sparkContext.setLogLevel("WARN")

# Información de la sesión
print("\n🎉 ¡SESIÓN SPARK PARA NOSQL CREADA EXITOSAMENTE!")
print("="*60)
print(f"🏷️  Aplicación: {spark.sparkContext.appName}")
print(f"🔗 Master URL: {spark.sparkContext.master}")
print(f"📊 Spark UI: {spark.sparkContext.uiWebUrl}")
print(f"🎯 App ID: {spark.sparkContext.applicationId}")
print(f"🚀 Versión Spark: {spark.version}")

# Verificar configuración específica para NoSQL
print("\n📋 CONFIGURACIÓN NOSQL:")
print(f"   🌟 Cassandra Host: {spark.conf.get('spark.cassandra.connection.host')}")
print(f"   🔌 Cassandra Port: {spark.conf.get('spark.cassandra.connection.port')}")
print(f"   🏛️ HBase Host: {spark.conf.get('spark.hbase.host')}")
print(f"   🔌 HBase Port: {spark.conf.get('spark.hbase.port')}")

# Verificar estado del cluster
print("\n🔍 ESTADO DEL CLUSTER SPARK:")
sc = spark.sparkContext

try:
    # Información de executors
    executor_infos = sc.statusTracker().getExecutorInfos()
    active_executors = len([e for e in executor_infos if e.isActive])
    print(f"   ⚡ Executors activos: {active_executors}")
    print(f"   🖥️ Total executors: {len(executor_infos)}")
    
    # Test rápido de conectividad
    test_rdd = sc.parallelize([1, 2, 3, 4, 5])
    result = test_rdd.sum()
    print(f"   ✅ Test de conectividad: Suma = {result}")
    
except Exception as e:
    print(f"   ⚠️ Error verificando cluster: {e}")

print("\n🌟 URLs de Monitoreo:")
print("   🎛️ Spark Master UI: http://localhost:8080")
print("   📊 Spark Driver UI: http://localhost:4040")

print("\n✅ Spark configurado para integración NoSQL")


## 🔗 **Tutorial Paso a Paso: Integración Spark + HBase**

### 🎯 **Objetivo:**
Aprender a leer datos desde HBase usando Spark, realizar transformaciones distribuidas y escribir resultados de vuelta a HBase.

### 📋 **Casos de Uso Reales:**
1. **ETL desde HBase**: Extraer datos de HBase, transformarlos y cargarlos en otro sistema
2. **Análisis en tiempo real**: Procesar datos de HBase con Spark para analytics
3. **Agregaciones complejas**: Usar Spark SQL para consultas complejas sobre datos HBase
4. **Data Lake Integration**: Combinar datos de HBase con datos de HDFS/S3

### 🏗️ **Arquitectura de Integración:**

```
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Spark Driver  │◄──►│  HBase Client   │◄──►│  HBase Cluster  │
│                 │    │   (happybase)   │    │                 │
│ • SparkSession  │    │ • Thrift API    │    │ • RegionServers │
│ • DataFrame API │    │ • Connection    │    │ • HDFS Storage  │
│ • SQL Engine   │    │ • Table Ops     │    │ • Zookeeper     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘
                                 │
                    ┌─────────────────┐
                    │  Spark Workers  │
                    │                 │
                    │ • Data Parallel │
                    │ • Executors     │
                    │ • Task Exec     │
                    └─────────────────┘
```

### 💡 **Mejores Prácticas:**
- **Partitioning**: Diseñar row keys para distribución uniforme
- **Batch Processing**: Usar batch operations para mejor rendimiento
- **Caching**: Cache DataFrames frecuentemente usados
- **Resource Management**: Configurar memoria adecuadamente

---


In [None]:
# 🏛️ INTEGRACIÓN SPARK + HBASE: LECTURA Y ANÁLISIS DE DATOS (CONFIGURACIÓN SIMPLE)

print("🏛️ INICIANDO INTEGRACIÓN SPARK + HBASE...")
print("="*50)

# Instalar happybase si no está disponible
try:
    import happybase
    print("✅ HappyBase ya está disponible")
except ImportError:
    print("📦 Instalando HappyBase...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "happybase"])
    import happybase
    print("✅ HappyBase instalado correctamente")

# Establecer conexión con HBase usando configuración simple
print("\n🔗 CONECTANDO A HBASE CON CONFIGURACIÓN SIMPLE:")

try:
    # CONFIGURACIÓN EXACTAMENTE IGUAL AL TUTORIAL ORIGINAL QUE FUNCIONABA
    hbase_connection = happybase.Connection(
        host='hbase',  # Nombre del servicio en Docker
        port=9090,     # Puerto Thrift
        timeout=10000
    )
    
    # Test de conexión básico (igual al original)
    tables = list(hbase_connection.tables())
    print("🎉 ¡Conexión a HBase exitosa!")
    print(f"📊 Tablas existentes: {len(tables)}")
    
    # PASO 1: LEER DATOS DE HBASE Y CONVERTIR A SPARK DATAFRAME
    print("\n1️⃣ LEYENDO DATOS DE HBASE Y CREANDO SPARK DATAFRAME:")
    
    # Verificar si existe la tabla de productos (del notebook anterior)
    existing_tables = [table.decode('utf-8') for table in hbase_connection.tables()]
    
    if 'productos_ecommerce' not in existing_tables:
        print("⚠️ Tabla 'productos_ecommerce' no existe. Creándola con datos de ejemplo...")
        
        # Crear tabla y datos de ejemplo
        column_families = {
            'info': {'compression': 'SNAPPY'},
            'inventory': {'compression': 'SNAPPY'},
            'sales': {'compression': 'SNAPPY'}
        }
        
        hbase_connection.create_table('productos_ecommerce', column_families)
        table = hbase_connection.table('productos_ecommerce')
        
        # Insertar datos de ejemplo
        sample_data = [
            ('PROD_001', {
                b'info:name': b'Laptop Gaming Pro',
                b'info:category': b'Electronics',
                b'info:price': b'1299.99',
                b'inventory:stock': b'25',
                b'sales:total_sold': b'156',
                b'sales:revenue': b'202798.44'
            }),
            ('PROD_002', {
                b'info:name': b'Smartphone Ultra',
                b'info:category': b'Electronics', 
                b'info:price': b'899.99',
                b'inventory:stock': b'45',
                b'sales:total_sold': b'289',
                b'sales:revenue': b'260097.11'
            }),
            ('PROD_003', {
                b'info:name': b'Wireless Headphones',
                b'info:category': b'Audio',
                b'info:price': b'199.99',
                b'inventory:stock': b'78',
                b'sales:total_sold': b'432',
                b'sales:revenue': b'86395.68'
            })
        ]
        
        for row_key, data in sample_data:
            table.put(row_key, data)
        
        print("✅ Tabla y datos de ejemplo creados")
    else:
        table = hbase_connection.table('productos_ecommerce')
        print("✅ Usando tabla existente 'productos_ecommerce'")
    
    # Leer datos de HBase y convertir a lista para Spark DataFrame
    print("\n📊 EXTRAYENDO DATOS DE HBASE:")
    
    hbase_data = []
    for key, data in table.scan():
        row_dict = {'product_id': key.decode('utf-8')}
        
        # Extraer datos de diferentes column families
        for col_key, col_value in data.items():
            family, qualifier = col_key.decode('utf-8').split(':')
            column_name = f"{family}_{qualifier}"
            row_dict[column_name] = col_value.decode('utf-8')
        
        hbase_data.append(row_dict)
    
    print(f"✅ Extraídos {len(hbase_data)} registros de HBase")
    
    # Mostrar datos extraídos
    print("\n📋 DATOS EXTRAÍDOS DE HBASE:")
    for i, row in enumerate(hbase_data, 1):
        print(f"   {i}. {row['product_id']}: {row.get('info_name', 'N/A')}")
    
    # PASO 2: CREAR SPARK DATAFRAME DESDE DATOS HBASE
    print("\n2️⃣ CREANDO SPARK DATAFRAME DESDE DATOS HBASE:")
    
    # Definir esquema para el DataFrame
    hbase_schema = StructType([
        StructField("product_id", StringType(), False),
        StructField("info_name", StringType(), True),
        StructField("info_category", StringType(), True),
        StructField("info_price", StringType(), True),
        StructField("inventory_stock", StringType(), True),
        StructField("sales_total_sold", StringType(), True),
        StructField("sales_revenue", StringType(), True)
    ])
    
    # Crear DataFrame desde los datos HBase
    hbase_df = spark.createDataFrame(hbase_data, schema=hbase_schema)
    
    print("✅ Spark DataFrame creado desde datos HBase")
    print(f"📊 Registros en DataFrame: {hbase_df.count()}")
    
    # Mostrar esquema y datos
    print("\n📋 ESQUEMA DEL DATAFRAME:")
    hbase_df.printSchema()
    
    print("\n📊 DATOS DEL DATAFRAME:")
    hbase_df.show(truncate=False)
    
    # PASO 3: TRANSFORMACIONES CON SPARK
    print("\n3️⃣ APLICANDO TRANSFORMACIONES CON SPARK:")
    
    # Convertir tipos de datos
    transformed_df = hbase_df \
        .withColumn("price", col("info_price").cast("double")) \
        .withColumn("stock", col("inventory_stock").cast("integer")) \
        .withColumn("total_sold", col("sales_total_sold").cast("integer")) \
        .withColumn("revenue", col("sales_revenue").cast("double")) \
        .withColumn("inventory_value", col("price") * col("stock")) \
        .select(
            col("product_id"),
            col("info_name").alias("name"),
            col("info_category").alias("category"),
            col("price"),
            col("stock"),
            col("total_sold"),
            col("revenue"),
            col("inventory_value")
        )
    
    print("✅ Transformaciones aplicadas")
    print("\n📊 DATAFRAME TRANSFORMADO:")
    transformed_df.show()
    
    # PASO 4: ANÁLISIS CON SPARK SQL
    print("\n4️⃣ ANÁLISIS CON SPARK SQL:")
    
    # Registrar DataFrame como tabla temporal
    transformed_df.createOrReplaceTempView("productos_hbase")
    
    # Análisis 1: Productos más vendidos
    print("\n📈 TOP PRODUCTOS MÁS VENDIDOS:")
    top_selling = spark.sql("""
        SELECT name, category, total_sold, revenue
        FROM productos_hbase
        ORDER BY total_sold DESC
    """)
    top_selling.show()
    
    # Análisis 2: Valor total del inventario por categoría
    print("\n💰 VALOR DEL INVENTARIO POR CATEGORÍA:")
    inventory_by_category = spark.sql("""
        SELECT 
            category,
            COUNT(*) as num_products,
            SUM(stock) as total_stock,
            SUM(inventory_value) as total_inventory_value,
            AVG(price) as avg_price
        FROM productos_hbase
        GROUP BY category
        ORDER BY total_inventory_value DESC
    """)
    inventory_by_category.show()
    
    # Análisis 3: ROI por producto
    print("\n📊 ROI (RETURN ON INVESTMENT) POR PRODUCTO:")
    roi_analysis = spark.sql("""
        SELECT 
            name,
            category,
            price,
            inventory_value,
            revenue,
            ROUND((revenue / inventory_value) * 100, 2) as roi_percentage
        FROM productos_hbase
        WHERE inventory_value > 0
        ORDER BY roi_percentage DESC
    """)
    roi_analysis.show()
    
    # PASO 5: AGREGACIONES AVANZADAS
    print("\n5️⃣ AGREGACIONES AVANZADAS:")
    
    # Estadísticas generales
    stats = transformed_df.agg(
        sum("revenue").alias("total_revenue"),
        sum("inventory_value").alias("total_inventory_value"),
        sum("total_sold").alias("total_units_sold"),
        avg("price").alias("avg_price"),
        count("*").alias("total_products")
    ).collect()[0]
    
    print("\n📊 ESTADÍSTICAS GENERALES:")
    print(f"   💰 Ingresos totales: ${stats['total_revenue']:,.2f}")
    print(f"   📦 Valor total inventario: ${stats['total_inventory_value']:,.2f}")
    print(f"   📈 Unidades vendidas: {stats['total_units_sold']:,}")
    print(f"   💵 Precio promedio: ${stats['avg_price']:,.2f}")
    print(f"   📱 Total productos: {stats['total_products']}")
    
    # Cache del DataFrame para consultas futuras
    transformed_df.cache()
    print("\n✅ DataFrame cacheado para consultas futuras")
    
    print("\n🎉 ¡Integración Spark + HBase completada exitosamente!")

except Exception as e:
    print(f"❌ Error en integración Spark + HBase: {e}")
    import traceback
    traceback.print_exc()


## 🌟 **Tutorial Paso a Paso: Integración Spark + Cassandra**

### 🎯 **Objetivo:**
Aprender a leer datos desde Cassandra usando Spark, realizar análisis distribuidos y aprovechar la escalabilidad de ambas tecnologías.

### 📋 **Casos de Uso Reales:**
1. **Real-time Analytics**: Procesar streams de datos almacenados en Cassandra
2. **Time-series Analysis**: Analizar datos temporales con Spark SQL
3. **Cross-datacenter Analytics**: Consultas distribuidas en múltiples regiones
4. **IoT Data Processing**: Procesar millones de eventos de sensores

### 🏗️ **Arquitectura de Integración:**

```
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Spark Driver  │◄──►│ Cassandra       │◄──►│  Cassandra Ring │
│                 │    │ Connector       │    │                 │
│ • SparkSession  │    │ • Token Aware   │    │ • Node 1, 2, 3  │
│ • DataFrame API │    │ • Load Balance  │    │ • Replication   │
│ • SQL Engine   │    │ • Pushdown      │    │ • Partitioning  │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘
                                 │
                    ┌─────────────────┐
                    │  Spark Workers  │
                    │                 │
                    │ • Parallel Read │
                    │ • Local Compute │
                    │ • Aggregation   │
                    └─────────────────┘
```

### 💡 **Ventajas de la Integración:**
- **Locality Awareness**: Spark ejecuta tareas cerca de los datos
- **Predicate Pushdown**: Filtros aplicados en Cassandra
- **Token Range Splitting**: Paralelización automática
- **Fault Tolerance**: Recuperación automática de fallos

---


In [None]:
# 🌟 INTEGRACIÓN SPARK + CASSANDRA: ANÁLISIS DISTRIBUIDO (CONFIGURACIÓN SIMPLE)

print("🌟 INICIANDO INTEGRACIÓN SPARK + CASSANDRA...")
print("="*52)

# Instalar driver de Cassandra si no está disponible
try:
    from cassandra.cluster import Cluster
    print("✅ Driver de Cassandra ya está disponible")
except ImportError:
    print("📦 Instalando driver de Cassandra...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "cassandra-driver"])
    from cassandra.cluster import Cluster
    print("✅ Driver de Cassandra instalado correctamente")

# Establecer conexión con Cassandra usando configuración simple
print("\n🔗 CONECTANDO A CASSANDRA CON CONFIGURACIÓN SIMPLE:")

try:
    # CONFIGURACIÓN EXACTAMENTE IGUAL AL TUTORIAL ORIGINAL QUE FUNCIONABA
    cluster = Cluster(
        contact_points=['cassandra'],  # Nombre del servicio en Docker
        port=9042
    )
    
    # Crear sesión
    session = cluster.connect()
    
    # Test de conexión (igual al original)
    result = session.execute("SELECT release_version FROM system.local")
    version = result.one()[0]
    
    print("🎉 ¡Conexión a Cassandra exitosa!")
    print(f"🔢 Versión de Cassandra: {version}")
    
    # Verificar si existe el keyspace del tutorial anterior
    keyspaces = session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
    existing_keyspaces = [ks.keyspace_name for ks in keyspaces]
    
    if 'ecommerce_nosql' not in existing_keyspaces:
        print("⚠️ Keyspace 'ecommerce_nosql' no existe. Creándolo con datos de ejemplo...")
        
        # Crear keyspace
        session.execute("""
            CREATE KEYSPACE ecommerce_nosql
            WITH REPLICATION = {
                'class': 'SimpleStrategy',
                'replication_factor': 1
            }
        """)
        
        session.set_keyspace('ecommerce_nosql')
        
        # Crear tabla de productos
        session.execute("""
            CREATE TABLE productos (
                producto_id text PRIMARY KEY,
                nombre text,
                categoria text,
                precio decimal,
                descripcion text,
                fecha_creacion timestamp,
                activo boolean
            )
        """)
        
        # Crear tabla de ventas por fecha
        session.execute("""
            CREATE TABLE ventas_por_fecha (
                fecha_venta date,
                hora_venta time,
                venta_id uuid,
                producto_id text,
                cantidad int,
                monto decimal,
                cliente_id text,
                PRIMARY KEY ((fecha_venta), hora_venta, venta_id)
            ) WITH CLUSTERING ORDER BY (hora_venta DESC, venta_id ASC)
        """)
        
        # Insertar datos de ejemplo
        import uuid
        from datetime import date, datetime
        
        # Productos
        productos_sample = [
            ("PROD_001", "MacBook Pro 16", "Laptops", 2499.99, "Laptop profesional", True),
            ("PROD_002", "iPhone 15 Pro", "Smartphones", 1199.99, "Smartphone premium", True),
            ("PROD_003", "iPad Air", "Tablets", 599.99, "Tablet ligera", True),
            ("PROD_004", "AirPods Pro", "Audio", 249.99, "Auriculares premium", True),
            ("PROD_005", "Apple Watch", "Wearables", 399.99, "Smartwatch avanzado", True)
        ]
        
        for pid, nombre, categoria, precio, desc, activo in productos_sample:
            session.execute("""
                INSERT INTO productos (producto_id, nombre, categoria, precio, descripcion, fecha_creacion, activo)
                VALUES (?, ?, ?, ?, ?, toTimestamp(now()), ?)
            """, [pid, nombre, categoria, precio, desc, activo])
        
        # Ventas (datos más ricos para análisis)
        ventas_sample = []
        for day in range(1, 16):  # 15 días de datos
            for hour in range(9, 18):  # Horario comercial
                for _ in range(2):  # 2 ventas por hora
                    ventas_sample.append((
                        f"2024-01-{day:02d}",
                        f"{hour}:{30 if _ == 0 else 45}:00",
                        str(uuid.uuid4()),
                        f"PROD_{(day + hour + _) % 5 + 1:03d}",
                        1 + (_ % 3),  # Cantidad 1-3
                        round(200 + (day * hour * 10) + (_ * 50), 2),  # Monto variable
                        f"CLIENTE_{(day * 100 + hour * 10 + _):05d}"
                    ))
        
        prepared_venta = session.prepare("""
            INSERT INTO ventas_por_fecha (fecha_venta, hora_venta, venta_id, producto_id, cantidad, monto, cliente_id)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """)
        
        for venta in ventas_sample:
            session.execute(prepared_venta, venta)
        
        print(f"✅ Keyspace creado con {len(productos_sample)} productos y {len(ventas_sample)} ventas")
    else:
        session.set_keyspace('ecommerce_nosql')
        print("✅ Usando keyspace existente 'ecommerce_nosql'")
    
    # PASO 1: LEER DATOS DE CASSANDRA USANDO SPARK
    print("\n1️⃣ LEYENDO DATOS DE CASSANDRA CON SPARK:")
    
    # Leer tabla de productos
    print("\n📊 CARGANDO PRODUCTOS DESDE CASSANDRA:")
    
    # Simular lectura de Cassandra (en un entorno real usaríamos spark-cassandra-connector)
    # Por ahora leemos directamente y convertimos a Spark DataFrame
    productos_cass = session.execute("SELECT * FROM productos")
    productos_data = []
    
    for producto in productos_cass:
        productos_data.append({
            'producto_id': producto.producto_id,
            'nombre': producto.nombre,
            'categoria': producto.categoria,
            'precio': float(producto.precio),
            'descripcion': producto.descripcion,
            'activo': producto.activo
        })
    
    print(f"✅ Extraídos {len(productos_data)} productos de Cassandra")
    
    # Crear DataFrame de Spark desde datos Cassandra
    productos_schema = StructType([
        StructField("producto_id", StringType(), False),
        StructField("nombre", StringType(), True),
        StructField("categoria", StringType(), True),
        StructField("precio", DoubleType(), True),
        StructField("descripcion", StringType(), True),
        StructField("activo", BooleanType(), True)
    ])
    
    productos_df = spark.createDataFrame(productos_data, schema=productos_schema)
    
    print("📋 PRODUCTOS CARGADOS EN SPARK:")
    productos_df.show()
    
    # Leer tabla de ventas
    print("\n💰 CARGANDO VENTAS DESDE CASSANDRA:")
    
    ventas_cass = session.execute("SELECT * FROM ventas_por_fecha")
    ventas_data = []
    
    for venta in ventas_cass:
        ventas_data.append({
            'fecha_venta': str(venta.fecha_venta),
            'hora_venta': str(venta.hora_venta),
            'venta_id': str(venta.venta_id),
            'producto_id': venta.producto_id,
            'cantidad': venta.cantidad,
            'monto': float(venta.monto),
            'cliente_id': venta.cliente_id
        })
    
    print(f"✅ Extraídas {len(ventas_data)} ventas de Cassandra")
    
    # Crear DataFrame de ventas
    ventas_schema = StructType([
        StructField("fecha_venta", StringType(), False),
        StructField("hora_venta", StringType(), False),
        StructField("venta_id", StringType(), False),
        StructField("producto_id", StringType(), True),
        StructField("cantidad", IntegerType(), True),
        StructField("monto", DoubleType(), True),
        StructField("cliente_id", StringType(), True)
    ])
    
    ventas_df = spark.createDataFrame(ventas_data, schema=ventas_schema)
    
    # Convertir fecha a formato Date
    ventas_df = ventas_df.withColumn("fecha", to_date(col("fecha_venta"), "yyyy-MM-dd"))
    
    print("📋 MUESTRA DE VENTAS CARGADAS EN SPARK:")
    ventas_df.select("fecha", "producto_id", "cantidad", "monto", "cliente_id").show(10)
    
    print("\n✅ Datos de Cassandra cargados exitosamente en Spark")

except Exception as e:
    print(f"❌ Error en integración Spark + Cassandra: {e}")
    import traceback
    traceback.print_exc()
    productos_df = None
    ventas_df = None


In [None]:
# 🔬 ANÁLISIS AVANZADO CON SPARK + CASSANDRA

if 'productos_df' in locals() and productos_df and 'ventas_df' in locals() and ventas_df:
    print("🔬 INICIANDO ANÁLISIS AVANZADO CON DATOS DE CASSANDRA...")
    print("="*60)
    
    # PASO 2: JOINS ENTRE DATAFRAMES DE CASSANDRA
    print("\n2️⃣ REALIZANDO JOINS ENTRE TABLAS DE CASSANDRA:")
    
    # Join entre productos y ventas
    ventas_con_productos = ventas_df.join(
        productos_df,
        ventas_df.producto_id == productos_df.producto_id,
        "inner"
    ).select(
        ventas_df.fecha,
        ventas_df.hora_venta,
        ventas_df.venta_id,
        productos_df.nombre.alias("producto_nombre"),
        productos_df.categoria,
        productos_df.precio.alias("precio_unitario"),
        ventas_df.cantidad,
        ventas_df.monto,
        ventas_df.cliente_id
    ).withColumn(
        "revenue_calculado", 
        col("precio_unitario") * col("cantidad")
    )
    
    print("✅ Join completado entre productos y ventas")
    print("\n📊 MUESTRA DE DATOS UNIDOS:")
    ventas_con_productos.show(10, truncate=False)
    
    # PASO 3: ANÁLISIS TEMPORAL DE VENTAS
    print("\n3️⃣ ANÁLISIS TEMPORAL DE VENTAS:")
    
    # Registrar como tablas temporales para SQL
    ventas_con_productos.createOrReplaceTempView("ventas_completas")
    productos_df.createOrReplaceTempView("productos_cassandra")
    ventas_df.createOrReplaceTempView("ventas_cassandra")
    
    # Análisis 1: Ventas por día
    print("\n📈 VENTAS DIARIAS:")
    ventas_diarias = spark.sql("""
        SELECT 
            fecha,
            COUNT(*) as num_transacciones,
            SUM(cantidad) as unidades_vendidas,
            SUM(monto) as revenue_total,
            AVG(monto) as ticket_promedio,
            COUNT(DISTINCT cliente_id) as clientes_unicos
        FROM ventas_completas
        GROUP BY fecha
        ORDER BY fecha
    """)
    
    ventas_diarias.show()
    
    # Análisis 2: Productos más vendidos
    print("\n🏆 TOP PRODUCTOS MÁS VENDIDOS:")
    top_productos = spark.sql("""
        SELECT 
            producto_nombre,
            categoria,
            COUNT(*) as num_ventas,
            SUM(cantidad) as unidades_totales,
            SUM(monto) as revenue_total,
            AVG(monto) as ticket_promedio
        FROM ventas_completas
        GROUP BY producto_nombre, categoria
        ORDER BY revenue_total DESC
    """)
    
    top_productos.show()
    
    # Análisis 3: Patrón horario de ventas
    print("\n⏰ PATRÓN HORARIO DE VENTAS:")
    patron_horario = spark.sql("""
        SELECT 
            SUBSTRING(hora_venta, 1, 2) as hora,
            COUNT(*) as num_ventas,
            SUM(monto) as revenue_total,
            AVG(monto) as ticket_promedio
        FROM ventas_completas
        GROUP BY SUBSTRING(hora_venta, 1, 2)
        ORDER BY hora
    """)
    
    patron_horario.show()
    
    # PASO 4: ANÁLISIS POR CATEGORÍAS
    print("\n4️⃣ ANÁLISIS DETALLADO POR CATEGORÍAS:")
    
    categoria_analysis = spark.sql("""
        SELECT 
            categoria,
            COUNT(DISTINCT producto_nombre) as productos_diferentes,
            COUNT(*) as total_transacciones,
            SUM(cantidad) as unidades_vendidas,
            SUM(monto) as revenue_total,
            AVG(monto) as ticket_promedio,
            MIN(monto) as venta_minima,
            MAX(monto) as venta_maxima,
            COUNT(DISTINCT cliente_id) as clientes_unicos
        FROM ventas_completas
        GROUP BY categoria
        ORDER BY revenue_total DESC
    """)
    
    categoria_analysis.show()
    
    # PASO 5: ANÁLISIS DE CLIENTES
    print("\n5️⃣ ANÁLISIS DE COMPORTAMIENTO DE CLIENTES:")
    
    cliente_analysis = spark.sql("""
        SELECT 
            COUNT(DISTINCT cliente_id) as total_clientes,
            AVG(compras_por_cliente) as avg_compras_por_cliente,
            AVG(gasto_por_cliente) as avg_gasto_por_cliente,
            MAX(compras_por_cliente) as max_compras_cliente,
            MAX(gasto_por_cliente) as max_gasto_cliente
        FROM (
            SELECT 
                cliente_id,
                COUNT(*) as compras_por_cliente,
                SUM(monto) as gasto_por_cliente
            FROM ventas_completas
            GROUP BY cliente_id
        ) cliente_stats
    """)
    
    cliente_analysis.show()
    
    # Top clientes
    print("\n👑 TOP 10 CLIENTES POR REVENUE:")
    top_clientes = spark.sql("""
        SELECT 
            cliente_id,
            COUNT(*) as num_compras,
            SUM(cantidad) as unidades_compradas,
            SUM(monto) as gasto_total,
            AVG(monto) as ticket_promedio,
            COUNT(DISTINCT categoria) as categorias_diferentes
        FROM ventas_completas
        GROUP BY cliente_id
        ORDER BY gasto_total DESC
        LIMIT 10
    """)
    
    top_clientes.show()
    
    # PASO 6: MÉTRICAS DE RENDIMIENTO Y CACHING
    print("\n6️⃣ OPTIMIZACIÓN Y CACHING:")
    
    # Cache de DataFrames frecuentemente usados
    ventas_con_productos.cache()
    print("✅ DataFrame ventas_con_productos cacheado")
    
    # Contar registros (fuerza la evaluación del cache)
    total_registros = ventas_con_productos.count()
    print(f"📊 Total de registros en análisis: {total_registros:,}")
    
    # Estadísticas de particionamiento
    print(f"📊 Número de particiones: {ventas_con_productos.rdd.getNumPartitions()}")
    
    # PASO 7: AGREGACIONES COMPLEJAS CON WINDOW FUNCTIONS
    print("\n7️⃣ ANÁLISIS AVANZADO CON WINDOW FUNCTIONS:")
    
    from pyspark.sql.window import Window
    
    # Ranking de productos por día
    window_spec = Window.partitionBy("fecha").orderBy(col("revenue_total").desc())
    
    ranking_diario = ventas_con_productos.groupBy("fecha", "producto_nombre", "categoria") \
        .agg(
            sum("monto").alias("revenue_total"),
            sum("cantidad").alias("unidades_vendidas"),
            count("*").alias("num_transacciones")
        ) \
        .withColumn("ranking_dia", row_number().over(window_spec)) \
        .filter(col("ranking_dia") <= 3) \
        .orderBy("fecha", "ranking_dia")
    
    print("\n🏆 TOP 3 PRODUCTOS POR DÍA:")
    ranking_diario.show(20, truncate=False)
    
    print("\n🎉 ¡Análisis avanzado completado exitosamente!")
    
else:
    print("❌ No hay datos disponibles para análisis")
    print("💡 Ejecuta las celdas anteriores para cargar los datos")


## 🔄 **Casos de Uso Híbridos: Spark + HBase + Cassandra**

### 🎯 **Escenarios del Mundo Real:**

#### **1. 🏪 E-commerce Multi-canal**
- **HBase**: Catálogo de productos, inventario en tiempo real
- **Cassandra**: Historial de clicks, eventos de usuario, logs
- **Spark**: Análisis de comportamiento, recomendaciones, ETL

#### **2. 🏦 Sistema Bancario**
- **HBase**: Transacciones en tiempo real, balances de cuentas
- **Cassandra**: Logs de auditoría, historial de transacciones
- **Spark**: Detección de fraude, análisis de riesgo, reportes

#### **3. 🌐 IoT y Telemetría**
- **HBase**: Estados actuales de sensores, configuraciones
- **Cassandra**: Series temporales de sensores, métricas
- **Spark**: Análisis de patrones, alertas, predicciones

#### **4. 📱 Aplicación Social**
- **HBase**: Perfiles de usuario, relaciones sociales
- **Cassandra**: Timeline de posts, mensajes, notificaciones
- **Spark**: Análisis de sentimientos, trending topics, ML

### 🏗️ **Arquitectura Híbrida:**

```
┌─────────────────────────────────────────────────────────────────┐
│                    HYBRID BIG DATA ARCHITECTURE                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────┐  │
│  │   Data Sources  │    │  Processing     │    │   Storage   │  │
│  │                 │    │                 │    │             │  │
│  │ • Web Apps      │───►│ ┌─────────────┐ │───►│ ┌─────────┐ │  │
│  │ • Mobile Apps   │    │ │   Apache    │ │    │ │ HBase   │ │  │
│  │ • IoT Sensors   │    │ │   Spark     │ │    │ │ (OLTP)  │ │  │
│  │ • APIs          │    │ │             │ │    │ └─────────┘ │  │
│  │ • Batch Files   │    │ │ • Driver    │ │    │             │  │
│  └─────────────────┘    │ │ • Workers   │ │    │ ┌─────────┐ │  │
│                         │ │ • SQL       │ │    │ │Cassandra│ │  │
│                         │ │ • MLlib     │ │    │ │(Analytics)│ │  │
│                         │ │ • Streaming │ │    │ └─────────┘ │  │
│                         │ └─────────────┘ │    │             │  │
│                         └─────────────────┘    │ ┌─────────┐ │  │
│                                                │ │  HDFS   │ │  │
│                                                │ │(Archive)│ │  │
│                                                │ └─────────┘ │  │
│                                                └─────────────┘  │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    DATA FLOW                            │   │
│  │                                                         │   │
│  │  Real-time ────► HBase ────► Spark ────► Cassandra     │   │
│  │  (OLTP)          (Fast)     (Process)   (Analytics)    │   │
│  │                                                         │   │
│  │  Batch ─────────► HDFS ────► Spark ────► HBase/Cass    │   │
│  │  (ETL)           (Storage)   (Process)   (Serve)       │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
```

### 💡 **Mejores Prácticas de Integración:**

#### **🔧 Configuración:**
- **Connection Pooling**: Reutilizar conexiones entre Spark tasks
- **Batch Size**: Optimizar tamaño de lotes para escritura
- **Partitioning**: Alinear particiones Spark con particiones NoSQL
- **Caching**: Cache DataFrames frecuentemente accedidos

#### **⚡ Rendimiento:**
- **Predicate Pushdown**: Filtros aplicados en la fuente
- **Columnar Projection**: Leer solo columnas necesarias
- **Locality Awareness**: Ejecutar tareas cerca de los datos
- **Parallel Processing**: Maximizar paralelización

#### **🛡️ Confiabilidad:**
- **Retry Logic**: Reintentos automáticos en fallos
- **Circuit Breakers**: Protección contra cascading failures
- **Monitoring**: Métricas de rendimiento y salud
- **Backup Strategies**: Estrategias de respaldo híbridas

---


In [None]:
# 🌟 CASO DE USO HÍBRIDO: ANÁLISIS CROSS-PLATFORM

print("🌟 DEMOSTRANDO CASO DE USO HÍBRIDO...")
print("="*45)
print("📊 Combinando datos de HBase y Cassandra en un análisis unificado")

# Verificar que tenemos datos de ambas fuentes
hbase_available = 'transformed_df' in locals() and transformed_df is not None
cassandra_available = 'ventas_con_productos' in locals() and ventas_con_productos is not None

if hbase_available and cassandra_available:
    print("\n✅ Datos disponibles de ambas fuentes:")
    print(f"   🏛️ HBase: DataFrame 'transformed_df'")
    print(f"   🌟 Cassandra: DataFrame 'ventas_con_productos'")
    
    # PASO 1: ANÁLISIS COMPARATIVO DE INVENTARIOS
    print("\n1️⃣ ANÁLISIS COMPARATIVO DE INVENTARIOS:")
    
    # Datos de HBase (inventario actual)
    print("\n📦 INVENTARIO ACTUAL (HBase):")
    hbase_inventory = transformed_df.select(
        col("product_id"),
        col("name").alias("producto_nombre"),
        col("category").alias("categoria"),
        col("price").alias("precio"),
        col("stock"),
        col("inventory_value")
    )
    
    hbase_inventory.show()
    
    # Datos de Cassandra (ventas históricas)
    print("\n💰 RESUMEN DE VENTAS (Cassandra):")
    cassandra_sales = ventas_con_productos.groupBy("producto_nombre", "categoria") \
        .agg(
            sum("cantidad").alias("total_vendido"),
            sum("monto").alias("revenue_total"),
            count("*").alias("num_transacciones"),
            avg("monto").alias("ticket_promedio")
        )
    
    cassandra_sales.show()
    
    # PASO 2: ANÁLISIS HÍBRIDO - ROTACIÓN DE INVENTARIO
    print("\n2️⃣ ANÁLISIS HÍBRIDO - ROTACIÓN DE INVENTARIO:")
    
    # Unir datos de ambas fuentes por nombre de producto
    inventory_turnover = hbase_inventory.join(
        cassandra_sales,
        hbase_inventory.producto_nombre == cassandra_sales.producto_nombre,
        "left_outer"
    ).select(
        hbase_inventory.product_id,
        hbase_inventory.producto_nombre,
        hbase_inventory.categoria,
        hbase_inventory.precio,
        hbase_inventory.stock,
        hbase_inventory.inventory_value,
        coalesce(cassandra_sales.total_vendido, lit(0)).alias("total_vendido"),
        coalesce(cassandra_sales.revenue_total, lit(0.0)).alias("revenue_total"),
        coalesce(cassandra_sales.num_transacciones, lit(0)).alias("num_transacciones")
    ).withColumn(
        "turnover_ratio",
        when(col("stock") > 0, col("total_vendido") / col("stock")).otherwise(0)
    ).withColumn(
        "stock_days",
        when(col("total_vendido") > 0, col("stock") * 30 / col("total_vendido")).otherwise(999)
    ).withColumn(
        "performance_score",
        (col("turnover_ratio") * 0.4) + 
        (when(col("stock_days") < 30, 1.0).otherwise(0.5) * 0.3) +
        (col("revenue_total") / 10000 * 0.3)
    )
    
    print("\n📊 ANÁLISIS DE ROTACIÓN DE INVENTARIO:")
    inventory_turnover.select(
        "product_id", "producto_nombre", "categoria", "stock", 
        "total_vendido", "turnover_ratio", "stock_days", "performance_score"
    ).orderBy(col("performance_score").desc()).show()
    
    # PASO 3: RECOMENDACIONES INTELIGENTES
    print("\n3️⃣ RECOMENDACIONES INTELIGENTES BASADAS EN DATOS HÍBRIDOS:")
    
    # Registrar vista temporal
    inventory_turnover.createOrReplaceTempView("inventory_analysis")
    
    # Productos con alto rendimiento
    high_performers = spark.sql("""
        SELECT 
            product_id,
            producto_nombre,
            categoria,
            stock,
            total_vendido,
            revenue_total,
            ROUND(turnover_ratio, 2) as turnover_ratio,
            ROUND(stock_days, 1) as stock_days,
            ROUND(performance_score, 2) as performance_score,
            CASE 
                WHEN performance_score > 1.5 THEN '🚀 Estrella'
                WHEN performance_score > 1.0 THEN '⭐ Alto rendimiento'
                WHEN performance_score > 0.5 THEN '📈 Promedio'
                ELSE '⚠️ Bajo rendimiento'
            END as clasificacion
        FROM inventory_analysis
        ORDER BY performance_score DESC
    """)
    
    print("\n🏆 CLASIFICACIÓN DE PRODUCTOS POR RENDIMIENTO:")
    high_performers.show(truncate=False)
    
    # Alertas de inventario
    print("\n🚨 ALERTAS DE INVENTARIO:")
    alerts = spark.sql("""
        SELECT 
            product_id,
            producto_nombre,
            categoria,
            stock,
            stock_days,
            CASE 
                WHEN stock_days < 7 THEN '🔴 CRÍTICO: Restock inmediato'
                WHEN stock_days < 15 THEN '🟡 ADVERTENCIA: Planificar restock'
                WHEN stock_days > 90 THEN '🔵 INFO: Exceso de inventario'
                ELSE '✅ OK: Niveles normales'
            END as alerta
        FROM inventory_analysis
        WHERE stock_days < 15 OR stock_days > 90
        ORDER BY stock_days ASC
    """)
    
    alerts.show(truncate=False)
    
    # PASO 4: ESTADÍSTICAS FINALES DEL ANÁLISIS HÍBRIDO
    print("\n4️⃣ ESTADÍSTICAS FINALES DEL ANÁLISIS HÍBRIDO:")
    
    final_stats = spark.sql("""
        SELECT 
            COUNT(*) as total_productos,
            SUM(stock) as total_stock_units,
            SUM(inventory_value) as total_inventory_value,
            SUM(total_vendido) as total_units_sold,
            SUM(revenue_total) as total_revenue,
            AVG(turnover_ratio) as avg_turnover_ratio,
            AVG(stock_days) as avg_stock_days,
            COUNT(CASE WHEN stock_days < 15 THEN 1 END) as productos_criticos,
            COUNT(CASE WHEN performance_score > 1.0 THEN 1 END) as productos_alto_rendimiento
        FROM inventory_analysis
    """).collect()[0]
    
    print("\n📊 RESUMEN EJECUTIVO:")
    print(f"   📦 Total productos analizados: {final_stats['total_productos']}")
    print(f"   📈 Total unidades en stock: {final_stats['total_stock_units']:,}")
    print(f"   💰 Valor total inventario: ${final_stats['total_inventory_value']:,.2f}")
    print(f"   📊 Total unidades vendidas: {final_stats['total_units_sold']:,}")
    print(f"   💵 Revenue total: ${final_stats['total_revenue']:,.2f}")
    print(f"   🔄 Ratio rotación promedio: {final_stats['avg_turnover_ratio']:.2f}")
    print(f"   📅 Días promedio de stock: {final_stats['avg_stock_days']:.1f}")
    print(f"   🚨 Productos críticos: {final_stats['productos_criticos']}")
    print(f"   ⭐ Productos alto rendimiento: {final_stats['productos_alto_rendimiento']}")
    
    # PASO 5: EXPORTAR RESULTADOS
    print("\n5️⃣ PREPARANDO RESULTADOS PARA EXPORT:")
    
    # Cache del análisis final para uso posterior
    inventory_turnover.cache()
    high_performers.cache()
    
    print("✅ DataFrames de análisis híbrido cacheados")
    print("💾 Listos para exportar a sistemas downstream")
    
    print("\n🎉 ¡ANÁLISIS HÍBRIDO COMPLETADO EXITOSAMENTE!")
    print("🔗 Datos de HBase y Cassandra integrados con éxito")
    print("📈 Insights accionables generados para el negocio")

elif hbase_available:
    print("⚠️ Solo datos de HBase disponibles")
    print("💡 Ejecuta las celdas de integración con Cassandra para análisis completo")
    
elif cassandra_available:
    print("⚠️ Solo datos de Cassandra disponibles")
    print("💡 Ejecuta las celdas de integración con HBase para análisis completo")
    
else:
    print("❌ No hay datos disponibles de ninguna fuente")
    print("💡 Ejecuta las celdas anteriores para cargar datos de HBase y Cassandra")


## 🎉 **Resumen y Conclusiones del Tutorial**

### ✅ **Lo que Hemos Logrado:**

#### **1. 🚀 Configuración Spark Optimizada**
- ✅ Sesión Spark configurada para integración NoSQL
- ✅ Recursos optimizados para cluster mode
- ✅ Configuraciones específicas para conectores NoSQL
- ✅ Monitoreo y verificación de estado del cluster

#### **2. 🏛️ Integración HBase + Spark**
- ✅ Conexión exitosa entre Spark y HBase
- ✅ Lectura de datos desde HBase a DataFrames
- ✅ Transformaciones distribuidas con Spark SQL
- ✅ Análisis de ROI y agregaciones avanzadas
- ✅ Caching para optimización de rendimiento

#### **3. 🌟 Integración Cassandra + Spark**
- ✅ Conexión exitosa entre Spark y Cassandra
- ✅ Análisis temporal de series de datos
- ✅ Joins complejos entre tablas distribuidas
- ✅ Window functions para rankings dinámicos
- ✅ Análisis de comportamiento de clientes

#### **4. 🔄 Casos de Uso Híbridos**
- ✅ Integración de datos de múltiples fuentes NoSQL
- ✅ Análisis de rotación de inventario cross-platform
- ✅ Generación de alertas inteligentes
- ✅ Recomendaciones basadas en datos híbridos
- ✅ Métricas de rendimiento empresarial

### 📊 **Arquitectura Final Implementada:**

```
┌─────────────────────────────────────────────────────────────────┐
│                  ECOSISTEMA BIG DATA COMPLETO                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────┐  │
│  │   SPARK 3.5.3   │◄──►│     HBASE       │    │  CASSANDRA  │  │
│  │                 │    │                 │    │             │  │
│  │ • Driver: 1.2GB │    │ • Thrift API    │    │ • CQL API   │  │
│  │ • Workers: 2x   │    │ • Column Store  │    │ • Ring Arch │  │
│  │ • Executors:800M│    │ • HDFS Backend  │    │ • P2P Nodes │  │
│  │ • Hive Support  │    │ • Strong Cons.  │    │ • Eventual  │  │
│  │ • SQL Engine    │    │ • Real-time     │    │ • Scalable  │  │
│  └─────────────────┘    └─────────────────┘    └─────────────┘  │
│           │                       │                      │       │
│           └───────────────────────┼──────────────────────┘       │
│                                   │                              │
│  ┌─────────────────────────────────────────────────────────┐     │
│  │                  JUPYTER ECOSYSTEM                      │     │
│  │                                                         │     │
│  │  📓 01_spark_cluster_professional.ipynb               │     │
│  │  📓 02_nosql_foundations.ipynb                         │     │
│  │  📓 03_spark_nosql_integration.ipynb                  │     │
│  │                                                         │     │
│  │  • Tutoriales completos CRUD                           │     │
│  │  • Casos de uso reales                                 │     │
│  │  • Mejores prácticas                                   │     │
│  │  • Análisis híbridos                                   │     │
│  └─────────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────┘
```

### 🎯 **Casos de Uso Implementados:**

#### **🏪 E-commerce Analytics**
- Análisis de inventario en tiempo real
- Tracking de ventas y comportamiento de clientes
- Recomendaciones de productos
- Alertas de stock crítico

#### **📊 Business Intelligence**
- Dashboards en tiempo real
- Análisis de tendencias temporales
- Métricas de rendimiento KPI
- Reportes ejecutivos automatizados

#### **🔍 Data Science & ML**
- Feature engineering distribuido
- Análisis exploratorio de datos
- Preparación de datasets para ML
- Análisis de patrones complejos

### 💡 **Mejores Prácticas Aplicadas:**

#### **⚡ Rendimiento:**
- ✅ Caching estratégico de DataFrames
- ✅ Particionamiento optimizado
- ✅ Pushdown de predicados
- ✅ Configuración de memoria ajustada

#### **🛡️ Confiabilidad:**
- ✅ Manejo de errores robusto
- ✅ Verificación de conectividad
- ✅ Timeouts configurados
- ✅ Logging estructurado

#### **🔧 Mantenibilidad:**
- ✅ Código bien documentado
- ✅ Estructura modular
- ✅ Configuración externalizada
- ✅ Ejemplos reproducibles

### 🚀 **Próximos Pasos Sugeridos:**

1. **📈 Escalabilidad**: Implementar en clusters multi-nodo
2. **🔄 Streaming**: Agregar Spark Streaming para datos en tiempo real  
3. **🤖 Machine Learning**: Integrar MLlib para modelos predictivos
4. **📊 Visualización**: Conectar con herramientas como Grafana/Tableau
5. **🔐 Seguridad**: Implementar autenticación y autorización
6. **📦 Productización**: Containerización con Kubernetes

### 🎓 **Conocimientos Adquiridos:**

- ✅ Configuración avanzada de Spark para entornos distribuidos
- ✅ Operaciones CRUD completas en HBase y Cassandra
- ✅ Integración seamless entre tecnologías Big Data
- ✅ Análisis de datos híbridos y cross-platform
- ✅ Optimización de rendimiento en ecosistemas complejos
- ✅ Casos de uso reales de la industria

---

## 🎉 **¡Felicitaciones!**

Has completado exitosamente un tutorial integral de **Apache Spark + NoSQL** que cubre desde conceptos fundamentales hasta implementaciones avanzadas de casos de uso híbridos. Este conocimiento te permitirá diseñar e implementar soluciones de Big Data robustas y escalables en entornos de producción.

**¡Continúa explorando y construyendo soluciones increíbles con Big Data! 🚀**


In [None]:
# 🚀 CREACIÓN DE SESIÓN SPARK INTEGRADA CON NoSQL

print("🚀 CREANDO SESIÓN SPARK PARA INTEGRACIÓN NoSQL...")
print("="*55)

# Detener cualquier sesión existente
try:
    spark.stop()
    print("🛑 Sesión anterior detenida")
    time.sleep(3)
except:
    print("🔍 No había sesión activa")

# CONFIGURACIÓN OPTIMIZADA PARA INTEGRACIÓN NoSQL
print("\n⚙️ Configurando Spark con conectores NoSQL...")

spark = SparkSession.builder \
    .appName("EducacionIT-Spark-NoSQL-Integration") \
    .master("spark://master:7077") \
    .config("spark.driver.memory", "1200m") \
    .config("spark.driver.cores", "2") \
    .config("spark.executor.memory", "800m") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "2") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.connection.keepAlive", "true") \
    .config("spark.cassandra.connection.timeout_ms", "10000") \
    .config("spark.cassandra.read.timeout_ms", "120000") \
    .config("spark.network.timeout", "300s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Información de la sesión integrada
print("\n🎉 ¡SESIÓN SPARK + NoSQL CREADA EXITOSAMENTE!")
print("="*60)
print(f"🏷️  Aplicación: {spark.sparkContext.appName}")
print(f"🔗 Master URL: {spark.sparkContext.master}")
print(f"📊 Spark UI: {spark.sparkContext.uiWebUrl}")
print(f"🚀 Versión Spark: {spark.version}")

# Verificar configuración de conectores
print(f"\n🔧 CONFIGURACIÓN DE CONECTORES:")
print(f"   🌟 Cassandra Host: {spark.conf.get('spark.cassandra.connection.host')}")
print(f"   🔌 Cassandra Port: {spark.conf.get('spark.cassandra.connection.port')}")
print(f"   ⏰ Connection Timeout: {spark.conf.get('spark.cassandra.connection.timeout_ms')}ms")
print(f"   📖 Read Timeout: {spark.conf.get('spark.cassandra.read.timeout_ms')}ms")

# Verificar conectividad con executors
print(f"\n📊 ESTADO DEL CLUSTER:")
try:
    executors = spark.sparkContext.statusTracker().getExecutorInfos()
    active_executors = [e for e in executors if e.isActive]
    print(f"   ✅ Executors activos: {len(active_executors)}")
    
    total_cores = sum([e.totalCores for e in active_executors])
    total_memory = sum([e.maxMemory for e in active_executors])
    
    print(f"   🖥️ Total cores: {total_cores}")
    print(f"   💾 Total memoria: {total_memory / (1024**3):.2f} GB")
    
except Exception as e:
    print(f"   ⚠️ Error obteniendo información de executors: {e}")

print("\n✅ Spark configurado para integración con HBase y Cassandra")
print("🔗 Listo para análisis distribuido híbrido...")
