In [1]:
import pymongo
import json
import pandas as pd
import sqlite3
import psycopg2
import duckdb
import matplotlib.pyplot as plt
import time
from memory_profiler import memory_usage
import numpy as np
from sqlalchemy import create_engine, text
import json
from functools import partial
from pymemcache.client import base
from pymemcache import serde
import re
from abc import ABC, abstractmethod
import warnings

warnings.filterwarnings('ignore')

In [2]:
cache = base.Client(('127.0.0.1', 11211),
    serde=serde.pickle_serde)

In [4]:
# Clase abstracta con la funcionalidad común de las bases de datos
class DB(ABC):
    def __init__(self, dbname, table_name, unique_cols, query_type="sql", close_conn_after_op=False):
        self.dbname = dbname
        self.table_name = table_name
        self._unique_cols = unique_cols or []
        self.oid = 0
        self._cached_queries = set()
        self._query_type = query_type
        self.close_conn_after_op = close_conn_after_op
        if self.close_conn_after_op:
            try:
                self._open_connection()
                self._delete_table()
            finally:
                self._close_connection()
        else:
            self._open_connection()
            self._delete_table()

    @abstractmethod
    def _delete_table(self):
        pass

    def _get_cache_key(self, query):
        # La llave de la caché usa el nombre de la implementación concreta para poder hacer pruebas con todas a la vez
        return self.__class__.__name__ + "/" + re.sub(r'\s+', '#', str(query))

    def _clear_cache(self):
        for key in list(self._cached_queries):
            cache.delete(key)
        self._cached_queries = set()

    def insert(self, data_csv, data_json, *_, oid=None, batch_size=None, **__):
        if oid is None or oid >= self.oid:
            self.oid += 1
            self._clear_cache()
            if batch_size is None:
                batch_size = len(data_json) # Si no se especifica un tamaño de batch, insertar todo a la vez
            if self.close_conn_after_op:
                try:
                    self._open_connection()
                    return self._insert(data_csv, data_json, batch_size)
                finally:
                    self._close_connection()
            else:
                return self._insert(data_csv, data_json, batch_size)

    @abstractmethod
    def _insert(self, data_csv, data_json, batch_size):
        pass

    def read(self, *_, oid=None, query=None, use_cache=True, **__):
        if oid is None or oid >= self.oid:
            self.oid += 1
            # Usar un select all como query predeterminada
            query = query or (f'SELECT * FROM {self.table_name};' if self._query_type == "sql" else {})
            key = self._get_cache_key(query)
            cached_data = cache.get(key)
            if use_cache and cached_data is not None:
                return cached_data
            if self.close_conn_after_op:
                try:
                    self._open_connection()
                    data = self._read(query)
                finally:
                    self._close_connection()
            else:
                data = self._read(query)
            cache.set(key, data)
            self._cached_queries.add(key)
            return data

    @abstractmethod
    def _read(self, query):
        pass

    def update(self, *_, oid=None, **__):
        if oid is None or oid >= self.oid:
            self.oid += 1
            self._clear_cache()
            if self.close_conn_after_op:
                try:
                    self._open_connection()
                    return self._update()
                finally:
                    self._close_connection()
            else:
                self._update()

    @abstractmethod
    def _update(self):
        pass

    def to_df(self):
        if self.close_conn_after_op:
            try:
                self._open_connection()
                return self._to_df()
            finally:
                self._close_connection()
        else:
            return self._to_df()

    @abstractmethod
    def _to_df(self):
        pass

    @abstractmethod
    def _open_connection(self):
        pass

    @abstractmethod
    def _close_connection(self):
        pass

    def create_index(self, index_cols=None):
        if index_cols is None:
            index_cols = self._unique_cols

        if self.close_conn_after_op:
            try:
                self._open_connection()
                return self._create_index(index_cols)
            finally:
                self._close_connection()
        else:
            return self._create_index(index_cols)

    @abstractmethod
    def _create_index(self, index_cols):
        pass

    def __del__(self):
        self._clear_cache()
        if not self.close_conn_after_op:
            self._close_connection()

In [5]:
# Clase para calcular tiempos en el gestor de MongoDB
class MongoDB(DB):
    def __init__(self, dbname, table_name, unique_cols=None, close_conn_after_op=False):
        super().__init__(dbname, table_name, unique_cols, query_type="mongo", close_conn_after_op=close_conn_after_op)

    def _delete_table(self):
        self.db.drop_collection(self.table_name)

    def _insert(self, _, data_json, batch_size):
        for i in range(0, len(data_json), batch_size):
            self.collection.insert_many(data_json[i:i+batch_size]) # Inserta los registros en batches

    def _read(self, query):
        return list(self.collection.find(query))

    def _update(self):
        """
        Actualiza el campo 'dni' añadiendo un 0 al final de cada valor para todos los documentos en la colección.
        La actualización se realiza de una sola vez para todos los documentos.
        """
        # Busca todos los documentos y selecciona el campo 'dni'
        documents = self.collection.find({}, {"dni": 1})

        bulk_updates = []

        for document in documents:
            current_dni = document.get('dni', '')
            new_dni = str(current_dni) + '0'

            # Preparar la operación de actualización en bloque
            bulk_updates.append(
                pymongo.UpdateOne(
                    {"_id": document["_id"]}, # Filtra por el ID del documento
                    {"$set": {"dni": new_dni}} # Establece el nuevo valor de 'dni'
                )
            )

        if bulk_updates:
            self.collection.bulk_write(bulk_updates)

    def _close_connection(self):
        if self._client:
            self._client.close()
            self._client = None

    def _open_connection(self):
        self._client = pymongo.MongoClient('mongodb://localhost:27017/')
        self.db = self._client[self.dbname]
        self.collection = self.db[self.table_name]

    def _to_df(self):
        return pd.DataFrame(self.read()) # Usa la query predeterminada para obtener la base de datos entera

    def _create_index(self, index_cols):
        for col in index_cols:
            self.collection.create_index([(col, pymongo.ASCENDING)], unique=True)


# Clase para calcular tiempos en el gestor de PostgreSQL
class PostgresqlDB(DB):
    def __init__(self, dbname, table_name, unique_cols=None, close_conn_after_op=False):
        super().__init__(dbname, table_name, unique_cols, close_conn_after_op=close_conn_after_op)

    def _delete_table(self):
        self.cursor.execute(text(f"DROP TABLE IF EXISTS {self.table_name};"))
        self.cursor.commit()

    def _create_table(self, data_csv):
        """
        Create a table based on the DataFrame's columns and types, with unique constraints on specified columns.
        """
        # Mapping de pandas a PostgreSQL
        type_mapping = {
            'int64': 'INTEGER',
            'float64': 'FLOAT',
            'object': 'TEXT',
            'bool': 'BOOLEAN',
            'datetime64[ns]': 'TIMESTAMP',
            'timedelta[ns]': 'INTERVAL'
        }

        columns_with_types = []
        for col, dtype in data_csv.dtypes.items():
            sql_type = type_mapping.get(str(dtype), 'TEXT') # Usar TEXT por defecto
            columns_with_types.append(f"{col} {sql_type}")

        # if self._unique_cols:
        #     unique_str = ", ".join([f"UNIQUE({col})" for col in self._unique_cols])
        #     columns_with_types.append(unique_str)

        # Generar el texto de creación de la tabla
        create_table_query = f"""
        CREATE TABLE {self.table_name} (
            {', '.join(columns_with_types)}
        );
        """

        self.cursor.execute(text(create_table_query))
        self.cursor.commit()

    def _insert(self, data_csv, _, batch_size):
        """
        Inserta los datos de un DataFrame de pandas (ya cargado con read_csv) en la tabla de PostgreSQL.
        """
        self._create_table(data_csv)
        
        # Query para insertar valores
        columns = ', '.join(data_csv.columns)
        placeholders = ', '.join([f':{col}' for col in data_csv.columns])
        insert_query = f"""
        INSERT INTO {self.table_name} ({columns})
        VALUES ({placeholders})
        """
        
        data = data_csv.to_dict('records')
        
        for i in range(0, len(data), batch_size): # Insertar por batches
            self.cursor.execute(text(insert_query), data[i:i+batch_size])
            self.cursor.commit()

    def _read(self, query):
        return pd.read_sql_query(query, self.conn)

    def _update(self):
        """
        Actualiza el campo 'dni' añadiendo un 0 al final de cada valor para todos los registros en la tabla.
        """
        update_query = f"""
        UPDATE {self.table_name}
        SET dni = dni || '0';
        """
        self.cursor.execute(text(update_query))
        self.cursor.commit()

    def _close_connection(self):
        if self.cursor:
            self.cursor.close()
            self.cursor = None

    def _open_connection(self):
        self.conn = create_engine(f'postgresql+psycopg2://postgres:postgres@localhost:5432/{self.dbname}')
        self.cursor = self.conn.connect()

    def _to_df(self):
        return pd.read_sql_query(f'SELECT * FROM {self.table_name};', self.conn)

    def _create_index(self, index_cols=None):
        for col in index_cols:
            index_query = f"CREATE INDEX idx_{self.table_name}_{col} ON {self.table_name} ({col});"
            self.cursor.execute(text(index_query))
        self.cursor.commit()


# Clase para calcular tiempos en el gestor de Sqlite3DB
class Sqlite3DB(DB):
    def __init__(self, dbname, table_name, unique_cols=None, close_conn_after_op=False):
        super().__init__(dbname + ".sqlite3", table_name, unique_cols, close_conn_after_op=close_conn_after_op)

    def _delete_table(self):
        self.conn.execute(f"DROP TABLE IF EXISTS {self.table_name};") # Limpiar la tabla antes de insertar
        self.conn.commit()  # Aplicar los cambios

    def _create_table(self, data_csv):
        """
        Create a table based on the DataFrame's columns and types, with unique constraints on specified columns.
        """
        # Mapping de pandas a SQLite
        type_mapping = {
            'int64': 'INTEGER',
            'float64': 'REAL',
            'object': 'TEXT',
            'bool': 'BOOLEAN',
            'datetime64[ns]': 'TEXT'
        }

        columns_with_types = []
        for col, dtype in data_csv.dtypes.items():
            sql_type = type_mapping.get(str(dtype), 'TEXT') # Usar TEXT por defecto
            columns_with_types.append(f"{col} {sql_type}")

        # if self._unique_cols:
        #     unique_str = ", ".join([f"UNIQUE({col})" for col in self._unique_cols])
        #     columns_with_types.append(unique_str)

        # Generar el texto de creación de la tabla
        create_table_query = f"""
        CREATE TABLE {self.table_name} (
            {', '.join(columns_with_types)}
        );
        """

        self.conn.execute(create_table_query)
        self.conn.commit()

    def _insert(self, data_csv, _, batch_size):
        self._create_table(data_csv)
        
        columns = ', '.join(data_csv.columns)
        placeholders = ', '.join(['?' for _ in data_csv.columns])
        insert_query = f"""
        INSERT INTO {self.table_name} ({columns})
        VALUES ({placeholders})
        """
        
        data = [tuple(x) for x in data_csv.to_numpy()]
        
        for i in range(0, len(data), batch_size): # Insertar por batches
            self.cursor.executemany(insert_query, data[i:i+batch_size])
            self.conn.commit()

    def _read(self, query):
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def _update(self):
        """
        Actualiza el campo 'dni' añadiendo un 0 al final de cada valor para todos los registros en la tabla.
        """
        update_query = f"""
        UPDATE {self.table_name}
        SET dni = dni || '0';
        """
        self.cursor.execute(update_query)
        self.conn.commit()

    def _close_connection(self):
        if self.conn:
            self.conn.close()
            self.conn = None

    def _open_connection(self):
        self.conn = sqlite3.connect(self.dbname)
        self.cursor = self.conn.cursor()

    def _to_df(self):
        # Convertir los resultados en un DataFrame
        return pd.DataFrame(self.read(), columns=[desc[0] for desc in self.cursor.description])

    def _create_index(self, index_cols=None):
        for col in index_cols:
            index_query = f"CREATE INDEX idx_{self.table_name}_{col} ON {self.table_name} ({col});"
            self.conn.execute(index_query)
        self.conn.commit()


# Clase para calcular tiempos en el gestor de DuckDB
class DuckDB(DB):
    def __init__(self, dbname, table_name, unique_cols=None, close_conn_after_op=False):
        super().__init__(dbname, table_name, unique_cols, close_conn_after_op=close_conn_after_op)

    def _delete_table(self):
        self.conn.execute(f"DROP TABLE IF EXISTS {self.table_name};")

    def _create_table(self, data_csv, index_cols=None):
        """
        Create a table based on the DataFrame's columns and types, with unique constraints on specified columns.
        """
        # Mapping de pandas a DuckDB
        type_mapping = {
            'int64': 'INTEGER',
            'float64': 'DOUBLE',
            'object': 'TEXT',
            'bool': 'BOOLEAN',
            'datetime64[ns]': 'TIMESTAMP'
        }

        columns_with_types = []
        for col, dtype in data_csv.dtypes.items():
            sql_type = type_mapping.get(str(dtype), 'TEXT') # Usar TEXT por defecto
            columns_with_types.append(f"{col} {sql_type}")

        if index_cols:
            unique_str = ", ".join([f"UNIQUE({col})" for col in index_cols])
            columns_with_types.append(unique_str)

        # Generar el texto de creación de la tabla
        create_table_query = f"""
        CREATE TABLE {self.table_name} (
            {', '.join(columns_with_types)}
        );
        """

        self.conn.execute(create_table_query)

    def _insert(self, data_csv, _, batch_size, index_cols=None):
        self._create_table(data_csv, index_cols)
        self.data = data_csv
        
        columns = ', '.join(data_csv.columns)
        placeholders = ', '.join(['?' for _ in data_csv.columns])
        insert_query = f"""
        INSERT INTO {self.table_name} ({columns})
        VALUES ({placeholders})
        """
        
        data = [tuple(x) for x in data_csv.to_numpy()]
        
        for i in range(0, len(data), batch_size): # Insertar por batches
            self.conn.executemany(insert_query, data[i:i+batch_size])
            self.conn.commit()

    def _read(self, query):
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def _update(self):
        """
        Actualiza el campo 'dni' añadiendo un 0 al final de cada valor para todos los registros en la tabla.
        """
        update_query = f"""
        UPDATE {self.table_name}
        SET dni = dni || '0';
        """
        self.cursor.execute(update_query)
        self.conn.commit()

    def _close_connection(self):
        if self.conn:
            self.conn.close()
            self.conn = None

    def _open_connection(self):
        self.conn = duckdb.connect(self.dbname)
        self.cursor = self.conn.cursor()

    def _to_df(self):
        return pd.DataFrame(self.read(), columns=[desc[0] for desc in self.cursor.description])

    def _create_index(self, index_cols=None):
        self._delete_table()
        self._insert(self.data, None, self.data.shape[0], index_cols)

In [6]:
class Measurements:
    def __init__(self, classes, dbname, table_name, unique_cols, sizes):
        self.classes = classes
        self.sizes = sizes
        self.dbname = dbname
        self.table_name = table_name
        self._unique_cols = unique_cols

    def _read_csv_files(self, n):
        return pd.read_csv(f'results/{self.table_name}/csv/{self.table_name}_{n}.csv')

    def _read_json_files(self, n):
        with open(f'results/{self.table_name}/json/{self.table_name}_{n}.json', 'r', encoding='utf-8') as f:
            return json.load(f)

    def _measure_time_and_memory(self, function):
        # Se mide la memoria de la función de medición de tiempos
        # De hacerlo al revés, todas las mediciones tardarían unos 6 segundos más por el tiempo de inicialización de `memory_usage`
        
        # Se usa una lista para poder sacar los resultados de la función interna
        result = []
        def _measure_time(function):
            t1 = time.perf_counter(), time.process_time()
            function()
            t2 = time.perf_counter(), time.process_time()
            result.append((t2[0] - t1[0], t2[1] - t1[1]))
        memory = memory_usage(lambda: _measure_time(function))
        return result[0][0], result[0][1], np.average(memory)

    def _get_time_memory(self, n, close_conn_after_op):
        data_json = self._read_json_files(n)
        data_csv = self._read_csv_files(n)
        results = {cl: {"insert": [], "read": [], "update": []} for cl in self.classes}

        for cl, cl_class in self.classes.items():
            cl_obj = cl_class(self.dbname, self.table_name, self._unique_cols, close_conn_after_op=close_conn_after_op)
            print(f"\n{cl}{' ' * (20 - len(cl) - len(str(n)))}{n}", end="")

            # Tiempo y memoria de cada operación por tamaño del dataset
            for operation in ['insert', 'read', 'update']:
                print(f" | {operation} ", end="")
                oid = cl_obj.oid
                t_real, t_cpu, memory = self._measure_time_and_memory(
                    partial(getattr(cl_obj, operation), data_csv, data_json, oid=oid, use_cache=False)
                )
                results[cl][operation].append((t_real, t_cpu, memory))
                t_str = f"{t_real:.5f}"
                print(f"{t_str:>10}s", end="")

        return results

    def _get_cache_times(self, n):
        data_json = self._read_json_files(n)
        data_csv = self._read_csv_files(n)
        results = {cl: {"no cache": [], "index": [], "cache": []} for cl in self.classes}

        # Tiempo de lectura con diferentes configuraciones para cada tamaño de dataset
        for cl, cl_class in self.classes.items():
            cl_obj = cl_class(self.dbname, self.table_name, self._unique_cols)
            print(f"\n{cl}{' ' * (20 - len(cl) - len(str(n)))}{n}", end="")
            cl_obj.insert(data_csv, data_json, oid=cl_obj.oid)

            column = cl_obj._unique_cols[0]
            for cache_type in ["no cache", "index", "cache"]:
                if cache_type == "index":
                    cl_obj.create_index()
                
                print(f" | {cache_type} ", end="")
                t = time.time()
                times = []
                for value in data_csv[column][-200:]:
                    query = f"SELECT * FROM table WHERE {column} = '{value}'" if cl_obj._query_type == "sql" else {column: value}
                    t_real, t_cpu = self._measure_time(
                        partial(cl_obj.read, query, oid=cl_obj.oid, use_cache=(cache_type == "cache"))
                    )
                    times.append((t_real, t_cpu))
                t_str = f"{time.time() - t:.5f}"
                print(f"{t_str:>10}s", end="")
                results[cl][cache_type].append((np.mean([t[0] for t in times]), np.mean([t[1] for t in times])))

        return results

    def _measure_time(self, function):
        t1 = time.perf_counter(), time.process_time()
        function()
        t2 = time.perf_counter(), time.process_time()
        return t2[0] - t1[0], t2[1] - t1[1]

    def plot_time_memory(self, by_database=False, show_cpu=False):
        time_memory_data = {size: self._get_time_memory(size, False) for size in self.sizes}
        self._plot_results(time_memory_data, plot_types=["time_real", "time_cpu", "memory"] if show_cpu else ["time_real", "memory"], by_database=by_database)

    def plot_cache_times(self, by_database=True, show_cpu=False):
        cache_time_data = {size: self._get_cache_times(size) for size in self.sizes}
        self._plot_results(cache_time_data, plot_types=["time_real", "time_cpu"] if show_cpu else ["time_real"], by_database=by_database)

    # Función general de plots
    def _plot_results(self, data, plot_types, by_database=False):
        operations = list(data[self.sizes[0]][list(self.classes.keys())[0]].keys())
        
        for plot_type in plot_types:
            if by_database:
                fig, axs = plt.subplots(1, len(self.classes), figsize=(6*len(self.classes), 6))
                if len(self.classes) == 1:
                    axs = [axs]
                
                for i, db_name in enumerate(self.classes):
                    ax = axs[i]
                    for operation in operations:
                        y_values = []
                        for size in self.sizes:
                            if plot_type == "memory":
                                y_values.append(data[size][db_name][operation][0][2])  # memory
                            elif plot_type == "time_cpu":
                                y_values.append(data[size][db_name][operation][0][1])  # CPU time
                            else:  # time_real
                                y_values.append(data[size][db_name][operation][0][0])  # real time
                        
                        ax.plot(self.sizes, y_values, label=operation.capitalize(), marker='o')
                    
                    ax.set_title(f'{db_name.capitalize()}', fontsize=16)
                    ax.set_xlabel('Tamaño del dataset', fontsize=14)
                    ax.set_xticks(self.sizes)
                    ax.set_xticklabels(self.sizes)
                    ax.set_xscale("log")
                    ax.grid(True)
                    ax.legend()
            else:
                fig, axs = plt.subplots(1, len(operations), figsize=(6*len(operations), 6))
                if len(operations) == 1:
                    axs = [axs]
                
                for i, operation in enumerate(operations):
                    ax = axs[i]
                    for db_name in self.classes:
                        y_values = []
                        for size in self.sizes:
                            if plot_type == "memory":
                                y_values.append(data[size][db_name][operation][0][2])  # memory
                            elif plot_type == "time_cpu":
                                y_values.append(data[size][db_name][operation][0][1])  # CPU time
                            else:  # time_real
                                y_values.append(data[size][db_name][operation][0][0])  # real time
                        
                        ax.plot(self.sizes, y_values, label=db_name.capitalize(), marker='o')
                    
                    ax.set_title(f'{operation.capitalize()}', fontsize=16)
                    ax.set_xlabel('Tamaño del Dataset', fontsize=14)
                    ax.set_xticks(self.sizes)
                    ax.set_xticklabels(self.sizes)
                    ax.set_xscale("log")
                    ax.grid(True)
                    ax.legend()

            titles = {
                "memory": "Uso de Memoria",
                "time_real": "Tiempo Real",
                "time_cpu": "Tiempo de CPU"
            }
            labels = {
                "memory": "Memoria (MB)",
                "time_real": "Tiempo (segundos)",
                "time_cpu": "Tiempo (segundos)"
            }
            
            plt.suptitle(titles[plot_type], fontsize=18)
            fig.supylabel(labels[plot_type], x=0, fontsize=18)
            plt.tight_layout()
            plt.show()

In [7]:
classes = {
    "mongo": MongoDB,
    "sqlite": Sqlite3DB,
    "duckdb": DuckDB,
    "postgres": PostgresqlDB,
}

measurements = Measurements(classes, "Practica_1", "cars", ["vin", "plate"], [10**n for n in range(3, 6)]) # 3, 7

In [None]:
measurements.plot_time_memory(show_cpu=True)

In [None]:
measurements.plot_cache_times()