In [3]:
import importlib
import os
import time

import pandas as pd
from dotenv import load_dotenv

import database
importlib.reload(database)
from database import Database

puenkt_csv = 'data/puenkt.csv'
load_dotenv()

# Check if variables are loaded correctly
print(f"Source DB: {os.getenv('QDABABAV_POSTGRES_DBNAME')}")
print(f"Target DB: {os.getenv('POSTGRES_DBNAME')}")

Source DB: qdababav
Target DB: qdaba


### Load Data into my-postgres

In [4]:
import csv
# source
QDABABAV_POSTGRES_CONFIG = {
    'dbname': os.getenv('QDABABAV_POSTGRES_DBNAME'),
    'user': os.getenv('QDABABAV_POSTGRES_USER'),
    'password': os.getenv('QDABABAV_POSTGRES_PASSWORD'),
    'host': os.getenv('QDABABAV_POSTGRES_HOST'),
    'port': int(os.getenv('QDABABAV_POSTGRES_PORT'))
}

# target
POSTGRES_CONFIG = {
    'dbname': os.getenv('POSTGRES_DBNAME'),
    'user': os.getenv('POSTGRES_USER'),
    'password': os.getenv('POSTGRES_PASSWORD'),
    'host': os.getenv('POSTGRES_HOST'),
    'port': int(os.getenv('POSTGRES_PORT'))
}
try:
    source_table = 'qdababav.puenkt'
    target_table = 'qdaba.puenkt'
    ddl_path = './data/puenkt_ddl.sql'

    # source
    qdaba = Database(QDABABAV_POSTGRES_CONFIG, 'postgres')
    qdaba.connect()
    qdaba_cursor = qdaba.connection.cursor(name='source_cursor')
    # target
    postgres = Database(POSTGRES_CONFIG, 'postgres')
    postgres.connect()
    pg_cursor = postgres.connection.cursor()

    # DDL
    try:
        with open(ddl_path, 'r') as ddl_file:
            ddl = ddl_file.read()
            print("Executing DDL")
            pg_cursor.execute(ddl)
            postgres.connection.commit()
            print("Schema and table created successfully.")
    except Exception as e:
        print(f"Error executing DDL: {e}")

    qdaba_cursor.execute(f"SELECT * FROM {source_table};")
    rows_fetched = 0

    while True:
        rows = qdaba_cursor.fetchmany(10000)
        if not rows:
            break # No more data

        rows_fetched += len(rows)

        insert_query = f"INSERT INTO {target_table} VALUES ({', '.join(['%s'] * len(rows[0]))})"
        pg_cursor.executemany(insert_query, rows)
        postgres.connection.commit()

        print(f"Loaded {rows_fetched} rows so far...")

    print(f"Loaded {rows_fetched} total rows.")
except Exception as e:
    print(f"Error: {e}")
finally:
    qdaba_cursor.close()
    qdaba.connection.close()
    pg_cursor.close()
    postgres.connection.close()

Connected to Postgres database: qdababav
Connected to Postgres database: qdaba
Executing DDL
Schema and table created successfully.
Loaded 10000 rows so far...
Loaded 20000 rows so far...


KeyboardInterrupt: 

### Postgres

#### Connect to Postgres Database

In [90]:
POSTGRES_CONFIG = {
    'dbname': os.getenv('POSTGRES_DBNAME'),
    'user': os.getenv('POSTGRES_USER'),
    'password': os.getenv('POSTGRES_PASSWORD'),
    'host': os.getenv('POSTGRES_HOST'),
    'port': int(os.getenv('POSTGRES_PORT'))
}

postgres = Database(POSTGRES_CONFIG, 'postgres')
postgres.connect()
pg_cursor = postgres.connection.cursor()

Connected to Postgres


#### Load Data into Postgres DB

In [None]:
ddl_file = open('./data/puenkt_ddl.sql', 'r')
with pg_cursor as conn:
    create_schema = "CREATE SCHEMA IF NOT EXISTS qdaba;"
    create_table = ddl_file.read()
    conn.execute(create_schema)
    conn.execute(create_table)
    result = conn.copy_expert(f"COPY qdaba.puenkt FROM STDIN WITH (FORMAT CSV, HEADER TRUE);", open(f"./{puenkt_csv}", 'r', encoding='utf-8'))
    postgres.connection.commit()

In [85]:
sql_file_path = './data/puenktlichkeit.sql'

postgres.connect()
pg_cursor = postgres.connection.cursor()
with pg_cursor as conn:
    try:
        with open(sql_file_path, 'r') as file:
            query = file.read()

        # Measure the execution time
        start_time = time.time()
        conn.execute(query)
        results = conn.fetchall()
        end_time = time.time()

        # Compute the execution time
        execution_time = end_time - start_time
        print(f"Query executed in {execution_time:.6f} seconds.")

        # Print the number of results and first few rows (optional)
        print(f"Number of results: {len(results)}")
    except Exception as e:
        print(f"Error executing query: {e}")


Connected to Postgres
Query executed in 0.074382 seconds.
Number of results: 3068


### DuckDB

#### Connect to DuckDB Database

In [4]:
DUCKDB_CONFIG = {"filepath": "./data/duck.db"}
duck = Database(DUCKDB_CONFIG, 'duckdb')
duck.connect()
ddbconn = duck.connection

Connected to DuckDB at ./data/duck.db


#### Load Data into DuckDb

In [84]:
with ddbconn as conn:
    create_schema = "CREATE SCHEMA IF NOT EXISTS qdaba;"
    create_table = f"CREATE TABLE IF NOT EXISTS qdaba.puenkt AS FROM read_csv_auto('{puenkt_csv}');"
    copy_table = f"COPY qdaba.puenkt FROM '{puenkt_csv}' WITH (FORMAT CSV);"
    conn.execute(create_schema)
    conn.execute(create_table)
    result = conn.execute(copy_table)
    print(result.fetchall())

[(50000,)]


### Cassandra

#### Connect to Cassandra Database

In [68]:
CASSANDRA_CONFIG = {
    'host': os.getenv('CASSANDRA_HOST'),
    'port': int(os.getenv('CASSANDRA_PORT')),
    'keyspace': os.getenv('CASSANDRA_KEYSPACE'),
    'username': os.getenv('CASSANDRA_USERNAME'),
    'password': os.getenv('CASSANDRA_PASSWORD')
}

cassandra = Database(CASSANDRA_CONFIG, 'cassandra')
cassandra.connect()
ccconn = cassandra.session

Connected to Cassandra


#### Load Data into Cassandra

In [69]:
# This is too slow, have to do it in another way 12m 57s for 50.000 rows.
ddl_file_path = './data/cassandra_puenkt_ddl.sql'

with ccconn as conn:
    with open(ddl_file_path, 'r') as file:
        ddl_statements = file.read()
        conn.execute(ddl_statements)

    data = pd.read_csv(puenkt_csv, dtype={'id_anwendungsfall': str, 'id_fahrt_tu': str, 'id_verspcode': str, 'id_fahrt': str})
    insert_query = (f"INSERT INTO puenkt("
                    f"id_t_puenkt, id_linie_tu, id_fahrt, betriebstag, fahrt_halt_lauf, go_nr, id_an_ab, id_anwendungsfall, id_fahrt_tu, id_messpunkt, id_mvu, id_t_import, id_t_linie_bav, id_t_puenkt_fahrt, id_verspcode, insert_time, ist_tu, manipulation_time, qrelevant, soll_bav, soll_tu) "
                    f"VALUES ("
                    f"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")

    for index, row in data.iterrows():
        # Replace NaN or null strings with None for Cassandra
        row['id_verspcode'] = row['id_verspcode'] if pd.notna(row['id_verspcode']) else None
        row['manipulation_time'] = row['manipulation_time'] if pd.notna(row['manipulation_time']) else None

        conn.execute(insert_query, (
                row['id_t_puenkt'],
                row['id_linie_tu'],
                row['id_fahrt'],
                row['betriebstag'],
                row['fahrt_halt_lauf'],
                row['go_nr'],
                row['id_an_ab'],
                row['id_anwendungsfall'],
                row['id_fahrt_tu'],
                row['id_messpunkt'],
                row['id_mvu'],
                row['id_t_import'],
                row['id_t_linie_bav'],
                row['id_t_puenkt_fahrt'],
                row['id_verspcode'],
                row['insert_time'],
                row['ist_tu'],
                row['manipulation_time'],
                row['qrelevant'],
                row['soll_bav'],
                row['soll_tu']
            )
        )

    print("Data loaded into Cassandra")


Data loaded into Cassandra
