In [1]:
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (6.9 kB)
Collecting geomet>=1.1 (from cassandra-driver)
  Downloading geomet-1.1.0-py3-none-any.whl.metadata (11 kB)
Downloading cassandra_driver-3.29.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (374 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.3/374.3 kB[0m [31m654.0 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading geomet-1.1.0-py3-none-any.whl (31 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.3 geomet-1.1.0


In [2]:
import time
import uuid
import random
from datetime import date, timedelta
from cassandra.cluster import Cluster

# --- CONFIGURACIÓN ---
# Conectamos al servicio 'cassandra' definido en docker-compose
print("1. Conectando a Cassandra...")
cluster = Cluster(['cassandra']) 
session = cluster.connect()

# Crear Keyspace y Tabla si no existen (por seguridad)
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS bigdata 
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
session.set_keyspace('bigdata')

session.execute("""
    CREATE TABLE IF NOT EXISTS ventas_crudas (
        id_venta UUID,
        fecha_venta DATE,
        id_producto INT,
        categoria TEXT,
        monto_total FLOAT,
        id_cliente INT,
        PRIMARY KEY ((fecha_venta), id_venta)
    )
""")

# Preparar la query de inserción para optimizar velocidad
prepared = session.prepare("""
    INSERT INTO ventas_crudas (id_venta, fecha_venta, id_producto, categoria, monto_total, id_cliente)
    VALUES (?, ?, ?, ?, ?, ?)
""")

# --- GENERADOR DE DATOS ---
categorias = ['Electronica', 'Ropa', 'Hogar', 'Juguetes', 'Deportes','Electrodomesticos','Gaming','Vehiculos','Telefonos','Computacion']
start_date = date(2026, 1, 1)

def generar_lote(n):
    datos = []
    for _ in range(n):
        datos.append((
            uuid.uuid4(),
            start_date + timedelta(days=random.randint(0, 30)),
            random.randint(1, 100),
            random.choice(categorias),
            round(random.uniform(10.0, 500.0), 2),
            random.randint(1000, 5000)
        ))
    return datos

# --- EJECUCIÓN Y MEDICIÓN (Fase 4.3) ---
CANTIDAD = 100000
print(f"2. Iniciando ingesta de {CANTIDAD} registros...")

inicio = time.time()

# Insertamos en lotes pequeños para no saturar la memoria del notebook
for i in range(CANTIDAD):
    # Generamos datos al vuelo
    d = (
        uuid.uuid4(),
        start_date + timedelta(days=random.randint(0, 30)),
        random.randint(1, 100),
        random.choice(categorias),
        round(random.uniform(10.0, 500.0), 2),
        random.randint(1000, 5000)
    )
    session.execute(prepared, d)
    
    if (i+1) % 10000 == 0:
        print(f"   -> Insertados {i+1} registros...")

fin = time.time()
tiempo_ingesta = fin - inicio

print("-" * 30)
print(f"RESULTADO FASE 2 (INGESTA):")
print(f"Tiempo Total: {tiempo_ingesta:.4f} segundos")
print(f"Velocidad: {CANTIDAD/tiempo_ingesta:.2f} inserts/seg")
print("-" * 30)

# Validación simple (Fase 2.3)
row = session.execute("SELECT COUNT(*) FROM ventas_crudas").one()
print(f"Registros en Cassandra: {row[0]}")

cluster.shutdown()

1. Conectando a Cassandra...
2. Iniciando ingesta de 100000 registros...
   -> Insertados 10000 registros...
   -> Insertados 20000 registros...
   -> Insertados 30000 registros...
   -> Insertados 40000 registros...
   -> Insertados 50000 registros...
   -> Insertados 60000 registros...
   -> Insertados 70000 registros...
   -> Insertados 80000 registros...
   -> Insertados 90000 registros...
   -> Insertados 100000 registros...
------------------------------
RESULTADO FASE 2 (INGESTA):
Tiempo Total: 423.0338 segundos
Velocidad: 236.39 inserts/seg
------------------------------
Registros en Cassandra: 100000


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, col

# --- CONFIGURACIÓN DE SPARK CON DEPENDENCIAS ---
# Aquí definimos los paquetes Maven necesarios para Cassandra y ClickHouse
spark = SparkSession.builder \
    .appName("ProyectoBigData_UNEG") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,ru.yandex.clickhouse:clickhouse-jdbc:0.3.2") \
    .getOrCreate()

print("Spark Inicializado correctamente.")

# --- FASE 3: LECTURA Y TRANSFORMACIÓN ---
print("1. Leyendo datos desde Cassandra (ventas_crudas)...")
# Leemos la tabla completa
df_raw = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="ventas_crudas", keyspace="bigdata") \
    .load()

# Transformación (Requisito 3.2): Agrupar por fecha y categoría
print("2. Procesando/Transformando datos...")
df_analitico = df_raw.groupBy("fecha_venta", "categoria") \
    .agg(
        sum("monto_total").alias("total_ventas"),
        count("id_venta").alias("cantidad_transacciones")
    ) \
    .orderBy("fecha_venta", "categoria")

# Mostramos un adelanto para verificar
df_analitico.show(10)

# --- FASE 4.1: CARGA A CLICKHOUSE ---
print("3. Escribiendo resultados en ClickHouse (dw_analitico)...")

# Propiedades JDBC para ClickHouse
jdbc_url = "jdbc:clickhouse://clickhouse:8123/dw_analitico"
propiedades = {
    "driver": "ru.yandex.clickhouse.ClickHouseDriver",
    "user": "default",
    "password": ""  # Si no pusiste clave en docker-compose
}

# Escritura
try:
    df_analitico.write \
        .mode("append") \
        .jdbc(url=jdbc_url, table="ventas_resumen", properties=propiedades)
    print("¡Carga a Data Warehouse exitosa!")
except Exception as e:
    print(f"Error escribiendo a ClickHouse: {e}")

spark.stop()

Spark Inicializado correctamente.
1. Leyendo datos desde Cassandra (ventas_crudas)...
2. Procesando/Transformando datos...
+-----------+-----------------+-----------------+----------------------+
|fecha_venta|        categoria|     total_ventas|cantidad_transacciones|
+-----------+-----------------+-----------------+----------------------+
| 2026-01-01|      Computacion|98283.70983600616|                   387|
| 2026-01-01|         Deportes| 85681.6800327301|                   325|
| 2026-01-01|Electrodomesticos|83727.56995487213|                   324|
| 2026-01-01|      Electronica|80011.86010074615|                   328|
| 2026-01-01|           Gaming|78706.82979297638|                   307|
| 2026-01-01|            Hogar|83039.83976268768|                   323|
| 2026-01-01|         Juguetes|81142.97006416321|                   321|
| 2026-01-01|             Ropa|72797.10990810394|                   304|
| 2026-01-01|        Telefonos|84619.85998058319|                   340|
|