# SDB Benchmark

## Initialise database

If the following cells raise a `ConnexionRefuseError`, it may be because of the Cassandra database didn't finished to initialize.

In [3]:
import csv, gc, time
import numpy as np
from matplotlib import pyplot as plt 
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement

#time.sleep(120)

cluster = Cluster(['cassandra'])
session = cluster.connect()

# Create a keyspace and table
session.execute("""CREATE KEYSPACE IF NOT EXISTS benchmarking WITH REPLICATION = 
{ 'class' : 'SimpleStrategy', 'replication_factor' : '1' }""")
session.execute("""CREATE TABLE IF NOT EXISTS benchmarking.bitcoin_addresses (BITCOIN_ADDRESS text, ACCOUNT text, 
IP_ADDRESS text, COUNTY text, COUNTRY_CODE text, DATABASE_COLUMN_TYPE text, PRIMARY KEY 
(BITCOIN_ADDRESS))""")


<cassandra.cluster.ResultSet at 0x786c3c281840>

## Import Data

In [4]:
# Import data
with open("/home/data.csv", 'r') as file:
    reader = csv.DictReader(file)
    insert_query = session.prepare("""
INSERT INTO benchmarking.bitcoin_addresses (BITCOIN_ADDRESS, ACCOUNT, IP_ADDRESS, COUNTY, COUNTRY_CODE, DATABASE_COLUMN_TYPE)
VALUES (?, ?, ?, ?, ?, ?)
""")
    cassandra_times_import = []

    for sample in range(100):
        batch = BatchStatement()

        for i in range(1000):
            row = next(reader)
            session.execute(insert_query, (
                row['BITCOIN_ADDRESS'],
                row['ACCOUNT'],
                row['IP_ADDRESS'],
                row['COUNTY'],
                row['COUNTRY_CODE'],
                row['DATABASE_COLUMN_TYPE']
            ))

        gc.collect()
        start_time = time.perf_counter()

        session.execute(batch)

        end_time = time.perf_counter()
        cassandra_times_import.append(end_time - start_time)

## Requests Generation

Generation of randomized queries

In [4]:
import random
# Generate random requests
simulated_values = {
    'BITCOIN_ADDRESS': lambda: f"'{''.join(random.choices('ABCDEFGHJKLMNPQRSTUVWXYZ123456789', k=32))}'",
    'ACCOUNT': lambda: f"'{random.randint(10000000, 99999999)}'",
    'IP_ADDRESS': lambda: f"'{random.randint(1, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 255)}'",
    'COUNTY': lambda: f"'{random.choice(['Buckinghamshire', 'Avon', 'Cambridgeshire', 'Bedfordshire', 'Borders'])}'",
    'COUNTRY_CODE': lambda: f"'{random.choice('ABCDEFGHJKLMNPQRSTUVWXYZ')}{random.choice('ABCDEFGHJKLMNPQRSTUVWXYZ')}'",
    'DATABASE_COLUMN_TYPE': lambda: f"'{random.choice(['float', 'point', 'int', 'serial', 'varchar', 'blob', 'timestamp'])}'"
}
def generate_random_select_primary_key(table_name, primary_key, set_of_primary_key_values):
    primary_key_value = random.choice(set_of_primary_key_values)

    select_query = f"SELECT * FROM {table_name} WHERE {primary_key} = '{primary_key_value}'"
    return select_query

def generate_random_select_conditions(table, column_names):
    num_columns = random.randint(1, len(column_names))
    selected_columns = random.sample(column_names, num_columns)

    num_conditions = random.randint(1,3)
    conditions = []
    for _ in range(num_conditions):
        column = random.choice(column_names)
        operator = random.choice(['=', '!='])
        value = simulated_values[column]()  # Générer une valeur simulée pour la colonne
        conditions.append(f"{column} {operator} {value}")

    if num_conditions :
        where_clause = f"WHERE {' AND '.join(conditions)}"
    else:
        where_clause = ""

    select_query = f"SELECT {', '.join(selected_columns)} FROM {table} {where_clause}"
    return select_query

def generate_random_update_query(table_name, column_names, primary_key, set_of_primary_key_values):
    num_updates = random.randint(1,3)
    columns_to_update = random.sample(column_names, num_updates)
    update_values = [f"{col} = {simulated_values[col]()}" for col in columns_to_update]

    primary_key_value = random.choice(set_of_primary_key_values)

    update_query = f"UPDATE {table_name} SET {', '.join(update_values)} WHERE {primary_key} = '{primary_key_value}'"
    return update_query

def generate_random_delete_query(table_name, primary_key, set_of_primary_key_values):
    primary_key_value = random.choice(set_of_primary_key_values)

    delete_query = f"DELETE FROM {table_name} WHERE {primary_key} = '{primary_key_value}'"
    return delete_query

table = "bitcoin_addresses"
conditions_column = ['COUNTY', 'COUNTRY_CODE','DATABASE_COLUMN_TYPE']
update_column = ['ACCOUNT', 'IP_ADDRESS', 'COUNTY', 'COUNTRY_CODE', 'DATABASE_COLUMN_TYPE']
primary_key = 'BITCOIN_ADDRESS'
set_of_key = []
rows = session.execute("SELECT BITCOIN_ADDRESS FROM benchmarking.bitcoin_addresses")
for a in rows:
    set_of_key.append(a[0])

# Generate Select queries to match primary key
with open("select_primary_key_0.sql", "w") as f:
    for i in range(10000):
        f.write(generate_random_select_primary_key(table, primary_key, set_of_key))
        f.write("\n")
    f.close()
    
# Generate Select with conditions queries
with  open("select_conditions_0.sql", "w") as f:
    for i in range(10000):
        f.write(generate_random_select_conditions(table, conditions_column))
        f.write("\n")
    f.close()

# Generate Update queries
with open("update_0.sql", "w") as f:
    for i in range(10000):
        f.write(generate_random_update_query(table, update_column, primary_key, set_of_key))
        f.write("\n")
    f.close()

# Generate Delete queries
with open("delete_0.sql", "w") as f:
    for i in range(10000):
        f.write(generate_random_delete_query(table, primary_key, set_of_key))
        f.write("\n")
    f.close()

## Queries Benchmarking

### Select

In [None]:
# Benchmark SELECT
session.set_keyspace("benchmarking")

select_path = "select_primary_key_0.sql"
with open(select_path, "r") as file:
    lines = file.readlines()
    n_iter = int(len(lines)/100)
    cassandra_times_select = np.empty(n_iter)
    
    for i in range(n_iter):
        gc.collect()
        start_time = time.perf_counter()

        for j in range(100):
            session.execute(lines[i*100 + j])

        end_time = time.perf_counter()
        cassandra_times_select[i] = (end_time - start_time)

### Update

In [9]:
# Benchmark SELECT
session.set_keyspace("benchmarking")

select_path = "update_0.sql"
with open(select_path, "r") as file:
    lines = file.readlines()
    n_iter = int(len(lines)/100)
    cassandra_times_select = np.empty(n_iter)
    
    for i in range(n_iter):
        gc.collect()
        start_time = time.perf_counter()

        for j in range(100):
            session.execute(lines[i*100 + j])

        end_time = time.perf_counter()
        cassandra_times_select[i] = (end_time - start_time)
    

### Delete

In [10]:
# Benchmark SELECT
session.set_keyspace("benchmarking")

select_path = "delete_0.sql"
with open(select_path, "r") as file:
    lines = file.readlines()
    n_iter = int(len(lines)/100)
    cassandra_times_select = np.empty(n_iter)
    
    for i in range(n_iter):
        gc.collect()
        start_time = time.perf_counter()

        for j in range(100):
            session.execute(lines[i*100 + j])

        end_time = time.perf_counter()
        cassandra_times_select[i] = (end_time - start_time)

## Index utilisation

**Creation of an index**

`COUNTY` colum is selected to be indexed because it have a moderate cardinality. The indexing must have a significant effect on queries performances. 

In [None]:
# Create two tables to compare 
session.execute("""CREATE TABLE IF NOT EXISTS benchmarking.indexedtable (BITCOIN_ADDRESS text, ACCOUNT text, 
IP_ADDRESS text, COUNTY text, COUNTRY_CODE text, DATABASE_COLUMN_TYPE text, PRIMARY KEY 
(BITCOIN_ADDRESS))""")
session.execute("""CREATE TABLE IF NOT EXISTS benchmarking.notindexedtable (BITCOIN_ADDRESS text, ACCOUNT text, 
IP_ADDRESS text, COUNTY text, COUNTRY_CODE text, DATABASE_COLUMN_TYPE text, PRIMARY KEY 
(BITCOIN_ADDRESS))""")

# Import data
with open("/home/data.csv", 'r') as file:
    reader = csv.DictReader(file)
    indexed_insert_query = session.prepare("""
INSERT INTO benchmarking.indexedtable (BITCOIN_ADDRESS, ACCOUNT, IP_ADDRESS, COUNTY, COUNTRY_CODE, DATABASE_COLUMN_TYPE)
VALUES (?, ?, ?, ?, ?, ?)
""")
    notindexed_insert_query = session.prepare("""
INSERT INTO benchmarking.notindexedtable (BITCOIN_ADDRESS, ACCOUNT, IP_ADDRESS, COUNTY, COUNTRY_CODE, DATABASE_COLUMN_TYPE)
VALUES (?, ?, ?, ?, ?, ?)
""")

    batch = BatchStatement()

    for row in reader:
        session.execute(indexed_insert_query, (
            row['BITCOIN_ADDRESS'],
            row['ACCOUNT'],
            row['IP_ADDRESS'],
            row['COUNTY'],
            row['COUNTRY_CODE'],
            row['DATABASE_COLUMN_TYPE']
        ))
        session.execute(notindexed_insert_query, (
            row['BITCOIN_ADDRESS'],
            row['ACCOUNT'],
            row['IP_ADDRESS'],
            row['COUNTY'],
            row['COUNTRY_CODE'],
            row['DATABASE_COLUMN_TYPE']
        ))

    session.execute(batch)

# Create index
create_index_query = """CREATE INDEX contryindex on benchmarking.indexedtable (COUNTY)"""
session.execute(create_index_query)

**Sample of query**

In [None]:
# With index
gc.collect()
start_time = time.perf_counter()
    
session.execute("SELECT * FROM benchmarking.indexedtable WHERE COUNTY = 'Bedfo'")

end_time = time.perf_counter()
print("With index : ", end_time - start_time)

# Without index
gc.collect()
start_time = time.perf_counter()
    
session.execute("SELECT * FROM benchmarking.notindexedtable WHERE COUNTY = 'Bedfo' ALLOW FILTERING")

end_time = time.perf_counter()
print("Without index : ", end_time - start_time)

## Replication performances benchmarking

**Setup replication**

In [None]:
# Create keyspace with replication
session.execute("""CREATE KEYSPACE IF NOT EXISTS benchmarkingreplication WITH REPLICATION = 
{ 'class' : 'SimpleStrategy', 'replication_factor' : '3' }""")

# Create table
session.execute("""CREATE TABLE IF NOT EXISTS benchmarkingreplication.bitcoin_addresses (BITCOIN_ADDRESS text, ACCOUNT text, 
IP_ADDRESS text, COUNTY text, COUNTRY_CODE text, DATABASE_COLUMN_TYPE text, PRIMARY KEY 
(BITCOIN_ADDRESS))""")

**Import Data**

In [None]:
# Import data
with open("/home/data.csv", 'r') as file:
    reader = csv.DictReader(file)
    insert_query = session.prepare("""
INSERT INTO benchmarkingreplication.tab (BITCOIN_ADDRESS, ACCOUNT, IP_ADDRESS, COUNTY, COUNTRY_CODE, DATABASE_COLUMN_TYPE)
VALUES (?, ?, ?, ?, ?, ?)
""")
    cassandra_times_replication_import = []

    for sample in range(100):
        batch = BatchStatement()

        for i in range(1000):
            row = next(reader)
            session.execute(insert_query, (
                row['BITCOIN_ADDRESS'],
                row['ACCOUNT'],
                row['IP_ADDRESS'],
                row['COUNTY'],
                row['COUNTRY_CODE'],
                row['DATABASE_COLUMN_TYPE']
            ))

        gc.collect()
        start_time = time.perf_counter()

        session.execute(batch)

        end_time = time.perf_counter()
        cassandra_times_replication_import.append(end_time - start_time)

**SELECT**

In [None]:
# Benchmark SELECT
session.set_keyspace('benchmarkingreplication')
cassandra_times_replication_select = []
for i in range(100):
    gc.collect()
    start_time = time.perf_counter()

    session.execute(f"{select_queries[i]}")

    end_time = time.perf_counter()
    cassandra_times_replication_select.append(end_time - start_time)

**UPDATE**

In [None]:
# Benchmark UPDATE
session.set_keyspace('benchmarkingreplication')
cassandra_times_update = []
for i in range(100):
    gc.collect()
    start_time = time.perf_counter()

    session.execute(f"{update_queries[i]}")

    end_time = time.perf_counter()
    cassandra_times_update.append(end_time - start_time)

**Delete**

In [None]:
# Benchmark UPDATE
session.set_keyspacex('benchmarkingreplication')
cassandra_times_delete = []
for i in range(100):
    gc.collect()
    start_time = time.perf_counter()

    session.execute(f"{delete_queries[i]}")

    end_time = time.perf_counter()
    cassandra_times_delete.append(end_time - start_time)