Recursos:
 - https://docs.datastax.com/en/developer/python-driver/3.25/getting_started/
 - https://cassandra.apache.org/doc/latest/cassandra/faq/
 - https://openwebinars.net/blog/como-usar-apache-cassandra-con-python/
 - https://docs.datastax.com/en/cql-oss/3.3/cql/cql_reference/cqlCreateTable.html
 - https://www.datastax.com/blog/most-important-thing-know-cassandra-data-modeling-primary-key

## Testeando la conexión a cassandra y algunas queries

In [1]:
import pandas as pd
import numpy as np

In [2]:
from cassandra.cluster import Cluster

#Conexion al cluster
#Es importante haber mapeado el puerto usando -p 9042:9042 al crear el contenedor

cluster = Cluster(['127.0.0.1'], port=9042)
#session = cluster.connect('bdnosql') #El keyspace lo creé desde la terminal

In [21]:
session = cluster.connect()
session.execute(
    """
    DROP KEYSPACE IF EXISTS bdnosql;
    """
)
session.execute(
    """
    CREATE KEYSPACE bdnosql 
    WITH replication = {'class' : 'SimpleStrategy', 'replication_factor':1};
    """
)
session.set_keyspace('bdnosql')

session.execute(
    """
    CREATE TABLE estaciones(
    id int PRIMARY KEY,
    nombre text
    );
    """
)

<cassandra.cluster.ResultSet at 0x26d65dcc970>

In [23]:
# Insertar cosas a la tabla estaciones 
sentencia = session.prepare( 
    """
    INSERT INTO estaciones (id, nombre)
    VALUES (?, ?)
    """
)

row = (2, 'Verano')
session.execute(sentencia, row)

row2 = (3, 'Otoño')
resultado = session.execute(sentencia, row2)

In [24]:
# Leer cosas de la tabla
rows = session.execute('SELECT id, nombre FROM estaciones')
for row in rows:
    print(row.id, row.nombre)

2 Verano
3 Otoño


In [25]:
sentencia_2 = session.prepare("SELECT id, nombre FROM estaciones WHERE id=?")
rows = session.execute(sentencia_2, [3])
for row in rows:
    print(row[0], row[1])

3 Otoño


In [26]:
#Insertando desde un dataFrame de pandas
import pandas as pd

data = [[4, 'Invierno'], [5, 'OtraEstacion']]
data = pd.DataFrame(data)

for row in data.values:
    session.execute(sentencia, row)

## Reconstrucción de tablas para precargarlas a base de datos

Para cargar los datos queremos:
 - Definir las tablas dentro del keyspace
 - Definir los métodos para popularlas
 - Cargar los datos 

 Dudas:
- ¿Las categorías están preestablecidas o cada usuario puede crear nuevas?

Consideraciones:
- El usuario al ingresar un nuevo libro no conoce el id, solo el título. Debe ser posible acceder al id del libro a partir de su nombre.

In [27]:
data = pd.read_csv('DatosBBDDE_test.csv')
data.columns = ['categoria', 'usuario', 'libro', 'calificacion']
data = data[['usuario', 'libro', 'categoria', 'calificacion']]

In [28]:
data.head()

Unnamed: 0,usuario,libro,categoria,calificacion
0,Yeudiel,Harry Potter,Fantasia,5
1,Eduardo,One Shot,Suspenso,5
2,FernandoA,On the road,Novela,5
3,Edgar,The Pillars of the Earth,Novela,4
4,Daniel,100 años de soledad,Realismo mágico,5


---------

## Creación de keyspace y tablas

In [59]:
session = cluster.connect()

session.execute(
    """
    DROP KEYSPACE IF EXISTS bd_libros;
    """
)

# Creación del keyspace
session.execute(
    """
    CREATE KEYSPACE bd_libros 
    WITH replication = {'class' : 'SimpleStrategy', 'replication_factor':1};
    """
)
session.set_keyspace('bd_libros')


### Creación de las tablas

# LIBROS POR CLIENTE (Queries 1,2,3)
session.execute(
    """
    CREATE TABLE libros_por_cliente (
    id_cliente text,
    titulo_libro text,
    autor_libro text,
    categoria text,
    calificacion int,
    nombre_cliente text STATIC,
    pais text STATIC,
    membresia text STATIC,
    PRIMARY KEY(id_cliente, titulo_libro, autor_libro)
    );
    """
)

## CATEGORIAS POR CLIENTE
#session.execute(
#    """
#    CREATE MATERIALIZED VIEW categorias_por_cliente AS
#    SELECT id_cliente, categoria, titulo_libro, autor_libro, calificacion
#    FROM libros_por_cliente
#    WHERE categoria IS NOT NULL
#    AND titulo_libro IS NOT NULL
#    AND autor_libro IS NOT NULL
#    PRIMARY KEY(id_cliente, categoria, titulo_libro, autor_libro);
#    """
#)

#CATEGORIAS POR CLIENTE
session.execute(
    """
    CREATE TABLE categorias_por_cliente(
    id_cliente text,    
    categoria text,
    titulo_libro text,
    autor_libro text,
    calificacion int,
    PRIMARY KEY(id_cliente,categoria, titulo_libro, autor_libro, calificacion)
    );
    """
)


# CLIENTES POR LIBRO (Query 4)
session.execute(
    """
    CREATE TABLE clientes_por_libro (
    titulo_libro text,
    autor_libro text,
    calificacion int,
    id_cliente text,
    categoria text,
    PRIMARY KEY((titulo_libro, autor_libro), calificacion, id_cliente)
    );
    """
)


# LIBROS POR CATEGORIA (Query 5)
session.execute(
    """
    CREATE TABLE libros_por_categoria (
    categoria text,
    titulo_libro text,
    autor_libro text,
    calificacion_promedio float,
    PRIMARY KEY (categoria, titulo_libro, autor_libro, calificacion_promedio)
    );
    """
)
session.execute(
    """
    CREATE INDEX ON libros_por_categoria(calificacion_promedio)"""
)

<cassandra.cluster.ResultSet at 0x7fe32874a4f0>

Quizá no es necesario tener un id_libro y es suficiente con usar el nombre del libro como identificador único.

## Queries

In [60]:
def Q1_alta_usuario(id_cliente:str, nombre_cliente:str, pais:str, membresia:str):
    existe_usuario = session.execute(
        """
        SELECT * FROM libros_por_cliente
        WHERE id_cliente = %s
        LIMIT 1
        """,
        [id_cliente]
    )

    if existe_usuario:
        return None

    session.execute(
        """
        INSERT INTO libros_por_cliente
        (id_cliente, nombre_cliente, pais, membresia)
        VALUES (%s, %s, %s, %s)
        """,
        (id_cliente, nombre_cliente, pais, membresia)
    )
    return 

In [72]:
def Q2_agregar_calificacion(id_cliente:str , titulo_libro:str, autor_libro:str, categoria:str, calificacion:int):
    existe_calif= session.execute(
        """
        SELECT categoria, calificacion FROM libros_por_cliente
        WHERE id_cliente=%s AND titulo_libro=%s  AND autor_libro=%s
        LIMIT 1
        """,
        [id_cliente, titulo_libro, autor_libro]
    )
    info_previa = existe_calif.one()
    
    if existe_calif:
        # pedir a usuario confirma que desea cambiar la calificación
        confirmacion_del_usuario = 1 #respuesta del usuario
        if not confirmacion_del_usuario:
            return

        info_previa = existe_calif.one()
        
        #Borrar la calificación anterior
        session.execute(
            """
            DELETE FROM clientes_por_libro
            WHERE titulo_libro=%s
            AND autor_libro=%s
            AND calificacion=%s
            AND id_cliente=%s
            """,
            (titulo_libro, autor_libro, info_previa.calificacion, id_cliente)
        )
        session.execute(
            """
            DELETE FROM categorias_por_cliente
            WHERE id_cliente=%s 
            AND categoria=%s
            AND titulo_libro=%s
            AND autor_libro=%s
            AND calificacion=%s
            """,
            (id_cliente, info_previa.categoria, titulo_libro, autor_libro, info_previa.calificacion)
        )
        

    #Agregar la nueva calificación
    session.execute(
        """
        INSERT INTO libros_por_cliente 
        (id_cliente, titulo_libro, autor_libro, categoria, calificacion)
        VALUES (%s, %s, %s, %s, %s)
        """,
        (id_cliente, titulo_libro, autor_libro, categoria, calificacion)
    )
    session.execute(
        """
        INSERT INTO clientes_por_libro 
        (titulo_libro, autor_libro, id_cliente, calificacion, categoria)
        VALUES (%s, %s, %s, %s, %s)
        """,
        (titulo_libro, autor_libro, id_cliente, calificacion, categoria)
    )
    session.execute(
        """
        INSERT INTO categorias_por_cliente 
        (id_cliente, titulo_libro, autor_libro, categoria, calificacion)
        VALUES (%s, %s, %s, %s, %s)
        """,
        (id_cliente, titulo_libro, autor_libro, categoria, calificacion)
    )
    

    #Actualizar calificaciones promedio
    calificacion_promedio = session.execute(
        """
        SELECT AVG(calificacion) FROM clientes_por_libro
        WHERE titulo_libro=%s
        AND autor_libro=%s
        """,
        (titulo_libro, autor_libro)
    )
    calificacion_promedio = round(calificacion_promedio.one()[0], 2)

    categorias_del_libro = session.execute(
            """
            SELECT categoria FROM clientes_por_libro
            WHERE titulo_libro =%s
            AND autor_libro = %s
            """,
            (titulo_libro, autor_libro)
    )
    
    #En caso de que sea la única review que asigna esa categoría al libro y se cambie la categoría, necesitamos
    #eliminar la fila de la tabla libros_por_categoría
    if info_previa is not None:
        if  info_previa.categoria != categoria:
            unica_calificacion = True
    
            for row in categorias_del_libro:
                if row.categoria == info_previa.categoria: #entonces hay otras reviews que categorizan igual al libro
                    unica_calificacion = False
                session.execute(
                    """
                    DELETE FROM libros_por_categoria
                    WHERE categoria=%s
                    AND titulo_libro=%s
                    AND autor_libro= %s
                    """,
                    (row.categoria, titulo_libro, autor_libro)
                )
                session.execute(
                    """
                    INSERT INTO libros_por_categoria
                    (categoria, titulo_libro, autor_libro, calificacion_promedio)
                    VALUES (%s, %s, %s, %s)
                    """,
                    (row.categoria, titulo_libro, autor_libro, calificacion_promedio)
                )
            
            if unica_calificacion:
                session.execute(
                    """
                    DELETE FROM libros_por_categoria
                    WHERE categoria=%s
                    AND titulo_libro=%s
                    AND autor_libro=%s
                    """,
                    (info_previa.categoria, titulo_libro, autor_libro)
                )
    
    else:
        for row in categorias_del_libro:
            session.execute(
                """
                DELETE FROM libros_por_categoria
                WHERE categoria=%s
                AND titulo_libro=%s
                AND autor_libro= %s
                """,
                (row.categoria, titulo_libro, autor_libro)
            )
            session.execute(
                """
                INSERT INTO libros_por_categoria
                (categoria, titulo_libro, autor_libro, calificacion_promedio)
                VALUES (%s, %s, %s, %s)
                """,
                (row.categoria, titulo_libro, autor_libro, calificacion_promedio)
            )

    return 

In [83]:
def Q3_categoría_preferida_por_cliente(id_cliente:str):
    
    resultado = session.execute(
        """
        SELECT categoria, AVG(calificacion) FROM categorias_por_cliente
        WHERE id_cliente=%s
        GROUP BY categoria
        """,
        [id_cliente]
    )

    resultado = pd.DataFrame(resultado.all(), columns=['cateogoria', 'avg_calificacion'])
    resultado = resultado.sort_values(by='avg_calificacion', ascending=False)
    resultado = resultado.values[0]

    return (resultado[0], resultado[1])

In [63]:
def Q4_mas_disfrutaron_libro(titulo_libro:str, autor_libro:str):
    
    resultado_max_calif = session.execute(
        """
        SELECT MAX(calificacion) FROM clientes_por_libro
        WHERE titulo_libro=%s
        AND autor_libro=%s
        LIMIT 1
        """,
        (titulo_libro, autor_libro)
    )

    if resultado_max_calif is None:
        return None

    max_calif = resultado_max_calif.one()[0]

    resultado_clientes = session.execute(
        """
        SELECT id_cliente, calificacion FROM clientes_por_libro
        WHERE titulo_libro=%s
        AND autor_libro=%s
        AND calificacion=%s
        """, 
        (titulo_libro, autor_libro, max_calif)
    )
        
    return pd.DataFrame(resultado_clientes.all())

In [85]:
# Falta seleccionar el libro de mejor rating
def Q5_mejores_libros_por_categoria(categoria:str, n:int=10):
    
    res = session.execute(
        """
        SELECT titulo_libro, autor_libro, calificacion_promedio FROM libros_por_categoria
        WHERE categoria=%s
        ORDER BY calificacion_promedio DESC
        LIMIT %s;
        """,
        (categoria, n)
    )

    return pd.DataFrame(res.all())

## Testing las queries

In [65]:
#LLENANDO LAS TABLAS

# Dar de alta usuarios
Q1_alta_usuario('MarceCL', 'Marcela Cruz', 'México', 'Estudiante')
Q1_alta_usuario('Yeu_L', 'Yeudiel Lara', 'Francia', 'Profesor')
Q1_alta_usuario('pabs', 'Pablo Castillo', 'Chile', 'Premium')

# Añadir libros y calificaciones
Q2_agregar_calificacion('MarceCL', 'Duna', 'Frank Herbert', 'Ciencia Ficción', 5)
Q2_agregar_calificacion('MarceCL', 'Duna 2', 'Frank Herbert', 'Ciencia Ficción', 1)
Q2_agregar_calificacion('Yeu_L', 'Duna', 'Frank Herbert', 'Ciencia Ficción', 3)
Q2_agregar_calificacion('pabs', 'Duna', 'Frank Herbert', 'Ciencia Ficción', 4)
Q2_agregar_calificacion('pabs', 'Duna', 'Frank Herbert', 'Ciencia Ficción', 5)
Q2_agregar_calificacion('pabs', 'La Máquina del Tiempo', 'Herbert George Wells', 'Ciencia Ficción', 5)
Q2_agregar_calificacion('MarceCL', 'El psicoanalista', 'John Katzenbach', 'Thriller', 5)

Ciencia Ficción
Ciencia Ficción
Ciencia Ficción
Ciencia Ficción
Ciencia Ficción
Ciencia Ficción
Ciencia Ficción
Ciencia Ficción
Thriller


In [66]:
res = session.execute(
        """
        SELECT * FROM libros_por_cliente
        """
    )
res = pd.DataFrame(res.all())
res

Unnamed: 0,id_cliente,titulo_libro,autor_libro,membresia,nombre_cliente,pais,calificacion,categoria
0,pabs,Duna,Frank Herbert,Premium,Pablo Castillo,Chile,5,Ciencia Ficción
1,pabs,La Máquina del Tiempo,Herbert George Wells,Premium,Pablo Castillo,Chile,5,Ciencia Ficción
2,MarceCL,Duna,Frank Herbert,Estudiante,Marcela Cruz,México,5,Ciencia Ficción
3,MarceCL,Duna 2,Frank Herbert,Estudiante,Marcela Cruz,México,1,Ciencia Ficción
4,MarceCL,El psicoanalista,John Katzenbach,Estudiante,Marcela Cruz,México,5,Thriller
5,Yeu_L,Duna,Frank Herbert,Profesor,Yeudiel Lara,Francia,3,Ciencia Ficción


In [67]:
res = session.execute(
        """
        SELECT * FROM clientes_por_libro
        """
    )
res = pd.DataFrame(res.all())
res

Unnamed: 0,titulo_libro,autor_libro,calificacion,id_cliente,categoria
0,El psicoanalista,John Katzenbach,5,MarceCL,Thriller
1,Duna,Frank Herbert,3,Yeu_L,Ciencia Ficción
2,Duna,Frank Herbert,5,MarceCL,Ciencia Ficción
3,Duna,Frank Herbert,5,pabs,Ciencia Ficción
4,Duna 2,Frank Herbert,1,MarceCL,Ciencia Ficción
5,La Máquina del Tiempo,Herbert George Wells,5,pabs,Ciencia Ficción


In [68]:
res = session.execute(
        """
        SELECT * FROM libros_por_categoria
        """
    )
res = pd.DataFrame(res.all())
res

Unnamed: 0,categoria,titulo_libro,autor_libro,calificacion_promedio
0,Ciencia Ficción,Duna,Frank Herbert,4.0
1,Ciencia Ficción,Duna 2,Frank Herbert,1.0
2,Ciencia Ficción,La Máquina del Tiempo,Herbert George Wells,5.0
3,Thriller,El psicoanalista,John Katzenbach,5.0


In [86]:
# CONSULTANDO LAS TABLAS

res1 = Q3_categoría_preferida_por_cliente('MarceCL')
print(res1)

res2 = Q4_mas_disfrutaron_libro('Duna', 'Frank Herbert')
print(res2)

res3 = Q5_mejores_libros_por_categoria('Ciencia Ficción')
print(res3)

('Thriller', 5)
  id_cliente  calificacion
0    MarceCL             5
1       pabs             5


InvalidRequest: Error from server: code=2200 [Invalid query] message="Order by currently only supports the ordering of columns following their declared order in the PRIMARY KEY"