In [6]:
# read the csv file into a pandas dataframe
import pandas as pd

df = pd.read_csv("filtered_llmops_database.csv")
df.head()
df = df.sample(2, random_state=42)

In [7]:
# pip install pandas pyarrow openai>=1.40.0
import os, json, time, uuid, pathlib
import pandas as pd
from openai import OpenAI

# ---------------------------
# Config
# ---------------------------
MODEL = "gpt-4o-mini"      # good price/quality for routing
BATCH_WINDOW = "24h"       # required by Batch API
INPUT_JSONL = "news_taxonomy_requests.jsonl"
OUTPUT_JSONL = "news_taxonomy_results.jsonl"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    raise RuntimeError("Set OPENAI_API_KEY in your environment.")

client = OpenAI()


In [8]:

# ---------------------------
# Taxonomy (your catalog)
# ---------------------------
CATALOG = {
    "categories": [
        {
            "key": "research_highlights",
            "name": "Research Highlights",
            "definition": "New papers, preprints, benchmarks, datasets, SOTA claims, academic results.",
            "example_signals": ["arXiv", "preprint", "benchmark", "dataset", "NeurIPS", "ICLR", "Nature", "SOTA"],
        },
        {
            "key": "industry_news",
            "name": "Industry News",
            "definition": "Funding, acquisitions, partnerships, regs/policy, leadership changes, earnings, market moves.",
            "example_signals": ["raises", "acquired", "partnership", "announces partnership", "regulation", "EU AI Act", "SEC filing"],
        },
        {
            "key": "cool_use_cases",
            "name": "Cool Use Cases",
            "definition": "Real deployments, case studies, measurable impact, pilots, customer rollouts.",
            "example_signals": ["in production", "case study", "pilot", "rollout", "customers", "ROI", "impact"],
        },
        {
            "key": "engineering_deep_dives",
            "name": "Engineering Deep Dives",
            "definition": "Concrete system/infra write-ups on scaling, latency, reliability, inference, distillation, orchestration.",
            "example_signals": ["scaling", "latency", "throughput", "GPU", "distillation", "load balancing", "traffic management", "observability"],
        },
        {
            "key": "product_launches_tools",
            "name": "Product Launches & Tools",
            "definition": "New products, features, SDKs, APIs, GA/preview, open-source releases.",
            "example_signals": ["launch", "introducing", "announces", "SDK", "API", "general availability", "open source", "release notes"],
        },
    ]
}

# ---------------------------
# Prompts
# ---------------------------
SYSTEM_PROMPT = (
    "You are an editor routing tech news into newsletter sections. "
    "Be decisive, concise, and consistent with the provided category definitions. "
    "When unsure, still score all categories in [1, 5]. Favor the most specific category. "
    "Return STRICT JSON that matches the JSON schema and nothing else."
)


In [9]:

def build_user_prompt(article: dict) -> str:
    """
    article = {
      title, summaries, tags, url, company, created_at
    }
    """
    schema = (
        "{\n"
        '  "primary_category": "string",\n'
        '  "secondary_categories": ["string", ...],\n'
        '  "scores": {"category_key": <score>, "...": <score>},\n'
        '  "newsletter_score": <score>\n'
        "}"
    )
    return (
        "You will receive:\n"
        "1) Article fields (title, summaries, tags, url, company, created_at).\n"
        "2) Category catalog (key, definition, example_signals).\n\n"
        "Tasks:\n"
        "A) Score every category from 0 to 5 based on evidence in the article.\n"
        "B) Pick the best category. Provide up to 2 secondaries if close.\n"
        "JSON schema (return exactly this):\n"
        + schema + "\n\n"
        "Article:\n" + json.dumps(article, ensure_ascii=False) + "\n\n"
        "Categories:\n" + json.dumps(CATALOG, ensure_ascii=False)
    )



In [10]:

# ---------------------------
# Build batch requests JSONL
# ---------------------------
df = df.reset_index(drop=False).rename(columns={"index": "_row_index"})
tasks = []

def _split_tags(s):
    if pd.isna(s) or not str(s).strip():
        return []
    return [t.strip() for t in str(s).split(",") if t.strip()]

for _, row in df.iterrows():
    # Build the article payload the model will see
    article = {
        "article": row.get("full_summary"),
        "tags": list(dict.fromkeys(  # de-duplicate while preserving order
            _split_tags(row.get("application_tags", "")) +
            _split_tags(row.get("tools_tags", "")) +
            _split_tags(row.get("techniques_tags", "")) +
            _split_tags(row.get("extra_tags", ""))
        )),
        "url": row.get("source_url"),
        "company": row.get("company"),
        "created_at": row.get("created_at"),
    }

    custom_id = f"row-{row['_row_index']}"
    user_prompt = build_user_prompt(article)

    # One line per request for the Batch API (Chat Completions flavor)
    # Format per OpenAI cookbook: custom_id/method/url/body
    # https://cookbook.openai.com/examples/batch_processing
    task = {
        "custom_id": custom_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": MODEL,
            "temperature": 0.0,
            "response_format": {"type": "json_object"},  # JSON Mode
            "messages": [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": user_prompt},
            ],
        },
    }
    tasks.append(task)

with open(INPUT_JSONL, "w", encoding="utf-8") as f:
    for obj in tasks:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

print(f"Wrote {len(tasks)} tasks to {INPUT_JSONL}")


Wrote 2 tasks to news_taxonomy_requests.jsonl


In [12]:
# ---------------------------
# Submit batch
# ---------------------------
batch_file = client.files.create(file=open(INPUT_JSONL, "rb"), purpose="batch")
batch = client.batches.create(
    input_file_id=batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window=BATCH_WINDOW,
)
print("Batch ID:", batch.id, "Status:", batch.status)

Batch ID: batch_68c40c24abdc81908d43d204d5e75cea Status: validating


In [13]:

# Optional: poll until done (you can also retrieve later by ID)
def wait_for_batch(batch_id: str, poll_seconds: int = 15):
    while True:
        b = client.batches.retrieve(batch_id)
        print(f"[{time.strftime('%H:%M:%S')}] status={b.status}")
        if b.status in ("completed", "failed", "expired", "cancelling", "cancelled"):
            return b
        time.sleep(poll_seconds)

batch = wait_for_batch(batch.id)
if batch.status != "completed":
    raise RuntimeError(f"Batch ended with status={batch.status}")


[17:33:56] status=in_progress
[17:34:12] status=in_progress
[17:34:27] status=in_progress
[17:34:42] status=in_progress
[17:34:58] status=in_progress
[17:35:14] status=in_progress
[17:35:30] status=completed


In [14]:
# ---------------------------
# Download results JSONL
# ---------------------------
result_bytes = client.files.content(batch.output_file_id).content
with open(OUTPUT_JSONL, "wb") as f:
    f.write(result_bytes)
print(f"Saved results to {OUTPUT_JSONL}")

Saved results to news_taxonomy_results.jsonl


In [None]:
# ---------------------------
# Simple parser: read JSONL -> normalize -> merge
# ---------------------------
import pandas as pd, json, re

def parse_batch_results(path: str):
    raw = pd.read_json(path, lines=True)

    def ok(resp):
        try:
            return resp.get("status_code", 0) == 200
        except Exception:
            return False

    # Successful rows
    ok_rows = raw[raw["response"].apply(ok)].copy()

    def to_parsed(resp):
        try:
            content = resp["body"]["choices"][0]["message"]["content"]
            return json.loads(content)
        except Exception as e:
            return {"_error": f"parse_error: {e}"}

    ok_rows["parsed"] = ok_rows["response"].apply(to_parsed)
    ok_rows["row_index"] = (
        ok_rows["custom_id"].str.extract(r"row-(\d+)", expand=False).astype(int)
    )

    # Flatten the parsed JSON
    parsed_flat = pd.json_normalize(ok_rows["parsed"])
    scores_flat  = pd.json_normalize(ok_rows["parsed"].apply(lambda d: d.get("scores", {}))).add_prefix("score__")

    out = pd.concat(
        [ok_rows[["row_index"]], parsed_flat, scores_flat],
        axis=1
    ).rename(columns={
        "primary_category": "primary_category",
        "secondary_categories": "secondary_categories",
        "newsletter_score": "newsletter_score",
        "scores": "scores_raw"
    })

    # Errors (non-200 or explicit error field)
    err_mask = ~raw["response"].apply(ok) | raw.get("error", pd.Series([None]*len(raw))).notna()
    errors = raw[err_mask][["custom_id", "response"]].copy()
    if "error" in raw.columns:
        errors["error"] = raw["error"]

    return out, errors

# Use it
parsed_df, errors_df = parse_batch_results(OUTPUT_JSONL)

# Merge back on your original df (built earlier in the script)
df = df.merge(parsed_df, left_on="_row_index", right_on="row_index", how="left").drop(columns=["row_index"])

# Optional: keep just the columns you care about
# df = df[
#   list(df.columns[:df.columns.get_loc("_row_index")]) +
#   ["primary_category", "secondary_categories", "newsletter_score"] +
#   [c for c in df.columns if c.startswith("score__")]
# ]

# Save
# df.drop(columns=["_row_index"]).to_parquet("news_taxonomy_labeled.parquet", index=False)
df.drop(columns=["_row_index"]).to_csv("news_taxonomy_labeled.csv", index=False)

In [16]:
df

Unnamed: 0,_row_index,created_at,title,industry,year,source_url,company,application_tags,tools_tags,extra_tags,...,scores.research_highlights,scores.industry_news,scores.cool_use_cases,scores.engineering_deep_dives,scores.product_launches_tools,score__research_highlights,score__industry_news,score__cool_use_cases,score__engineering_deep_dives,score__product_launches_tools
0,362,2025-05-07 07:52:00+00:00,Using GenAI to Automatically Fix Java Resource...,Tech,2024.0,https://www.uber.com/en-NL/blog/fixrleak-fixin...,uber,"code_generation,code_interpretation","cicd,documentation,guardrails","genai,static analysis,ast,java,code generation...",...,1,1,5,4,1,1,1,5,4,1
1,73,2024-11-18 08:56:00+00:00,Scaling and Optimizing Self-Hosted LLMs for De...,Tech,2023.0,https://www.youtube.com/watch?v=886hZl5Qp7g,various,"question_answering,document_processing","fastapi,vllm,mistral,scaling,monitoring,docume...","rag,inference optimization,vllm,ray serve,gpu ...",...,1,1,5,4,1,1,1,5,4,1


In [17]:
# print the first row in a json format
print(df.iloc[0].to_json(indent=2))

{
  "_row_index":362,
  "created_at":"2025-05-07 07:52:00+00:00",
  "title":"Using GenAI to Automatically Fix Java Resource Leaks",
  "industry":"Tech",
  "year":2024.0,
  "source_url":"https:\/\/www.uber.com\/en-NL\/blog\/fixrleak-fixing-java-resource-leaks-with-genai\/",
  "company":"uber",
  "application_tags":"code_generation,code_interpretation",
  "tools_tags":"cicd,documentation,guardrails",
  "extra_tags":"genai,static analysis,ast,java,code generation,prompt engineering,testing,sonarqube,gpt-4,automation,code review",
  "techniques_tags":"prompt_engineering,error_handling",
  "short_summary":"Uber developed FixrLeak, a framework combining generative AI and Abstract Syntax Tree (AST) analysis to automatically detect and fix resource leaks in Java code. The system processes resource leaks identified by SonarQube, analyzes code safety through AST, and uses GPT-4 to generate appropriate fixes. When tested on 124 resource leaks in Uber's codebase, FixrLeak successfully automated fi