# Benchmark

Microbenchmark various ways of storing tabular data in PostgreSQL or MongoDB.

In [1]:
from multiprocessing import Pool, cpu_count
import tqdm

import adbc_driver_postgresql.dbapi
import numpy as np
import pandas as pd
import pymongo
import pyarrow
import toolz
from typing import Any

Containers are running using `podman-compose up`.

In [2]:
MONGO_URI = "mongodb://root:example@localhost:27017"
PG_URI = "postgresql://postgres:example@localhost:5432"

In [3]:
mongo_client = pymongo.MongoClient(MONGO_URI)

Define data generation utilities.

In [4]:
def random_numbers(dtype: np.dtype, size: int):
    """
    Generate 1D array of random numbers based on the provided numpy dtype and size.
    """
    if np.issubdtype(dtype, np.floating):
        return np.random.random(size).astype(dtype)
        
    elif np.issubdtype(dtype, np.integer):
        info = np.iinfo(dtype)
        return np.random.randint(info.min, info.max, size=size, dtype=dtype)
        
    elif np.issubdtype(dtype, np.bool_):
        return np.random.choice([True, False], size=size)
        
    else:
        raise ValueError(f"Unsupported dtype: {dtype}")
        

def generate_data(n_rows: int, column_types: np.dtype):
    return pd.DataFrame({f"column_{i}": random_numbers(dtype, n_rows) for i, dtype in enumerate(column_types)})


def numpy_to_python_builtin(val: Any) -> Any:
    """Convert a single value from NumPy type to Python builtin type."""
    
    # Handle None
    if val is None:
        return None
        
    # Handle NumPy arrays
    if isinstance(val, np.ndarray):
        return val.tolist()
        
    # Handle NumPy scalars
    if isinstance(val, np.generic):
        if np.issubdtype(val.dtype, np.integer):
            return int(val)
        elif np.issubdtype(val.dtype, np.floating):
            return float(val)
        elif np.issubdtype(val.dtype, np.complexfloating):
            return complex(val)
        elif np.issubdtype(val.dtype, np.bool_):
            return bool(val)
        elif np.issubdtype(val.dtype, np.datetime64):
            return val.astype('datetime64[us]').item()
        elif np.issubdtype(val.dtype, np.str_):
            return str(val)
        else:
            return val.item()
            
    # Handle lists/tuples (recursively)
    if isinstance(val, (list, tuple)):
        return type(val)(convert_value(x) for x in val)
        
    # Handle dictionaries (recursively)
    if isinstance(val, dict):
        return {k: numpy_to_python_builtin(v) for k, v in val.items()}
        
    return val

In [5]:
generate_data(5, [np.float64])  # one column of 5 float64 values

Unnamed: 0,column_0
0,0.5628
1,0.485525
2,0.997986
3,0.387006
4,0.104592


In [6]:
generate_data(3, [np.float64, np.uint8, np.bool_, np.uint8])  # multiple columns with various types

Unnamed: 0,column_0,column_1,column_2,column_3
0,0.940758,80,True,231
1,0.945223,116,False,233
2,0.970032,221,True,113


# MongoDB document per row (legacy bluesky approach)

Construct rows as dicts of Python builtins.

In [7]:
def rows_as_dict_of_builtins(table: pd.DataFrame):
    for _, row_as_series in table.iterrows():
        yield numpy_to_python_builtin(dict(row_as_series))

In [8]:
list(rows_as_dict_of_builtins(generate_data(3, [np.float64, np.uint8, np.bool_])))

[{'column_0': 0.4286139016021725, 'column_1': 205, 'column_2': True},
 {'column_0': 0.43381465683788, 'column_1': 127, 'column_2': False},
 {'column_0': 0.1905587143499351, 'column_1': 71, 'column_2': True}]

In [9]:
def add_extra_keys(rows, extra_keys):
    "Include additional fixed values in each row."
    for row in rows:
        row.update(extra_keys)
        yield row

def insert_document_per_row(mongo_collection, table, extra_keys=None):
    "Insert table as document-per-row into Mongo collection, optionally added constant columns."
    BATCH_SIZE = 1_000  # Mongo-defined limit for insert_many
    extra_keys = extra_keys or {}
    for batch in toolz.partition_all(BATCH_SIZE, add_extra_keys(rows_as_dict_of_builtins(table), extra_keys)):
        mongo_collection.insert_many(batch)

def get_columns_from_document_per_row(collection, column_names=None, filter=None):
    projection = {"_id": False}
    if column_names is not None:
        projection.update({name: True for name in column_names})
    records = collection.find(filter, projection)
    return pd.DataFrame.from_records(records)

Insert and retrieve a little test data.

In [10]:
mongo_client["test"].drop_collection("x")  # ensure clean
collection = mongo_client["test"]["x"]
for scan in [1, 2]:
    insert_document_per_row(collection, generate_data(3, [np.float64, np.uint8, np.bool_]), extra_keys={"scan": scan})

In [11]:
list(collection.find())

[{'_id': ObjectId('67a191227a6776919b21a86f'),
  'column_0': 0.28477074723976104,
  'column_1': 236,
  'column_2': True,
  'scan': 1},
 {'_id': ObjectId('67a191227a6776919b21a870'),
  'column_0': 0.33697362839425504,
  'column_1': 85,
  'column_2': True,
  'scan': 1},
 {'_id': ObjectId('67a191227a6776919b21a871'),
  'column_0': 0.10568567093653436,
  'column_1': 128,
  'column_2': True,
  'scan': 1},
 {'_id': ObjectId('67a191227a6776919b21a872'),
  'column_0': 0.8136133370551494,
  'column_1': 93,
  'column_2': True,
  'scan': 2},
 {'_id': ObjectId('67a191227a6776919b21a873'),
  'column_0': 0.9657357664975449,
  'column_1': 98,
  'column_2': False,
  'scan': 2},
 {'_id': ObjectId('67a191227a6776919b21a874'),
  'column_0': 0.6510745911265755,
  'column_1': 208,
  'column_2': False,
  'scan': 2}]

In [12]:
get_columns_from_document_per_row(collection)

Unnamed: 0,column_0,column_1,column_2,scan
0,0.284771,236,True,1
1,0.336974,85,True,1
2,0.105686,128,True,1
3,0.813613,93,True,2
4,0.965736,98,False,2
5,0.651075,208,False,2


In [13]:
get_columns_from_document_per_row(collection, ["column_0"], {"scan": 1})

Unnamed: 0,column_0
0,0.284771
1,0.336974
2,0.105686


In [15]:
n_rows = 100_000
n_scans = 1000

collection_name = f"benchmark_{n_rows}_{n_scans}"
mongo_client["benchmark"].drop_collection(collection_name)  # ensure clean
collection = mongo_client["benchmark"][collection_name]
collection.create_index([("scan", 1)])

def insert_scan(scan_num):
    mongo_client = pymongo.MongoClient(MONGO_URI)
    collection = mongo_client["benchmark"][collection_name]
    insert_document_per_row(collection, generate_data(n_rows, [np.float64, np.uint8, np.bool_]), extra_keys={"scan": scan_num})


with Pool(cpu_count() - 2) as pool:
    list(
        tqdm.tqdm(pool.imap(insert_scan, range(1, 1 + n_scans)), total=n_scans)
    )

100%|██████████████████████████████████████████████| 1000/1000 [02:58<00:00,  5.59it/s]


In [16]:
collection.count_documents({})

100000000

In [17]:
# one signal from one scan
get_columns_from_document_per_row(collection, ["column_0"], {"scan": 952})

Unnamed: 0,column_0
0,0.139513
1,0.565555
2,0.316444
3,0.667147
4,0.233003
...,...
99995,0.080839
99996,0.712898
99997,0.980649
99998,0.114787


In [18]:
# data type expands
get_columns_from_document_per_row(collection, ["column_1"], {"scan": 952}).dtypes

column_1    int64
dtype: object

In [19]:
%%timeit

# one signal from one scan
get_columns_from_document_per_row(collection, ["column_0"], {"scan": 952})

169 ms ± 1.41 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [21]:
%%timeit

# all signals from one scan
get_columns_from_document_per_row(collection, None, {"scan": 952})

238 ms ± 2.02 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# PostgreSQL with ADBC

In [22]:
def insert_adbc(conn, table_name, table, extra_keys=None):
    extra_keys = extra_keys or {}
    with conn.cursor() as cursor:
        table = table.copy()
        for key, value in extra_keys.items():
            table[key] = value
        cursor.adbc_ingest(table_name, table, mode="create_append")
    conn.commit()


def get_columns_from_adbc(conn, table_name, column_names=None, filter=None):
    if column_names:
        select = ", ".join(column_names)
    else:
        select = "*"
    filter = filter or {}
    query = f"SELECT {select} FROM {table_name}"
    query += " AND ".join(f" WHERE {key}={value}" for key, value in filter.items())
    with conn.cursor() as cursor:
        df = pd.read_sql_query(query, pg_conn)
    return df

In [23]:
# ensure clean
with adbc_driver_postgresql.dbapi.connect(PG_URI) as pg_conn:
    with pg_conn.cursor() as cursor:
        cursor.execute(f"DROP TABLE IF EXISTS test")
        
    for scan in [1, 2]:
        insert_adbc(pg_conn, "test", generate_data(3, [np.float64, np.uint8, np.bool_]), extra_keys={"scan": scan})

In [24]:
with adbc_driver_postgresql.dbapi.connect(PG_URI) as pg_conn:
    get_columns_from_adbc(pg_conn, "test")

In [28]:
n_rows = 100_000
n_scans = 1000

table_name = f"benchmark_{n_rows}_{n_scans}"
        
with adbc_driver_postgresql.dbapi.connect(PG_URI) as pg_conn:
    # ensure clean
    with pg_conn.cursor() as cursor:
        cursor.execute(f"DROP TABLE IF EXISTS {table_name}")

    for scan_num in tqdm.tqdm(range(1, 1 + n_scans), total=n_scans):
        insert_adbc(pg_conn, table_name, generate_data(n_rows, [np.float64, np.uint8, np.bool_]), extra_keys={"scan": scan_num})


100%|██████████████████████████████████████████████| 1000/1000 [00:45<00:00, 22.12it/s]


In [29]:
with adbc_driver_postgresql.dbapi.connect(PG_URI) as pg_conn:
    with pg_conn.cursor() as cursor:
        cursor.execute(f"CREATE INDEX scan ON {table_name}(scan)")
    pg_conn.commit()

In [30]:
pg_conn = adbc_driver_postgresql.dbapi.connect(PG_URI)

In [31]:
get_columns_from_adbc(pg_conn, table_name, ["column_0"], {"scan": 972})

Unnamed: 0,column_0
0,0.447192
1,0.824779
2,0.994049
3,0.859056
4,0.549764
...,...
99995,0.377405
99996,0.219068
99997,0.946752
99998,0.760984


In [43]:
%%timeit

get_columns_from_adbc(pg_conn, table_name, ["column_0"], {"scan": 972})

20 ms ± 173 μs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [42]:
%%timeit

get_columns_from_adbc(pg_conn, table_name, None, {"scan": 972})

32 ms ± 69.3 μs per loop (mean ± std. dev. of 7 runs, 10 loops each)
