In [2]:
import wandb
import tqdm
import json
import numpy as np
import pandas as pd
import concurrent.futures

from wandb.apis.public import Run
from wandb.sdk.internal.internal_api import gql
from typing import Union, List, Dict

import multiprocessing as mp
from functools import partial

api = wandb.Api(timeout=60)

pd.set_option('future.no_silent_downcasting', True)

In [9]:
query = """
    query Runs($project: String!, $entity: String!, $cursor: String, $filters: JSONString) {
        project(name: $project, entityName: $entity) {
            runs(first: 256, after: $cursor, filters: $filters) {
                edges {
                    node {
                        id
                        name
                    }
                    cursor
                }
                pageInfo {
                    hasNextPage
                    endCursor
                }
            }
        }
    }
"""
query = gql(query)

all_runs = []
cursor = None
filters = json.dumps({
    "config.model": "matrix_factorization",
    "state": "finished",
})

while True:
    variables = {
        "project": "peppermint-matrix",
        "entity": api.default_entity,
        "cursor": cursor,
        "filters": filters
    }
    
    result = api.client.execute(query, variables)
    runs_data = result["project"]["runs"]
    
    for edge in runs_data["edges"]:
        all_runs.append(edge["node"]["name"])
    
    if not runs_data["pageInfo"]["hasNextPage"]:
        print(f"Fetched {len(all_runs)} runs...")
        break

    cursor = runs_data["pageInfo"]["endCursor"]
    print(f"Fetched {len(all_runs)} runs...")

Fetched 256 runs...
Fetched 512 runs...
Fetched 768 runs...
Fetched 1024 runs...
Fetched 1276 runs...


In [4]:
def fetch_run_metadata(api: wandb.Api, run_id: str, considered_metrics: Union[str, Dict[str, float]] = "epoch/epoch") -> Dict:
    """Fetch full metadata for a single run by ID."""
    run: Run = api.run(f"{api.default_entity}/peppermint-matrix/{run_id}")
    
    run_config = {}
    for key, value in run.config.items():
        if isinstance(value, (list, dict)):
            run_config[key] = str(value)
        else:
            run_config[key] = value

    run_history = run.history()
    run_history = run_history.replace({"Infinity": np.inf, "NaN": np.nan})

    if isinstance(considered_metrics, str):
        run_history["score"] = run_history[considered_metrics]
    elif isinstance(considered_metrics, dict):
        run_history["score"] = sum(
            run_history[metric] * weight for metric, weight in considered_metrics.items()
        )
    else:
        raise ValueError("considered_metrics must be either a string or a dictionary")
    
    best_summary = run_history.iloc[run_history["score"].argmax()]
    best_summary = {f"best:{key}": val for key, val in best_summary.items()}
    
    return {
        "run_id": run.id,
        "run_name": run.name,
        "sweep_id": run.sweep.id if run.sweep else None,
        "model": run.config.get("model"),
        "created_at": run.created_at,
        **run_config,
        **{metric: run_history[metric].to_list() for metric in run_history},
        **best_summary,
        "gpu_type": run.metadata.get("gpu"),
        "cpu_count": run.metadata.get("cpu_count"),
    }

In [5]:
def process_chunk(chunk: List[str], considered_metrics: Union[str, Dict[str, float]], threads_per_process: int = 16) -> List[Dict]:
    """Process a chunk of runs using a shared API object and thread pool."""
    # Each process creates its own API instance
    api = wandb.Api(timeout=60)
    
    records = []
    errors = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=threads_per_process) as executor:
        futures = {executor.submit(fetch_run_metadata, api, run_id, considered_metrics): run_id for run_id in chunk}
        
        for future in concurrent.futures.as_completed(futures):
            run_id = futures[future]
            try:
                record = future.result()
                records.append(record)
            except Exception as e:
                errors.append((run_id, str(e)))
    
    return {"records": records, "errors": errors}

def chunk_list(data: List, chunk_size: int = 128) -> List[List]:
    """Split a list into chunks of specified size. Last chunk may be smaller."""
    chunks = []
    for i in range(0, len(data), chunk_size):
        chunks.append(data[i:i + chunk_size])
    return chunks

In [6]:
from concurrent.futures import ProcessPoolExecutor

n_processes = 8
threads_per_process = 32
sorting_criterion = {
    "epoch/test_hitrate@50": 0.5,
    "epoch/test_ndcg@50": 0.25,
}

chunks = chunk_list(all_runs)
print(f"Split {len(all_runs)} runs into {len(chunks)} chunks of sizes: {[len(c) for c in chunks]}")

# Process chunks in parallel using multiprocessing where each process uses multi-threading
records = []
errors = []
ctx = mp.get_context('fork')
with ProcessPoolExecutor(max_workers=n_processes, mp_context=ctx) as executor:
    process_kernel = partial(process_chunk, considered_metrics=sorting_criterion, threads_per_process=threads_per_process)
    
    futures = {executor.submit(process_kernel, chunk): i for i, chunk in enumerate(chunks)}
    
    for future in tqdm.tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing chunks"):
        result = future.result()
        records.extend(result["records"])
        errors.extend(result["errors"])

print(f"\nProcessed {len(records)} runs successfully")
if errors:
    print(f"Errors: {len(errors)}")
    for run_id, err in errors[:5]:
        print(f"  - {run_id}: {err}")

Split 1275 runs into 10 chunks of sizes: [128, 128, 128, 128, 128, 128, 128, 128, 128, 123]


Processing chunks:   0%|                                                                                                                                                  | 0/10 [00:00<?, ?it/s][34m[1mwandb[0m: [32m[41mERROR[0m Failed to detect the name of this notebook. You can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: [32m[41mERROR[0m Failed to detect the name of this notebook. You can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: [32m[41mERROR[0m Failed to detect the name of this notebook. You can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: [32m[41mERROR[0m Failed to detect the name of this notebook. You can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: [32m[41mERROR[0m Failed to detect the name of this notebook.


Processed 1275 runs successfully





In [10]:
records[0]

{'run_id': 'ub2tpf77',
 'run_name': 'amber-hill-1402',
 'sweep_id': None,
 'model': 'matrix_factorization',
 'created_at': '2026-01-26T17:17:37Z',
 'config': 'configs/single_runs/baseline_matrix_factorization.yaml',
 'shuffle': True,
 'log_freq': 'epoch',
 'max_epoch': 64,
 'batch_size': 16384,
 'random_seed': 4461,
 'store_model': False,
 'learning_rate': 0.01,
 'early_stopping': False,
 'l1_regularization': 0,
 'l2_regularization': 1e-10,
 'evaluation_cutoffs': '[2, 10, 20, 50]',
 'early_stopping_mode': 'max',
 'embedding_dimension': 512,
 'early_stopping_monitor': 'test_recall@10',
 'embedding_dropout_rate': 0,
 'early_stopping_patience': 0,
 'epoch/train_mrr@20': [0.36724090576171875,
  0.2990785837173462,
  0.4848828911781311,
  0.6066086292266846,
  0.7094389200210571,
  0.7866910099983215,
  0.8401764631271362,
  0.8736442923545837,
  0.90346759557724,
  0.9209550023078918,
  0.9301645159721375,
  0.9444308876991272,
  0.9503794312477112,
  0.9599955081939697,
  0.96525335311889

In [6]:
batch_size = 8 * 32
sorting_criterion = {
    "epoch/test_hitrate@50": 0.5,
    "epoch/test_ndcg@50": 0.25,
}

records = []
with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
    futures = {executor.submit(fetch_run_metadata, api, run_id, sorting_criterion): run_id for run_id in all_runs}
    
    for future in tqdm.tqdm(concurrent.futures.as_completed(futures), total=len(futures), ncols=128):
        try:
            record = future.result()
            records.append(record)
        except Exception as e:
            run_id = futures[future]
            print(f"Error processing run {run_id}: {e}")

print(f"Processed {len(records)} runs")

100%|███████████████████████████████████████████████████████████████████████████████████████| 1124/1124 [00:57<00:00, 19.44it/s]


Processed 1124 runs


In [None]:
batch_size = 64
sorting_criterion = {
    "epoch/test_hitrate@50": 0.5,
    "epoch/test_ndcg@50": 0.25,
}

records = []
with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
    futures = {executor.submit(fetch_run_metadata, run_id, sorting_criterion): run_id for run_id in all_runs}
    
    for future in tqdm.tqdm(concurrent.futures.as_completed(futures), total=len(futures), ncols=128):
        try:
            record = future.result()
            records.append(record)
        except Exception as e:
            run_id = futures[future]
            print(f"Error processing run {run_id}: {e}")

print(f"Processed {len(records)} runs")

100%|███████████████████████████████████████████████████████████████████████████████████████| 1124/1124 [01:21<00:00, 13.76it/s]

Processed 1124 runs





In [None]:
batch_size = 16
sorting_criterion = {
    "epoch/test_hitrate@50": 0.5,
    "epoch/test_ndcg@50": 0.25,
}

records = []
with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
    futures = {executor.submit(fetch_run_metadata, run_id, sorting_criterion): run_id for run_id in all_runs}
    
    for future in tqdm.tqdm(concurrent.futures.as_completed(futures), total=len(futures), ncols=128):
        try:
            record = future.result()
            records.append(record)
        except Exception as e:
            run_id = futures[future]
            print(f"Error processing run {run_id}: {e}")

print(f"Processed {len(records)} runs")

  0%|                                                                                                  | 0/1124 [00:00<?, ?it/s]

100%|███████████████████████████████████████████████████████████████████████████████████████| 1124/1124 [04:11<00:00,  4.47it/s]

Processed 1124 runs





In [None]:
records[1]

{'run_id': '0i0wt2hk',
 'run_name': 'usual-sweep-251',
 'sweep_id': 'fcxm7zni',
 'model': 'matrix_factorization',
 'created_at': '2026-01-29T10:28:56Z',
 'shuffle': False,
 'log_freq': 'epoch',
 'max_epoch': 64,
 'batch_size': 16384,
 'random_seed': 6883,
 'store_model': False,
 'learning_rate': 0.01,
 'early_stopping': False,
 'l1_regularization': 1e-08,
 'l2_regularization': 0,
 'evaluation_cutoffs': '[2, 10, 20, 50]',
 'early_stopping_mode': 'max',
 'embedding_dimension': 4,
 'early_stopping_monitor': 'test_recall@10',
 'embedding_dropout_rate': 0,
 'early_stopping_patience': 0,
 'epoch/train_hitrate@20': [0.1661614179611206,
  0.4894530773162842,
  0.5588606595993042,
  0.5749337077140808,
  0.5791966915130615,
  0.5853227376937866,
  0.5880699753761292,
  0.5891120433807373,
  0.5910382866859436,
  0.5913540720939636,
  0.5977011322975159,
  0.6009852290153503,
  0.6012062430381775,
  0.6033219695091248,
  0.6086270213127136,
  0.6153530478477478,
  0.6185107827186584,
  0.6251736

# All Bulk

In [None]:
query = """
    query Runs($project: String!, $entity: String!, $cursor: String, $filters: JSONString, $historySpecs: [JSONString!]!) {
        project(name: $project, entityName: $entity) {
            runs(first: 256, after: $cursor, filters: $filters) {
                edges {
                    node {
                        id
                        name
                        state
                        config
                        summaryMetrics
                        systemMetrics
                        createdAt
                        heartbeatAt
                        historyKeys
                        sweep {
                            id
                        }
                        sampledHistory(specs: $historySpecs)
                    }
                    cursor
                }
                pageInfo {
                    hasNextPage
                    endCursor
                }
            }
        }
    }
"""
query = gql(query)

all_runs = []
cursor = None
filters = json.dumps({
    "config.model": "matrix_factorization",
})

history_specs = [
    json.dumps({"key": "epoch/epoch", "samples": 100}),
    json.dumps({"key": "epoch/train_loss", "samples": 100}),
    json.dumps({"key": "epoch/test_loss", "samples": 100}),
    json.dumps({"key": "epoch/test_recall@20", "samples": 100}),
    json.dumps({"key": "epoch/test_ndcg@20", "samples": 100}),
    json.dumps({"key": "epoch/test_hitrate@50", "samples": 100}),
    json.dumps({"key": "epoch/test_ndcg@50", "samples": 100}),
    json.dumps({"key": "_runtime", "samples": 100}),
    json.dumps({"key": "_timestamp", "samples": 100}),
]

while True:
    variables = {
        "project": "peppermint-matrix",
        "entity": "feedr",
        "cursor": cursor,
        "filters": filters,
        "historySpecs": history_specs
    }
    
    result = api.client.execute(query, variables)
    runs_data = result["project"]["runs"]
    
    for edge in runs_data["edges"]:
        all_runs.append(edge["node"])
    
    if not runs_data["pageInfo"]["hasNextPage"]:
        print(f"Fetched {len(all_runs)} runs...")
        break

    cursor = runs_data["pageInfo"]["endCursor"]
    print(f"Fetched {len(all_runs)} runs...")

Fetched 256 runs...
Fetched 512 runs...
Fetched 768 runs...
Fetched 1024 runs...
Fetched 1124 runs...


In [None]:
all_runs[10]

{'id': 'UnVuOnYxOnRmem1zc2sxOnBlcHBlcm1pbnQtbWF0cml4OmZlZWRy',
 'name': 'tfzmssk1',
 'state': 'finished',
 'config': '{"model": {"value": "matrix_factorization"}, "_wandb": {"value": {"e": {"9ftad0wd8f0ttfso9w51wiui656g5uhf": {"os": "Linux-6.14.0-1018-aws-x86_64-with-glibc2.39", "git": {"commit": "6cec2f2d652033165f0e7ff4b4179aee311cc8fd", "remote": "git@github.com:chaald/peppermint-matrix.git"}, "gpu": "NVIDIA A10G", "args": ["--sweep_id=fcxm7zni", "--nworker=4", "--nruns=32"], "disk": {"/": {"used": "6309182631936", "total": "10400076980224"}}, "host": "ip-10-36-3-206", "root": "/home/hafidh_rendyanto/documents/peppermint-matrix", "email": "hafidh.rendyanto@gmail.com", "memory": {"total": "66681044992"}, "python": "CPython 3.12.3", "program": "/home/hafidh_rendyanto/documents/peppermint-matrix/hyperparameter_search.py", "codePath": "hyperparameter_search.py", "writerId": "9ftad0wd8f0ttfso9w51wiui656g5uhf", "cpu_count": 8, "gpu_count": 1, "startedAt": "2026-01-29T10:11:53.859188Z", "e

In [None]:
# Try fetching a single run with sampledHistory to see if it works
single_run_query = """
    query RunHistory($project: String!, $entity: String!, $runName: String!, $historySpecs: [JSONString!]!) {
        project(name: $project, entityName: $entity) {
            run(name: $runName) {
                id
                name
                sampledHistory(specs: $historySpecs)
            }
        }
    }
"""
single_run_query = gql(single_run_query)

history_specs = [
    json.dumps({"key": "epoch/epoch", "samples": 10000}),
    json.dumps({"key": "epoch/train_loss", "samples": 10000}),
    json.dumps({"key": "epoch/test_hitrate@50", "samples": 10000}),
    json.dumps({"key": "epoch/test_ndcg@50", "samples": 10000}),
]

result = api.client.execute(single_run_query, variable_values={
    "project": "peppermint-matrix",
    "entity": "feedr",
    "runName": "5tvb3woc",  # Use the run name from your output
    "historySpecs": history_specs
})

print("Single run sampledHistory:")
print(result["project"]["run"]["sampledHistory"])

Single run sampledHistory:
[[], [], [], []]
