# Scenariusze testowe dla porównania wydajności baz danych

### 1. Operacja CREATE

- Dodanie nowego nauczyciela
- Utworzenie nowej klasy
- Dodanie nowego przedmiotu
- Zarejestrowanie nowego ucznia
- Przypisanie ucznia do klasy (**Dodano: Zapisanie ucznia do klasy (enrolment)**)
- Utworzenie harmonogramu zajęć
- Wystawienie oceny

### 2. Operacja READ

Pobranie kompleksowego raportu zawierającego:
- Dane osobowe ucznia
- Informacje o klasie (**Dodano: Informacje o zapisach do klas**)
- Dane nauczyciela prowadzącego
- Listę ocen z opisami przedmiotów
- Szczegółowy harmonogram zajęć

### 3. Operacja UPDATE

- Aktualizacja danych ucznia
- Zmiana przypisania do klasy (**Dodano: Aktualizacja zapisu do klasy**)
- Modyfikacja nazwy klasy
- Aktualizacja danych nauczyciela
- Zmiana oceny
- Aktualizacja opisu przedmiotu
- Modyfikacja harmonogramu zajęć

### 4. Operacja DELETE

- Usunięcie ocen ucznia
- Wypisanie ucznia z klasy (**Dodano: Usunięcie zapisu do klasy**)
- Usunięcie harmonogramu zajęć
- Usunięcie klasy
- Opcjonalne usunięcie przedmiotów
- Opcjonalne usunięcie nauczyciela
- Usunięcie rekordu ucznia

## Ilość rekordów do testów

Testy będą przeprowadzane dla następujących ilości rekordów:

1. 10,000 rekordów
2. 100,000 rekordów
3. 1,000,000 rekordów
4. 10,000,000 rekordów

## Metryki wydajnościowe

Dla każdego scenariusza i ilości rekordów będziemy mierzyć:

1. Czas wykonania całego scenariusza
2. Średni czas pojedynczych operacji
3. Liczbę operacji na sekundę (throughput)
4. Zużycie zasobów systemowych (CPU, RAM, I/O dysku)

# Narzędzia i technologie testowe

### Wbudowane instrumenty bazodanowe

Każdy system oferuje specjalizowane narzędzia diagnostyczne:

| System | Narzędzie | Funkcjonalności |
| :-- | :-- | :-- |
| PostgreSQL | pgBench | Testy TPC-B, własne skrypty SQL |
| MariaDB | sysbench | Testy OLTP, skalowanie pionowe |
| MongoDB | mongoperf | Operacje na dokumentach JSON |
| Cassandra | cassandra-stress | Testy dystrybucji danych |
| Redis | redis-benchmark | Pomiar opóźnień operacji klucz-wartość |

Wykorzystanie natywnych narzędzi pozwala na precyzyjne badanie specyficznych mechanizmów storage engine.

### Automatyzacja w Pythonie

Kluczowe biblioteki wspierające testy:

- **SQLAlchemy** dla baz relacyjnych
- **PyMongo** dla MongoDB
- **Cassandra-driver** dla Cassandra
- **redis-py** dla Redis

In [None]:
# Import required libraries
import psycopg2
import psycopg2.errors
from pymongo import MongoClient
from cassandra.cluster import Cluster
import redis
import mysql.connector
import yaml
import pandas as pd
import os
import time
import sys
from pathlib import Path

# Load database configuration
print("Setting up database connections...")
with open('docker-compose.yml', 'r') as file:
    docker_config = yaml.safe_load(file)

# PostgreSQL connection
postgres_config = docker_config['services']['postgresql']
postgres_client = psycopg2.connect(
    host='localhost',
    database=postgres_config['environment']['POSTGRES_DB'],
    user=postgres_config['environment']['POSTGRES_USER'],
    password=postgres_config['environment']['POSTGRES_PASSWORD'],
    port=postgres_config['ports'][0].split(':')[0]
)

# MariaDB connection
mariadb_config = docker_config['services']['mariadb']
mariadb_client = mysql.connector.connect(
    host='localhost',
    database=mariadb_config['environment']['MYSQL_DATABASE'],
    user=mariadb_config['environment']['MYSQL_USER'],
    password=mariadb_config['environment']['MYSQL_PASSWORD'],
    port=mariadb_config['ports'][0].split(':')[0],
    allow_local_infile=True
)

# MongoDB connection
mongo_config = docker_config['services']['mongodb']
mongo_client = MongoClient(
    host='localhost',
    port=int(mongo_config['ports'][0].split(':')[0])
)

# Cassandra connection
cassandra_config = docker_config['services']['cassandra']
cassandra_client = Cluster(['localhost'], port=cassandra_config['ports'][0].split(':')[0])
cassandra_session = cassandra_client.connect()

# Redis connection
redis_config = docker_config['services']['redis']
redis_client = redis.Redis(
    host='localhost',
    port=int(redis_config['ports'][0].split(':')[0])
)

# Test connections
try:
    postgres_client.cursor().execute("SELECT 1")
    print("INFO: PostgreSQL connection successful")
    
    mariadb_client.cursor(buffered=True).execute("SELECT 1")
    print("INFO: MariaDB connection successful")
    
    cassandra_session.execute("SELECT release_version FROM system.local")
    print("INFO: Cassandra connection successful")
    
    mongo_client.admin.command('ping')
    print("INFO: MongoDB connection successful")
    
    redis_client.ping()
    print("INFO: Redis connection successful")
except Exception as e:
    print(f"ERROR: Connection test failed: {e}")

In [None]:
CELL_END = ''

In [None]:
# Data generation functions
sys.path.append(str(Path.cwd()))
from generator import generate_school_data

def generate_files(output_dir='./data', scale=1000, batch_size=10000, **kwargs):
    """
    Generate synthetic school data files for benchmarking.
    """
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    print(f"INFO: Generating data with scale {scale} and batch size {batch_size}...")

    result = generate_school_data(
        output_dir=output_dir,
        scale=scale,
        batch_size=batch_size,
        **kwargs
    )

    print(f"INFO: Generated {len(result['students'])} students, {len(result['teachers'])} teachers, " + 
          f"{len(result['classes'])} classes, {len(result['subjects'])} subjects")
    print("="*50)
    return result

# Generate test data sets
scale_100_dir = './data/scale_100'
scale_1000_dir = './data/scale_1000'

generate_files(output_dir=scale_100_dir, scale=100, batch_size=5000)
generate_files(output_dir=scale_1000_dir, scale=1000, batch_size=5000)
CELL_END


# PostgreSQL Operations

In [None]:
# PostgreSQL Methods

def initialize_postgres_schema(conn, schema_sql):
    """
    Initializes the PostgreSQL database schema using the provided SQL script.
    """
    if not schema_sql:
        print("ERROR: Schema SQL content is empty.")
        return

    try:
        with conn.cursor() as cur:
            cur.execute(schema_sql)
        conn.commit()
        print("INFO: PostgreSQL schema initialized.")
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error initializing PostgreSQL schema: {e}")

def verify_postgres_tables(conn, expected_tables):
    """
    Verifies if the expected tables exist in PostgreSQL.
    """
    try:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public' AND table_name = ANY(%s);
            """, (expected_tables,))
            existing_tables = {row[0] for row in cur.fetchall()}

        missing_tables = set(expected_tables) - existing_tables
        if not missing_tables:
            print(f"INFO: All PostgreSQL tables exist: {', '.join(expected_tables)}")
            return True
        else:
            print(f"WARNING: Missing PostgreSQL tables: {', '.join(missing_tables)}")
            return False
    except Exception as e:
        print(f"ERROR: Error verifying PostgreSQL tables: {e}")
        return False

def insert_postgres_table_from_csv(conn, table_name, csv_file) -> tuple[float, float, float]:
    # Inserts data from a CSV file into a PostgreSQL table using INSERT.
    # Does not fail on duplicate key errors.
    # Assumes the table already exists and has the same structure as the CSV file.
    
    operation_start_time = time.time() # Initialize start_time
    file_opened_start_time = 0 # Initialize file_opened_start_time
    try:
        with conn.cursor() as cur:
            with open(csv_file, 'r') as f:
                next(f)  # Skip header
                file_opened_start_time = time.time() # Initialize start_time when file is opened
                for line in f:
                    values = line.strip().split(',')
                    insert_sql = f"INSERT INTO {table_name} VALUES ({', '.join(['%s'] * len(values))})"
                    try:
                        cur.execute(insert_sql, values)
                    except psycopg2.errors.UniqueViolation:
                        # Ignore duplicate key errors
                        conn.rollback()
                        continue
                    except Exception as e:
                        print(f"ERROR: Error inserting into {table_name}: {e}")
                        conn.rollback()
                        break
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error inserting data from {csv_file} into {table_name}: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)

def copy_postgres_table_from_csv(conn, table_name, csv_file) -> tuple[float, float, float]:
    # Inserts data from a CSV file into a PostgreSQL table using COPY.
    # Does not fail on duplicate key errors.
    # Assumes the table already exists and has the same structure as the CSV file.
    operation_start_time = time.time() # Initialize start_time
    file_opened_start_time = 0 # Initialize file_opened_start_time
    try:
        with conn.cursor() as cur:
            copy_sql = f"COPY {table_name} FROM STDIN WITH (FORMAT CSV, HEADER)"
            with open(csv_file, 'r') as f:
                file_opened_start_time = time.time() # Initialize start_time when file is opened
                cur.copy_expert(sql=copy_sql, file=f)
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error inserting data from {csv_file} into {table_name}: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)

def copy_postgres_enrollments_from_csv(conn, csv_file):
    # This function is a specialized version for the enrollments table, because it has a composite primary key.
    # Inserts data from a CSV file into the enrollments table using COPY.
    # Does not fail on duplicate key errors.
    # Uses a temporary table to handle duplicates.

    operation_start_time = time.time() # Initialize start_time
    file_opened_start_time = 0 # Initialize file_opened_start_time

    try:
        with conn.cursor() as cur:
            # Create a temporary table for the COPY operation
            temp_table_name = "temp_enrollments"
            cur.execute(f"""
                CREATE TEMP TABLE {temp_table_name} (
                    student_id INT,
                    class_id INT,
                    enrolled_at TIMESTAMP
                ) ON COMMIT DROP;
            """)
            copy_sql = f"COPY {temp_table_name} FROM STDIN WITH (FORMAT CSV, HEADER)"
            with open(csv_file, 'r') as f:
                file_opened_start_time = time.time()
                cur.copy_expert(sql=copy_sql, file=f)

            # Insert into the main table, ignoring duplicates
            insert_sql = f"""
                INSERT INTO enrollments (student_id, class_id, enrolled_at)
                SELECT student_id, class_id, enrolled_at FROM {temp_table_name}
                ON CONFLICT (student_id, class_id) DO NOTHING;
            """
            cur.execute(insert_sql)
            conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error inserting data from {csv_file} into enrollments: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)

def load_postgres_data(conn, data_dir):
    """
    Loads data from CSV files into PostgreSQL tables.
    """
    data_path = Path(data_dir)
    table_csv_map = {
        'teachers': 'teachers.csv',
        'subjects': 'subjects.csv',
        'classes': 'classes.csv',
        'students': 'students.csv',
        'grades': 'grades.csv',
        'schedules': 'schedules.csv',
        # 'enrollments': 'enrollments.csv' # Handled separately
    }
    for table_name, csv_file in table_csv_map.items():
        op_time, f_op_time, end_time = copy_postgres_table_from_csv(conn, table_name, data_path / csv_file)
        print(f"INFO: Inserted {table_name} in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")

    # Handle enrollments separately due to composite primary key
    op_time, f_op_time, end_time = copy_postgres_enrollments_from_csv(conn, data_path / 'enrollments.csv')
    print(f"INFO: Inserted enrollments in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")

def verify_postgres_counts(conn, tables):
    """
    Counts rows in PostgreSQL tables.
    """
    counts = {}
    max_len = max(len(t) for t in tables) if tables else 0
    print(f"INFO: Counting rows in PostgreSQL tables")
    try:
        with conn.cursor() as cur:
            for table_name in tables:
                try:
                    cur.execute(f"SELECT COUNT(*) FROM {table_name};")
                    count = cur.fetchone()[0]
                    counts[table_name] = count
                except Exception as count_error:
                    print(f"ERROR: {count_error}")
                    counts[table_name] = 'Error'

        print("--- PostgreSQL Table Row Counts ---")
        for table, count in counts.items():
            print(f"{table:<{max_len}} : {count}")
        print("-----------------------------------")
        return counts

    except Exception as e:
        print(f"ERROR: {e}")
        return None

In [None]:
# PostgreSQL Operations Execution

# Schema initialization
with open('schemas/postgres_schema.sql', 'r') as f:
    sql_schema = f.read()

initialize_postgres_schema(postgres_client, sql_schema)

# Table verification 
required_tables = ['teachers', 'subjects', 'classes', 'students', 'enrollments', 'grades', 'schedules']
verify_postgres_tables(postgres_client, required_tables)

# Data loading
load_postgres_data(postgres_client, scale_100_dir)

# Count verification
verify_postgres_counts(postgres_client, required_tables)
CELL_END

# MariaDB Operations

In [None]:
# MariaDB Methods

def initialize_mariadb_schema(conn, schema_sql):
    """
    Initializes the MariaDB database schema using the provided SQL script.
    """
    if not schema_sql:
        print("ERROR: Schema SQL content is empty.")
        return
    try:
        with conn.cursor() as cur:
            for statement in schema_sql.split(';'):
                stmt = statement.strip()
                if stmt:
                    cur.execute(stmt)
        conn.commit()
        print("INFO: MariaDB schema initialized.")
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error initializing MariaDB schema: {e}")

def verify_mariadb_tables(conn, expected_tables):
    """
    Verifies if the expected tables exist in MariaDB.
    """
    try:
        with conn.cursor() as cur:
            format_strings = ','.join(['%s'] * len(expected_tables))
            cur.execute(f"""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = DATABASE() AND table_name IN ({format_strings});
            """, tuple(expected_tables))
            existing_tables = {row[0] for row in cur.fetchall()}

        missing_tables = set(expected_tables) - existing_tables
        if not missing_tables:
            print(f"INFO: All MariaDB tables exist: {', '.join(expected_tables)}")
            return True
        else:
            print(f"WARNING: Missing MariaDB tables: {', '.join(missing_tables)}")
            return False
    except Exception as e:
        print(f"ERROR: Error verifying MariaDB tables: {e}")
        return False

def insert_mariadb_table_from_csv(conn, table_name, csv_file) -> tuple[float, float, float]:
    """Inserts data from a CSV file into a MariaDB table by reading the header for columns."""
    operation_start_time = time.time()
    file_opened_start_time = 0
    try:
        with conn.cursor() as cur:
            with open(csv_file, 'r') as f:
                # read header for column names
                header = next(f).strip().split(',')
                cols = header
                placeholders = ','.join(['%s'] * len(cols))
                insert_sql = f"INSERT INTO {table_name} ({','.join(cols)}) VALUES ({placeholders})"
                file_opened_start_time = time.time()
                for line in f:
                    values = line.strip().split(',')
                    # ensure values length matches columns
                    if len(values) != len(cols):
                        if len(values) > len(cols):
                            values = values[:len(cols)]
                        else:
                            print(f"WARNING: Skipping {table_name} row with {len(values)} values (expected {len(cols)}, values: {values})")
                            continue
                    try:
                        cur.execute(insert_sql, values)
                    except mysql.connector.errors.IntegrityError:
                        conn.rollback()
                        continue
                    except Exception as e:
                        print(f"ERROR: Error inserting into {table_name}: {e}")
                        conn.rollback()
                        break
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error inserting data from {csv_file} into {table_name}: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)


def copy_mariadb_table_from_csv(conn, table_name, csv_file) -> tuple[float, float, float]:
    # Inserts data from a CSV file into a MariaDB table using COPY.
    # Does not fail on duplicate key errors.
    # Assumes the table already exists and has the same structure as the CSV file.
    
    operation_start_time = time.time() # Initialize start_time
    file_opened_start_time = 0 # Initialize file_opened_start_time
    try:
        with conn.cursor() as cur:
            copy_sql = f"""
            LOAD DATA LOCAL INFILE '{csv_file}'
            INTO TABLE {table_name}
            FIELDS TERMINATED BY ','
            OPTIONALLY ENCLOSED BY '"'
            LINES TERMINATED BY '\n'
            IGNORE 1 LINES;
            """
            with open(csv_file, 'r') as f:
                file_opened_start_time = time.time() # Initialize start_time when file is opened
                cur.execute(copy_sql)
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error inserting data from {csv_file} into {table_name}: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)

def copy_mariadb_enrollments_from_csv(conn, csv_file):
    # This function is a specialized version for the enrollments table, because it has a composite primary key.
    # Inserts data from a CSV file into the enrollments table using COPY.
    # Does not fail on duplicate key errors.
    # Uses a temporary table to handle duplicates.

    operation_start_time = time.time() # Initialize start_time
    file_opened_start_time = 0 # Initialize file_opened_start_time

    try:
        with conn.cursor() as cur:
            # Handle enrollments with INSERT IGNORE to skip duplicates
            print(f"INFO: Loading enrollments with duplicate handling...")
            with open(csv_file, 'r') as f:
                next(f)  # skip header
                for line in f:
                    student_id, class_id, enrolled_at = line.strip().split(',')
                    cur.execute(
                        """
                        INSERT IGNORE INTO enrollments (student_id, class_id, enrolled_at)
                        VALUES (%s, %s, %s)
                        """,
                        (student_id, class_id, enrolled_at)
                    )
            conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"ERROR: Error inserting data from {csv_file} into enrollments: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)

def load_mariadb_data(conn, data_dir):
    table_csv_map = {
    'teachers': 'teachers.csv',
    'subjects': 'subjects.csv',
    'classes': 'classes.csv',
    'students': 'students.csv',
    'grades': 'grades.csv',
    'schedules': 'schedules.csv',
    # 'enrollments': 'enrollments.csv' Handled separately
    }
    data_path = Path(data_dir)
    for table_name, csv_file in table_csv_map.items():
        op_time, f_op_time, end_time = insert_mariadb_table_from_csv(conn, table_name, data_path / csv_file)
        print(f"INFO: Inserted {table_name} in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")

    # Handle enrollments separately due to composite primary key
    op_time, f_op_time, end_time = copy_mariadb_enrollments_from_csv(conn, data_path / 'enrollments.csv')
    print(f"INFO: Inserted enrollments in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")

def verify_mariadb_counts(conn, tables):
    """
    Counts rows in MariaDB tables.
    """
    counts = {}
    max_len = max(len(t) for t in tables) if tables else 0
    print(f"INFO: Counting rows in MariaDB tables")
    
    try:
        with conn.cursor() as cur:
            for table_name in tables:
                try:
                    cur.execute(f"SELECT COUNT(*) FROM {table_name};")
                    count = cur.fetchone()[0]
                    counts[table_name] = count
                except Exception as count_error:
                    print(f"ERROR: {count_error}")
                    counts[table_name] = 'Error'

        print("--- MariaDB Table Row Counts ---")
        for table, count in counts.items():
            print(f"{table:<{max_len}} : {count}")
        print("---------------------------------")
        return counts

    except Exception as e:
        print(f"ERROR: {e}")
        return None

In [None]:
# MariaDB Operations Execution

# Schema initialization
with open('schemas/mariadb_schema.sql', 'r') as f:
    mariadb_schema = f.read()

initialize_mariadb_schema(mariadb_client, mariadb_schema)

# Table verification
required_tables = ['teachers', 'subjects', 'classes', 'students', 'enrollments', 'grades', 'schedules']
verify_mariadb_tables(mariadb_client, required_tables)

# Data loading
load_mariadb_data(mariadb_client, scale_100_dir)

# Count verification
verify_mariadb_counts(mariadb_client, required_tables)
CELL_END

# MongoDB Operations

In [None]:
# MongoDB Methods
def initialize_mongo_schema(client, db_name='benchmark'):
    """
    Initializes the MongoDB schema by creating necessary collections.
    """
    try:
        db = client[db_name]
        
        # List of collections to create based on no_sql_design.txt
        collections = ['students', 'teachers', 'classes', 'subjects']
        
        # Drop existing collections if they exist
        for collection in collections:
            if collection in db.list_collection_names():
                db[collection].drop()
                print(f"INFO: Dropped MongoDB collection: {collection}")
        
        # Create collections with indexes
        for collection in collections:
            db.create_collection(collection)
            print(f"INFO: Created MongoDB collection: {collection}")
            
            # Create indexes for performance
            if collection == 'students':
                db[collection].create_index([("last_name", 1), ("first_name", 1)])
            elif collection == 'classes':
                db[collection].create_index([("name", 1)])
                
        print("INFO: MongoDB schema initialized.")
    except Exception as e:
        print(f"ERROR: {e}")

def verify_mongo_collections(client, db_name='benchmark', expected_collections=None):
    """
    Verifies if the expected collections exist in MongoDB.
    """
    if expected_collections is None:
        expected_collections = ['students', 'teachers', 'classes', 'subjects']
    
    try:
        db = client[db_name]
        existing_collections = db.list_collection_names()
        
        missing_collections = set(expected_collections) - set(existing_collections)
        if not missing_collections:
            print(f"INFO: All MongoDB collections exist: {', '.join(expected_collections)}")
            return True
        else:
            print(f"WARNING: Missing MongoDB collections: {', '.join(missing_collections)}")
            return False
    except Exception as e:
        print(f"ERROR: {e}")
        return False

def insert_mongo_data_from_csv(client, collection_name, csv_file) -> tuple[float, float, float]:
    operation_start_time = time.time() # Initialize start_time
    file_opened_start_time = 0 # Initialize file_opened_start_time
    try:
        db = client['benchmark']
        collection = db[collection_name]
        
        with open(csv_file, 'r') as f:
            reader = pd.read_csv(f)
            # rename id to _id for MongoDB
            if 'id' in reader.columns:
                reader.rename(columns={'id': '_id'}, inplace=True)

            file_opened_start_time = time.time() # Initialize start_time just before starting to insert
            for _, row in reader.iterrows():
                doc = row.to_dict()
                collection.insert_one(doc)
    except Exception as e:
        print(f"ERROR: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, file_opened_start_time, end_time)

def insert_mongo_students_from_csv(client, data_path) -> tuple[float, float]:
    # load all grades and enrollments into students from csv files
    # create a student object with embedded enrollments and grades
    students_file = data_path / 'students.csv'
    enrollments_file = data_path / 'enrollments.csv'
    grades_file = data_path / 'grades.csv'
    operation_start_time = time.time() # Initialize start_time

    try:
        db = client['benchmark']
        collection = db['students']
        
        with open(students_file, 'r') as f:
            reader = pd.read_csv(f)
            for _, row in reader.iterrows():
                student_doc = {
                    "_id": row['id'],
                    "first_name": row['first_name'],
                    "last_name": row['last_name'],
                    "birth_date": row['birth_date'],
                    "enrollments": [],
                    "grades": []
                }
                collection.insert_one(student_doc)

        with open(enrollments_file, 'r') as f:
            reader = pd.read_csv(f)
            for _, row in reader.iterrows():
                student_id = row['student_id']
                enrollment_doc = {
                    "class_id": row['class_id'],
                    "enrolled_at": row['enrolled_at']
                }
                collection.update_one(
                    {"_id": student_id},
                    {"$push": {"enrollments": enrollment_doc}}
                )

        with open(grades_file, 'r') as f:
            reader = pd.read_csv(f)
            for _, row in reader.iterrows():
                student_id = row['student_id']
                grade_doc = {
                    "subject_id": row['subject_id'],
                    "grade": row['grade'],
                    "created_at": row['created_at']
                }
                collection.update_one(
                    {"_id": student_id},
                    {"$push": {"grades": grade_doc}}
                )

    except Exception as e:
        print(f"ERROR: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, end_time)


def insert_mongo_classes_from_csv(client, data_path) -> tuple[float, float]:
    # load all teachers and schedules into classes from csv files
    # create a class object with embedded teachers and schedules
    classes_file = data_path / 'classes.csv'
    schedules_file = data_path / 'schedules.csv'
    operation_start_time = time.time() # Initialize start_time

    try:
        db = client['benchmark']
        collection = db['classes']

        with open(classes_file, 'r') as f:
            reader = pd.read_csv(f)
            for _, row in reader.iterrows():
                class_doc = {
                    "_id": row['id'],
                    "name": row['name'],
                    "teacher_id": row['teacher_id'],
                    "schedule": []
                }
                collection.insert_one(class_doc)

        with open(schedules_file, 'r') as f:
            reader = pd.read_csv(f)
            for _, row in reader.iterrows():
                class_id = row['class_id']
                schedule_doc = {
                    "subject_id": row['subject_id'],
                    "day_of_week": row['day_of_week'],
                    "time_start": row['time_start'],
                    "time_end": row['time_end']
                }
                collection.update_one(
                    {"_id": class_id},
                    {"$push": {"schedule": schedule_doc}}
                )

    except Exception as e:
        print(f"ERROR: {e}")
    finally:
        end_time = time.time()
        return (operation_start_time, end_time)

def load_mongo_data(client, data_dir):
    data_path = Path(data_dir)
    insert_mongo_data_from_csv(client, 'teachers', data_path / 'teachers.csv')
    insert_mongo_data_from_csv(client, 'subjects', data_path / 'subjects.csv')
    insert_mongo_students_from_csv(client, data_path)
    insert_mongo_classes_from_csv(client, data_path)

def verify_mongo_counts(client, db_name='benchmark'):
    """
    Counts documents in MongoDB collections.
    """
    collections = ['students', 'teachers', 'classes', 'subjects']
    max_len = max(len(c) for c in collections)
    
    try:
        db = client[db_name]
        counts = {}
        
        for collection in collections:
            try:
                count = db[collection].count_documents({})
                counts[collection] = count
            except Exception as e:
                print(f"ERROR: {e}")
                counts[collection] = 'Error'
                
        print("--- MongoDB Collection Document Counts ---")
        for collection, count in counts.items():
            print(f"{collection:<{max_len}} : {count}")
        print("-----------------------------------------")

        # Additional checks for embedded documents
        try:
            students_with_enrollments = db.students.count_documents({"enrollments": {"$exists": True, "$ne": []}})
            students_with_grades = db.students.count_documents({"grades": {"$exists": True, "$ne": []}})
            classes_with_schedules = db.classes.count_documents({"schedule": {"$exists": True, "$ne": []}})
            
            print("\n--- MongoDB Embedded Document Counts ---")
            print(f"Students with enrollments : {students_with_enrollments}")
            print(f"Students with grades      : {students_with_grades}")
            print(f"Classes with schedules    : {classes_with_schedules}")
            print("-----------------------------------------")
        except Exception as e:
            print(f"ERROR: {e}")
        
        return counts
    except Exception as e:
        print(f"ERROR: {e}")
        return None

In [None]:
# MongoDB Operations Execution

# Schema initialization
initialize_mongo_schema(mongo_client)

# Collection verification
verify_mongo_collections(mongo_client)

# Data loading
load_mongo_data(mongo_client, scale_100_dir)

# Document count verification
verify_mongo_counts(mongo_client)
CELL_END

In [None]:
# Cassandra data loading functions with minimal memory usage

def initialize_cassandra_schema(session, keyspace='benchmark'):
    """Initializes the Cassandra schema by creating necessary keyspace and tables."""
    try:
        # Create keyspace if not exists
        session.execute(f"""
            CREATE KEYSPACE IF NOT EXISTS {keyspace} 
            WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};
        """)
        
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Drop existing tables if they exist
        tables = ['teachers', 'subjects', 'classes', 'students', 
                    'enrollments', 'grades', 'schedules', 
                    'student_enrollments', 'student_grades']
        
        for table in tables:
            session.execute(f"DROP TABLE IF EXISTS {table};")
            print(f"INFO: Dropped Cassandra table: {table}")
        
        # Create tables with appropriate data types
        session.execute("""
            CREATE TABLE teachers (
                id INT PRIMARY KEY,
                first_name TEXT,
                last_name TEXT
            );
        """)
        
        session.execute("""
            CREATE TABLE subjects (
                id INT PRIMARY KEY,
                name TEXT,
                description TEXT
            );
        """)
        
        session.execute("""
            CREATE TABLE classes (
                id INT PRIMARY KEY,
                name TEXT,
                teacher_id INT
            );
        """)
        
        session.execute("""
            CREATE TABLE students (
                id INT PRIMARY KEY,
                first_name TEXT,
                last_name TEXT,
                birth_date TEXT
            );
        """)
        
        session.execute("""
            CREATE TABLE enrollments (
                student_id INT,
                class_id INT,
                enrolled_at TIMESTAMP,
                PRIMARY KEY (student_id, class_id)
            );
        """)
        
        session.execute("""
            CREATE TABLE grades (
                id INT PRIMARY KEY,
                student_id INT,
                subject_id INT,
                grade FLOAT,
                created_at TIMESTAMP
            );
        """)
        
        session.execute("""
            CREATE TABLE schedules (
                id INT PRIMARY KEY,
                class_id INT,
                subject_id INT,
                day_of_week INT,
                time_start TEXT,
                time_end TEXT
            );
        """)
        
        session.execute("""
            CREATE TABLE student_enrollments (
                student_id INT,
                class_id INT,
                class_name TEXT,
                teacher_id INT,
                enrolled_at TIMESTAMP,
                PRIMARY KEY (student_id, class_id)
            );
        """)
        
        session.execute("""
            CREATE TABLE student_grades (
                student_id INT,
                subject_id INT,
                subject_name TEXT,
                grade FLOAT,
                created_at TIMESTAMP,
                PRIMARY KEY (student_id, subject_id, created_at)
            ) WITH CLUSTERING ORDER BY (subject_id ASC, created_at DESC);
        """)
        
        print("INFO: Cassandra schema initialized.")
    except Exception as e:
        print(f"ERROR: {e}")

def verify_cassandra_tables(session, keyspace='benchmark', expected_tables=None):
    """Verifies if the expected tables exist in Cassandra."""
    if expected_tables is None:
        expected_tables = ['teachers', 'subjects', 'classes', 'students', 
                            'enrollments', 'grades', 'schedules']
    
    try:
        # Get existing tables
        query = f"""
            SELECT table_name FROM system_schema.tables 
            WHERE keyspace_name = '{keyspace}';
        """
        rows = session.execute(query)
        existing_tables = {row.table_name for row in rows}
        
        missing_tables = set(expected_tables) - existing_tables
        if not missing_tables:
            print(f"INFO: All Cassandra tables exist: {', '.join(expected_tables)}")
            return True
        else:
            print(f"WARNING: Missing Cassandra tables: {', '.join(missing_tables)}")
            return False
    except Exception as e:
        print(f"ERROR: {e}")
        return False

def insert_cassandra_teachers(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert teacher data from CSV, line by line."""
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO teachers (id, first_name, last_name) VALUES (?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 3:  # Ensure we have enough columns
                    session.execute(prepared_stmt, [
                        int(values[0]),       # id
                        values[1],            # first_name
                        values[2]             # last_name
                    ])
        
        print("INFO: Inserted teachers successfully")
    except Exception as e:
        print(f"ERROR: Failed to load teachers: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def insert_cassandra_subjects(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert subject data from CSV, line by line."""
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO subjects (id, name, description) VALUES (?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 3:  # Ensure we have enough columns
                    session.execute(prepared_stmt, [
                        int(values[0]),       # id
                        values[1],            # name
                        values[2]             # description
                    ])
        
        print("INFO: Inserted subjects successfully")
    except Exception as e:
        print(f"ERROR: Failed to load subjects: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def insert_cassandra_classes(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert class data from CSV, line by line."""
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO classes (id, name, teacher_id) VALUES (?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 3:  # Ensure we have enough columns
                    session.execute(prepared_stmt, [
                        int(values[0]),       # id
                        values[1],            # name
                        int(values[2])        # teacher_id
                    ])
        
        print("INFO: Inserted classes successfully")
    except Exception as e:
        print(f"ERROR: Failed to load classes: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def insert_cassandra_students(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert student data from CSV, line by line."""
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO students (id, first_name, last_name, birth_date) VALUES (?, ?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 4:  # Ensure we have enough columns
                    session.execute(prepared_stmt, [
                        int(values[0]),       # id
                        values[1],            # first_name
                        values[2],            # last_name
                        values[3]             # birth_date
                    ])
        
        print("INFO: Inserted students successfully")
    except Exception as e:
        print(f"ERROR: Failed to load students: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def insert_cassandra_enrollments(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert enrollment data from CSV, line by line, with timestamp handling."""
    from datetime import datetime
    
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO enrollments (student_id, class_id, enrolled_at) VALUES (?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 3:  # Ensure we have enough columns
                    # Convert timestamp string to datetime object
                    enrolled_at = datetime.fromisoformat(values[2].replace('Z', '+00:00'))
                    
                    session.execute(prepared_stmt, [
                        int(values[0]),       # student_id
                        int(values[1]),       # class_id
                        enrolled_at           # enrolled_at as datetime
                    ])
        
        print("INFO: Inserted enrollments successfully")
    except Exception as e:
        print(f"ERROR: Failed to load enrollments: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def insert_cassandra_grades(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert grades data from CSV, line by line, with UUID and timestamp handling."""
    from datetime import datetime
    from uuid import uuid4
    
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO grades (id, student_id, subject_id, grade, created_at) VALUES (?, ?, ?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 4:  # Ensure we have enough columns
                    # Convert timestamp string to datetime object
                    created_at = datetime.fromisoformat(values[4].replace('Z', '+00:00'))
                    
                    session.execute(prepared_stmt, [
                        int(values[0]),              # id (generated UUID)
                        int(values[1]),       # student_id
                        int(values[2]),       # subject_id
                        float(values[3]),     # grade
                        created_at            # created_at as datetime
                    ])
        
        print("INFO: Inserted grades successfully")
    except Exception as e:
        print(f"ERROR: Failed to load grades: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def insert_cassandra_schedules(session, csv_file, keyspace='benchmark') -> tuple:
    """Insert schedule data from CSV, line by line, with UUID and day mapping."""
    from uuid import uuid4
    
    operation_start_time = time.time()
    file_opened_start_time = 0
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # Day name to integer mapping
        day_map = {
            'Monday': 1, 'Tuesday': 2, 'Wednesday': 3, 
            'Thursday': 4, 'Friday': 5, 'Saturday': 6, 'Sunday': 7
        }
        
        # Prepare the insert statement
        prepared_stmt = session.prepare(
            "INSERT INTO schedules (id, class_id, subject_id, day_of_week, time_start, time_end) VALUES (?, ?, ?, ?, ?, ?)"
        )
        
        # Process CSV file line by line
        with open(csv_file, 'r') as f:
            # Skip header
            header = next(f)
            file_opened_start_time = time.time()
            
            for line in f:
                values = line.strip().split(',')
                if len(values) >= 6:  # Ensure we have enough columns
                    # Convert day name to integer
                    day_num = day_map.get(values[3], 0)
                    
                    session.execute(prepared_stmt, [
                        int(values[0]),              # id (generated UUID)
                        int(values[1]),       # class_id
                        int(values[2]),       # subject_id
                        day_num,              # day_of_week as int
                        values[4],            # time_start
                        values[5]             # time_end
                    ])
        
        print("INFO: Inserted schedules successfully")
    except Exception as e:
        print(f"ERROR: Failed to load schedules: {e}")
    
    end_time = time.time()
    return (operation_start_time, file_opened_start_time, end_time)

def populate_cassandra_denormalized_tables(session, data_dir, keyspace='benchmark'):
    """
    Populate denormalized tables for efficient queries.
    This requires more memory as we need to join data in Python.
    """
    from datetime import datetime
    import pandas as pd
    
    data_path = Path(data_dir)
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        
        # --- Populate student_enrollments table ---
        # This requires joining enrollments with classes
        enrollments_df = pd.read_csv(data_path / 'enrollments.csv')
        classes_df = pd.read_csv(data_path / 'classes.csv')
        
        # Prepare statement
        stmt = session.prepare(
            "INSERT INTO student_enrollments (student_id, class_id, class_name, teacher_id, enrolled_at) VALUES (?, ?, ?, ?, ?)"
        )
        
        # Join and process
        merged = pd.merge(enrollments_df, classes_df, left_on='class_id', right_on='id')
        for _, row in merged.iterrows():
            enrolled_at = datetime.fromisoformat(row['enrolled_at'].replace('Z', '+00:00'))
            
            session.execute(stmt, [
                int(row['student_id']),
                int(row['class_id']),
                row['name'],
                int(row['teacher_id']),
                enrolled_at
            ])
        
        print("INFO: Populated student_enrollments denormalized table")
        
        # --- Populate student_grades table ---
        # This requires joining grades with subjects
        grades_df = pd.read_csv(data_path / 'grades.csv')
        subjects_df = pd.read_csv(data_path / 'subjects.csv')
        
        # Prepare statement
        stmt = session.prepare(
            "INSERT INTO student_grades (student_id, subject_id, subject_name, grade, created_at) VALUES (?, ?, ?, ?, ?)"
        )
        
        # Join and process
        merged = pd.merge(grades_df, subjects_df, left_on='subject_id', right_on='id')
        for _, row in merged.iterrows():
            created_at = datetime.fromisoformat(row['created_at_x'].replace('Z', '+00:00'))
            
            session.execute(stmt, [
                int(row['student_id']),
                int(row['subject_id']),
                row['name'],
                float(row['grade']),
                created_at
            ])
        
        print("INFO: Populated student_grades denormalized table")
        
    except Exception as e:
        print(f"ERROR: Failed to populate denormalized tables: {e}")

def load_cassandra_data(session, data_dir, keyspace='benchmark'):
    """Load all data into Cassandra tables."""
    data_path = Path(data_dir)
    
    # Use the keyspace
    session.execute(f"USE {keyspace};")
    
    # Insert basic entities
    op_time, f_op_time, end_time = insert_cassandra_teachers(session, data_path / 'teachers.csv')
    print(f"INFO: Inserted teachers in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    op_time, f_op_time, end_time = insert_cassandra_subjects(session, data_path / 'subjects.csv')
    print(f"INFO: Inserted subjects in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    op_time, f_op_time, end_time = insert_cassandra_classes(session, data_path / 'classes.csv')
    print(f"INFO: Inserted classes in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    op_time, f_op_time, end_time = insert_cassandra_students(session, data_path / 'students.csv')
    print(f"INFO: Inserted students in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    # Insert relationships and complex data
    op_time, f_op_time, end_time = insert_cassandra_enrollments(session, data_path / 'enrollments.csv')
    print(f"INFO: Inserted enrollments in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    op_time, f_op_time, end_time = insert_cassandra_grades(session, data_path / 'grades.csv')
    print(f"INFO: Inserted grades in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    op_time, f_op_time, end_time = insert_cassandra_schedules(session, data_path / 'schedules.csv')
    print(f"INFO: Inserted schedules in {end_time - op_time:.2f} seconds (file opened in {end_time - f_op_time:.2f} seconds)")
    
    # Populate denormalized tables
    populate_cassandra_denormalized_tables(session, data_path)

def verify_cassandra_counts(session, keyspace='benchmark'):
    """Count rows in all Cassandra tables."""
    tables = ['teachers', 'subjects', 'classes', 'students', 
                'enrollments', 'grades', 'schedules', 
                'student_enrollments', 'student_grades']
    max_len = max(len(t) for t in tables)
    
    try:
        # Use the keyspace
        session.execute(f"USE {keyspace};")
        counts = {}
        
        for table in tables:
            try:
                rows = session.execute(f"SELECT COUNT(*) FROM {table}")
                count = rows.one()[0]
                counts[table] = count
            except Exception as e:
                print(f"ERROR: {e}")
                counts[table] = 'Error'
        
        print("--- Cassandra Table Row Counts ---")
        for table, count in counts.items():
            print(f"{table:<{max_len}} : {count}")
        print("----------------------------------")
        
        return counts
    except Exception as e:
        print(f"ERROR: {e}")
        return None

In [None]:
# Cassandra Operations Execution

# Schema initialization
initialize_cassandra_schema(cassandra_session)

# Table verification
required_tables = ['teachers', 'subjects', 'classes', 'students', 'enrollments', 'grades', 'schedules']
verify_cassandra_tables(cassandra_session, expected_tables=required_tables)

# Data loading
load_cassandra_data(cassandra_session, scale_100_dir)

# Row count verification
verify_cassandra_counts(cassandra_session)
CELL_END