# Lab15

## P1. (7 pts) Medir la temperatura y la humedad:
### Consultas requeridas
1. Obtener todas las mediciones de humedad del sensor 'SENS001' del último día. La consulta debe incluir el tiempo restante de vida (TTL) de cada registro.

2. Detectar valores anómalos fuera del rango permitido en la última hora. Implementar una consulta que identifique mediciones de temperatura o humedad fuera del rango normal. La consulta debe permitir filtrar por sensor específico.

3. Verificar el tiempo restante de vida de los datos usando la función TTL. Implementar una consulta que muestre el TTL en diferentes unidades (segundos, horas, días). Crear una consulta para identificar datos que están próximos a expirar (ej: en las próximas 24 horas).

### Estructura de tablas propuestas
La tabla *sensor_readings* tendrá un propósito general que permite filtrar por tipo de medicion, sensor y día. Esta tabla se usará para la consulta 1.

La tabla *sensor_anomalies* será destinada a optimizar la consulta 2, esta nos permite filtrar por hora ya que se incluye este dato en el partition key, además de tener clustering por el valor de la medición lo que facilita hallar los valores anómalos.

La tabla *sensor_by_date* fue pensada para usarse con la consulta 3, porque particiona solo por la fecha y con esto poder filtrar los datos con más de 6 días de antigüedad.

Se respeta el tiempo de vida de los datos de 7 días de acuerdo a lo solicitado, y se considera adicionalmente un tiempo de vida de solo 2 horas para el seguimiento de datos anómalos ya que se espera que siempre se consulten los insertados en la hora pasada.

In [None]:
create table if not exists sensor_readings
(
    measurement_type text,
    sensor_id        text,
    date             text,
    event_time       timestamp,
    measurement      double,
    primary key ( (measurement_type, sensor_id, date), event_time )
) with clustering order by (event_time desc) and
        default_time_to_live = 604800; -- 7 days

create table if not exists sensor_anomalies
(
    measurement_type text,
    sensor_id        text,
    hour             text,
    event_time       timestamp,
    measurement      double,
    primary key ( (measurement_type, sensor_id, hour), measurement, event_time)
) with clustering order by (measurement asc, event_time desc) and
        default_time_to_live = 7200; -- 2 hour

create table if not exists sensor_by_date
(
    measurement_type text,
    sensor_id        text,
    date             text,
    event_time       timestamp,
    measurement      double,
    primary key ( date, event_time )
) with clustering order by (event_time asc) and
        default_time_to_live = 604800; -- 7 days

### Configuración del entorno de trabajo

Instalación de driver de cassandra para python, se requiere haber configurado un entorno local de conda previamente.

In [22]:
%conda update -n base -c defaults conda
%conda install -c anaconda libev
%conda install -c msys2 m2-make
%conda install -c conda-forge pkg-config
%pip install cassandra-driver

Channels:
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.




    current version: 25.3.1
    latest version: 25.5.1

Please update conda by running

    $ conda update -n base -c defaults conda




Channels:
 - anaconda
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: failed

Note: you may need to restart the kernel to use updated packages.



PackagesNotFoundError: The following packages are not available from current channels:

  - libev

Current channels:

  - https://conda.anaconda.org/anaconda
  - defaults

To search for alternate channels that may provide the conda package you're
looking for, navigate to

    https://anaconda.org

and use the search bar at the top of the page.




Channels:
 - msys2
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.




    current version: 25.3.1
    latest version: 25.5.1

Please update conda by running

    $ conda update -n base -c defaults conda




Channels:
 - conda-forge
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.




    current version: 25.3.1
    latest version: 25.5.1

Please update conda by running

    $ conda update -n base -c defaults conda




Note: you may need to restart the kernel to use updated packages.


### Generación de datos y pruebas

In [None]:
import random
import time
from datetime import datetime, timedelta
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, BatchType

# Parametros de simulación
N_SENSORS = 5
SAMPLING_RATE = 60 # 1 minuto en segundos
SAMPLING_TIME = 604800  # 7 días en segundos
ID_PREFIX = 'SENS'
MESUREMENTS_TYPES = ('temperatura', 'humedad')
NORMAL_RANGE = {'temperatura': (15, 35),
                'humedad': (30, 80)}

# Precomputar rango extendido de valores para simulación de anomalías
EXTENDED_RANGE = {
    type: (low - (high - low) * 0.2,
        high + (high - low) * 0.2
    )
    for type, (low, high) in NORMAL_RANGE.items()
}

# Conexión a Cassandra en Docker
cluster = Cluster(
  ['localhost'], port=9042,
  protocol_version=4,
  connect_timeout=5,
  idle_heartbeat_interval=30,
  control_connection_timeout=10
)
cassandra = cluster.connect('my_keyspace')

def create_tables() -> None:
    # Crear tabla para lecturas de sensores
    cassandra.execute("""
        create table if not exists sensor_readings
        (
            measurement_type text,
            sensor_id        text,
            date             text,
            event_time       timestamp,
            measurement      double,
            primary key ( (measurement_type, sensor_id, date), event_time )
        ) with clustering order by (event_time desc) and
                default_time_to_live = 604800
    """)
    cassandra.execute("""
        create table if not exists sensor_anomalies
        (
            measurement_type text,
            sensor_id        text,
            hour             text,
            event_time       timestamp,
            measurement      double,
            primary key ( (measurement_type, sensor_id, hour), measurement, event_time)
        ) with clustering order by (measurement asc, event_time desc) and
                default_time_to_live = 7200
    """)
    cassandra.execute("""
        create table if not exists sensor_by_date
        (
            measurement_type text,
            sensor_id        text,
            date             text,
            event_time       timestamp,
            measurement      double,
            primary key ( date, event_time )
        ) with clustering order by (event_time asc) and
                default_time_to_live = 604800;
    """)

def drop_tables() -> None:
    # Eliminar tablas si existen
    cassandra.execute("drop table if exists sensor_readings")
    cassandra.execute("drop table if exists sensor_anomalies")
    cassandra.execute("drop table if exists sensor_by_date")
    
def generate_sensor_data() -> None:
    # Generar indetificadores únicos para cada sensor
    ids: list[str] = [f"{ID_PREFIX}{str(i).zfill(3)}" for i in range(1, N_SENSORS + 1)]

    # Preparar queries
    insert_reading = cassandra.prepare("""
        INSERT INTO sensor_readings (measurement_type, sensor_id, date, event_time, measurement)
        VALUES (?, ?, ?, ?, ?)
    """)
    insert_anomaly = cassandra.prepare("""
        INSERT INTO sensor_anomalies (measurement_type, sensor_id, hour, event_time, measurement)
        VALUES (?, ?, ?, ?, ?)
    """)
    insert_by_date = cassandra.prepare("""
        INSERT INTO sensor_by_date (measurement_type, sensor_id, date, event_time, measurement)
        VALUES (?, ?, ?, ?, ?)
    """)

    # Definir el tiempo de inicio y fin para la generación de datos
    start_time: datetime = datetime.now() - timedelta(seconds=SAMPLING_TIME)
    end_time: datetime = datetime.now()
    current_time: datetime = end_time   # Insertar en orden descendente

    # Usar BatchStatement para agrupar inserciones
    batch = BatchStatement(batch_type=BatchType.UNLOGGED)

    # Lista para almacenar las futuras ejecuciones asíncronas
    futures = []

    # Limite de inflight para evitar sobrecargar Cassandra
    max_inflight = 16

    while current_time >= start_time:
        date = current_time.strftime('%Y-%m-%d')
        hour = current_time.strftime('%Y-%m-%dT%H')

        for id in ids:
            for type in MESUREMENTS_TYPES:
                ext_low, ext_high = EXTENDED_RANGE[type]
                measurement = random.uniform(ext_low, ext_high)

                batch.add(insert_reading, (type, id, date, current_time, measurement))
                batch.add(insert_anomaly, (type, id, hour, current_time, measurement))
                batch.add(insert_by_date, (type, id, date, current_time, measurement))

        # Ejecutar el batch de forma asíncrona
        futures.append(cassandra.execute_async(batch))
        batch.clear()  # Limpiar el batch para la siguiente iteración

        # Si el batch alcanza el límite de inflight, esperar a que se completen
        if len(futures) >= max_inflight:
            for f in futures:
                f.result()
            futures.clear()

        # Retroceder el tiempo para la siguiente iteración
        current_time -= timedelta(seconds=SAMPLING_RATE)
    
    # Esperar batchs restantes
    for f in futures:
        f.result()

    result = cassandra.execute("select count(*) from sensor_readings")
    count = list(result)[0][0]
    print("Inserción de datos de prueba completada.\nTotal de registros insertados:", count)

def query_1(type: str, id: str) -> None:
    print("\nQuery 1:")
    # Calcular el bucket del día anterior
    target_date: str = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
    # Ejecutar consulta y medir el tiempo de ejecución
    start_time = time.time()
    rows = cassandra.execute("""
        select event_time, measurement, ttl(measurement) as ttl
        from sensor_readings
        where measurement_type = %s
            and sensor_id = %s
            and date = %s
    """, (type, id, target_date))
    end_time = time.time()
    query_time = end_time - start_time
    rows: list = list(rows)  # Convertir a lista para poder usar len() y slicing
    print(f"Lecturas de {type} para el sensor {id} del día de ayer {target_date}:")
    print(f"{len(rows)} resultados encontrados en {query_time:.4f} segundos")
    for row in rows[:5]:  # Limitar a las primeras 5 lecturas
        event_time = row.event_time.strftime('%Y-%m-%d %H:%M:%S')
        measurement = row.measurement
        ttl_seconds = row.ttl
        print(f"  - {event_time}: {measurement} (TTL: {ttl_seconds} segundos)")

def query_2(type: str, id: str) -> None:
    print("\nQuery 2:")
    # Calcular el bucket de la hora anterior
    target_hour: str = (datetime.now() - timedelta(hours=1)).strftime('%Y-%m-%dT%H')
    # Obtener el rango normal para el tipo de medición
    low, high = NORMAL_RANGE[type]
    # Ejecutar consulta y medir el tiempo de ejecución
    start_time = time.time()
    rows_low = cassandra.execute("""
        select event_time, measurement
        from sensor_anomalies
        where measurement_type = %s
            and sensor_id = %s
            and hour = %s
            and measurement < %s
    """, (type, id, target_hour, low))

    rows_high = cassandra.execute("""
        select event_time, measurement
        from sensor_anomalies
        where measurement_type = %s
            and sensor_id = %s
            and hour = %s
            and measurement > %s
    """, (type, id, target_hour, high))
    end_time = time.time()
    query_time = end_time - start_time
    rows: list = list(rows_low) + list(rows_high)
    rows.sort(key=lambda row: row.event_time, reverse=True)
    print(f"Anomalías de {type} para el sensor {id} de la hora pasada {target_hour}:")
    print(f"{len(rows)} resultados encontrados en {query_time:.4f} segundos")
    for row in rows[:5]:  # Limitar a las primeras 5 anomalías
        event_time = row.event_time.strftime('%Y-%m-%d %H:%M:%S')
        measurement = row.measurement
        print(f"  - {event_time}: {measurement}")

def query_3() -> None:
    print("\nQuery 3:")
    # Calcular el bucket de la fecha de hace 6 días
    target_date: str = (datetime.now() - timedelta(days=6)).strftime('%Y-%m-%d')
    # Ejecutar consulta y medir el tiempo de ejecución
    start_time = time.time()
    rows = cassandra.execute("""
        select measurement_type, sensor_id, date, event_time, ttl(measurement) as ttl
        from sensor_by_date
        where date = %s
    """, (target_date,))
    end_time = time.time()
    query_time = end_time - start_time
    rows: list = list(rows)  # Convertir a lista para poder usar len() y slicing
    print(f"Datos de más de 6 días de antigüedad ({target_date}):")
    print(f"{len(rows)} resultados encontrados en {query_time:.4f} segundos")
    for row in rows[:5]:  # Limitar a las primeras 5 lecturas
        measurement_type = row.measurement_type
        sensor_id = row.sensor_id
        date = row.date
        ttl_seconds = row.ttl
        ttl_minutes = ttl_seconds // 60
        ttl_hours = ttl_minutes // 60
        ttl_days = ttl_hours // 24
        print(f"  - {measurement_type} del sensor {sensor_id} del día {date} (TTL: {ttl_seconds} segundos, {ttl_minutes} minutos, {ttl_hours} horas, {ttl_days} días)")

def test() -> None:
    # Crear tablas
    create_tables()
    
    # Generar datos de prueba
    generate_sensor_data()
    
    # Ejecutar consultas de prueba
    query_1('temperatura', 'SENS001')
    query_2('humedad', 'SENS002')
    query_3()
    
    # Limpiar tablas
    drop_tables()

test()
# Cerrar conexión a Cassandra
cluster.shutdown()

Inserción de datos de prueba completada.
Total de registros insertados: 100810

Query 1:
Lecturas de temperatura para el sensor SENS001 del día de ayer 2025-07-07:
1440 resultados encontrados en 0.0445 segundos
  - 2025-07-07 23:59:23: 29.201208291475858 (TTL: 604789 segundos)
  - 2025-07-07 23:58:23: 26.325772840598976 (TTL: 604789 segundos)
  - 2025-07-07 23:57:23: 16.36373289016412 (TTL: 604789 segundos)
  - 2025-07-07 23:56:23: 15.755248981596546 (TTL: 604789 segundos)
  - 2025-07-07 23:55:23: 29.982972045719457 (TTL: 604789 segundos)

Query 2:
Anomalías de humedad para el sensor SENS002 de la hora pasada 2025-07-08T15:
20 resultados encontrados en 0.0318 segundos
  - 2025-07-08 15:59:23: 26.105229674183267
  - 2025-07-08 15:57:23: 84.11479523132168
  - 2025-07-08 15:52:23: 29.90490526548601
  - 2025-07-08 15:50:23: 29.105740757183067
  - 2025-07-08 15:49:23: 26.49023812956321

Query 3:
Datos de más de 6 días de antigüedad (2025-07-02):
1440 resultados encontrados en 0.0457 segundo

## P2. (13 pts) Evaluación Experimental

### Cluster de Cassandra con Docker Compose:

In [10]:
with open("docker-compose.yml") as f:
    print(f.read())

version: '3.8'

services:
  cassandra1:
    image: cassandra:4.1
    container_name: cassandra1
    hostname: cassandra1
    networks:
      - cassandra-net
    ports:
      - "9042:9042"
    environment:
      CASSANDRA_CLUSTER_NAME: "CassandraCluster"
      CASSANDRA_DC: DC1
      CASSANDRA_RACK: RAC1
      CASSANDRA_SEEDS: "cassandra1,cassandra2,cassandra3"
      MAX_HEAP_SIZE: 1024M
      HEAP_NEWSIZE: 256M
    mem_limit: 1536m

  cassandra2:
    image: cassandra:4.1
    container_name: cassandra2
    hostname: cassandra2
    networks:
      - cassandra-net
    depends_on:
      - cassandra1
    environment:
      CASSANDRA_CLUSTER_NAME: "CassandraCluster"
      CASSANDRA_DC: DC1
      CASSANDRA_RACK: RAC1
      CASSANDRA_SEEDS: "cassandra1,cassandra2,cassandra3"
      MAX_HEAP_SIZE: 1024M
      HEAP_NEWSIZE: 256M
    mem_limit: 1536m

  cassandra3:
    image: cassandra:4.1
    container_name: cassandra3
    hostname: cassandra3
    networks:
      - cassandra-net
    depends_on:
 

In [15]:
!docker compose down -v
!docker compose up -d

 Container cassandra2  Stopping
 Container cassandra3  Stopping
 Container cassandra3  Stopped
 Container cassandra3  Removing
 Container cassandra3  Removed
 Container cassandra2  Stopped
 Container cassandra2  Removing
 Container cassandra2  Removed
 Container cassandra1  Stopping
 Container cassandra1  Stopped
 Container cassandra1  Removing
 Container cassandra1  Removed
 Network lab15_cassandra-net  Removing
 Network lab15_cassandra-net  Removed
 Network lab15_cassandra-net  Creating
 Network lab15_cassandra-net  Created
 Container cassandra1  Creating
 Container cassandra1  Created
 Container cassandra3  Creating
 Container cassandra2  Creating
 Container cassandra3  Created
 Container cassandra2  Created
 Container cassandra1  Starting
 Container cassandra1  Started
 Container cassandra2  Starting
 Container cassandra3  Starting
 Container cassandra3  Started
 Container cassandra2  Started


In [None]:
CREATE KEYSPACE IF NOT EXISTS my_keyspace
    WITH replication = {
        'class': 'NetworkTopologyStrategy',
        'datacenter1': '3'
    }

In [25]:
from cassandra.cluster import Cluster

cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

# Crear keyspace (si no existe)
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS my_keyspace
    WITH replication = {
        'class': 'NetworkTopologyStrategy',
        'datacenter1': '3'
    }
""")

# Verificar la creación del keyspace
rows = session.execute("""
    SELECT keyspace_name, replication
    FROM system_schema.keyspaces
    WHERE keyspace_name = 'my_keyspace'
""")

for row in rows:
    print("Keyspace:", row.keyspace_name)
    print("Replicación:", row.replication)


Keyspace: my_keyspace
Replicación: {'class': 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'datacenter1': '3'}


In [26]:
!docker ps

CONTAINER ID   IMAGE           COMMAND                  CREATED          STATUS          PORTS                                                       NAMES
5674d14e8152   cassandra:4.1   "docker-entrypoint.s…"   10 minutes ago   Up 10 minutes   7000-7001/tcp, 7199/tcp, 9042/tcp, 9160/tcp                 cassandra2
3ba274a98a65   cassandra:4.1   "docker-entrypoint.s…"   10 minutes ago   Up 10 minutes   7000-7001/tcp, 7199/tcp, 9042/tcp, 9160/tcp                 cassandra3
c62be9070c46   cassandra:4.1   "docker-entrypoint.s…"   10 minutes ago   Up 10 minutes   7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp   cassandra1


In [27]:
!docker exec cassandra1 nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  172.21.0.4  75.44 KiB  16      100.0%            91278d84-a6f5-4c4a-a15d-4d130f69f9c3  rack1
UN  172.21.0.2  75.45 KiB  16      100.0%            911df817-5bbb-4120-8416-5a526e0645ae  rack1
UN  172.21.0.3  75.44 KiB  16      100.0%            7d2e0357-0bdc-4cfe-be43-3236e4121186  rack1



### PostgreSQL
Se usa una instancia local en el puerto 5432.

In [56]:
%conda install -c conda-forge psycopg2
%conda install -c conda-forge pandas

Channels:
 - conda-forge
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: c:\Users\Jvnc\Documents\BD2\BD2-Labs\Lab15\.conda

  added / updated specs:
    - psycopg2


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2025.7.9   |       h4c7d964_0         149 KB  conda-forge
    ------------------------------------------------------------
                                           Total:         149 KB

The following packages will be UPDATED:

  ca-certificates                      2025.6.15-h4c7d964_0 --> 2025.7.9-h4c7d964_0 



Downloading and Extracting Packages: ...working...
ca-certificates-2025 | 149 KB    |            |   0% 
ca-certificates-2025 | 149 KB    | #          |  11% 
ca-certificates-2025 | 149 KB    | ########## | 100% 
ca-certificates-2025 | 149 KB    | ##



    current version: 25.3.1
    latest version: 25.5.1

Please update conda by running

    $ conda update -n base -c defaults conda




Channels:
 - conda-forge
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: c:\Users\Jvnc\Documents\BD2\BD2-Labs\Lab15\.conda

  added / updated specs:
    - pandas


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    libblas-3.9.0              |  32_h641d27c_mkl         3.6 MB  conda-forge
    libcblas-3.9.0             |  32_h5e41251_mkl         3.6 MB  conda-forge
    libhwloc-2.9.1             |       h51c2c0f_0         2.4 MB  conda-forge
    liblapack-3.9.0            |  32_h1aa476e_mkl         3.6 MB  conda-forge
    libxml2-2.13.8             |       h866ff63_0         2.8 MB
    numpy-1.24.3               |  py311h0b4df5a_0         6.7 MB  conda-forge
    pandas-2.0.1               |  py311hf63dbb6_0        12.8 MB  conda-forge
    pthreads-win32-2.9.1       |       hfa6e2cd_3        



    current version: 25.3.1
    latest version: 25.5.1

Please update conda by running

    $ conda update -n base -c defaults conda





### Conexión desde Jupyter Notebook

In [5]:
import psycopg2
import random
import time
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from datetime import datetime, timedelta
from psycopg2.extras import execute_values
from collections import defaultdict

cluster = Cluster(
  ['localhost'], port=9042,
  protocol_version=4,
  connect_timeout=5,
  idle_heartbeat_interval=30,
  control_connection_timeout=10
)
cassandra = cluster.connect('my_keyspace')

connect = psycopg2.connect(dbname="lab15",user="postgres",password="postgres",host="localhost",port=5432)
connect.autocommit = True
postgres = connect.cursor()

def create_table_cassandra() -> None:
    cassandra.execute("""
        create table if not exists temperature_measurements
        (
            sensor_id   text,
            date        text,
            event_time  timestamp,
            temperature double,
            humidity    double,
            primary key ((sensor_id, date), event_time)
        )
    """)

def create_table_postgres() -> None:
    postgres.execute("""
        create table if not exists temperature_measurements
        (
            sensor_id   varchar(20),
            date        varchar(10),
            event_time  timestamp,
            temperature double precision,
            humidity    double precision,
            primary key (sensor_id, date, event_time)
        )
    """)

def drop_table_cassandra() -> None:
    cassandra.execute("drop table if exists temperature_measurements")

def drop_table_postgres() -> None:
    postgres.execute("drop table if exists temperature_measurements")

NORMAL_RANGE = [(15, 35), (30, 80)]  # Rango normal para temperatura y humedad
EXTENDED_RANGE = [(low - (high - low) * 0.2, high + (high - low) * 0.2) for low, high in NORMAL_RANGE]

def generate_data(sensors: int, days: int) -> list[tuple]:
    now = datetime.now()
    ids = [f"SENS{str(i).zfill(3)}" for i in range(1, sensors + 1)]
    data: list[tuple] = []
    for id in ids:
        for day in range(days):
            date_obj = now - timedelta(days=day)
            date_str = date_obj.strftime('%Y-%m-%d')
            for minute in range(24 * 60):
                timestamp = date_obj.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(minutes=minute)
                temperature = random.uniform(*EXTENDED_RANGE[0])
                humidity = random.uniform(*EXTENDED_RANGE[1])
                data.append((id, date_str, timestamp, temperature, humidity))
    return data

def insert_postgres(data: list[tuple]) -> float:
    drop_table_postgres()
    create_table_postgres()
    query = """
        INSERT INTO temperature_measurements (sensor_id, date, event_time, temperature, humidity)
        VALUES %s
    """
    start = time.time()
    execute_values(postgres, query, data)
    end = time.time()
    return end - start

def insert_cassandra(data: list[tuple], batch_size: int) -> float:
    drop_table_cassandra()
    create_table_cassandra()
    partitioned = defaultdict(list)
    for row in data:
        partitioned[(row[0], row[1])].append(row)
    
    prepared = cassandra.prepare("""
        INSERT INTO temperature_measurements (sensor_id, date, event_time, temperature, humidity)
        VALUES (?, ?, ?, ?, ?)
    """)
    
    batch = BatchStatement()
    start = time.time()
    for rows in partitioned.values():
        for i in range(0, len(rows), batch_size):
            for row in rows[i:i + batch_size]:
                batch.add(prepared, row)
            cassandra.execute(batch)
            batch.clear()
    end = time.time()
    return end - start

def insert_test() -> None:
    print("Prueba de inserción de datos:")
    volumes: list = [7, 15, 30, 60] # Volúmenes de datos en días
    batch_sizes: list = [100, 200, 500, 1000]
    columns = ['dias', 'postgres'] + [f'cassandra({size})' for size in batch_sizes]
    results = pd.DataFrame(columns=columns)
    for volume in volumes:
        data: list[tuple] = generate_data(5, volume)
        row = {'dias': volume}
        row['postgres'] = insert_postgres(data)
        for size in batch_sizes:
            row[f'cassandra({size})'] = insert_cassandra(data, size)
        results = pd.concat([results, pd.DataFrame([row])], ignore_index=True)
    display(results)

### Pruebas de Escritura (INSERT)
Se realizaron pruebas preliminares de inserción individual en Cassandra utilizando un volumen reducido de 5 sensores durante 7 días. El tiempo requerido para completar la inserción fue excesivo, por lo que se descartó esta estrategia y se optó por evaluar únicamente la inserción por lotes (batch).

Para los experimentos se emplearon los siguientes parámetros:
- Sensores: 5
- Volúmenes de datos en días: 7, 15, 30, 60

En el caso de PostgreSQL, el tamaño del batch corresponde siempre al total de datos generados para cada volumen, es decir, toda la inserción se realiza en una sola operación masiva. Para Cassandra, se evaluaron diferentes tamaños de batch (100, 200, 500 y 1000) para analizar su impacto en el rendimiento.

La tabla de resultados presenta una columna para PostgreSQL y una columna para cada tamaño de batch en Cassandra, agrupando los resultados por volumen de datos (días).

Este enfoque permite comparar de manera clara el efecto del tamaño de batch en Cassandra y la diferencia de desempeño respecto a PostgreSQL.

In [70]:
insert_test()

Prueba de inserción de datos:


Prueba de inserción de datos:


Unnamed: 0,dias,postgres,cassandra(100),cassandra(200),cassandra(500),cassandra(1000)
0,7,2.109822,30.822227,15.861103,4.538761,4.344583
1,15,3.271147,66.552932,34.268575,15.353234,8.453861
2,30,143.133282,130.770741,5650.858164,30.523972,17.589561
3,60,13.444289,999.740117,509.043554,61.017116,36.38892


#### Propuesta de Optimización: Inserción Asíncrona y Concurrente
Para mejorar el rendimiendto en la inserción masiva de datos se propone el uso de *execute_async* que permite enviar múltiples lotes en paralelto y aprovechar mejor los recursos del clúster.

In [None]:
def insert_cassandra_async(data: list[tuple], batch_size: int, max_workers: int) -> float:
    drop_table_cassandra()
    create_table_cassandra()
    partitioned = defaultdict(list)
    for row in data:
        partitioned[(row[0], row[1])].append(row)
    prepared = cassandra.prepare("""
        INSERT INTO temperature_measurements (sensor_id, date, event_time, temperature, humidity)
        VALUES (?, ?, ?, ?, ?)
    """)
    batches: list[BatchStatement] = []
    for rows in partitioned.values():
        for i in range(0, len(rows), batch_size):
            batch = BatchStatement()
            for row in rows[i:i + batch_size]:
                batch.add(prepared, row)
            batches.append(batch)
    start = time.time()
    futures = []
    for batch in batches:
        futures.append(cassandra.execute_async(batch))
        if len(futures) >= max_workers:
            for future in futures:
                future.result()
            futures.clear()
    for future in futures:
        future.result()
    end = time.time()
    return end - start

def insert_test_async() -> None:
    print("Prueba de inserción de datos batch + async:")
    volumes: list[int] = [7, 15, 30, 60] # Volúmenes de datos en días
    workers_amounts: list[int] = [2, 4, 8, 16]   # Cantidad de workers para la inserción asíncrona
    columns = ['dias'] + [f'cassandra({workers})' for workers in workers_amounts]
    results = pd.DataFrame(columns=columns, dtype=float)
    for volume in volumes:
        data: list[tuple] = generate_data(5, volume)
        row = {'dias': float(volume)}
        for workers in workers_amounts:
            row[f'cassandra({workers})'] = insert_cassandra_async(data, 1000, workers)
        results = pd.concat([results, pd.DataFrame([row])], ignore_index=True)
    display(results)

insert_test_async()

Prueba de inserción de datos batch + async:


Unnamed: 0,dias,cassandra(2),cassandra(4),cassandra(8),cassandra(16)
0,7.0,1.214091,0.446585,0.332718,0.309937
1,15.0,2.972985,0.84392,0.717132,1.149184
2,30.0,5.566646,1.569872,1.227481,1.315997
3,60.0,11.021844,3.316874,2.647669,2.561033


La tabla de datos muestra los resultados de tiempo en segundos agrupados por volumen de datos para Cassandra con distintas cantidad de workers simultáneos.