# SynaDB Playground

**Don't trust our claims. Verify them yourself.**

This notebook runs real benchmarks with the actual SynaDB library. We use **relative comparisons** that work on any hardware.

| Benchmark | Comparison |
|-----------|------------|
| Mmap vs VectorStore | MmapVectorStore batch insert faster |
| GWI vs HNSW | GWI builds faster |
| HNSW vs Brute Force | HNSW search faster |
| Schema-Free | 4 data types stored correctly |
| Crash Recovery | Data integrity after reopen |
| Tensor Extraction | Direct NumPy from history |

---
**Links:** [GitHub](https://github.com/gtava5813/SynaDB) | [Docs](https://github.com/gtava5813/SynaDB/wiki) | [PyPI](https://pypi.org/project/synadb/)

## Setup

In [None]:
# Install SynaDB
!pip install -q synadb numpy

import synadb
print(f"SynaDB {synadb.__version__} installed")

In [None]:
#@title Test Scale Configuration { run: "auto" }
SCALE = "medium" #@param ["small", "medium", "large"]

SCALES = {
    "small":  {"records": 10_000,    "vectors": 1_000,   "dims": 384},
    "medium": {"records": 100_000,   "vectors": 10_000,  "dims": 768},
    "large":  {"records": 1_000_000, "vectors": 50_000,  "dims": 768},
}

cfg = SCALES[SCALE]
print(f"Scale: {SCALE.upper()} | Records: {cfg['records']:,} | Vectors: {cfg['vectors']:,} | Dims: {cfg['dims']}")

# Results collector
RESULTS = {}

In [None]:
# Common imports
import time
import os
import numpy as np
from synadb import SynaDB, VectorStore

def cleanup(*paths):
    for p in paths:
        if os.path.exists(p):
            os.remove(p)

---
## Benchmark 1: Core Database Performance
**Relative: Read faster than write (in-memory index)**

In [None]:
n = cfg["records"]
db_path = "bench_write.db"
cleanup(db_path)

db = SynaDB(db_path, sync_on_write=False)

print(f"Writing {n:,} records...")
start = time.perf_counter()
for i in range(n):
    db.put_float(f"sensor/temp/{i}", 20.0 + (i % 100) * 0.1)
write_time = time.perf_counter() - start
write_rate = n / write_time

print(f"Write rate: {write_rate:,.0f} ops/sec")
print(f"File: {os.path.getsize(db_path) / 1e6:.1f} MB")

# Now test read performance
read_count = min(n, 100_000)
indices = np.random.randint(0, n, size=read_count)

print(f"\nReading {read_count:,} random records...")
start = time.perf_counter()
for i in indices:
    _ = db.get_float(f"sensor/temp/{i}")
read_time = time.perf_counter() - start
read_rate = read_count / read_time

read_vs_write = read_rate / write_rate if write_rate > 0 else 0
core_pass = read_rate > write_rate

print(f"Read rate: {read_rate:,.0f} ops/sec")
print(f"\nRead vs Write: {read_vs_write:.1f}x | {'PASS' if core_pass else 'FAIL'}")
db.close()

RESULTS['core_db'] = {'write_rate': write_rate, 'read_rate': read_rate, 'speedup': read_vs_write, 'pass': core_pass}

---
## Benchmark 2: Vector Store + HNSW
**Relative: HNSW search faster than brute force**

In [None]:
n_vec = cfg["vectors"]
dims = cfg["dims"]
vec_path = "bench_vectors.db"
cleanup(vec_path)

store = VectorStore(vec_path, dimensions=dims)
vectors = np.random.randn(n_vec, dims).astype(np.float32)

print(f"Inserting {n_vec:,} vectors ({dims} dims)...")
start = time.perf_counter()
for i, vec in enumerate(vectors):
    store.insert(f"doc_{i}", vec)
insert_time = time.perf_counter() - start
insert_rate = n_vec / insert_time

query = np.random.randn(dims).astype(np.float32)

# Brute force search (before index)
print("Brute force search...")
start = time.perf_counter()
brute_results = store.search(query, k=10)
brute_ms = (time.perf_counter() - start) * 1000

print(f"Building HNSW index...")
start = time.perf_counter()
store.build_index()
build_time = time.perf_counter() - start

# HNSW search (after index)
print("HNSW search...")
start = time.perf_counter()
hnsw_results = store.search(query, k=10)
hnsw_ms = (time.perf_counter() - start) * 1000

speedup = brute_ms / hnsw_ms if hnsw_ms > 0 else 0
vector_pass = hnsw_ms < brute_ms

print(f"\nInsert: {insert_rate:,.0f}/sec | Build: {build_time:.1f}s")
print(f"Brute force: {brute_ms:.2f}ms | HNSW: {hnsw_ms:.2f}ms | Speedup: {speedup:.1f}x")
print(f"Result: {'PASS' if vector_pass else 'FAIL'}")
print(f"Top result: {hnsw_results[0].key} (score: {hnsw_results[0].score:.4f})")
store.close()

RESULTS['hnsw_vs_brute'] = {'brute_ms': brute_ms, 'hnsw_ms': hnsw_ms, 'speedup': speedup, 'pass': vector_pass}
RESULTS['hnsw_build'] = {'value': build_time, 'unit': 's'}

---
## Benchmark 3: MmapVectorStore vs VectorStore
**Relative: MmapVectorStore batch insert faster than VectorStore**

In [None]:
try:
    from synadb import MmapVectorStore
    
    mmap_path = "bench_mmap.mmap"
    cleanup(mmap_path)
    
    store = MmapVectorStore(mmap_path, dimensions=dims, initial_capacity=n_vec * 2)
    keys = [f"vec_{i}" for i in range(n_vec)]
    
    print(f"MmapVectorStore batch inserting {n_vec:,} vectors...")
    start = time.perf_counter()
    store.insert_batch(keys, vectors)
    mmap_time = time.perf_counter() - start
    mmap_rate = n_vec / mmap_time
    store.close()
    
    # Compare with VectorStore insert rate from earlier
    speedup = mmap_rate / insert_rate if insert_rate > 0 else 0
    mmap_pass = speedup >= 2  # At least 2x faster
    
    print(f"MmapVectorStore: {mmap_rate:,.0f}/sec")
    print(f"VectorStore: {insert_rate:,.0f}/sec")
    print(f"Speedup: {speedup:.1f}x | {'PASS' if mmap_pass else 'FAIL'}")
    
    RESULTS['mmap_vs_vector'] = {'mmap_rate': mmap_rate, 'vector_rate': insert_rate, 'speedup': speedup, 'pass': mmap_pass}
except ImportError:
    print("MmapVectorStore not available in this version")
    RESULTS['mmap_vs_vector'] = {'speedup': 0, 'pass': None, 'note': 'Not available'}

---
## Benchmark 4: GWI vs HNSW Build Time
**Relative: GWI builds faster than HNSW**

In [None]:
try:
    from synadb import GravityWellIndex
    
    gwi_path = "bench_gwi.gwi"
    cleanup(gwi_path)
    
    gwi = GravityWellIndex(gwi_path, dimensions=dims)
    gwi.initialize(vectors[:1000])
    
    print(f"GWI: Inserting {n_vec:,} vectors...")
    start = time.perf_counter()
    gwi.insert_batch(keys, vectors)
    gwi_time = time.perf_counter() - start
    gwi.close()
    
    speedup = build_time / gwi_time if gwi_time > 0 else 0
    gwi_pass = speedup >= 5  # At least 5x faster
    
    print(f"GWI build: {gwi_time:.2f}s | HNSW build: {build_time:.2f}s")
    print(f"Speedup: {speedup:.1f}x | {'PASS' if gwi_pass else 'FAIL'}")
    
    RESULTS['gwi_vs_hnsw'] = {'gwi_time': gwi_time, 'hnsw_time': build_time, 'speedup': speedup, 'pass': gwi_pass}
except ImportError:
    print("GravityWellIndex not available in this version")
    RESULTS['gwi_vs_hnsw'] = {'speedup': 0, 'pass': None, 'note': 'Not available'}

---
## Benchmark 5: Crash Recovery
**Functional: Full data recovery after reopen**

In [None]:
print(f"Reopening database with {n:,} entries...")
start = time.perf_counter()
db2 = SynaDB(db_path)
recovery_time = time.perf_counter() - start

recovery_rate = n / recovery_time

# Verify integrity
sample = db2.get_float(f"sensor/temp/{n//2}")
expected = 20.0 + ((n//2) % 100) * 0.1
integrity_ok = abs(sample - expected) < 0.001

print(f"Recovery: {recovery_time:.3f}s | Rate: {recovery_rate:,.0f}/sec")
print(f"Integrity: {'PASS' if integrity_ok else 'FAIL'}")
db2.close()

RESULTS['recovery'] = {'rate': recovery_rate, 'time': recovery_time, 'pass': integrity_ok}

---
## Benchmark 6: Schema-Free Storage
**Functional: Store any type without migrations**

In [None]:
schema_path = "bench_schema.db"
cleanup(schema_path)

db = SynaDB(schema_path)

db.put_float("metrics/accuracy", 0.95)
db.put_int("metrics/epoch", 100)
db.put_text("config/model", "bert-base-uncased")
db.put_bytes("data/binary", b"\x00\x01\x02\x03\xff")

# Verify
float_ok = db.get_float('metrics/accuracy') == 0.95
int_ok = db.get_int('metrics/epoch') == 100
text_ok = db.get_text('config/model') == "bert-base-uncased"
bytes_ok = db.get_bytes('data/binary') == b"\x00\x01\x02\x03\xff"

types_passed = sum([float_ok, int_ok, text_ok, bytes_ok])
schema_pass = types_passed == 4

print("Schema-free storage test:")
print(f"  Float: {'OK' if float_ok else 'FAIL'} | Int: {'OK' if int_ok else 'FAIL'} | Text: {'OK' if text_ok else 'FAIL'} | Bytes: {'OK' if bytes_ok else 'FAIL'}")
print(f"Result: {types_passed}/4 types | {'PASS' if schema_pass else 'FAIL'}")
db.close()

RESULTS['schema_free'] = {'types': types_passed, 'target': 4, 'pass': schema_pass}

---
## Benchmark 7: Tensor Extraction
**Functional: Direct NumPy tensor extraction**

In [None]:
tensor_path = "bench_tensor.db"
cleanup(tensor_path)

db = SynaDB(tensor_path, sync_on_write=False)

ts_count = min(n, 100_000)
print(f"Writing {ts_count:,} time-series values...")
for i in range(ts_count):
    db.put_float("sensor/temperature", 20.0 + np.sin(i / 100) * 5)

print("Extracting as NumPy tensor...")
start = time.perf_counter()
tensor = db.get_history_tensor("sensor/temperature")
extract_time = time.perf_counter() - start

tensor_pass = tensor is not None and len(tensor) == ts_count

print(f"Shape: {tensor.shape} | Dtype: {tensor.dtype}")
print(f"Size: {tensor.nbytes / 1e6:.2f} MB | Time: {extract_time:.3f}s")
print(f"Result: {'PASS' if tensor_pass else 'FAIL'}")
db.close()

RESULTS['tensor'] = {'count': len(tensor), 'time': extract_time, 'pass': tensor_pass}

---
## Summary

In [None]:
# Cleanup all test files
cleanup("bench_write.db", "bench_vectors.db", "bench_mmap.mmap", 
        "bench_gwi.gwi", "bench_schema.db", "bench_tensor.db")

print("=" * 70)
print("SYNADB RELATIVE BENCHMARK RESULTS")
print("=" * 70)
print(f"Scale: {SCALE.upper()} | Records: {cfg['records']:,} | Vectors: {cfg['vectors']:,}")
print("-" * 70)
print(f"{'Benchmark':<25} {'Result':<25} {'Status':<10}")
print("-" * 70)

# Core DB (Read vs Write)
r = RESULTS.get('core_db', {})
print(f"{'Read vs Write':<25} {r.get('speedup', 0):>20.1f}x faster {'PASS' if r.get('pass') else 'FAIL':<10}")

# HNSW vs Brute Force
r = RESULTS.get('hnsw_vs_brute', {})
print(f"{'HNSW vs Brute Force':<25} {r.get('speedup', 0):>20.1f}x faster {'PASS' if r.get('pass') else 'FAIL':<10}")

# MmapVectorStore vs VectorStore
r = RESULTS.get('mmap_vs_vector', {})
if r.get('note'):
    print(f"{'Mmap vs VectorStore':<25} {'N/A':>25} {'SKIP':<10}")
else:
    print(f"{'Mmap vs VectorStore':<25} {r.get('speedup', 0):>20.1f}x faster {'PASS' if r.get('pass') else 'FAIL':<10}")

# GWI vs HNSW
r = RESULTS.get('gwi_vs_hnsw', {})
if r.get('note'):
    print(f"{'GWI vs HNSW':<25} {'N/A':>25} {'SKIP':<10}")
else:
    print(f"{'GWI vs HNSW':<25} {r.get('speedup', 0):>20.1f}x faster {'PASS' if r.get('pass') else 'FAIL':<10}")

# Recovery
r = RESULTS.get('recovery', {})
print(f"{'Crash Recovery':<25} {'Integrity OK':>25} {'PASS' if r.get('pass') else 'FAIL':<10}")

# Schema-Free
r = RESULTS.get('schema_free', {})
print(f"{'Schema-Free Storage':<25} {str(r.get('types', 0)) + '/4 types':>25} {'PASS' if r.get('pass') else 'FAIL':<10}")

# Tensor
r = RESULTS.get('tensor', {})
print(f"{'Tensor Extraction':<25} {str(r.get('count', 0)) + ' values':>25} {'PASS' if r.get('pass') else 'FAIL':<10}")

print("-" * 70)

# Count results
passed = sum(1 for r in RESULTS.values() if r.get('pass') is True)
failed = sum(1 for r in RESULTS.values() if r.get('pass') is False)
skipped = sum(1 for r in RESULTS.values() if r.get('pass') is None)
total = passed + failed

print(f"TOTAL: {passed}/{total} benchmarks passed")
if skipped > 0:
    print(f"       {skipped} tests skipped (features not available)")
print("=" * 70)
print("\nTip: Change SCALE to 'large' for more rigorous testing")
print("\nLinks:")
print("  GitHub: https://github.com/gtava5813/SynaDB")
print("  PyPI:   pip install synadb")