In [1]:
import duckdb
import pandas as pd
import timeit
import time
import polars as plr

In [2]:
def measure_time(func):
    def wrapper(*args, **kwargs):
        # Code before calling the original function

       
        
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()

        time_taken=end-start
        print(f"Time elapased is {time_taken} seconds")
        return time_taken
    return wrapper


Concurrency

In [3]:
import duckdb
from threading import Thread, current_thread
import random

duckdb_con = duckdb.connect('my_peristent_db.duckdb')
# Use connect without parameters for an in-memory database
# duckdb_con = duckdb.connect()
duckdb_con.execute("""
    CREATE OR REPLACE TABLE my_inserts (
        thread_name VARCHAR,
        insert_time TIMESTAMP DEFAULT current_timestamp
    )
""")

<duckdb.duckdb.DuckDBPyConnection at 0x266d3193170>

In [4]:
def write_from_thread(duckdb_con):
    # Create a DuckDB connection specifically for this thread
    local_con = duckdb_con.cursor()
    # Insert a row with the name of the thread. insert_time is auto-generated.
    thread_name = str(current_thread().name)
    result = local_con.execute("""
        INSERT INTO my_inserts (thread_name)
        VALUES (?)
    """, (thread_name,)).fetchall()

def read_from_thread(duckdb_con):
    # Create a DuckDB connection specifically for this thread
    local_con = duckdb_con.cursor()
    # Query the current row count
    thread_name = str(current_thread().name)
    results = local_con.execute("""
        SELECT
            ? AS thread_name,
            count(*) AS row_counter,
            current_timestamp
        FROM my_inserts
    """, (thread_name,)).fetchall()
    print(results)

In [5]:
write_thread_count = 50
read_thread_count = 5
threads = []

# Create multiple writer and reader threads (in the same process)
# Pass in the same connection as an argument
for i in range(write_thread_count):
    threads.append(Thread(target = write_from_thread,
                            args = (duckdb_con,),
                            name = 'write_thread_' + str(i)))

for j in range(read_thread_count):
    threads.append(Thread(target = read_from_thread,
                            args = (duckdb_con,),
                            name = 'read_thread_' + str(j)))

# Shuffle the threads to simulate a mix of readers and writers
random.seed(6) # Set the seed to ensure consistent results when testing
random.shuffle(threads)

In [6]:
# Kick off all threads in parallel
for thread in threads:
    thread.start()

# Ensure all threads complete before printing final results
for thread in threads:
    thread.join()

print(duckdb_con.execute("""
    SELECT *
    FROM my_inserts
    ORDER BY
        insert_time
""").df())

[('read_thread_4', 23, datetime.datetime(2024, 9, 26, 17, 37, 21, 929000, tzinfo=<DstTzInfo 'Asia/Calcutta' IST+5:30:00 STD>))]
[('read_thread_0', 37, datetime.datetime(2024, 9, 26, 17, 37, 21, 939000, tzinfo=<DstTzInfo 'Asia/Calcutta' IST+5:30:00 STD>))]
[('read_thread_2', 36, datetime.datetime(2024, 9, 26, 17, 37, 21, 938000, tzinfo=<DstTzInfo 'Asia/Calcutta' IST+5:30:00 STD>))]
[('read_thread_3', 19, datetime.datetime(2024, 9, 26, 17, 37, 21, 927000, tzinfo=<DstTzInfo 'Asia/Calcutta' IST+5:30:00 STD>))]
[('read_thread_1', 1, datetime.datetime(2024, 9, 26, 17, 37, 21, 915000, tzinfo=<DstTzInfo 'Asia/Calcutta' IST+5:30:00 STD>))]
        thread_name             insert_time
0   write_thread_39 2024-09-26 17:37:21.905
1   write_thread_40 2024-09-26 17:37:21.905
2   write_thread_27 2024-09-26 17:37:21.905
3    write_thread_4 2024-09-26 17:37:21.906
4   write_thread_38 2024-09-26 17:37:21.906
5   write_thread_41 2024-09-26 17:37:21.907
6   write_thread_22 2024-09-26 17:37:21.908
7   write

In the above set of write threads and a set of read threads were combined , shuffled and executed.

While Pandas, is single threaded, such as operation is not possible.

But with DuckDB, multiple read and write operations are possible simultanously.
DuckDB not only allows faster executionn of queries but also parallel execution of queries too.
 


Peristence Testing

In [8]:
con = duckdb.connect('my_database.duckdb')
con.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER,
        name VARCHAR,
        age INTEGER
    )
''')
con.execute("INSERT INTO users VALUES (1, 'Alice', 25), (2, 'Bob', 30)")
result = con.execute('SELECT * FROM users').fetchall()


for row in result:
    print(row)


con.close()


(1, 'Alice', 25)
(2, 'Bob', 30)


In [9]:
con = duckdb.connect('my_database.duckdb')
result = con.execute('SELECT * FROM users').fetchall()


for row in result:
    print(row)


con.close()


(1, 'Alice', 25)
(2, 'Bob', 30)
