In [1]:
from abc import ABC, abstractmethod
from typing import List, Dict, Tuple, Union
import csv
import itertools
from datetime import datetime

class DatabaseTestingInterface (ABC):
    @abstractmethod
    def create(self, rows_created: int = 1, transactions: int = 1):
        pass

    @abstractmethod
    def read(self, rows_read: int = 1, transactions: int = 1):
        pass

    @abstractmethod
    def update(self, rows_updated: int = 1, transactions: int = 1):
        pass

    @abstractmethod
    def delete(self, rows_deleted: int = 1, transactions: int = 1):
        pass

    @abstractmethod
    def reset(self):
        pass

    @abstractmethod
    def getName(self) -> str:
        pass

In [2]:
import glob
import os

def get_csv_file_paths(folder_path: str) -> List[str]:
    return glob.glob(os.path.join(folder_path, "*.csv"))

In [3]:
import sqlite3

class SqliteDatabaseTesting(DatabaseTestingInterface):
    def __init__(self, connection_string: str, csv_file_paths: List[str]):
        self.connection = sqlite3.connect(connection_string)
        self.csv_data = {}
        self.table_names = []
        self.table_column_names = {}

        for file_path in csv_file_paths:
            table_name = os.path.splitext(os.path.basename(file_path))[0]
            self.table_names.append(table_name)
            self.table_column_names[table_name], self.csv_data[table_name] = self.__parse_csv_data(file_path)

        self.__normalize_table_column_names()
        self.__create_tables()
        self.connection.commit()

    def __create_tables(self):
        c = self.connection.cursor()
        for table_name in self.table_names:
            c.execute(f"DROP TABLE IF EXISTS {table_name}")
            column_definitions = []
            column_definitions.append(f"{self.table_column_names[table_name][0]} timestamp NOT NULL")
            column_definitions.append(f"{self.table_column_names[table_name][1]} timestamp NOT NULL")
            column_definitions.append(f"{self.table_column_names[table_name][2]} numerical NOT NULL")
            column_definitions.append(f"{self.table_column_names[table_name][3]} int")
            column_definitions_str = ", ".join(column_definitions)
            c.execute(f"CREATE TABLE {table_name} ({column_definitions_str})")
        self.connection.commit()

    def __normalize_table_column_names(self):
        for table_name, column_names in self.table_column_names.items():
            column_names = [''.join(c for c in name.strip() if c.isalnum() or c == '_') for name in column_names]
            self.table_column_names[table_name] = column_names

    def __parse_csv_data(self, file_path: str) -> Tuple[List[str], List[Tuple[str, str, float, int]]]:
        MAX_DATA_READ = 1000
        with open(file_path, "r") as f:
            reader = csv.reader(f, delimiter="|")
            column_names = next(reader)
            rows = itertools.islice(reader, MAX_DATA_READ)
            data = [tuple(row) for row in rows]
        return column_names, data

    def create(self, rows_created: int = 1, transactions: int = 1):
        for table_name, data in self.csv_data.items():
            for transaction in range(transactions):
                c = self.connection.cursor()
                for i in range(rows_created):
                    c.execute(f"INSERT INTO {table_name} VALUES (?, ?, ?, ?)", data[i+transaction*rows_created])
                self.connection.commit()

    def read(self, rows_read: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                c = self.connection.cursor()
                c.execute(f"SELECT * FROM {table_name} LIMIT {rows_read}")
                c.fetchone()

    def update(self, rows_updated: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                c = self.connection.cursor()
                for i in range(rows_updated):
                    c.execute(f"UPDATE {table_name} SET {self.table_column_names[table_name][2]} = 0 WHERE {self.table_column_names[table_name][0]} = \"{self.csv_data[table_name][i+transaction*rows_updated][0]}\"")
                self.connection.commit()

    def delete(self, rows_deleted: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                c = self.connection.cursor()
                for i in range(rows_deleted):
                    c.execute(f"DELETE FROM {table_name} WHERE {self.table_column_names[table_name][0]} = \"{self.csv_data[table_name][i+transaction*rows_deleted][0]}\"")
                self.connection.commit()

    def reset(self):
        c = self.connection.cursor()
        c.execute("SELECT name FROM sqlite_master WHERE type='table';")
        table_names = [row[0] for row in c.fetchall()]
        for table_name in table_names:
            c.execute(f"DROP TABLE IF EXISTS {table_name}")
        self.connection.commit()
        self.__create_tables()

    def __del__(self):
        self.connection.close()

    def getName(self) -> str:
        return "SQLite"

In [4]:
import psycopg2

class PostgresDatabaseTesting(DatabaseTestingInterface):
    def __init__(self, connection_string: str, csv_file_paths: List[str]):
        self.connection = psycopg2.connect(connection_string)
        self.csv_data = {}
        self.table_names = []
        self.table_column_names = {}

        for file_path in csv_file_paths:
            table_name = os.path.splitext(os.path.basename(file_path))[0]
            self.table_names.append(table_name)
            self.table_column_names[table_name], self.csv_data[table_name] = self.__parse_csv_data(file_path)

        self.__normalize_table_column_names()
        self.__create_tables()

    def __create_tables(self):
        cursor = self.connection.cursor()
        for table_name in self.table_names:
            cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
            column_definitions = []
            column_definitions.append(f"{self.table_column_names[table_name][0]} timestamp NOT NULL")
            column_definitions.append(f"{self.table_column_names[table_name][1]} timestamp NOT NULL")
            column_definitions.append(f"{self.table_column_names[table_name][2]} numeric NOT NULL")
            column_definitions.append(f"{self.table_column_names[table_name][3]} integer")
            column_definitions_str = ", ".join(column_definitions)
            cursor.execute(f"CREATE TABLE {table_name} ({column_definitions_str})")
        self.connection.commit()

    def __normalize_table_column_names(self):
        for table_name, column_names in self.table_column_names.items():
            column_names = [''.join(c for c in name.strip() if c.isalnum() or c == '_') for name in column_names]
            self.table_column_names[table_name] = column_names

    def __parse_csv_data(self, file_path: str) -> Tuple[List[str], List[Tuple[str, str, float, int]]]:
        MAX_DATA_READ = 1000
        with open(file_path, "r") as f:
            reader = csv.reader(f, delimiter="|")
            column_names = next(reader)
            rows = itertools.islice(reader, MAX_DATA_READ)
            data = []
            for row in rows:
                row[0] = row[0].replace(",", ".")
                row[1] = row[1].replace(",", ".")
                data.append(tuple(row))

        return column_names, data

    def create(self, rows_created: int = 1, transactions: int = 1):
        for table_name, data in self.csv_data.items():
            for transaction in range(transactions):
                cursor = self.connection.cursor()
                for i in range(rows_created):
                    cursor.execute(f"INSERT INTO {table_name} VALUES (%s, %s, %s, %s)", data[i+transaction*rows_created])
                self.connection.commit()

    def read(self, rows_read: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                cursor = self.connection.cursor()
                cursor.execute(f"SELECT * FROM {table_name} LIMIT {rows_read}")
                cursor.fetchone()

    def update(self, rows_updated: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                cursor = self.connection.cursor()
                for i in range(rows_updated):
                    cursor.execute(f"UPDATE {table_name} SET {self.table_column_names[table_name][2]} = 0 WHERE {self.table_column_names[table_name][0]} = %s", (self.csv_data[table_name][i+transaction*rows_updated][0],))
                self.connection.commit()

    def delete(self, rows_deleted: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                cursor = self.connection.cursor()
                for i in range(rows_deleted):
                    cursor.execute(f"DELETE FROM {table_name} WHERE {self.table_column_names[table_name][0]} = %s", (self.csv_data[table_name][i+transaction*rows_deleted][0],))
                self.connection.commit()

    def reset(self):
        cursor = self.connection.cursor()
        for table_name in self.table_names:
            cursor.execute(f"DROP TABLE {table_name}")
        self.connection.commit()
        self.__create_tables()

    def getName(self) -> str:
        return "PostgreSQL"

    def __del__(self):
        self.connection.close()

In [16]:
import cassandra
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, BatchStatement
from typing import List, Tuple

class CassandraDatabaseTesting(DatabaseTestingInterface):
    def __init__(self, connection_string: str, csv_file_paths: List[str]):
        self.cluster = Cluster([connection_string])
        self.session = self.cluster.connect()
        self.csv_data = {}
        self.table_names = []
        self.table_column_names = {}
        self.KEYSPACE_NAME = "databaseTesting"

        for file_path in csv_file_paths:
            table_name = os.path.splitext(os.path.basename(file_path))[0]
            self.table_names.append(table_name)
            self.table_column_names[table_name], self.csv_data[table_name] = self.__parse_csv_data(file_path)

        self.__normalize_table_column_names()
        self.__execute_simple_statement(f"DROP KEYSPACE IF EXISTS {self.KEYSPACE_NAME}")
        self.__create_tables()

    def __execute_simple_statement(self, query: str, parameters: Tuple = ()):
        statement = SimpleStatement(query, consistency_level=cassandra.ConsistencyLevel.ONE)
        self.session.execute(statement, parameters)

    def __execute_batch_statement(self, statements: List[str], parameters_list: List[Tuple[Tuple]]):
        batch = BatchStatement(consistency_level=cassandra.ConsistencyLevel.ONE)
        for statement, parameters in zip(statements, parameters_list):
            batch.add(statement, parameters)
        self.session.execute(batch)

    def __create_tables(self):
        self.__execute_simple_statement(f"CREATE KEYSPACE {self.KEYSPACE_NAME} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': 1}}")
        self.__execute_simple_statement(f"USE {self.KEYSPACE_NAME}")
        for table_name in self.table_names:
            self.__execute_simple_statement(f"CREATE TABLE IF NOT EXISTS {table_name} ({self.table_column_names[table_name][0]} timestamp, {self.table_column_names[table_name][1]} timestamp, {self.table_column_names[table_name][2]} float, {self.table_column_names[table_name][3]} int, PRIMARY KEY ({self.table_column_names[table_name][0]}, {self.table_column_names[table_name][1]}))")

    def __normalize_table_column_names(self):
        for table_name, column_names in self.table_column_names.items():
            column_names = [''.join(c for c in name.strip() if c.isalnum() or c == '_') for name in column_names]
            self.table_column_names[table_name] = column_names

    def __parse_csv_data(self, file_path: str) -> Tuple[List[str], List[Tuple[str, str, float, int]]]:
        MAX_DATA_READ = 1000
        with open(file_path, "r") as f:
            reader = csv.reader(f, delimiter="|")
            column_names = next(reader)
            rows = itertools.islice(reader, MAX_DATA_READ)
            data = []
            for row in rows:
                row[0] = row[0].replace(",", ".")
                row[1] = row[1].replace(",", ".")
                data.append(tuple(row))

        return column_names, data

    def create(self, rows_created: int = 1, transactions: int = 1):
        for table_name, data in self.csv_data.items():
            for transaction in range(transactions):
                statements = []
                parameters_list = []
                for i in range(rows_created):
                    statements.append(f"INSERT INTO {table_name} ({self.table_column_names[table_name][0]}, {self.table_column_names[table_name][1]}, {self.table_column_names[table_name][2]}, {self.table_column_names[table_name][3]}) VALUES (%s, %s, %s, %s)")
                    parameters_list.append((data[i][0], data[i][1], float(data[i][2]), int(data[i][3])))

                self.__execute_batch_statement(statements=statements, parameters_list=parameters_list)

    def read(self, rows_read: int = 1, transactions: int = 1):
        for table_name in self.table_names:
            for transaction in range(transactions):
                statement = f"SELECT * FROM {table_name} LIMT {rows_read}"
                self.session.execute(statement)

    def update(self, rows_updated: int = 1, transactions: int = 1):
        for table_name, data in self.csv_data.items():
            for transaction in range(transactions):
                statements = []
                parameters_list = []
                for i in range(rows_updated):
                    statements.append(f"UPDATE {table_name} SET {self.table_column_names[table_name][2]} = %s WHERE {self.table_column_names[table_name][0]} = %s AND {self.table_column_names[table_name][1]} = %s")
                    parameters_list.append((float(self.csv_data[table_name][i][2]), self.csv_data[table_name][i][0], self.csv_data[table_name][i][1]))

                self.__execute_batch_statement(statements=statements, parameters_list=parameters_list)

    def delete(self, rows_deleted: int = 1, transactions: int = 1):
        for table_name, data in self.csv_data.items():
            for transaction in range(transactions):
                statements = []
                parameters_list = []
                for i in range(rows_deleted):
                    statements.append(f"DELETE FROM {table_name} WHERE {self.table_column_names[table_name][0]} = %s AND {self.table_column_names[table_name][1]} = %s")
                    parameters_list.append((self.csv_data[table_name][i][0], self.csv_data[table_name][i][1]))

                self.__execute_batch_statement(statements=statements, parameters_list=parameters_list)

    def reset(self):
        self.__execute_simple_statement(f"DROP KEYSPACE {self.KEYSPACE_NAME}")
        self.__create_tables()

    def getName(self) -> str:
        return "Cassandra"

    def __del__(self):
        self.session.close()

In [6]:
from pymongo import MongoClient
from pymongo.operations import InsertOne, UpdateOne, DeleteOne

class MongoDatabaseTesting(DatabaseTestingInterface):
    def __init__(self, connection_string: str, csv_file_paths: List[str]):
        self.client = MongoClient(connection_string)
        self.csv_data = {}
        self.collection_names = []
        self.collection_column_names = {}
        self.DATABASE_NAME = "databasetesting"

        for file_path in csv_file_paths:
            collection_name = os.path.splitext(os.path.basename(file_path))[0]
            self.collection_names.append(collection_name)
            self.collection_column_names[collection_name], self.csv_data[collection_name] = self.__parse_csv_data(file_path)

        self.__normalize_collection_column_names()

    def __normalize_collection_column_names(self):
        for database_name, column_names in self.collection_column_names.items():
            column_names = [''.join(c for c in name.strip() if c.isalnum() or c == '_') for name in column_names]
            self.collection_column_names[database_name] = column_names

    def __parse_csv_data(self, file_path: str) -> Tuple[List[str], List[Tuple[str, str, float, int]]]:
        MAX_DATA_READ = 1000

        with open(file_path, "r") as f:
            reader = csv.reader(f, delimiter="|")
            column_names = next(reader)
            rows = itertools.islice(reader, MAX_DATA_READ)
            data = []
            for row in rows:
                row[0] = row[0].replace(",", ".")
                row[1] = row[1].replace(",", ".")
                data.append(tuple(row))

        return column_names, data

    def __convert_timestamp(self, timestamp_string: str):
        return datetime.strptime(timestamp_string, '%Y-%m-%d %H:%M:%S.%f').timestamp()

    def create(self, rows_created: int = 1, transactions: int = 1):
        for collection_name, data in self.csv_data.items():
            for transaction in range(transactions):
                requests = []
                for i in range(rows_created):
                    requests.append(InsertOne({
                        self.collection_column_names[collection_name][0]: self.__convert_timestamp(data[i][0]),
                        self.collection_column_names[collection_name][1]: self.__convert_timestamp(data[i][1]),
                        self.collection_column_names[collection_name][2]: data[i][2],
                        self.collection_column_names[collection_name][3]: data[i][3]
                    }))
                for collection_name in self.collection_names:
                    db = self.client[self.DATABASE_NAME]
                    collection = db[collection_name]
                    collection.bulk_write(requests, ordered=False)

    def read(self, rows_read: int = 1, transactions: int = 1):
        for collection_name in self.collection_names:
            db = self.client[self.DATABASE_NAME]
            collection = db[collection_name]
            for transaction in range(transactions):
                collection.find().limit(rows_read).next()

    def update(self, rows_updated: int = 1, transactions: int = 1):
        for collection_name, data in self.csv_data.items():
            for transaction in range(transactions):
                requests = []
                for i in range(rows_updated):
                    requests.append(UpdateOne({self.collection_column_names[collection_name][0]: self.__convert_timestamp(self.csv_data[collection_name][i][0]), self.collection_column_names[collection_name][1]: self.__convert_timestamp(self.csv_data[collection_name][i][1])}, {'$set': {self.collection_column_names[collection_name][2]: 0}}))
                for collection_name in self.collection_names:
                    db = self.client[self.DATABASE_NAME]
                    collection = db[collection_name]
                    collection.bulk_write(requests, ordered=False)

    def delete(self, rows_deleted: int = 1, transactions: int = 1):
        for collection_name, data in self.csv_data.items():
            for transaction in range(transactions):
                requests = []
                for i in range(rows_deleted):
                    requests.append(DeleteOne({self.collection_column_names[collection_name][0]: self.__convert_timestamp(self.csv_data[collection_name][i][0]), self.collection_column_names[collection_name][1]: self.__convert_timestamp(self.csv_data[collection_name][i][1])}))
                for collection_name in self.collection_names:
                    db = self.client[self.DATABASE_NAME]
                    collection = db[collection_name]
                    collection.bulk_write(requests, ordered=False)

    def reset(self):
        db = self.client[self.DATABASE_NAME]
        for collection_name in self.collection_names:
            db[collection_name].drop()

    def getName(self) -> str:
        return "MongoDB"

    def __del__(self):
        self.client.close()

In [7]:
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import requests

class PrometheusDatabaseTesting(DatabaseTestingInterface):
    def __init__(self, connection_string: str, pushgateway_connection_string: str, csv_file_paths: List[str]):
        self.connection_string = connection_string
        self.pushgateway_connection_string = pushgateway_connection_string
        self.registry = CollectorRegistry()
        self.csv_data = {}
        self.gauge_names = []
        self.gauges = {}

        for file_path in csv_file_paths:
            gauge_name = os.path.splitext(os.path.basename(file_path))[0]
            self.gauge_names.append(gauge_name)
            self.csv_data[gauge_name] = self.__parse_csv_data(file_path)

        self.__create_gauges()

    def __create_gauges(self):
        for gauge_name in self.gauge_names:
            self.gauges[gauge_name] = Gauge(f"{gauge_name}", f"Metric for monitoring {gauge_name}", registry=self.registry)

    def __parse_csv_data(self, file_path: str) -> List[float]:
        MAX_DATA_READ = 1000
        with open(file_path, "r") as f:
            reader = csv.reader(f, delimiter="|")
            next(reader)    #column_names
            rows = itertools.islice(reader, MAX_DATA_READ)
            data = [row[2] for row in rows]
        return data

    def create(self, rows_created: int = 1, transactions: int = None):
        for gauge_name, data in self.csv_data.items():
            for i in range(rows_created):
                self.gauges[gauge_name].set(data[i])
                push_to_gateway(self.pushgateway_connection_string, job="prometheus", registry=self.registry)

    def read(self, rows_read: int = 1, transactions: int = None):
        for gauge_name in self.gauge_names:
            query_url = f"{self.connection_string}/api/v1/query?query={gauge_name}"
            response = requests.get(query_url)
            print(response)
            if response.status_code == 200:
                value = response.json()["data"]["result"][0]["value"][1]
                #print(f"{gauge_name}: {value}")
            else:
                print("Error querying Prometheus database")

    def update(self, rows_updated: int = 1, transactions: int = None):
        pass

    def delete(self, rows_deleted: int = 1, transactions: int = None):
        pass

    def reset(self):
        pass

    def getName(self) -> str:
        return "Prometheus"

In [8]:
#prometheusDatabaseTesting = PrometheusDatabaseTesting("localhost:9090", csvFiles)

In [17]:
csvFiles = get_csv_file_paths("./csvData")
sqlite = CassandraDatabaseTesting("localhost", csvFiles)

In [14]:
sqlite.reset()

In [18]:
sqlite.create(1000)