We annotate the data with judges' scores once we get the 'hu

In [1]:
from martian_apart_hack_sdk import martian_client, utils
from martian_apart_hack_sdk.models import llm_models
import pickle
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
import time

from openai.types.chat import (
    chat_completion,
    chat_completion_message,
)

In [13]:
config = utils.load_config()
client = martian_client.MartianClient(
    api_url=config.api_url,
    api_key=config.api_key,
)

In [14]:
JUDGE_IDS = [
    'harmlessness-judge',
    'privacy-judge',
    'factual-accuracy-judge',
    'prompt-faithfulness-relevance-judge',
    'calibration-uncertainty-judge',
    'bias-fairness-judge',
    'reasoning-consistency-judge',
    'discourse-coherence-judge',
    'conciseness-redundancy-judge',
    'style-formatting-judge',
]

In [15]:
JUDGES = { id: client.judges.get(judge_id=id) for id in JUDGE_IDS }

In [16]:
def get_score(query, answer, judge):
    # Get the score from the judge
    chat_request_text = query
    chat_response_text = answer

    completion_request = {
        "model": llm_models.GPT_4O_MINI,
        "messages": [{"role": "user", "content": chat_request_text}],
    }

    chat_completion_response = chat_completion.ChatCompletion(
        id="123",
        choices=[
            chat_completion.Choice(
                finish_reason="stop",
                index=0,
                message=chat_completion_message.ChatCompletionMessage(
                    role="assistant",
                    content=chat_response_text,
                ),
            )
        ],
        created=0,
        model="gpt-4o",
        object="chat.completion",
        service_tier=None,
    )

    evaluation_result = client.judges.evaluate(
        judge,
        completion_request=completion_request,
        completion_response=chat_completion_response,
    )

    return evaluation_result.score

## Load data

In [41]:
def evaluate_single_judge(args: Tuple[str, str, str, dict]) -> Tuple[int, float]:
    """
    Evaluate a single judge - designed for parallel execution
    
    Args:
        args: Tuple of (question, answer, judge_id, judges_dict)
    Returns:
        Tuple of (judge_index, score)
    """
    question, answer, judge_id, judges = args
    judge_index = JUDGE_IDS.index(judge_id)
    score = get_score(question, answer, judges[judge_id])
    return (judge_index, score)

In [42]:
def evaluate_with_backoff(args, max_retries: int = 5, initial_delay: float = 1.0):
    """
    Wrap evaluate_single_judge with exponential back-off on exception.
    
    Args:
        args: Tuple of (question, answer, judge_id, JUDGES)
        max_retries: Number of retry attempts
        initial_delay: Seconds to sleep before first retry
    """
    delay = initial_delay
    for attempt in range(max_retries):
        try:
            return evaluate_single_judge(args)
        except Exception:
            if attempt == max_retries - 1:
                # Last attempt: re-raise or return a default
                raise
            time.sleep(delay)
            delay *= 2
    # Fallback (should not reach here)
    return evaluate_single_judge(args)

In [43]:
def evaluate_pair_parallel(question: str, answer: str, max_workers: int = None) -> List[float]:
    """
    Parallelize judge evaluations using ThreadPoolExecutor,
    with exponential back-off on each judge call.
    
    Args:
        question: The query text
        answer: The response text
        max_workers: Number of parallel workers (None = auto-select based on CPU)
    
    Returns:
        List of scores in judge order
    """
    scores = [0.0] * len(JUDGE_IDS)
    eval_args = [(question, answer, judge_id, JUDGES) for judge_id in JUDGE_IDS]
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for judge_index, score in executor.map(evaluate_with_backoff, eval_args):
            scores[judge_index] = score
    
    return scores

In [44]:
def eval_row(args):
    idx, question, answer = args
    return idx, evaluate_pair_parallel(question, answer)

In [18]:
def label_rows(df):
    completed = 0
    batch = 0

    # This is the trick to parallelize the code across rows and allow for "resuming" from a checkpoint
    results = [None] * len(df)
    tasks = [
        (i, row["instruction"], row["answer"])
        for i, (_, row) in enumerate(df.iterrows())
    ]

    # Parallelize across rows
    with ThreadPoolExecutor() as executor:
        for i, scores in executor.map(eval_row, tasks):
            results[i] = scores
            completed += 1

            # This is a really kinky way to save progress but it worked :p
            if completed % 100 == 0:
                print(f"Completed {completed} rows! Saving progress...")
                
                # The so-called "progress" files
                with open(f'progress_{batch}.pkl', 'wb') as f:
                    pickle.dump(results, f)

                batch += 1

In [None]:
with open('questions_and_answers.pkl', 'rb') as f:
    questions_and_answers = pickle.load(f)

Theoretically, the code below should eventually stop. If for some reason it stops, you can use the progress files to filter the rows that are yet to be computed and then call `label_rows` again over that subset dataframe. 

In [None]:
label_rows(questions_and_answers)

Depending on how big your dataset is, you will have to load a different progress file. Since we loaded 10K human simulated annotated records, then we load progress file **#99**

In [None]:
with open('progress_99.pkl', 'rb') as f:
    results = pickle.load(f)

# Cardinalities between the questions_and_answers and the results should match
questions_and_answers['scores'] = results

We can then save!

In [None]:
with open('qas_with_scores.pkl', 'wb') as f:
    pickle.dump(questions_and_answers, f)