In [13]:
%pip install psycopg2-binary requests


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [14]:
# %% Cell 2: Imports & cleanup unused
import os
import math
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine, text
import pandas as pd
import psycopg2
import requests
from tqdm import tqdm


In [None]:
# %% Cell 3: Configuration
BATCH_SIZE      = 128
NUM_WORKERS     = 4
DATABASE_URI    = 'postgresql+psycopg2://rg5073:rg5073pass@129.114.27.112:5432/cleaned_meta_data_db'
REMOTE_EMBED_URL = 'http://localhost:8000/embed'   # your hosted FastAPI batch-embed URL

MODEL_DETAILS = [
    {
        "column": "chunk_embedding_768",
        "model_path": "/home/pb/projects/course/sem2/mlops/project/mlops/models/distilbert.onnx",
    },
    {
        "column": "chunk_embedding_768_dyn",
        "model_path": "/home/pb/projects/course/sem2/mlops/project/mlops/models/distilbert_dyn.onnx",
    },
    {
        "column": "chunk_embedding_768_graph",
        "model_path": "/home/pb/projects/course/sem2/mlops/project/mlops/models/distilbert_opt.onnx",
    },
    {
        "column": "chunk_embedding_768_static_h",
        "model_path": "/home/pb/projects/course/sem2/mlops/project/mlops/models/distilbert_static_heavy.onnx",
    },
    {
        "column": "chunk_embedding_768_static_m",
        "model_path": "/home/pb/projects/course/sem2/mlops/project/mlops/models/distilbert_static_moderate.onnx",
    },
]

column = MODEL_DETAILS[3]["column"]

In [16]:
# %% Cell 4: Ensure the vector column exists
engine = create_engine(DATABASE_URI, pool_size=8, max_overflow=0)
with engine.begin() as conn:
    conn.execute(text(f"""
      ALTER TABLE arxiv_chunks_eval_5
      ADD COLUMN IF NOT EXISTS {column} vector(768)
    """))


In [17]:
# %% Cell 5: Count total rows
with engine.connect() as conn:
    total = conn.execute(text("SELECT COUNT(*) FROM arxiv_chunks_eval_5")).scalar_one()
print(f"Total rows to embed: {total}")


Total rows to embed: 52554


In [18]:
# %% Cell 6: Remote embedding via batch-embed endpoint
def embed_texts(texts: list[str]) -> list[list[float]]:
    """
    Send a list of texts to the remote batch-embed endpoint,
    receive back list-of-list embeddings.
    """
    resp = requests.post(REMOTE_EMBED_URL, json={"texts": texts})
    resp.raise_for_status()
    return resp.json()["embeddings"]


In [19]:
# %% Cell 7: Fetch, embed, and update one batch
def process_batch(offset: int) -> int:
    # 1) fetch batch
    with engine.connect() as conn:
        rows = conn.execute(
            text("""
              SELECT paper_id, chunk_id, chunk_data
                FROM arxiv_chunks_eval_5
               ORDER BY paper_id, chunk_id
               LIMIT :limit OFFSET :offset
            """),
            {"limit": BATCH_SIZE, "offset": offset}
        ).fetchall()
    if not rows:
        return 0

    # 2) compute embeddings remotely
    ids   = [(r.paper_id, r.chunk_id) for r in rows]
    texts = [r.chunk_data for r in rows]
    embs  = embed_texts(texts)  # calls your FastAPI

    # 3) bulk update back into Postgres
    params = [
        {"pid": pid, "cid": cid, "vec": vec}
        for (pid, cid), vec in zip(ids, embs)
    ]
    with engine.begin() as conn:
        conn.execute(
            text(f"""
              UPDATE arxiv_chunks_eval_5
                 SET {column} = :vec
               WHERE paper_id = :pid
                 AND chunk_id   = :cid
            """),
            params
        )

    return len(rows)


In [20]:
# %% Cell 8: Compute all offsets
n_batches = math.ceil(total / BATCH_SIZE)
offsets   = [i * BATCH_SIZE for i in range(n_batches)]
print(f"{n_batches} batches, offsets: {offsets[:5]}…")


822 batches, offsets: [0, 64, 128, 192, 256]…


In [21]:
import logging
import time
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    datefmt="%H:%M:%S",
)

In [22]:
# %% Cell: Sample Test with Detailed Logging
sample_texts = [
    "The quick brown fox jumps over the lazy dog.",
    "OpenAI's GPT models are powerful for NLP tasks.",
    "FastAPI + ONNX Runtime is great for serving ML models!"
]

logging.info(f"Prepared {len(sample_texts)} sample texts for embedding test")

# Measure round-trip time for the remote call
t0 = time.time()
logging.info("Sending request to remote /embed endpoint")
embs = embed_texts(sample_texts)
t1 = time.time()

elapsed = t1 - t0
logging.info(f"Received embeddings in {elapsed:.3f} seconds")

# Validate and inspect embeddings
assert isinstance(embs, list) and len(embs) == len(sample_texts), "Unexpected response format"
dim = len(embs[0])
logging.info(f"Each embedding has dimension: {dim}")

for i, vec in enumerate(embs):
    norm = np.linalg.norm(vec)
    logging.info(f" Sample {i}: first 5 dims = {vec[:5]}, L2 norm = {norm:.4f}")


14:15:01 INFO Prepared 3 sample texts for embedding test
14:15:01 INFO Sending request to remote /embed endpoint
14:15:02 INFO Received embeddings in 1.467 seconds
14:15:02 INFO Each embedding has dimension: 768
14:15:02 INFO  Sample 0: first 5 dims = [0.12146452069282532, 0.29932883381843567, 0.10192757844924927, 0.24048078060150146, -0.17713797092437744], L2 norm = 9.5468
14:15:02 INFO  Sample 1: first 5 dims = [-0.16278865933418274, 0.1646127998828888, 0.6065582036972046, 0.05404533073306084, 0.0028536012396216393], L2 norm = 10.0426
14:15:02 INFO  Sample 2: first 5 dims = [-0.21152034401893616, -0.046315282583236694, 0.8236836791038513, -0.011421268805861473, -0.44301533699035645], L2 norm = 10.6453


In [23]:
# import time
# import random
# from concurrent.futures import ThreadPoolExecutor, as_completed

# # Suppose `offsets` is your full list of chunk-offsets.
# # We’ll sample 200 of them for each benchmark.
# MAX_QUERIES = 20

# def benchmark(batch_size, num_workers, offsets):
#     # pick 200 at random, or the first 200 if you prefer determinism
#     sample = random.sample(offsets, min(MAX_QUERIES, len(offsets)))
#     logging.info(f"Benchmarking with batch_size={batch_size}, num_workers={num_workers}")
#     logging.info(f"{len(sample)} offsets to process")
#     start = time.perf_counter()
#     processed = 0

#     with ThreadPoolExecutor(max_workers=num_workers) as exe:
#         futures = [exe.submit(process_batch, off) for off in sample]
#         for f in as_completed(futures):
#             processed += f.result()
#             logging.info(f"Processed {processed} batches")

#     elapsed = time.perf_counter() - start
#     tput = processed / elapsed
#     return tput

# # Grid‐searching batch_size × num_workers
# best = (0, None, None)
# for bs in [16, 32]:
#     for nw in [8]:
#         tput = benchmark(bs, nw, offsets)
#         logging.info(f"batch={bs:<3} workers={nw:<2} → {tput:.1f} items/sec")
#         if tput > best[0]:
#             best = (tput, bs, nw)

# logging.info(f"\n🏆 Best throughput: {best[0]:.1f} items/sec "
#       f"with batch_size={best[1]} and num_workers={best[2]}")


In [None]:
# %% Cell 9: Run batches in parallel and report progress
processed = 0
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    futures = {executor.submit(process_batch, off): off for off in offsets}
    for fut in as_completed(futures):
        done = fut.result()
        processed += done
        print(f"… done {processed}/{total} rows", end="\r")

print(f"\n✅ Finished embedding & updating {processed} rows")


… done 256/52554 rows