# PRÁCTICA DE TERREMOTOS UTILIZANDO CASSANDRA

En esta práctica se usará la base de datos NoSQL de Cassandra, para el caso de uso de modelación de una serie de consultas.

En el caso de los datos sísmicos, el volumen, la variedad y, especialmente, la
velocidad de generación de la información (nuevos registros de sensores, actualizaciones de
magnitudes, alertas de tsunami, datos geoespaciales y de profundidad) hacen que un enfoque
relacional resulte poco eficiente para este caso de uso.

A continuación se observará el criterio de elección de Cassandra como base de datos NoSQL para modelar las consultas, así como el modelo de datos de las distintas tablas, inserción y actualización de datos, y la creación y ejecución de las consultas SQL propuestas en la actividad.

## 1. Ventajas y desventajas de Cassandra

Antes de empezar con ventajas y desventajas, se debe de analizar el teorema CAP, para saber de forma general si Cassandra es de utilidad en nuestro caso de uso de Terremotos.

### ===================
### Teorema CAP en Cassandra
### ===================

* ¿Que cumple simpre?:
    * Tolerancia a particiones (P): En un entorno real, Cassandra usa varios nodos a modo de backup siguiendo una topología de anillo. Por tanto, el sistema sigue funcionando aunque haya fallos o cortes de red entre nodos.
    * Disponibilidad (A): Siempre responde a las peticiones (lecturas/escrituras), aunque algunos datos puedan no estar totalmente sincronizados.

* ¿Que no cumple siempre?:
    * Consistencia (C): Se sacrifica parcialmente, ya que los datos pueden tardar un poco en propagarse entre nodos. Esto se conoce como consistencia eventual.

* ¿Realmente sirve Cassandra en este caso de uso?

    * Según el teorema CAP, Cassandra es adecuada para este caso porque en un sistema global y distribuido de datos sísmicos es más importante no perder datos y mantener el servicio disponible que tener consistencia instantánea en todas las réplicas.

### ======
### Ventajas
### ======

* Alta escalabilidad horizontal: permite añadir nodos fácilmente sin afectar la disponibilidad ni el rendimiento.

* Altísima velocidad de escritura: ideal para registrar datos de sensores y eventos sísmicos en tiempo real.

* Disponibilidad continua: sin un único punto de fallo gracias a su arquitectura distribuida y replicación entre nodos.

* Modelo flexible de datos: no se requiere esquema rígido, como pasa en SQL. Cassandra puede adaptarse fácilmente a nuevos tipos de datos, por ejemplo a nuevos parámetros geológicos que previamente no estaban definidos.

* Replicación geográfica: los datos se podrían replicarse entre regiones o continentes, útil para observatorios sísmicos internacionales.

* Lecturas rápidas por clave: muy eficiente para consultas que acceden por identificadores concretos (ID de evento, zona, etc.).

* Tolerancia a fallos: sigue funcionando incluso si algunos nodos o centros de datos quedan fuera de servicio.

### =========
### Desventajas
### =========

* No soporta transacciones complejas ni joins: En el momento que la consulta sea un poco más compleja de lo habitual y requiriese un acceso multitabla.

* Curva de aprendizaje: Requiere entender muy bien el modelo de datos y tener muy bien especificadas las consultas a resolver. Esto genera mucho debate a la hora de construir las tablas que ejecuten de manera óptima la query, si se quiere evitar el allow filtering.

* Consistencia eventual: Como se ha visto antes, puede haber pequeños retrasos en la sincronización entre réplicas.

## 2. CONEXIÓN Y CREACIÓN DE LAS TABLAS

**¡IMPORTANTE!**: Antes de ejecutar las siguientes celdas, debes de seguir todos los pasos indicados en el Readme, para habilitar el servidor de Cassandra. Este cuaderno debe de ser ejecutado con **Jupyter Notebooks en local**, y no en Google Colab.

### ==========================
### Celda 1: Inicialización y Conexión
### ==========================

In [1]:
import time
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra import OperationTimedOut

# --- Configuración ---
CASSANDRA_HOSTS = ['127.0.0.1'] 
PORT = 9042
KEYSPACE = 'seismic_data'

# 1. Conexión al Cluster con reintentos para dar tiempo al contenedor a inicializarse
cluster = None
session = None
RETRY_ATTEMPTS = 15
RETRY_DELAY_SEC = 10 

print(f"Intentando conectar a Cassandra en {CASSANDRA_HOSTS}:{PORT}...")

for i in range(RETRY_ATTEMPTS):
    try:
        # Me conecto desde el host de WSL
        cluster = Cluster(CASSANDRA_HOSTS, port=PORT) 
        session = cluster.connect()
        print(f"✅ Conexión exitosa al cluster en el intento {i+1}.")
        break
    except Exception as e:
        print(f"❌ Error de conexión (Intento {i+1}/{RETRY_ATTEMPTS}): {e}")
        if i < RETRY_ATTEMPTS - 1:
            print(f"Esperando {RETRY_DELAY_SEC} segundos...")
            time.sleep(RETRY_DELAY_SEC)
        else:
            print("🚨 Fallo al conectar después de varios intentos.")
            raise ConnectionError("No se pudo conectar al cluster de Cassandra.")


# 2. Establecer Keyspace y Verificar Versión
if session:
    # Crear Keyspace (se debe hacer en la sesión inicial)
    session.execute(f"""
        CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}
        WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
    """)
    session.set_keyspace(KEYSPACE)
    print(f"🔑 KEYSAPCE '{KEYSPACE}' activo.")
    
    # Mostrar versión
    rows = session.execute("SELECT release_version FROM system.local;").one()
    print(f"Versión de Cassandra: {rows.release_version}")

Intentando conectar a Cassandra en ['127.0.0.1']:9042...
❌ Error de conexión (Intento 1/15): ('Unable to connect to any servers', {'127.0.0.1:9042': OperationTimedOut('errors=Timed out creating connection (5 seconds), last_host=None')})
Esperando 10 segundos...
❌ Error de conexión (Intento 2/15): ('Unable to connect to any servers', {'127.0.0.1:9042': OperationTimedOut('errors=Timed out creating connection (5 seconds), last_host=None')})
Esperando 10 segundos...
❌ Error de conexión (Intento 3/15): ('Unable to connect to any servers', {'127.0.0.1:9042': OperationTimedOut('errors=Timed out creating connection (5 seconds), last_host=None')})
Esperando 10 segundos...
❌ Error de conexión (Intento 4/15): ('Unable to connect to any servers', {'127.0.0.1:9042': OperationTimedOut('errors=Timed out creating connection (5 seconds), last_host=None')})
Esperando 10 segundos...
✅ Conexión exitosa al cluster en el intento 5.
🔑 KEYSAPCE 'seismic_data' activo.
Versión de Cassandra: 5.0.5


### =========================
### Creación de las 4 Tablas Optimizadas
### =========================

A continuación se procede a la creación de las 4 tablas, una para cada una de las queies propuestas en la actividad

Las consultas a realizar son las siguientes:
 - 5.a.i: Filtra terremotos por el año del evento mayor de 2015.
 - 5.a.ii: Filtra terremotos en el país donde ocurrió y que empiece por: “Japa...” para el caso de "Japan".
 - 5.b.i: Consulta por un país concreto donde se han producido terremotos, mostrando todos los terremotos con magnitud superior a 7.0.
 - 5.b.ii: Consulta por un país concreto donde se han producido terremotos, incluyendo sólo los que presenten riesgo potencial de tsunami.

In [None]:
print("\nCreando las 4 Tablas optimizadas para las consultas...")

# ----------------------------------------------------------------------
# TABLA 1: earthquakes_by_year_range (Optimiza: 5.a.i -> eq_id y Año > 2015)
# Estrategia: Particionar por rango de años (a partir de 2015) para escalar.
# Lógica PK: year_pk = 'False' (si < 2015) o 'True' (si >= 2015).
# Clustering Key 1: Permite el filtro de rango (event_year > 2015).
# ----------------------------------------------------------------------
session.execute("""
    CREATE TABLE IF NOT EXISTS earthquakes_by_year_range (
        year_pk boolean,      
        event_year int,
        eq_id text,
        
        event_time timestamp, 
        intensity_mmi decimal, 
        country text, 
        duration_sec int,
        
        PRIMARY KEY (year_pk, event_year, eq_id)
    ) WITH CLUSTERING ORDER BY (event_year ASC, eq_id ASC);
""")


# -------------------------------------------------------------------------------
# TABLA 2: earthquakes_by_country_initial (Optimiza: 5.a.ii -> eq_id y País 'Japa...')
# Estrategia: Particionar por la inicial del país (A-Z) para balanceo de carga en != particiones.
# Lógica PK: country_initial_pk = Primera letra del País (ej: 'J' para Japan).
# Clustering Key 1: Permite el filtro de rango (>= 'Japa' AND < 'Japb').
# -------------------------------------------------------------------------------
session.execute("""
    CREATE TABLE IF NOT EXISTS earthquakes_by_country_starting_with_prefix (
        country_initial_pk text,
        country text,
        eq_id text,
        
        event_time timestamp, 
        intensity_mmi decimal,
        duration_sec int,
        
        PRIMARY KEY (country_initial_pk, country, eq_id)
    ) WITH CLUSTERING ORDER BY (country ASC, eq_id ASC);
""")


# ----------------------------------------------------------------------
# TABLA 3: earthquakes_by_country_and_magnitude (Optimiza: 5.b.i -> País y Magnitud > 7.0)
# Estrategia: La PK es el País, ya que la consulta es "por un país concreto".
# Clustering Key 1: Permite el filtro de rango (mw > 7.0).
# Clustering Key 2: Ordenar por evento más reciente.
# ----------------------------------------------------------------------
session.execute("""
    CREATE TABLE IF NOT EXISTS earthquakes_by_country_and_magnitude (
        country text,
        mw decimal,
        event_time timestamp,
        
        eq_id text,
        intensity_mmi decimal,
        duration_sec int,
        
        PRIMARY KEY (country, mw, event_time)
    ) WITH CLUSTERING ORDER BY (mw DESC, event_time DESC); 
""")


# ----------------------------------------------------------------------
# TABLA 4: tsunami_potential_by_country (Optimiza: 5.b.ii -> País y riesgo potencial=TRUE)
# Estrategia: Partición Compuesta (País, Tsunami Boolean) para acceder directamente a TRUE/FALSE.
# Clustering Key 1: Ordenar por evento más reciente.
# ----------------------------------------------------------------------
session.execute("""
    CREATE TABLE IF NOT EXISTS tsunami_potential_by_country (
        country text,
        tsunami_potential boolean,
        event_time timestamp,
        
        -- Datos Desnormalizados
        eq_id text,
        mw decimal,
        intensity_mmi decimal,
        
        PRIMARY KEY ((country, tsunami_potential), event_time) -- PK Compuesta
    ) WITH CLUSTERING ORDER BY (event_time DESC);
""")

print("Status=OK --> Las 4 Tablas han sido creadas/verificadas")


Creando las 4 Tablas optimizadas para las consultas...
Status=OK --> Las 4 Tablas han sido creadas/verificadas


## 3. INSERCIÓN DE DATOS EN CASSANDRA

Vamos a sacar nuestra lista de paises con pycountry, pero dejando 5 paises por defecto para poder hacer ejemplos controlados en el apartado 5.a.ii.

También queremos que China aparezca al menos un 25% de las veces, dado que se va a usar como país de ejemplo para el posterior UPDATE

In [3]:
import pycountry
import random

def random_country(country_set):
    countries = [c.name for c in pycountry.countries if c.name != "Czechia" and  c.name not in country_set]
    return random.choice(countries)


country_set = set(['Antarctica', 'Andorra', 'Chequia', 'Cameroon', 'Canada'])

while len(country_set) < 9:
    country_set.add(random_country(country_set=country_set))

top_9_countries = list(country_set)
print(top_9_countries)

def get_country():
    return random.choice(top_9_countries) if random.random() <= 0.75 else 'China'

['French Guiana', 'Barbados', 'Chequia', 'Cameroon', 'Canada', 'Australia', 'Brunei Darussalam', 'Andorra', 'Antarctica']


Creamos las funciones auxiliares que nos ayudarán a determinar las partition keys de las queries 5.a.i y 5.a.ii, definidas en las tablas

In [4]:
def year_pk(year):
    return True if year >= 2015 else False

def country_initial(country):
    return country[0].upper() if isinstance(country, str) and country else "X"

Ahora escogeremos 100 filas aleatorias del dataset *earthquake_data_tsunami.csv*, un dataset de Kaggle de tsunamis, prepararemos esos datos y los insertaremos en las 4 tablas de Cassandra

In [5]:
import pandas as pd
from datetime import datetime

df = pd.read_csv("./data/earthquake_data_tsunami.csv")
df_sample = df.sample(n=100, random_state=42).reset_index(drop=True)  # 100 filas aleatorias del dataset

# === Insertar registros adaptados a las 4 tablas ===
for idx, row in df_sample.iterrows():
    eq_id = f"eq_{idx+1:03d}"
    event_year = int(row['Year']) if random.random() <= 0.5 else random.choice([2012, 2013, 2014])
    event_time = datetime(event_year, int(row['Month']), 1)
    intensity_mmi = float(row['mmi'])
    country = get_country()
    duration_sec = random.randint(10, 600)
    mw = float(row['magnitude'])
    tsunami = bool(row['tsunami'])

    # --- Tabla 1: earthquakes_by_year_range ---
    session.execute("""
        INSERT INTO earthquakes_by_year_range (year_pk, event_year, eq_id, event_time, intensity_mmi, country, duration_sec)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
    """, (year_pk(event_year), event_year, eq_id, event_time, intensity_mmi, country, duration_sec))

    # --- Tabla 2: earthquakes_by_country_starting_with_prefix ---
    country_initial_pk = country[0]
    session.execute("""
        INSERT INTO earthquakes_by_country_starting_with_prefix (country_initial_pk, country, eq_id, event_time, intensity_mmi, duration_sec)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (country_initial(country), country, eq_id, event_time, intensity_mmi, duration_sec))

    # --- Tabla 3: earthquakes_by_country_and_magnitude ---
    session.execute("""
        INSERT INTO earthquakes_by_country_and_magnitude (country, mw, event_time, eq_id, intensity_mmi, duration_sec)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (country, mw, event_time, eq_id, intensity_mmi, duration_sec))

    # --- Tabla 4: tsunami_potential_by_country ---
    session.execute("""
        INSERT INTO tsunami_potential_by_country (country, tsunami_potential, event_time, eq_id, mw, intensity_mmi)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (country, tsunami, event_time, eq_id, mw, intensity_mmi))

print(f"Status=OK --> {df_sample.shape[0]} registros insertados en las 4 tablas adaptadas.")


Status=OK --> 100 registros insertados en las 4 tablas adaptadas.


## 4. UPDATE EN CASSANDRA

Elegimos la tabla *earthquakes_by_country_and_magnitude*, en la que vamos a actualizar un registro.

Tal y como nos pone el enunciado, tenemos que actualizar los registros de un país concreto, cambiando el nombre del país a letras mayúsculas. 

 - Ej: China → CHINA.

Usaremos "China" como país a actualizar, dado que en `get_country()` hemos puesto por defecto que retorne "China" un 30% de las veces, para estar bastante seguros que hay datos para actualizar. 

In [6]:

country_used_to_delete = 'China'


row = session.execute("""
    SELECT country, mw, event_time, eq_id, intensity_mmi, duration_sec
    FROM earthquakes_by_country_and_magnitude
    WHERE country = %s
""", (country_used_to_delete,)).one()

if not row:
    print(f"No se encontró ningún registro con country='{country_used_to_delete}'")
else:
    print(" -> Registro encontrado:", row)

    mw = row.mw
    event_time = row.event_time
    eq_id = row.eq_id
    intensity_mmi = row.intensity_mmi
    duration_sec = row.duration_sec

    # Insertamos el registro existente con el nuevo campo 'CHINA'
    insert_cql = """
        INSERT INTO earthquakes_by_country_and_magnitude 
        (country, mw, event_time, eq_id, intensity_mmi, duration_sec)
        VALUES (%s, %s, %s, %s, %s, %s)
    """
    session.execute(insert_cql, ('CHINA', mw, event_time, eq_id, intensity_mmi, duration_sec))
    print(f" -> Registro insertado con country='CHINA', eq_id={eq_id}")

    # Borramos el registro antiguo, ya que UPDATE no borra
    delete_cql = """
        DELETE FROM earthquakes_by_country_and_magnitude
        WHERE country=%s AND mw=%s AND event_time=%s
    """
    session.execute(delete_cql, (country_used_to_delete, mw, event_time))
    print(f" -> Registro eliminado para country='{country_used_to_delete}', eq_id={eq_id}")

    # Verificamos
    result = session.execute("""
        SELECT country, eq_id, mw, intensity_mmi, event_time
        FROM earthquakes_by_country_and_magnitude
        WHERE country = %s
    """, ('CHINA',))

    print("\n -> Registros verificados con country='CHINA':")
    for r in result:
        print(r)

 -> Registro encontrado: Row(country='China', mw=Decimal('8.6'), event_time=datetime.datetime(2005, 3, 1, 0, 0), eq_id='eq_048', intensity_mmi=Decimal('8.0'), duration_sec=443)
 -> Registro insertado con country='CHINA', eq_id=eq_048
 -> Registro eliminado para country='China', eq_id=eq_048

 -> Registros verificados con country='CHINA':
Row(country='CHINA', eq_id='eq_048', mw=Decimal('8.6'), intensity_mmi=Decimal('8.0'), event_time=datetime.datetime(2005, 3, 1, 0, 0))


En este flujo podemos ver que nos encuentra un registro que contiene "China" en su atributo country, y se puede ver todo el flujo de ejecución:
 * Primero encuentra un registro y guarda sus valores (el id, mw, event_time, ...)

 * Inserta un nuevo registro en la misma tabla, pero cambiando el atributo de country a "CHINA"

 * Elimina el registro seleccionado en el paso 1, para seguir la lógica UPDATE de cambio completo de un registro

 * Por último, se verifica que el registro haya sido cambiado en el atributo country a CHINA, y que el resto de valores mantengan la consistencia con los valores que tenía el antiguo registro eliminado

 En el output obtenido, se puede observar que sigue todos los pasos y la validación es completamente correcta

## 5. CONSULTAS CQL Y PRUEBAS

### ===================
### Definición de las Queries
### ===================

Se procede a la definición de las 4 queries. Será una query por tabla, ya que en una base de datos NoSQL siempre se debe de plantear el modelo de datos en base a las consultas que se quieren realizar (modelo de datos hecho ya en el paso de creación de las tablas)

Query 5.a.i

In [7]:
def query_earthquake_by_year_range(session, year):  
    query = """
    SELECT * FROM earthquakes_by_year_range 
    WHERE year_pk = ?  
      AND event_year > ?
    """
    prepared = session.prepare(query)

    start_time = time.time()
    rows = session.execute(prepared, (year_pk(year), year))
    end_time = time.time()

    time_ms = (end_time - start_time) * 1000

    return rows, time_ms

Query 5.a.ii

In [8]:
def get_next_prefix(prefix):
    if not prefix:
        return None
    last_char = prefix[-1]
    next_char = chr(ord(last_char) + 1)
    return prefix[:-1] + next_char

def query_earthquake_by_country_prefix(session, country_prefix):
    c_i = country_initial(country_prefix)
    upper_bound = get_next_prefix(country_prefix)

    query = """
    SELECT * FROM earthquakes_by_country_starting_with_prefix 
    WHERE country_initial_pk = ? 
      AND country >= ? 
      AND country < ?
    """
    prepared = session.prepare(query)

    start_time = time.time()
    rows = session.execute(prepared, (c_i, country_prefix, upper_bound))
    end_time = time.time()

    time_ms = (end_time - start_time) * 1000

    return rows, time_ms

Query 5.b.i

In [10]:
from decimal import Decimal
def query_earthquakes_by_magnitude(session, country, min_magnitude):
    query = """
    SELECT * FROM earthquakes_by_country_and_magnitude 
    WHERE country = ? 
      AND mw > ?
    """
    prepared = session.prepare(query)

    start_time = time.time()
    rows = session.execute(prepared, (country, Decimal(str(min_magnitude))))
    end_time = time.time()

    time_ms = (end_time - start_time) * 1000

    return rows, time_ms

Query 5.b.ii

In [11]:
def query_tsunami_potential_by_country(session, country_name):
    
    query = """
    SELECT * FROM tsunami_potential_by_country 
    WHERE country = ? 
      AND tsunami_potential = ?
    """
    prepared = session.prepare(query)

    start_time = time.time()
    rows = session.execute(prepared, (country_name, True))
    end_time = time.time()

    time_ms = (end_time - start_time) * 1000

    return rows, time_ms

### ===================
### Testeo de las Queries
### ===================

#### 5.a.i

In [12]:
year = 2017
rows, elapsed_time = query_earthquake_by_year_range(session, year)

df = pd.DataFrame(list(rows))

if df.shape[0] > 0:
    frec_yearPk = df['year_pk'].value_counts().values
    print(df.head())
    print(f"\n-> Tiempo de ejecución: {elapsed_time} ms; Registros totales obtenidos: {df.shape[0]}")
    print(f'\n-> Frecuencia de registros >= 2015: {df[df['year_pk'] == True].shape[0]}')

else:
    if year < 2015:
        print(f"No hay ningún registro desde {year} hasta 2015")
    else:
        print(f"No hay ningún registro >= {year}")

   year_pk  event_year   eq_id        country  duration_sec event_time  \
0     True        2018  eq_016  French Guiana           383 2018-10-01   
1     True        2018  eq_055       Cameroon           156 2018-08-01   
2     True        2019  eq_043          China           467 2019-08-01   
3     True        2019  eq_091         Canada           266 2019-08-01   
4     True        2020  eq_093        Andorra            40 2020-10-01   

  intensity_mmi  
0           3.0  
1           6.0  
2           4.0  
3           6.0  
4           7.0  

-> Tiempo de ejecución: 7.929325103759766 ms; Registros totales obtenidos: 10

-> Frecuencia de registros >= 2015: 10


Se simula la query filtrando por un año mayor o igual a 2015. Cabe destacar que la lógica ha sido generalizada para soportar también años inferiores a 2015, buscando desde ese año hasta 2015 (Ej: 2013 hasta 2015). Se puede probar con otro año para validar varios casos y la tabla en cuestión.

Si el año es igual o superior a 2015 se observará un otuput con este formato:

    -> Tiempo de ejecución: 4.205226898193359 ms; Registros totales obtenidos: N

    -> Frecuencia de registros >= 2015: N

El número total de registros siempre será igual a la frecuencia de los registros mayores a 2015 --> N registros.

Si el año es inferior a 2015, se observará un output de este estilo:
    
    -> Tiempo de ejecución: 4.205226898193359 ms; Registros totales obtenidos: K

    -> Frecuencia de registros >= 2015: 0

La frecuencia de los registros mayores a 2015 será siempre 0, confirmándonos que las particiones actúan de manera correcta, mandando los registros con años >= 2015 a una partición y los inferiores a la otra partición, balanceando la carga.

Además, podemos observar que en muy pocos minutos la query ha sido resuelta, indicándonos que nuestras particiones y tablas están bien optimizadas para esta consulta

#### 5.a.ii

Aquí, se debe probar con varios prefijos. Yo he probado con los siguientes, ya que en los paises insertados existe probabilidad alta de que haya registros:
 * `Prefix: C --> Output: {Cameroon, China, Chequia, Canada, ...}`
 * `Prefix: Ch --> Output: {China, Chequia, ...}`
 * `Prefix: Ca --> Output: {Cameroon, Canada, ...}`
 * `Prefix: A --> Output: {Andorra, Antartica, ...}`
 * `Prefix: An --> Output: {Andorra, Antartica, ...}`

Como mínimo saldrán esos países al insertar esos prefijos, salvo que hayas tenido muy mala suerte y justo el random no te haya metido alguno de esos paises en ninguno de los 100 registros, pero eso es poco probable

In [17]:
rows, elapsed_time = query_earthquake_by_country_prefix(session, 'A')

df = pd.DataFrame(list(rows))
frecuencia_paises = df['country'].value_counts()
print(df.head())
print(f"\n-> Tiempo de ejecución: {elapsed_time} ms; Filas obtenidas: {df.shape[0]}")
print('\n-> Frecuencias de cada país:')
print(frecuencia_paises)

  country_initial_pk  country   eq_id  duration_sec event_time intensity_mmi
0                  A  Andorra  eq_019           549 2017-10-01           5.0
1                  A  Andorra  eq_021           522 2014-05-01           4.0
2                  A  Andorra  eq_037           296 2014-11-01           6.0
3                  A  Andorra  eq_054           296 2012-09-01           5.0
4                  A  Andorra  eq_066           368 2013-05-01           5.0

-> Tiempo de ejecución: 4.824161529541016 ms; Filas obtenidas: 22

-> Frecuencias de cada país:
country
Andorra       9
Australia     8
Antarctica    5
Name: count, dtype: int64


Se puede observar que en cada partición van a estar agrupados países que tengan la mimsa inicial:

    -> Prefix = 'An' --> 'Andorra', 'Antartica' --> Partición: "A"
    -> Prefix = 'Ch' --> 'Chequia', 'Chile', 'China'  --> Partición:  "C"
    -> Prefix = 'Chi' --> 'Chile', 'China'  --> Partición: "C"

Se puede ver con esto que las particiones por iniciales funcionan bien para balancear la carga y que además sea capaz de buscar paises por prefijos en una misma partición.

También se puede observar la actuación de la Clustering Key en el *df.head*, dado que sacará los registros ordenados alfabéticamente por Country. Se debe a que en la creación de la tabla *earthquakes_by_country_starting_with_prefix*, la primera Clustering Key es country.

#### 5.b.i

In [18]:
mw = 7.1
rows, elapsed_time = query_earthquakes_by_magnitude(session, 'China', mw)

df = pd.DataFrame(list(rows))

if df.shape[0] > 0:
    frecu_escala = df[df['mw'] >= 7.0].value_counts()
    print(df.head())
    print(f"\n-> Tiempo de ejecución: {elapsed_time} ms; Filas obtenidas: {df.shape[0]}")
    print(f'\n-> Frecuencia escalas superiores a 7.0:')
    print(frecu_escala)
else:
    print(f"No hay escala disponible a partir de {mw}")

  country   mw event_time  duration_sec   eq_id intensity_mmi
0   China  7.4 2014-06-01           518  eq_084           7.0
1   China  7.4 2013-03-01           143  eq_028           8.0
2   China  7.4 2003-09-01            46  eq_007           7.0

-> Tiempo de ejecución: 6.452322006225586 ms; Filas obtenidas: 3

-> Frecuencia escalas superiores a 7.0:
country  mw   event_time  duration_sec  eq_id   intensity_mmi
China    7.4  2003-09-01  46            eq_007  7.0              1
              2013-03-01  143           eq_028  8.0              1
              2014-06-01  518           eq_084  7.0              1
Name: count, dtype: int64


En este filtro, si se prueba a ejecutar con distintos paises y distintas escalas de magnitud, se puede observar que algunos de los países no contienen magnitudes superiores a 7.0

De acuerdo al dataset descargado de Kaggle, la mayoría de las magnitudes no son superiores a 7, y al recolectar esos datos de ese dataset, y no haberle aplicado al *mw* ninguna modificación, va a tener esta consecuencia (que tiene sentido ya que una magnitud superior a 7 es bastante catastrófica y no pasa regularmente en la vida real).

También podremos observar (en el caso en el que si que existan datos), que la frecuencia será baja, lo que indica consistencia con la vida real y con el dataset, dado que son magnitudes peligrosísimas que ocurren muy poco en el planeta.

#### 5.b.ii

In [19]:
country = 'Cameroon'
rows, elapsed_time = query_tsunami_potential_by_country(session, country)

df = pd.DataFrame(list(rows))

if df.shape[0] > 0:
    print(df.head())
    print(f"\n-> Tiempo de ejecución: {elapsed_time} ms; Filas obtenidas: {df.shape[0]}")
    print(f'\n-> Frecuencia de riesgos potenciales de sunami en {country}: {df[df['tsunami_potential'] == True].shape[0]}')
else:
    print(f"{country} no tiene riesgo potencial de tsunami")

    country  tsunami_potential event_time   eq_id intensity_mmi   mw
0  Cameroon               True 2022-03-01  eq_025           4.0  6.9
1  Cameroon               True 2021-08-01  eq_081           4.0  6.9
2  Cameroon               True 2018-08-01  eq_055           6.0  7.3
3  Cameroon               True 2017-03-01  eq_074           7.0  6.6
4  Cameroon               True 2014-07-01  eq_039           4.0  6.5

-> Tiempo de ejecución: 5.401849746704102 ms; Filas obtenidas: 6

-> Frecuencia de riesgos potenciales de sunami en Cameroon: 6


Recordemos que en esta consulta, la partition key está compuesta con el país y el riesgo de tsunami, ya que así podemos buscar en la misma partición de una manera muy rápida, el país que tenga riesgo de tsunami, y a parte balanceando mucho más la carga dado que los registro de ese mismo país que no tienen rsiego de sunami van a ir a otra partición.

Como se puede evaluar en la frecuencia de riesgos de tsunami en el filtro, coincide con el total de registros obtenidos, lo que nos indica que la consulta es consistente, ya que solo queremos sacar los riesgos potenciales de dicho país.

#### Evaluación de tiempos de las consultas 

En los outputs obtenidos tras ejecutar las consultas, podemos observar que el tiempo de ejecución de las queries en Cassandra dura entre 6 y 7 milisegundos. Esto demuestra la rapidez de las consultas, por lo que para un case de uso como el de los terremotos, en el que se aplique el Real Time con la recolección de datos, unos tiempos de lectura y escritura a base de datos que sean muy rápidos es crucial.

En términos de balanceo de carga de datos, se logra repartir de forma consistente en distintas particiones de acuerdo a las consultas definidas, y sin lugar a error. Distribuir la carga implicará aumentar la velocidad en el momento en el que nuevos datos sean introducidos, y también mantener la consistencia y coherencia a la hora de sacar los resultados, sin tener que hacer uso del ALLOW FILTERING.

## 6. CERRAR SESIÓN EN CASSANDRA

In [20]:
cluster.shutdown()

Una vez se acabe todo el flujo de sesión, desconecta el cluster para no tener muchas sesiones abiertas.

Si se quiere volver a re ejecutar otra vez alguna celda concreta, tienes que volver a ejecutar la celda 1 de Inicialización y Conexión.