In [None]:
!pip install cassandra-driver

# Módulo de mensajeria - Obligatorio 2023
Para el contexto del obligatorio de Modelo Avanzados de base de datos se presento la realidad que tiene el servidor de discord y sus distintos módulos. Dentro de esta realidad se presento la próblematica del módulo de mensajería para el cual se eligio el motor de base de datos Cassandra. 

Se busco la estrategía para poder modelar las tablas de cassandra (column family) messages teniendo en cuenta las siguientes restricciones:
- Es un subsistema que debe manejar cientos de millones de mensajes diarios generados por los usuarios en los miles de servidores y canales disponibles.
- Los mensajes son generados por usuarios en cada uno de los canales de los servidores a los
cuales está suscrito.
- Sobre este sistema no se harán búsquedas complejas de texto si no que lecturas específicas
de mensajes ya sea por su id o por el momento de tiempo en el que fueron generados.

Dado las características de cassandra es muy relevante definir la estructura de datos en base a como esta va a ser consultada. A su vez las particiones se tienen que buscar estrategía para que no crezcan de forma descontrolada ya que hay limites establecidos donde complica la operación.

Teniendo esto en cuenta se definio el siguiente esquema:

```sql
CREATE TABLE messages (
        channel_id      UUID,
        date            DATE,
        message_id      TIMEUUID,
        datetime        TIMESTAMP,
        message         VARCHAR,
        user_id         UUID,
        links           SET<VARCHAR>,
        hashtags        SET<VARCHAR>,
        user_references SET<VARCHAR>,
        PRIMARY KEY ((channel_id, date), message_id) 
) WITH CLUSTERING ORDER BY (message_id DESC);

CREATE TABLE message_dates (
    channel_id        UUID,
    date              DATE,
  PRIMARY KEY (channel_id, date)
) WITH CLUSTERING ORDER BY (date DESC);

CREATE TABLE pinned_messages(
    channel_id        UUID,
    message_id      TIMEUUID,
    datetime           TIMESTAMP,
    message           VARCHAR,
    user_id              UUID,
    PRIMARY KEY (channel_id, message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)

```
Esta propuesta de la tabla mensajes tiene la particularidad que particiona por canal y por día, buscando contener el crecimiento de la partición. Y a su vez se define la clave de cluster message_id del tipo TIMEUUID con ORDER BY para tener los mensajes ordenados.
El equipo en este esquema se preocupo en intentar analizar cuales eran las opciones disponibles para poder brindar el resultado de mensajes historicos, con la particularidad que no se puedan realizar consultas agresivas que iteren sobre todas las particiones del cluster.

Para probar esto se realizó una prueba de concepto utilizando el driver de cassandra para python (https://pypi.org/project/cassandra-driver/)

In [16]:
import pandas as pd
import uuid
from datetime import date, timedelta
from cassandra.cluster import Cluster
from datetime import datetime

In [3]:
cluster = Cluster(contact_points=['localhost'], port=9042)


In [5]:
session = cluster.connect()


Chequeo si se puede obtener la versión del servidor de cassandra

In [6]:
row = session.execute("SELECT release_version FROM system.local").one()
if row:
    print(row[0])

3.11.13


Se genera una consulta simple contra la estructura de mensajes y se carga los datos en un dataframe

In [7]:
result = session.execute("select * from obligatorio.messages")

In [8]:
df = pd.DataFrame(list(result))


In [10]:
df.head(10)

Unnamed: 0,channel_id,date,message_id,datetime,hashtags,links,message,user_id,user_references
0,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,a2c16120-2032-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
1,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,9444bb10-2032-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
2,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,1a5d54e0-202f-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
3,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,302d2fd0-1f70-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola como estas,1e934199-fe55-4f61-a90c-6313f9d1c3a8,
4,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,24b6ebf0-1f70-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola mundo,25445c5c-be7f-43e6-8614-e9637b6acfc1,
5,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,a6a0efe0-1f6f-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola mundo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
6,59a46093-dd92-4b57-8e62-b50502d59736,2023-09-21,f0bee930-202b-11ee-8dd0-5d8cce29f123,2023-09-21 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
7,59a46093-dd92-4b57-8e62-b50502d59736,2023-09-18,7eb15ee0-202b-11ee-8dd0-5d8cce29f123,2023-09-18 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
8,59a46093-dd92-4b57-8e62-b50502d59736,2023-06-11,035a10d0-202f-11ee-8dd0-5d8cce29f123,2023-06-11 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
9,59a46093-dd92-4b57-8e62-b50502d59736,2023-06-11,f4f95730-202e-11ee-8dd0-5d8cce29f123,2023-06-11 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,


Se crea un conjunto de operaciones de utilidades que permita validar que la estructura funciona de forma correcta. 

In [18]:
def save_message(channel_id, message_datetime, message, user_id, hashtags, links, user_references):    
    """
    Guarda un mensaje en un canal especificado.

    Args:
        channel_id (str): El ID del canal.
        message_datetime (datetime): La fecha y hora del mensaje.
        message (str): El contenido del mensaje.
        user_id (str): El ID del usuario que envía el mensaje.
        hashtags (list): Lista de hashtags asociados al mensaje.
        links (list): Lista de enlaces asociados al mensaje.
        user_references (list): Lista de usuarios mencionados en el mensaje.
    """    
    date_str = message_datetime.strftime('%Y-%m-%d')
    channel_id_uuid = uuid.UUID(channel_id)
    user_id_uuid = uuid.UUID(user_id)
    
    # escribe un menssaje
    
    session.execute("""
        insert into obligatorio.messages (channel_id, date, message_id, datetime, hashtags, links, 
            message, user_id, user_references) values (
            %s, %s, now(), %s, %s, %s, %s, %s, %s);""", [channel_id_uuid, date_str, date_str, hashtags, links, message, user_id_uuid, 
                                                         user_references])
    
    # Obtiene la maxima fecha
    
    result = session.execute("""
        select date from obligatorio.message_dates where channel_id = %s and date = %s""", [channel_id_uuid, date_str]).one()

    if result is None:
        session.execute("insert into obligatorio.message_dates (channel_id, date) values (%s, %s)", [channel_id_uuid, date_str])
        
def pin_message(channel_id, message_id, message_datetime, message, user_id):
    """
    Pinnea un mensaje en un canal especificado.

    Args:
        channel_id (str): El ID del canal.
        message_id (str): El ID del mensaje a pinneanr.
        message_datetime (datetime): La fecha y hora del mensaje.
        message (str): El contenido del mensaje.
        user_id (str): El ID del usuario que pinnea el mensaje.
    """    
    channel_id_uuid = uuid.UUID(channel_id)
    message_id_uuid = uuid.UUID(message_id)
    user_id_uuid = uuid.UUID(user_id)
    date_str = message_datetime.strftime('%Y-%m-%d')

    result = session.execute("""
        select * from obligatorio.pinned_messages where channel_id = %s and message_id = %s""", [channel_id_uuid, message_id_uuid]).one()
    
    if result is None:
        session.execute("insert into obligatorio.pinned_messages (channel_id, message_id, datetime, message, user_id) values (%s, %s, %s, %s, %s)", 
                        [channel_id_uuid, message_id_uuid, date_str, message, user_id_uuid])

def undo_pin_message(channel_id, message_id):
    """
    Deshace el pin de un mensaje en un canal especificado.

    Args:
        channel_id (str): El ID del canal.
        message_id (str): El ID del mensaje a despinneanr.
    """    
    channel_id_uuid = uuid.UUID(channel_id)
    message_id_uuid = uuid.UUID(message_id)
    
    session.execute("""delete from obligatorio.pinned_messages where channel_id = %s and message_id = %s""", [channel_id_uuid, message_id_uuid])
    

In [None]:
def read_messages(channel_id, date, last_message_id, limit):
    """
    Lee los mensajes de un canal en una fecha y límite específicos.

    Args:
        channel_id (str): El ID del canal.
        date (datetime): La fecha de los mensajes.
        last_message_id (str): El ID del último mensaje obtenido anteriormente.
        limit (int): El límite de mensajes a recuperar.

    Returns:
        list: Una lista de mensajes del canal.
    """
    date_str = date.strftime('%Y-%m-%d')
    channel_id_uuid = uuid.UUID(channel_id)
    
    if last_message_id is None:
        result = session.execute("""
            select * from obligatorio.messages where channel_id = %s and date = %s limit %s""", [channel_id_uuid, date_str, limit])        
    else:
        message_id_uuid = uuid.UUID(last_message_id)

        result = session.execute("""
            select * from obligatorio.messages where channel_id = %s and date = %s and message_id < %s  limit %s""", [channel_id_uuid, date_str, message_id_uuid, limit])
    
    return list(result)

def read_all_messages(channel_id, pagination_cursor, page_size):
    """
    Lee los mensajes de forma paginada, esta operación no fue realizada y solo se presenta un esbozo de como podría ser
    realizada dada la complejidad de la tabla elegida.
    """

    pass
    # Se consulta la lista de fechas para la cual hay mensajes en el canal en message_dates.
    # Si pagination_cursor es vacio (primera vez)
    #    Se itera en las fechas y channel id, se obtiene los mensajes hasta llegar a page_size (se hace una consulta por fecha, channel_id)
    # Si no 
    #    Se descodifica el cursor obteniendo fecha y max(message_id) de la pagina anterior
    #    Se consulta los mensajes restantes de fecha y max(message_id) y se sigue obteniendo mensajes por cada fecha hasta page_size
    # Se encodea un cursor pasando fecha y max(message_id) y se devuelve el resultado.                          


Se prueba guardar un mensaje y luego leer para esa fecha de forma paginada usando limit y last_messageid con éxito.  La tabla ya contenia registros.

In [23]:
channel_id = "59a46093-dd92-4b57-8e62-b50502d59736"
user_id = "4ff38b30-3559-41f1-8874-42cf15bf9b3b"
datetime = datetime.strptime('2023-07-10', '%Y-%m-%d')

save_message(channel_id, datetime, "Hola Juan", user_id, None, None, None)

In [38]:
# Se lee la primer pagina de tamaño 2 para los mensajes del 10/7
messages = read_messages(channel_id, datetime.date(), None, 2)

In [39]:
df = pd.DataFrame(list(messages))
df

Unnamed: 0,channel_id,date,message_id,datetime,hashtags,links,message,user_id,user_references
0,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,cf5eaa20-20d3-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Juan,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
1,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,a2c16120-2032-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,


In [43]:
# Se solicita una segunda página tomando en cuenta el ultimo message id enviado.
messages = read_messages(channel_id, datetime.date(), str(df.loc[1, "message_id"]), 10)

In [44]:
df = pd.DataFrame(list(messages))
df

Unnamed: 0,channel_id,date,message_id,datetime,hashtags,links,message,user_id,user_references
0,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,9444bb10-2032-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
1,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,1a5d54e0-202f-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola Mateo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,
2,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,302d2fd0-1f70-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola como estas,1e934199-fe55-4f61-a90c-6313f9d1c3a8,
3,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,24b6ebf0-1f70-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola mundo,25445c5c-be7f-43e6-8614-e9637b6acfc1,
4,59a46093-dd92-4b57-8e62-b50502d59736,2023-07-10,a6a0efe0-1f6f-11ee-8dd0-5d8cce29f123,2023-07-10 03:00:00,,,Hola mundo,4ff38b30-3559-41f1-8874-42cf15bf9b3b,


Se solicitan todos los mensajes para chequear que pagino bien

In [45]:
messages = read_messages(channel_id, datetime.date(), None, 10)

In [None]:
df = pd.DataFrame(list(messages))
df

Por último se prueban los mensaje el pin y el unpin

In [30]:
pin_message('59a46093-dd92-4b57-8e62-b50502d59736', '9444bb10-2032-11ee-8dd0-5d8cce29f123', datetime, 'Hola Mateo', user_id)

In [31]:
undo_pin_message('59a46093-dd92-4b57-8e62-b50502d59736', '9444bb10-2032-11ee-8dd0-5d8cce29f123')