In [1]:
from dotenv import load_dotenv
import os
load_dotenv()
neon_url = os.getenv("NEONDB_DATABASE_URL")
render_url = os.getenv("RENDER_DATABASE_URL")
xata_url = os.getenv("XATA_DATABASE_URL")

In [6]:
import psycopg
import time
import statistics
import os

# Avg Connection Latency of NeonDB: 494.19 ms

In [7]:
conn_str = os.getenv("NEONDB_DATABASE_URL")
latencies = []

for _ in range(100):
    start = time.perf_counter()
    conn = psycopg.connect(conn_str)
    conn.close()
    end = time.perf_counter()
    latencies.append((end - start) * 1000)  # ms

avg_latency = statistics.mean(latencies)
print(f"Avg Connection Latency of NeonDB: {avg_latency:.2f} ms")

Avg Connection Latency of NeonDB: 494.19 ms


# Avg Connection Latency of Render: 948.76 ms

In [8]:
conn_str = os.getenv("RENDER_DATABASE_URL")
latencies = []

for _ in range(100):
    start = time.perf_counter()
    conn = psycopg.connect(conn_str)
    conn.close()
    end = time.perf_counter()
    latencies.append((end - start) * 1000)  # ms

avg_latency = statistics.mean(latencies)
print(f"Avg Connection Latency of Render: {avg_latency:.2f} ms")

Avg Connection Latency of Render: 948.76 ms


# Avg Connection Latency of Xata: 984.92 ms

In [11]:
conn_str = os.getenv("XATA_DATABASE_URL")
latencies = []

for _ in range(100):
    start = time.perf_counter()
    conn = psycopg.connect(conn_str)
    conn.close()
    end = time.perf_counter()
    latencies.append((end - start) * 1000)  # ms

avg_latency = statistics.mean(latencies)
print(f"Avg Connection Latency of Xata: {avg_latency:.2f} ms")

Avg Connection Latency of Xata: 984.92 ms


# NeonDB (2)

In [14]:
# %% [markdown]
# Benchmark 2: Query Latency (SELECT) - students table
# %%
import psycopg
import time
import statistics
import os
import random

db_url = os.getenv("NEONDB_DATABASE_URL")  # Neon/Render/Xata URL
conn = psycopg.connect(db_url)
cur = conn.cursor()

# Warm up connection
cur.execute("SELECT 1")
conn.commit()

# Indexed SELECT on uni_roll (PK, indexed)
indexed_latencies = []
for _ in range(100):
    uni_roll = f"UNI{random.randint(1, 50000):06d}"  # Format matching UNI000001 - UNI050000
    start = time.perf_counter()
    cur.execute("SELECT name, branch FROM students WHERE uni_roll = %s", (uni_roll,))
    result = cur.fetchone()
    end = time.perf_counter()
    if result:  # Only count successful hits
        indexed_latencies.append((end - start) * 1000)

# Non-indexed SELECT on name (partial match, full scan)
non_indexed_latencies = []
for _ in range(100):
    name_pattern = f"%{random.choice(['Andrea', 'Vicki', 'Steven', 'Christopher', 'Tyrone', 'Linda', 'Vanessa', 'Megan', 'Paige', 'James', 'Nicholas', 'Thomas', 'Tracy'])}%"
    start = time.perf_counter()
    cur.execute("SELECT uni_roll, branch FROM students WHERE name LIKE %s", (name_pattern,))
    results = cur.fetchall()
    end = time.perf_counter()
    non_indexed_latencies.append((end - start) * 1000)

# Branch filter (non-indexed, selective)
branch_latencies = []
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]
for _ in range(100):
    branch = random.choice(branches)
    start = time.perf_counter()
    cur.execute("SELECT count(*) FROM students WHERE branch = %s", (branch,))
    cur.fetchone()
    end = time.perf_counter()
    branch_latencies.append((end - start) * 1000)

print(f"Indexed Query (uni_roll) Avg: {statistics.mean(indexed_latencies):.2f} ms (n={len(indexed_latencies)})")
print(f"Non-Indexed Query (name LIKE) Avg: {statistics.mean(non_indexed_latencies):.2f} ms")
print(f"Branch Filter Avg: {statistics.mean(branch_latencies):.2f} ms")

# Store results
query_results = {
    'indexed_avg_ms': statistics.mean(indexed_latencies) if indexed_latencies else 0,
    'non_indexed_avg_ms': statistics.mean(non_indexed_latencies),
    'branch_avg_ms': statistics.mean(branch_latencies)
}

conn.close()

Indexed Query (uni_roll) Avg: 85.92 ms (n=100)
Non-Indexed Query (name LIKE) Avg: 90.95 ms
Branch Filter Avg: 86.97 ms


# Xata (2)

In [15]:
# %% [markdown]
# Benchmark 2: Query Latency (SELECT) - students table
# %%
import psycopg
import time
import statistics
import os
import random

db_url = os.getenv("XATA_DATABASE_URL")  # Neon/Render/Xata URL
conn = psycopg.connect(db_url)
cur = conn.cursor()

# Warm up connection
cur.execute("SELECT 1")
conn.commit()

# Indexed SELECT on uni_roll (PK, indexed)
indexed_latencies = []
for _ in range(100):
    uni_roll = f"UNI{random.randint(1, 50000):06d}"  # Format matching UNI000001 - UNI050000
    start = time.perf_counter()
    cur.execute("SELECT name, branch FROM students WHERE uni_roll = %s", (uni_roll,))
    result = cur.fetchone()
    end = time.perf_counter()
    if result:  # Only count successful hits
        indexed_latencies.append((end - start) * 1000)

# Non-indexed SELECT on name (partial match, full scan)
non_indexed_latencies = []
for _ in range(100):
    name_pattern = f"%{random.choice(['Andrea', 'Vicki', 'Steven', 'Christopher', 'Tyrone', 'Linda', 'Vanessa', 'Megan', 'Paige', 'James', 'Nicholas', 'Thomas', 'Tracy'])}%"
    start = time.perf_counter()
    cur.execute("SELECT uni_roll, branch FROM students WHERE name LIKE %s", (name_pattern,))
    results = cur.fetchall()
    end = time.perf_counter()
    non_indexed_latencies.append((end - start) * 1000)

# Branch filter (non-indexed, selective)
branch_latencies = []
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]
for _ in range(100):
    branch = random.choice(branches)
    start = time.perf_counter()
    cur.execute("SELECT count(*) FROM students WHERE branch = %s", (branch,))
    cur.fetchone()
    end = time.perf_counter()
    branch_latencies.append((end - start) * 1000)

print(f"Indexed Query (uni_roll) Avg: {statistics.mean(indexed_latencies):.2f} ms (n={len(indexed_latencies)})")
print(f"Non-Indexed Query (name LIKE) Avg: {statistics.mean(non_indexed_latencies):.2f} ms")
print(f"Branch Filter Avg: {statistics.mean(branch_latencies):.2f} ms")

# Store results
query_results = {
    'indexed_avg_ms': statistics.mean(indexed_latencies) if indexed_latencies else 0,
    'non_indexed_avg_ms': statistics.mean(non_indexed_latencies),
    'branch_avg_ms': statistics.mean(branch_latencies)
}

conn.close()

Indexed Query (uni_roll) Avg: 162.58 ms (n=100)
Non-Indexed Query (name LIKE) Avg: 177.77 ms
Branch Filter Avg: 170.61 ms


# Render (2)

In [16]:
# %% [markdown]
# Benchmark 2: Query Latency (SELECT) - students table
# %%
import psycopg
import time
import statistics
import os
import random

db_url = os.getenv("RENDER_DATABASE_URL")  # Neon/Render/Xata URL
conn = psycopg.connect(db_url)
cur = conn.cursor()

# Warm up connection
cur.execute("SELECT 1")
conn.commit()

# Indexed SELECT on uni_roll (PK, indexed)
indexed_latencies = []
for _ in range(100):
    uni_roll = f"UNI{random.randint(1, 50000):06d}"  # Format matching UNI000001 - UNI050000
    start = time.perf_counter()
    cur.execute("SELECT name, branch FROM students WHERE uni_roll = %s", (uni_roll,))
    result = cur.fetchone()
    end = time.perf_counter()
    if result:  # Only count successful hits
        indexed_latencies.append((end - start) * 1000)

# Non-indexed SELECT on name (partial match, full scan)
non_indexed_latencies = []
for _ in range(100):
    name_pattern = f"%{random.choice(['Andrea', 'Vicki', 'Steven', 'Christopher', 'Tyrone', 'Linda', 'Vanessa', 'Megan', 'Paige', 'James', 'Nicholas', 'Thomas', 'Tracy'])}%"
    start = time.perf_counter()
    cur.execute("SELECT uni_roll, branch FROM students WHERE name LIKE %s", (name_pattern,))
    results = cur.fetchall()
    end = time.perf_counter()
    non_indexed_latencies.append((end - start) * 1000)

# Branch filter (non-indexed, selective)
branch_latencies = []
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]
for _ in range(100):
    branch = random.choice(branches)
    start = time.perf_counter()
    cur.execute("SELECT count(*) FROM students WHERE branch = %s", (branch,))
    cur.fetchone()
    end = time.perf_counter()
    branch_latencies.append((end - start) * 1000)

print(f"Indexed Query (uni_roll) Avg: {statistics.mean(indexed_latencies):.2f} ms (n={len(indexed_latencies)})")
print(f"Non-Indexed Query (name LIKE) Avg: {statistics.mean(non_indexed_latencies):.2f} ms")
print(f"Branch Filter Avg: {statistics.mean(branch_latencies):.2f} ms")

# Store results
query_results = {
    'indexed_avg_ms': statistics.mean(indexed_latencies) if indexed_latencies else 0,
    'non_indexed_avg_ms': statistics.mean(non_indexed_latencies),
    'branch_avg_ms': statistics.mean(branch_latencies)
}

conn.close()

Indexed Query (uni_roll) Avg: 76.06 ms (n=100)
Non-Indexed Query (name LIKE) Avg: 106.81 ms
Branch Filter Avg: 92.22 ms


# Neon (3)

In [18]:
# %% [markdown]
# Benchmark 3: Write Throughput (INSERT/UPDATE) - students table
# %%
import psycopg
import time
import os
import random
from faker import Faker

fake = Faker()
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]

db_url = os.getenv("NEONDB_DATABASE_URL")
conn = psycopg.connect(db_url)
cur = conn.cursor()

# Bulk INSERT 1K new students
start = time.perf_counter()
insert_data = []
for i in range(1000):
    roll = f"TEMP{random.randint(100000, 999999):06d}"  # Temporary rolls to avoid conflicts, matching 6-digit format
    name = fake.name() + random.choice(["", " DVM", " PhD", " MD"])  # Optional titles like in sample
    insert_data.append((name, roll, random.choice(branches)))

cur.executemany("""
    INSERT INTO students (name, uni_roll, branch) 
    VALUES (%s, %s, %s)
    ON CONFLICT (uni_roll) DO NOTHING
""", insert_data)
conn.commit()
insert_time = time.perf_counter() - start
insert_tps = 1000 / insert_time if insert_time > 0 else 0

# Bulk UPDATE 1K existing students
existing_rolls = [f"UNI{random.randint(1, 50000):06d}" for _ in range(1000)]

start = time.perf_counter()
for i in range(0, 1000, 100):
    batch = existing_rolls[i:i+100]
    updates = [(random.choice(branches), roll) for roll in batch]
    cur.executemany("UPDATE students SET branch = %s WHERE uni_roll = %s", updates)
    conn.commit()
update_time = time.perf_counter() - start
update_tps = 1000 / update_time if update_time > 0 else 0

# Cleanup temp inserts
cur.execute("DELETE FROM students WHERE uni_roll LIKE 'TEMP%'")
conn.commit()

print(f"INSERT Throughput: {insert_tps:.0f} rows/sec")
print(f"UPDATE Throughput: {update_tps:.0f} rows/sec")

write_results = {
    'insert_tps': insert_tps,
    'update_tps': update_tps
}

conn.close()

INSERT Throughput: 1530 rows/sec
UPDATE Throughput: 343 rows/sec


# Xata (3)

In [None]:
# %% [markdown]
# Benchmark 3: Write Throughput (INSERT/UPDATE) - students table
# %%
import psycopg
import time
import os
import random
from faker import Faker

fake = Faker()
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]

db_url = os.getenv("XATA_DATABASE_URL")
conn = psycopg.connect(db_url)
cur = conn.cursor()

# Bulk INSERT 1K new students
start = time.perf_counter()
insert_data = []
for i in range(1000):
    roll = f"TEMP{random.randint(100000, 999999):06d}"  # Temporary rolls to avoid conflicts, matching 6-digit format
    name = fake.name() + random.choice(["", " DVM", " PhD", " MD"])  # Optional titles like in sample
    insert_data.append((name, roll, random.choice(branches)))

cur.executemany("""
    INSERT INTO students (name, uni_roll, branch) 
    VALUES (%s, %s, %s)
    ON CONFLICT (uni_roll) DO NOTHING
""", insert_data)
conn.commit()
insert_time = time.perf_counter() - start
insert_tps = 1000 / insert_time if insert_time > 0 else 0

# Bulk UPDATE 1K existing students
existing_rolls = [f"UNI{random.randint(1, 50000):06d}" for _ in range(1000)]

start = time.perf_counter()
for i in range(0, 1000, 100):
    batch = existing_rolls[i:i+100]
    updates = [(random.choice(branches), roll) for roll in batch]
    cur.executemany("UPDATE students SET branch = %s WHERE uni_roll = %s", updates)
    conn.commit()
update_time = time.perf_counter() - start
update_tps = 1000 / update_time if update_time > 0 else 0

# Cleanup temp inserts
cur.execute("DELETE FROM students WHERE uni_roll LIKE 'TEMP%'")
conn.commit()

print(f"INSERT Throughput: {insert_tps:.0f} rows/sec")
print(f"UPDATE Throughput: {update_tps:.0f} rows/sec")

write_results = {
    'insert_tps': insert_tps,
    'update_tps': update_tps
}

conn.close()

KeyboardInterrupt: 

# Render (3)

In [21]:
# %% [markdown]
# Benchmark 3: Write Throughput (INSERT/UPDATE) - students table
# %%
import psycopg
import time
import os
import random
from faker import Faker

fake = Faker()
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]

db_url = os.getenv("RENDER_DATABASE_URL")
conn = psycopg.connect(db_url)
cur = conn.cursor()

# Bulk INSERT 1K new students
start = time.perf_counter()
insert_data = []
for i in range(1000):
    roll = f"TEMP{random.randint(100000, 999999):06d}"  # Temporary rolls to avoid conflicts, matching 6-digit format
    name = fake.name() + random.choice(["", " DVM", " PhD", " MD"])  # Optional titles like in sample
    insert_data.append((name, roll, random.choice(branches)))

cur.executemany("""
    INSERT INTO students (name, uni_roll, branch) 
    VALUES (%s, %s, %s)
    ON CONFLICT (uni_roll) DO NOTHING
""", insert_data)
conn.commit()
insert_time = time.perf_counter() - start
insert_tps = 1000 / insert_time if insert_time > 0 else 0

# Bulk UPDATE 1K existing students
existing_rolls = [f"UNI{random.randint(1, 50000):06d}" for _ in range(1000)]

start = time.perf_counter()
for i in range(0, 1000, 100):
    batch = existing_rolls[i:i+100]
    updates = [(random.choice(branches), roll) for roll in batch]
    cur.executemany("UPDATE students SET branch = %s WHERE uni_roll = %s", updates)
    conn.commit()
update_time = time.perf_counter() - start
update_tps = 1000 / update_time if update_time > 0 else 0

# Cleanup temp inserts
cur.execute("DELETE FROM students WHERE uni_roll LIKE 'TEMP%'")
conn.commit()

print(f"INSERT Throughput: {insert_tps:.0f} rows/sec")
print(f"UPDATE Throughput: {update_tps:.0f} rows/sec")

write_results = {
    'insert_tps': insert_tps,
    'update_tps': update_tps
}

conn.close()

INSERT Throughput: 1842 rows/sec
UPDATE Throughput: 470 rows/sec


# Neon (4)

In [24]:
# %% [markdown]
# Benchmark 4: Concurrency Scaling - students table
# %%
import asyncio
import asyncpg
import statistics
import os
import random
from faker import Faker

fake = Faker()
branches = ["EE", "CHE", "ECE", "ME", "CSE", "CE", "PHM", "BT"]

async def client_task(conn_str, client_id, num_queries=100):
    conn = await asyncpg.connect(conn_str)
    query_latencies = []
    
    for i in range(num_queries):
        start = asyncio.get_event_loop().time()
        
        if random.random() < 0.7:  # 70% SELECT
            uni_roll = f"UNI{random.randint(1, 50000):06d}"
            try:
                result = await conn.fetchval(
                    "SELECT name FROM students WHERE uni_roll = $1", uni_roll
                )
                if result:
                    query_latencies.append((asyncio.get_event_loop().time() - start) * 1000)
            except:
                pass
        else:  # 30% INSERT
            temp_roll = f"TEMP{client_id}_{i:06d}"
            name = fake.name() + random.choice(["", " DVM", " PhD"])
            try:
                await conn.execute(
                    "INSERT INTO students (name, uni_roll, branch) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
                    name, temp_roll, random.choice(branches)
                )
                query_latencies.append((asyncio.get_event_loop().time() - start) * 1000)
            except:
                pass
        
        await asyncio.sleep(0.001)
    
    await conn.close()
    return query_latencies

async def run_concurrency(num_clients, num_queries=100):
    conn_str = os.getenv("NEONDB_DATABASE_URL")
    tasks = [client_task(conn_str, i, num_queries) for i in range(num_clients)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    all_latencies = []
    for result in results:
        if isinstance(result, list):
            all_latencies.extend(result)
    
    if all_latencies:
        return {
            'avg_latency_ms': statistics.mean(all_latencies),
            'p95_latency_ms': statistics.percentile(all_latencies, 95),
            'total_queries': len(all_latencies),
            'clients': num_clients
        }
    return None

# Run tests
concurrency_levels = [5, 20, 50]
concurrency_results = {}

print("Running concurrency tests...")
for n_clients in concurrency_levels:
    print(f"Testing {n_clients} concurrent clients...")
    result = await run_concurrency(n_clients)
    if result:
        concurrency_results[n_clients] = result
        print(f"  {n_clients} clients: {result['avg_latency_ms']:.2f}ms avg, "
              f"{result['p95_latency_ms']:.2f}ms P95, {result['total_queries']} queries")
    await asyncio.sleep(2)

# Cleanup
conn = await asyncpg.connect(os.getenv("DATABASE_URL"))
await conn.execute("DELETE FROM students WHERE uni_roll LIKE 'TEMP%'")
await conn.close()

print("\nConcurrency Results:", concurrency_results)

Running concurrency tests...
Testing 5 concurrent clients...
Testing 20 concurrent clients...
Testing 50 concurrent clients...


AttributeError: module 'statistics' has no attribute 'percentile'

# Neon (5)

In [23]:
# %% [markdown]
# Benchmark 5: Cold Start / Idle Resume Latency - students table
# %%
import psycopg
import time
import os
import random
import statistics

db_url = os.getenv("NEONDB_DATABASE_URL")
idle_duration = 900  # 15 minutes

def measure_query_latency(conn_str, query_type="warm"):
    conn = psycopg.connect(conn_str, connect_timeout=30)
    cur = conn.cursor()
    
    start = time.perf_counter()
    if query_type == "warm":
        uni_roll = f"UNI{random.randint(1, 15):06d}"  # Sample from small range for warm
        cur.execute("SELECT name FROM students WHERE uni_roll = %s LIMIT 1", (uni_roll,))
    else:
        branch = random.choice(["EE", "CHE", "ECE"])
        cur.execute("SELECT count(*) FROM students WHERE branch = %s", (branch,))
    result = cur.fetchone()
    end = time.perf_counter()
    
    latency_ms = (end - start) * 1000
    conn.close()
    return latency_ms, bool(result)

# Warm query
print("Running warm query...")
warm_latency, warm_success = measure_query_latency(db_url, "warm")
print(f"Warm query: {warm_latency:.2f} ms, Success: {warm_success}")

# Idle
print(f"Idling for {idle_duration//60} minutes...")
time.sleep(idle_duration)

# Cold query
print("Running cold query...")
cold_latency, cold_success = measure_query_latency(db_url, "cold")
print(f"Cold query: {cold_latency:.2f} ms, Success: {cold_success}")

# Connection latencies (5 iterations for avg)
conn_latencies = []
for _ in range(5):
    start = time.perf_counter()
    try:
        conn = psycopg.connect(db_url, connect_timeout=30)
        conn.close()
        conn_latencies.append((time.perf_counter() - start) * 1000)
    except Exception as e:
        print(f"Connection failed: {e}")

cold_start_results = {
    'warm_query_ms': warm_latency,
    'cold_query_ms': cold_latency,
    'cold_delta_ms': cold_latency - warm_latency,
    'avg_conn_latency_ms': statistics.mean(conn_latencies) if conn_latencies else 0,
    'cold_start_factor': (cold_latency / warm_latency) if warm_latency > 0 else 0
}

print(f"\nCold Start Analysis:")
print(f"Query degradation: {cold_start_results['cold_delta_ms']:.2f} ms")
print(f"Connection latency: {cold_start_results['avg_conn_latency_ms']:.2f} ms")
print(f"Performance factor: {cold_start_results['cold_start_factor']:.2f}x slower")

Running warm query...
Warm query: 131.36 ms, Success: True
Idling for 15 minutes...


KeyboardInterrupt: 