In [None]:
import json
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import pandas as pd
from openai import OpenAI
from tqdm.auto import tqdm

In [2]:
OPENAI_MODEL = "gpt-5-mini"

In [3]:
# create directory for storing LLM responses
responses_dir = Path("../.llm_responses")
responses_dir.mkdir(exist_ok=True)

# initialize OpenAI client
openai_client = OpenAI()

In [4]:
def response_file_exists(paper_id, question_id, responses_dir):
    """Check if response file already exists for given paper and question"""
    filename = f"{paper_id}_{question_id}.json"
    filepath = responses_dir / filename
    return filepath.exists()


def save_response(paper_id, question_id, prompt, response, responses_dir):
    """Save LLM response to JSON file"""
    filename = f"{paper_id}_{question_id}.json"
    filepath = responses_dir / filename

    data = {
        "paper_id": paper_id,
        "question_id": question_id,
        "prompt": prompt,
        "response": response.choices[0].message.content,
        "model": response.model,
        "input_tokens": response.usage.prompt_tokens,
        "output_tokens": response.usage.completion_tokens,
    }

    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)


def call_llm_with_caching(paper_title, paper_abstract, question_text, paper_id, question_id):
    """
    Call OpenAI API with prompt caching.
    Structure:
    - System message (cached): Instructions
    - User message (cached): Title + Abstract
    - User message (not cached): Specific question
    """

    messages = [
        {
            "role": "system",
            "content": "You are a systematic review screening assistant. Your task is to determine if a paper meets specific inclusion or exclusion criteria based on its title and abstract. You must answer with ONLY one word: YES, NO, or UNSURE (only if you are really uncertain). Do not provide any explanation or additional text.",
        },
        {"role": "user", "content": f"Title: {paper_title}\n\nAbstract: {paper_abstract}"},
        {
            "role": "user",
            "content": f"Task: according to the paper's title and abstract, answer the following question. Only answer with YES, NO or UNSURE (just if you are really unsure). Nothing else.\n\nQuestion: {question_text}",
        },
    ]

    try:
        response = openai_client.chat.completions.create(model=OPENAI_MODEL, messages=messages)
        return response
    except Exception as e:
        print(f"Error processing paper {paper_id}, question {question_id}: {e}")
        return None

# Import data

In [None]:
llm_questions = pd.read_csv("../data/5_llm_questions.csv")
llm_questions.head()

Unnamed: 0,id,question,category,include_exclude,notes
0,I1,Does the paper describe a randomized controlle...,study_design,include,"This inclusion criterion is like the ""I3""; how..."
1,I2,Does the paper include or reference a trial re...,study_design,include,"This is a repetition of ""I1""; we can optimize"
2,I3,Is this an interventional study where particip...,study_design,include,"This tends to narrow the scope of the ""trial w..."
3,I4,Does the study include participants with schiz...,population,include,"This is like ""I4"", in the sense that this is u..."
4,I5,Does the study include participants with non-a...,population,include,"OK, this goes into detail on the diagnoses/sym..."


In [6]:
papers_df = pd.read_csv("../data/full_dataset.csv")
papers_df.head()

Unnamed: 0,id,MK_IN,MK_IN_source,label,ti_ab,FH_Err,DC_Notes,title,abstract,journal_title,authors,Publication_Year,DOI,Volume,Pages,Reviewed_Item,DB,DP
0,0,False,auto,False,False,,,"""Cognitive Assessment in Psychiatric Patients ...",Background: Electroconvulsive treatment (ECT) ...,European Journal of Cardiovascular Medicine,"Sharma, V.; Gade, V.; Rajeev, A.; Saudam, M.; ...",2023,,13(4),924-935,,Embase,Ovid Technologies
1,1,True,auto,True,True,,,"""Comparison of the impact of peer education an...","Inclusion criteria: ""All patients with psychia...",https://irct.behdasht.gov.ir/trial/80261,IRCT20241108063635N1,2024,,,,,IRCT,
2,2,True,auto,True,True,,,"""Comparison of two Anaesthesia Drugs Thiopenta...",Inclusion criteria: ASA 1 and 2 patients sched...,http://www.ctri.nic.in/Clinicaltrials/pmaindet...,CTRI/2024/09/073405,2024,,,,,CTRI,
3,3,True,auto,False,False,,,"""Efficacy of intensive bilateral Temporo-Parie...",Transcranial magnetic stimulation (TMS) is a n...,Asian Journal of Psychiatry,"Tyagi, Priya; Dhyani, Mohan; Khattri, Sumit; T...",2022,,74,1-7,,APA PsycInfo,Ovid Technologies
4,4,False,auto,False,False,,,"""Empowering Hope: Non-Pharmacological Interven...",<b>Aim</b>: Borderline personality disorder (B...,Journal of multidisciplinary healthcare,"Maulana, I.; Suryani, S.; Sriati, A.; Yosep, I...",2024,,17,4603-4609,,MEDLINE,Ovid Technologies


In [7]:
# parallelized pipeline with prompt caching preservation

def process_single_paper(paper_row: pd.Series, questions_df: pd.DataFrame, responses_dir: Path, num_questions=-1):
    """
    Process all questions for a single paper sequentially.
    This preserves prompt caching for the paper's title+abstract.

    Returns: (paper_id, num_processed, num_skipped)
    """
    paper_id = paper_row["id"]
    title = paper_row["title"]
    abstract = paper_row["abstract"]

    questions_to_process = questions_df.head(num_questions) if num_questions != -1 else questions_df

    processed = 0
    skipped = 0

    for _, question_row in questions_to_process.iterrows():
        question_id = question_row["id"]
        question_text = question_row["question"]

        # skip if already processed
        if response_file_exists(paper_id, question_id, responses_dir):
            skipped += 1
            continue

        try:
            # call LLM with caching
            response = call_llm_with_caching(title, abstract, question_text, paper_id, question_id)

            if response is None:
                continue

            # create prompt for saving
            prompt = f"Title: {title}\n\nAbstract: {abstract}\n\nQuestion: {question_text}"

            # save response
            save_response(paper_id=paper_id, question_id=question_id, prompt=prompt, response=response, responses_dir=responses_dir)
            processed += 1

        except Exception as e:
            print(f"Error processing paper {paper_id}, question {question_id}: {e}")
            continue

    return paper_id, processed, skipped

In [8]:
def process_papers_parallel(papers_df: pd.DataFrame, questions_df: pd.DataFrame, num_papers: int = -1, num_questions: int = -1, max_workers: int = 5):
    """
    Process papers in parallel while keeping questions for each paper sequential.
    This maximizes throughput while preserving prompt caching benefits.

    Args:
        papers_df: DataFrame with papers
        questions_df: DataFrame with questions
        num_papers: Number of papers to process (-1 = all)
        num_questions: Number of questions per paper (-1 = all)
        max_workers: Number of parallel workers (papers processed simultaneously)
    """
    papers_to_process = papers_df.head(num_papers) if num_papers != -1 else papers_df

    total_papers = len(papers_to_process)
    total_questions_per_paper = len(questions_df.head(num_questions) if num_questions != -1 else questions_df)
    total_tasks = total_papers * total_questions_per_paper

    print(f"Processing {total_papers} papers x {total_questions_per_paper} questions = {total_tasks} total tasks")
    print(f"Using {max_workers} parallel workers\n")

    # thread-safe counter for progress
    completed_papers = 0
    lock = threading.Lock()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # submit all papers for parallel processing
        future_to_paper = {
            executor.submit(process_single_paper, paper_row, questions_df, responses_dir, num_questions): paper_row["id"]
            for _, paper_row in papers_to_process.iterrows()
        }

        # progress bar for papers
        with tqdm(total=total_papers, desc="Processing papers", position=0) as pbar:
            for future in as_completed(future_to_paper):
                paper_id = future_to_paper[future]
                try:
                    paper_id, processed, skipped = future.result()
                    with lock:
                        completed_papers += 1
                    pbar.set_postfix({"paper_id": paper_id, "n_processed": processed, "n_skipped": skipped})
                    pbar.update(1)
                except Exception as e:
                    print(f"\nFailed to process paper {paper_id}: {e}")
                    pbar.update(1)

    print(f"Completed processing {completed_papers} papers ({processed + skipped} tasks)")
    print(f"N. requests to the LLM: {processed}")
    print(f"N. requests skipped (already processed): {skipped}")
    

In [16]:
# run pipeline
process_papers_parallel(papers_df=papers_df, questions_df=llm_questions, num_papers=-1, num_questions=-1, max_workers=20)

Processing 5747 papers x 29 questions = 166663 total tasks
Using 20 parallel workers



Processing papers:   0%|          | 0/5747 [00:00<?, ?it/s]

Completed processing 5747 papers (29 tasks)
N. requests to the LLM: 0
N. requests skipped (already processed): 29


# Analyze responses

In [17]:
def load_responses_to_dataframe(responses_dir) -> pd.DataFrame:
    """
    Load all JSON response files into a pandas DataFrame.
    """
    json_files = list(responses_dir.glob("*.json"))

    if not json_files:
        print("No response files found.")
        return pd.DataFrame()

    data_records = []

    for file in json_files:
        with open(file, "r", encoding="utf-8") as f:
            data = json.load(f)
            data_records.append(
                {
                    "paper_id": data.get("paper_id"),
                    "question_id": data.get("question_id"),
                    "response": data.get("response"),
                    "model": data.get("model"),
                    "input_tokens": data.get("input_tokens", 0),
                    "output_tokens": data.get("output_tokens", 0),
                }
            )

    df = pd.DataFrame(data_records)

    # sort by paper_id and question_id
    df = df.sort_values(["paper_id", "question_id"]).reset_index(drop=True)
    return df


# cost calculation function (use only input_tokens and output_tokens)
def add_cost_column(df, costs_path="../src/llm/models_costs.json", model=OPENAI_MODEL):
    with open(costs_path, "r", encoding="utf-8") as f:
        model_costs = json.load(f)

    def calculate_cost(row):
        input_tokens = row.get("input_tokens")
        output_tokens = row.get("output_tokens")

        costs = model_costs[model]
        input_cost_per_m = float(costs.get("COST_PER_1M_TOKENS_INPUT"))
        output_cost_per_m = float(costs.get("COST_PER_1M_TOKENS_OUTPUT"))

        input_cost = (input_tokens / 1_000_000) * input_cost_per_m
        output_cost = (output_tokens / 1_000_000) * output_cost_per_m

        return input_cost + output_cost

    df_with_cost = df.copy()
    df_with_cost["cost_usd"] = df_with_cost.apply(calculate_cost, axis=1).round(6)
    return df_with_cost

In [None]:
# # load all responses into a DataFrame
# responses_df = load_responses_to_dataframe(responses_dir)
# responses_df = add_cost_column(responses_df)
# responses_df.to_csv("../results/5/emanuele/llm_responses.csv", index=False)
# responses_df

Unnamed: 0,paper_id,question_id,response,model,input_tokens,output_tokens,cost_usd
0,0,E1,NO,gpt-5-mini-2025-08-07,805,10,0.000221
1,0,E10,UNSURE,gpt-5-mini-2025-08-07,808,332,0.000866
2,0,E11,NO,gpt-5-mini-2025-08-07,814,138,0.000480
3,0,E12,NO,gpt-5-mini-2025-08-07,806,74,0.000350
4,0,E2,NO,gpt-5-mini-2025-08-07,811,74,0.000351
...,...,...,...,...,...,...,...
166658,5746,I5,NO,gpt-5-mini-2025-08-07,917,74,0.000377
166659,5746,I6,NO,gpt-5-mini-2025-08-07,897,10,0.000244
166660,5746,I7,NO,gpt-5-mini-2025-08-07,896,74,0.000372
166661,5746,I8,NO,gpt-5-mini-2025-08-07,900,10,0.000245


In [None]:
responses_df = pd.read_csv("../results/5/emanuele/llm_responses.csv")
responses_df

In [22]:
def compute_cost_summary(responses_df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute cost summary statistics from responses DataFrame.
    Returns a DataFrame with total cost, average cost per response,
    average cost per paper, and average cost per 1000 papers.
    """

    cost_summary = {
        "total_cost": responses_df["cost_usd"].sum().round(3),
        "avg_cost_per_response": responses_df["cost_usd"].mean().round(4),
        "avg_cost_per_paper": responses_df.groupby("paper_id")["cost_usd"].mean().mean().round(4),
        "avg_cost_per_1000_papers": (
            responses_df.groupby("paper_id")["cost_usd"].mean().sum() / (len(responses_df["paper_id"].unique()) / 1000)
        ).round(2),
    }
    return pd.DataFrame([cost_summary])

In [23]:
cost_summary_df = compute_cost_summary(responses_df)
cost_summary_df

Unnamed: 0,total_cost,avg_cost_per_response,avg_cost_per_paper,avg_cost_per_1000_papers
0,54.753,0.0003,0.0003,0.33


In [None]:
# summary statistics per paper
responses_df.groupby("paper_id").agg(
    n_questions=("question_id", "count"),
    total_cost_usd=("cost_usd", "sum"),
    avg_cost_usd=("cost_usd", "mean"),
).reset_index()

In [None]:
# convert it to wide format
responses_df_wide = responses_df.pivot(index="paper_id", columns="question_id", values="response").reset_index()
responses_df_wide