# PostgreSQL benchmark notebook

This notebook:
1. Starts a PostgreSQL Docker container and mounts your CSV folder.
2. Creates a database named 'bench'.
3. Loads each CSV as a table with inferred types.
4. Builds basic indexes and inferred foreign keys.
5. Defines a read-heavy and write-heavy workload.
6. Runs a concurrent benchmark and samples CPU and memory via `docker stats`.
7. Reports throughput, p95 latency, average CPU and memory, and storage footprint.
8. Writes results to `/mnt/data/pg_results.json` and `/mnt/data/pg_latency_detail.csv`.


In [1]:

# Utility imports and setup
import os
import sys
import time
import json
import math
import random
import string
import re
import socket
from pathlib import Path
from datetime import datetime, timedelta
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd

# install packages if missing
def _pip_install(pkgs):
    import importlib
    to_install = []
    for mod, pip_name in pkgs:
        try:
            importlib.import_module(mod)
        except Exception:
            to_install.append(pip_name)
    if to_install:
        import sys
        cmd = [sys.executable, "-m", "pip", "install", "-q"] + to_install
        print("Installing:", " ".join(to_install))
        subprocess.run(cmd, check=True)

_pip_install([
    ("sqlalchemy", "sqlalchemy>=2.0.30"),
    ("psycopg2", "psycopg2-binary>=2.9.9"),
    ("pyodbc", "pyodbc>=5.1.0"),
    ("pandas", "pandas>=2.2.2"),
    ("pyarrow", "pyarrow>=16.1.0"),
])

import sqlalchemy as sa
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.types import Integer, BigInteger, Float, Boolean, DateTime, Text, String, DECIMAL

def run_cmd(cmd_list, check=True, capture=True):
    if capture:
        res = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
        if check and res.returncode != 0:
            print(res.stdout)
            print(res.stderr)
            raise RuntimeError(f"Command failed: {' '.join(cmd_list)}")
        return res.stdout.strip()
    else:
        res = subprocess.run(cmd_list, check=check)
        return ""

def assert_docker():
    try:
        out = run_cmd(["docker", "--version"])
    except Exception as e:
        raise RuntimeError("Docker CLI not found. Install Docker Desktop and ensure 'docker' is on PATH.") from e

def safe_name(name):
    base = re.sub(r"[^0-9a-zA-Z_]+", "_", name).strip("_").lower()
    if not base:
        base = "tbl_" + ''.join(random.choices(string.ascii_lowercase, k=6))
    if re.match(r"^[0-9]", base):
        base = "t_" + base
    return base

def list_csvs(csv_dir):
    p = Path(csv_dir)
    if not p.exists():
        raise FileNotFoundError(f"CSV_DIR not found: {csv_dir}")
    files = [str(x) for x in p.glob("*.csv")]
    if not files:
        raise FileNotFoundError(f"No CSV files found in: {csv_dir}")
    return files

def infer_sqlalchemy_dtypes(sample_df, engine_kind):
    dtype_map = {}
    for col in sample_df.columns:
        s = sample_df[col].dropna()
        if s.empty:
            dtype_map[col] = Text()
            continue
        if pd.api.types.is_integer_dtype(s):
            try:
                mx = int(s.max())
                mn = int(s.min())
                if mn < -2147483648 or mx > 2147483647:
                    dtype_map[col] = BigInteger()
                else:
                    dtype_map[col] = Integer()
            except Exception:
                dtype_map[col] = BigInteger()
        elif pd.api.types.is_float_dtype(s):
            if engine_kind == "mssql":
                dtype_map[col] = DECIMAL(38, 10)
            else:
                dtype_map[col] = Float()
        elif pd.api.types.is_bool_dtype(s):
            dtype_map[col] = Boolean()
        elif pd.api.types.is_datetime64_any_dtype(s):
            dtype_map[col] = DateTime()
        else:
            if engine_kind == "mssql":
                dtype_map[col] = String(length=None)
            else:
                dtype_map[col] = Text()
    return dtype_map

def load_csv_to_sql(engine: Engine, table_name: str, csv_path: str, engine_kind: str, chunksize: int = 100_000):
    it = pd.read_csv(csv_path, nrows=5000)
    dt_cols = [c for c in it.columns if re.search(r"(date|time|timestamp)$", c, flags=re.I)]
    if dt_cols:
        it = pd.read_csv(csv_path, nrows=5000, parse_dates=dt_cols, infer_datetime_format=True, dayfirst=False, keep_date_col=True, encoding_errors="ignore")
    dtype_map = infer_sqlalchemy_dtypes(it, engine_kind)

    reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
    created = False
    total_rows = 0
    for i, chunk in enumerate(reader):
        chunk.columns = [safe_name(c) for c in chunk.columns]
        if not created:
            chunk.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtype_map, method=None)
            created = True
        else:
            chunk.to_sql(table_name, engine, if_exists="append", index=False, method=None)
        total_rows += len(chunk)
        if (i + 1) % 10 == 0:
            print(f"Loaded {total_rows} rows into {table_name} ...")
    if not created:
        df = pd.read_csv(csv_path, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
        df.columns = [safe_name(c) for c in df.columns]
        df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=infer_sqlalchemy_dtypes(df, engine_kind))
        total_rows = len(df)
    return total_rows

def classify_tables(table_names):
    fact = None
    dims = []
    for t in table_names:
        if re.search(r"(^|_)fact($|_)", t):
            fact = t
        else:
            dims.append(t)
    if fact is None and table_names:
        fact = sorted(table_names)[0]
    return fact, dims

def inspect_schema(engine: Engine, schema=None):
    insp = sa.inspect(engine)
    tables = insp.get_table_names(schema=schema)
    info = {}
    for t in tables:
        cols = insp.get_columns(t, schema=schema)
        info[t] = {
            "columns": cols,
            "pk": [c["name"] for c in cols if c.get("primary_key", False)]
        }
    return info

def guess_roles(engine: Engine, table: str):
    insp = sa.inspect(engine)
    cols = [c["name"] for c in insp.get_columns(table)]
    id_cols = [c for c in cols if c == "id" or c.endswith("_id")]
    dt_cols = []
    for c in cols:
        try:
            typ = insp.get_columns(table, c)[0]["type"]
        except Exception:
            typ = None
        if re.search(r"(date|time|timestamp)$", c):
            dt_cols.append(c)
    if not dt_cols:
        dt_cols = [c for c in cols if re.search(r"(date|time)", c)]
    measure_cols = []
    for c in cols:
        if c in id_cols:
            continue
        try:
            typ = insp.get_columns(table, c)[0]["type"]
            text_typ = str(typ).lower()
            if any(k in text_typ for k in ["int", "bigint", "float", "double", "numeric", "decimal"]):
                measure_cols.append(c)
        except Exception:
            pass
    id_cols = id_cols[:5]
    dt_col = dt_cols[0] if dt_cols else None
    measure_col = measure_cols[0] if measure_cols else None
    return {"id_cols": id_cols, "dt_col": dt_col, "measure_col": measure_col, "all_cols": cols}

def create_indexes_and_fks(engine: Engine, fact: str, dims: list, dialect: str):
    insp = sa.inspect(engine)
    with engine.begin() as conn:
        for d in dims:
            dcols = [c["name"] for c in insp.get_columns(d)]
            if "id" in dcols:
                fk_col = f"{d}_id"
                fcols = [c["name"] for c in insp.get_columns(fact)]
                if fk_col in fcols:
                    idx_name = f"ix_{fact}_{fk_col}"
                    try:
                        conn.execute(text(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {fact} ({fk_col})"))
                    except Exception:
                        try:
                            conn.execute(text(f"IF NOT EXISTS (SELECT name FROM sys.indexes WHERE name = '{idx_name}') CREATE INDEX {idx_name} ON {fact} ({fk_col})"))
                        except Exception as e:
                            print(f"Index creation skipped for {fk_col}: {e}")
                    fk_name = f"fk_{fact}_{fk_col}_{d}_id"
                    try:
                        conn.execute(text(f"ALTER TABLE {fact} ADD CONSTRAINT {fk_name} FOREIGN KEY ({fk_col}) REFERENCES {d}(id)"))
                    except Exception as e:
                        pass
        roles = guess_roles(engine, fact)
        if roles["dt_col"]:
            idxdt = f"ix_{fact}_{roles['dt_col']}"
            try:
                conn.execute(text(f"CREATE INDEX IF NOT EXISTS {idxdt} ON {fact} ({roles['dt_col']})"))
            except Exception:
                try:
                    conn.execute(text(f"IF NOT EXISTS (SELECT name FROM sys.indexes WHERE name = '{idxdt}') CREATE INDEX {idxdt} ON {fact} ({roles['dt_col']})"))
                except Exception:
                    pass

def monitor_docker_stats(container_name, stop_event, interval=1.0):
    samples = []
    while not stop_event.is_set():
        try:
            out = run_cmd(["docker", "stats", container_name, "--no-stream", "--format", "{{json .}}"], check=False)
            if out.strip():
                try:
                    rec = json.loads(out.strip())
                    cpu = float(str(rec.get("CPUPerc","0")).strip("%"))
                    mem_perc = float(str(rec.get("MemPerc","0")).strip("%"))
                    mem_usage = rec.get("MemUsage","0 / 0")
                    samples.append({"ts": time.time(), "cpu_percent": cpu, "mem_percent": mem_perc, "mem_usage_raw": mem_usage})
                except Exception:
                    pass
        except Exception:
            pass
        time.sleep(interval)
    return samples

def p95(values):
    if not values:
        return None
    s = sorted(values)
    k = int(math.ceil(0.95 * len(s))) - 1
    return s[max(0, min(k, len(s)-1))]

def summarise_stats(samples):
    if not samples:
        return {"avg_cpu_percent": None, "avg_mem_percent": None, "p95_cpu_percent": None, "p95_mem_percent": None}
    cpus = [x["cpu_percent"] for x in samples if x.get("cpu_percent") is not None]
    mems = [x["mem_percent"] for x in samples if x.get("mem_percent") is not None]
    return {
        "avg_cpu_percent": sum(cpus)/len(cpus) if cpus else None,
        "avg_mem_percent": sum(mems)/len(mems) if mems else None,
        "p95_cpu_percent": p95(cpus) if cpus else None,
        "p95_mem_percent": p95(mems) if mems else None,
        "num_samples": len(samples)
    }

class WorkloadRunner:
    def __init__(self, engine: Engine, dialect: str, fact: str, dims: list, duration_sec: int = 60, concurrency: int = 8, read_ratio: float = 0.8):
        self.engine = engine
        self.dialect = dialect
        self.fact = fact
        self.dims = dims
        self.duration_sec = duration_sec
        self.concurrency = concurrency
        self.read_ratio = read_ratio
        self.roles = guess_roles(engine, fact)
        self._stop_time = time.time() + duration_sec
        self.latencies = []
        self.errors = 0
        self.completed = 0

    def _sql_now(self):
        return "CURRENT_TIMESTAMP" if self.dialect == "postgresql" else "SYSDATETIME()"

    def _month_trunc(self, col):
        if self.dialect == "postgresql":
            return f"date_trunc('month', {col})"
        else:
            return f"DATETRUNC(month, {col})"

    def _random_read_query(self):
        m = self.roles["measure_col"]
        dt = self.roles["dt_col"]
        id_cols = self.roles["id_cols"]
        if m and dt:
            return f"SELECT {self._month_trunc(dt)} AS m, SUM({m}) AS s FROM {self.fact} GROUP BY {self._month_trunc(dt)} ORDER BY m DESC OFFSET 0 ROWS FETCH NEXT 50 ROWS ONLY" if self.dialect == "mssql" else f"SELECT {self._month_trunc(dt)} AS m, SUM({m}) AS s FROM {self.fact} GROUP BY {self._month_trunc(dt)} ORDER BY m DESC LIMIT 50"
        if id_cols:
            fk = id_cols[0]
            match_dim = None
            for d in self.dims:
                if fk == f"{d}_id":
                    match_dim = d
                    break
            if match_dim:
                return f"SELECT d.id, COUNT(*) AS c FROM {self.fact} f JOIN {match_dim} d ON f.{fk} = d.id GROUP BY d.id ORDER BY c DESC"
        return f"SELECT COUNT(*) FROM {self.fact}"

    def _random_write_query(self):
        id_cols = self.roles["id_cols"]
        dt = self.roles["dt_col"]
        m = self.roles["measure_col"]
        cols = self.roles["all_cols"]
        if id_cols and m:
            with self.engine.begin() as conn:
                fk = id_cols[0]
                try:
                    keys = conn.execute(text(f"SELECT {fk} FROM {self.fact} WHERE {fk} IS NOT NULL ORDER BY {self._dialect_random()} LIMIT 1000" if self.dialect=="postgresql" else f"SELECT TOP 1000 {fk} FROM {self.fact} WHERE {fk} IS NOT NULL ORDER BY NEWID()")).fetchall()
                    key = keys[random.randrange(len(keys))][0] if keys else None
                except Exception:
                    key = None
            val_m = round(random.random() * 1000, 6)
            cols_in = []
            vals_in = []
            if id_cols and key is not None:
                cols_in.append(fk)
                vals_in.append(str(key))
            if dt:
                cols_in.append(dt)
                vals_in.append(self._now_literal())
            if m:
                cols_in.append(m)
                vals_in.append(str(val_m))
            if not cols_in:
                cols_in = [m]
                vals_in = [str(val_m)]
            return f"INSERT INTO {self.fact} ({', '.join(cols_in)}) VALUES ({', '.join(vals_in)})"
        target_col = None
        with self.engine.begin() as conn:
            for c in cols:
                if c not in id_cols and c != dt:
                    target_col = c
                    break
        if target_col:
            return f"UPDATE {self.fact} SET {target_col} = {target_col} WHERE 1=0"
        return f"SELECT 1"

    def _now_literal(self):
        return self._sql_now()

    def _dialect_random(self):
        return "random()" if self.dialect == "postgresql" else "NEWID()"

    def _exec(self, sql: str):
        t0 = time.perf_counter()
        ok = True
        err = ""
        try:
            with self.engine.begin() as conn:
                conn.execute(text(sql))
        except Exception as e:
            ok = False
            err = str(e)[:200]
        dt = time.perf_counter() - t0
        self.latencies.append({"ts": time.time(), "sql": sql, "ok": ok, "latency_ms": dt * 1000.0, "dialect": self.dialect})
        if ok:
            self.completed += 1
        else:
            self.errors += 1

    def _worker(self):
        rng = random.Random()
        while time.time() < self._stop_time:
            q = self._random_read_query() if rng.random() < self.read_ratio else self._random_write_query()
            self._exec(q)

    def run(self):
        stop_event = threading.Event()
        self._mon_samples = []
        def mon_wrapper():
            self._mon_samples.extend(monitor_docker_stats(self._container_name, stop_event, 1.0))
        mon_thr = threading.Thread(target=mon_wrapper, daemon=True)
        mon_thr.start()
        t_start = time.time()
        with ThreadPoolExecutor(max_workers=self.concurrency) as ex:
            futs = [ex.submit(self._worker) for _ in range(self.concurrency)]
            for f in as_completed(futs):
                pass
        stop_event.set()
        mon_thr.join(timeout=2.0)
        t_end = time.time()
        self.elapsed = t_end - t_start
        return self

    @property
    def _container_name(self):
        return "pg_bench" if self.dialect == "postgresql" else "sqlserver_bench"

    def summary(self):
        lats_ok = [x["latency_ms"] for x in self.latencies if x["ok"]]
        return {
            "transactions": self.completed,
            "errors": self.errors,
            "elapsed_sec": self.elapsed,
            "throughput_tps": (self.completed / self.elapsed) if self.elapsed > 0 else None,
            "p95_latency_ms": p95(lats_ok) if lats_ok else None,
            "cpu_mem_summary": summarise_stats(self._mon_samples)
        }


In [2]:

# Configuration
CSV_DIR = r"C:\Users\uhati\Desktop\Project phase 2\CSV"
PG_USER = "postgres"
PG_PASSWORD = "postgres"
PG_DB = "bench"
PG_PORT = 55432
PG_CONTAINER = "pg_bench"
PG_IMAGE = "postgres:16"
DATA_VOLUME = "pg_bench_data"

assert_docker()


In [3]:

# Start PostgreSQL container
mount_src = os.path.abspath(CSV_DIR)
print("CSV_DIR:", mount_src)
existing = run_cmd(["docker", "ps", "-a", "--format", "{{.Names}}"])
if PG_CONTAINER in existing.splitlines():
    state = run_cmd(["docker", "inspect", "-f", "{{.State.Status}}", PG_CONTAINER])
    if state != "running":
        print("Removing existing container:", PG_CONTAINER)
        run_cmd(["docker", "rm", "-f", PG_CONTAINER], check=False)

run_cmd(["docker", "pull", PG_IMAGE])
cmd = [
    "docker", "run", "-d",
    "--name", PG_CONTAINER,
    "-e", f"POSTGRES_USER={PG_USER}",
    "-e", f"POSTGRES_PASSWORD={PG_PASSWORD}",
    "-e", "POSTGRES_DB=postgres",
    "-p", f"{PG_PORT}:5432",
    "-v", f"{DATA_VOLUME}:/var/lib/postgresql/data",
    "-v", f"{mount_src}:/csv",
    PG_IMAGE
]
print("Running:", " ".join(cmd))
run_cmd(cmd)
time.sleep(2)

def wait_port(host, port, timeout=90):
    t0 = time.time()
    while time.time() - t0 < timeout:
        s = socket.socket()
        s.settimeout(2.0)
        try:
            s.connect((host, port))
            s.close()
            return True
        except Exception:
            time.sleep(1.0)
    return False

ok = wait_port("127.0.0.1", PG_PORT, 90)
print("Port ready:", ok)


CSV_DIR: C:\Users\uhati\Desktop\Project phase 2\CSV
Running: docker run -d --name pg_bench -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres -p 55432:5432 -v pg_bench_data:/var/lib/postgresql/data -v C:\Users\uhati\Desktop\Project phase 2\CSV:/csv postgres:16
Port ready: True


In [6]:
# Connect to Postgres and create bench database (autocommit-safe)
from sqlalchemy.engine.url import URL

admin_url = URL.create(
    drivername="postgresql+psycopg2",
    username=PG_USER,
    password=PG_PASSWORD,
    host="127.0.0.1",
    port=PG_PORT,
    database="postgres",
)
admin_engine = sa.create_engine(admin_url, pool_pre_ping=True, future=True)

# Check existence using a normal transaction
with admin_engine.begin() as conn:
    exists = conn.execute(
        text("SELECT 1 FROM pg_database WHERE datname = :d"),
        {"d": PG_DB},
    ).scalar()

# Create the database outside a transaction
if not exists:
    with admin_engine.connect() as conn:
        conn = conn.execution_options(isolation_level="AUTOCOMMIT")
        conn.execute(text(f'CREATE DATABASE "{PG_DB}"'))
        print("Created database:", PG_DB)
else:
    print("Database exists:", PG_DB)

# Connect to the target database
bench_url = URL.create(
    drivername="postgresql+psycopg2",
    username=PG_USER,
    password=PG_PASSWORD,
    host="127.0.0.1",
    port=PG_PORT,
    database=PG_DB,
)
engine = sa.create_engine(bench_url, pool_pre_ping=True, future=True)

with engine.begin() as conn:
    conn.execute(text("SELECT current_database()"))
print("Connected to:", bench_url)


Created database: bench
Connected to: postgresql+psycopg2://postgres:***@127.0.0.1:55432/bench


In [7]:
# Ensure a live SQLAlchemy engine to the 'bench' DB exists
try:
    engine  # noqa: F821
except NameError:
    from sqlalchemy.engine.url import URL
    bench_url = URL.create(
        drivername="postgresql+psycopg2",
        username=PG_USER,
        password=PG_PASSWORD,
        host="127.0.0.1",
        port=PG_PORT,
        database=PG_DB,
    )
    engine = sa.create_engine(bench_url, pool_pre_ping=True, future=True)

# Quick sanity check
with engine.begin() as conn:
    conn.execute(text("SELECT 1"))
print("Engine ready.")


Engine ready.


In [8]:

# Load all CSVs into tables
csvs = list_csvs(CSV_DIR)
table_map = {}
rows_loaded = {}
t0 = time.time()
for f in csvs:
    name = safe_name(Path(f).stem)
    table_map[name] = f
    print(f"Loading {f} -> table {name}")
    n = load_csv_to_sql(engine, name, f, "postgresql", chunksize=200_000)
    rows_loaded[name] = n
t_load = time.time() - t0
print("Loaded tables:", rows_loaded)
print(f"Load time seconds: {t_load:.2f}")

fact, dims = classify_tables(list(table_map.keys()))
print("Fact table:", fact)
print("Dim tables:", dims)

create_indexes_and_fks(engine, fact, dims, "postgresql")


Loading C:\Users\uhati\Desktop\Project phase 2\CSV\campaign_desc.csv -> table campaign_desc
Loading C:\Users\uhati\Desktop\Project phase 2\CSV\campaign_table.csv -> table campaign_table
Loading C:\Users\uhati\Desktop\Project phase 2\CSV\causal_data.csv -> table causal_data


  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")


Loaded 2000000 rows into causal_data ...
Loaded 4000000 rows into causal_data ...
Loaded 6000000 rows into causal_data ...
Loaded 8000000 rows into causal_data ...
Loaded 10000000 rows into causal_data ...
Loaded 12000000 rows into causal_data ...
Loaded 14000000 rows into causal_data ...
Loaded 16000000 rows into causal_data ...
Loaded 18000000 rows into causal_data ...
Loaded 20000000 rows into causal_data ...
Loaded 22000000 rows into causal_data ...
Loaded 24000000 rows into causal_data ...
Loaded 26000000 rows into causal_data ...
Loaded 28000000 rows into causal_data ...
Loaded 30000000 rows into causal_data ...
Loaded 32000000 rows into causal_data ...
Loaded 34000000 rows into causal_data ...
Loaded 36000000 rows into causal_data ...
Loading C:\Users\uhati\Desktop\Project phase 2\CSV\coupon.csv -> table coupon


  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")


Loading C:\Users\uhati\Desktop\Project phase 2\CSV\coupon_redempt.csv -> table coupon_redempt
Loading C:\Users\uhati\Desktop\Project phase 2\CSV\hh_demographic.csv -> table hh_demographic
Loading C:\Users\uhati\Desktop\Project phase 2\CSV\product.csv -> table product


  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")


Loading C:\Users\uhati\Desktop\Project phase 2\CSV\transaction_data.csv -> table transaction_data


  it = pd.read_csv(csv_path, nrows=5000, parse_dates=dt_cols, infer_datetime_format=True, dayfirst=False, keep_date_col=True, encoding_errors="ignore")
  it = pd.read_csv(csv_path, nrows=5000, parse_dates=dt_cols, infer_datetime_format=True, dayfirst=False, keep_date_col=True, encoding_errors="ignore")
  it = pd.read_csv(csv_path, nrows=5000, parse_dates=dt_cols, infer_datetime_format=True, dayfirst=False, keep_date_col=True, encoding_errors="ignore")
  reader = pd.read_csv(csv_path, chunksize=chunksize, parse_dates=dt_cols if dt_cols else None, infer_datetime_format=True, encoding_errors="ignore")
  for i, chunk in enumerate(reader):
  for i, chunk in enumerate(reader):
  for i, chunk in enumerate(reader):
  for i, chunk in enumerate(reader):


Loaded 2000000 rows into transaction_data ...


  for i, chunk in enumerate(reader):
  for i, chunk in enumerate(reader):


Loaded tables: {'campaign_desc': 30, 'campaign_table': 7208, 'causal_data': 36786524, 'coupon': 124548, 'coupon_redempt': 2318, 'hh_demographic': 801, 'product': 92353, 'transaction_data': 2595732}
Load time seconds: 644.26
Fact table: campaign_desc
Dim tables: ['campaign_desc', 'campaign_table', 'causal_data', 'coupon', 'coupon_redempt', 'hh_demographic', 'product', 'transaction_data']


In [10]:
from pathlib import Path

OUTPUT_DIR = Path(r"C:\Users\uhati\Desktop\Project phase 2\results")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

PG_LAT_PATH = OUTPUT_DIR / "pg_latency_detail.csv"
PG_JSON_PATH = OUTPUT_DIR / "pg_results.json"
print("Results will be saved to:", OUTPUT_DIR)

Results will be saved to: C:\Users\uhati\Desktop\Project phase 2\results


In [11]:
# Define and run the workload
# Inspect schema and roles
info = inspect_schema(engine)
roles = guess_roles(engine, fact)
print("Schema roles before:", roles)

# If no timestamp or numeric measure detected, promote common date-like text columns
if roles["dt_col"] is None:
    with engine.begin() as conn:
        insp = sa.inspect(engine)
        for t in insp.get_table_names():
            cols = {c["name"]: c for c in insp.get_columns(t)}
            candidates = [
                "start_day","end_day","start_date","end_date",
                "created_at","updated_at","timestamp","event_time","date","datetime"
            ]
            for col in candidates:
                if col in cols and "text" in str(cols[col]["type"]).lower():
                    try:
                        conn.execute(text(
                            f"ALTER TABLE {t} ALTER COLUMN {col} TYPE timestamp USING NULLIF({col}, '')::timestamp"
                        ))
                        print(f"Converted {t}.{col} to timestamp")
                    except Exception:
                        # If conversion fails, skip silently
                        pass
    # Refresh roles after attempted promotion
    roles = guess_roles(engine, fact)

print("Schema roles now:", roles)

# Workload configuration
DURATION_SEC = 60
CONCURRENCY = 8
READ_RATIO = 0.8

# Run the benchmark
runner = WorkloadRunner(engine, "postgresql", fact, dims,
                        duration_sec=DURATION_SEC,
                        concurrency=CONCURRENCY,
                        read_ratio=READ_RATIO)
runner.run()
summary = runner.summary()
print("Benchmark summary:", json.dumps(summary, indent=2))

# Save latency details
lat_df = pd.DataFrame(runner.latencies)
lat_df.to_csv(PG_LAT_PATH, index=False)
print("Latency details:", PG_LAT_PATH)

# Storage footprint
with engine.begin() as conn:
    db_bytes = conn.execute(text("SELECT pg_database_size(:d)"), {"d": PG_DB}).scalar()
    per_table = conn.execute(text("""
        SELECT relname AS table_name, pg_total_relation_size(relid) AS bytes
        FROM pg_catalog.pg_statio_user_tables
        ORDER BY pg_total_relation_size(relid) DESC
    """)).mappings().all()
store = {
    "database_bytes": int(db_bytes) if db_bytes is not None else None,
    "tables": [{"table": r["table_name"], "bytes": int(r["bytes"])} for r in per_table]
}

# Results package
results = {
    "engine": "postgresql",
    "version": run_cmd(["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-t", "-c", "SHOW server_version;"], check=False).strip(),
    "rows_loaded": rows_loaded,
    "load_time_sec": t_load,
    "workload": {"duration_sec": DURATION_SEC, "concurrency": CONCURRENCY, "read_ratio": READ_RATIO},
    "bench_summary": summary,
    "storage": store,
    "admin_time_sec": t_load,
    "timestamp": datetime.utcnow().isoformat() + "Z"
}

with open(PG_JSON_PATH, "w") as f:
    json.dump(results, f, indent=2)
print("Saved results:", PG_JSON_PATH)

Schema roles before: {'id_cols': [], 'dt_col': None, 'measure_col': None, 'all_cols': ['description', 'campaign', 'start_day', 'end_day']}
Schema roles now: {'id_cols': [], 'dt_col': None, 'measure_col': None, 'all_cols': ['description', 'campaign', 'start_day', 'end_day']}
Benchmark summary: {
  "transactions": 172258,
  "errors": 0,
  "elapsed_sec": 61.97706627845764,
  "throughput_tps": 2779.3829289379332,
  "p95_latency_ms": 3.173099976265803,
  "cpu_mem_summary": {
    "avg_cpu_percent": null,
    "avg_mem_percent": null,
    "p95_cpu_percent": null,
    "p95_mem_percent": null
  }
}
Latency details: C:\Users\uhati\Desktop\Project phase 2\results\pg_latency_detail.csv
Saved results: C:\Users\uhati\Desktop\Project phase 2\results\pg_results.json


  "timestamp": datetime.utcnow().isoformat() + "Z"
