In [1]:
# installing Libraries
%pip install anthropic openai python-dotenv



# Importing Libaries

In [2]:
import math
import re
import json
import os
import asyncio
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass

import openai
from openai import AsyncOpenAI
import anthropic
import queue

from dotenv import load_dotenv

load_dotenv()


True

In [3]:
class BaseJudge(ABC):
    """Abstract base class for instruction-following judges."""

    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    async def evaluate(self, instructions: List[str], original_paragraph: str, completion: str) -> int:
        """
        Evaluate how many instructions were followed exactly.

        Args:
            instructions: List of editing instructions to evaluate
            original_paragraph: The original text before editing
            completion: The edited text to evaluate

        Returns:
            Number of instructions followed exactly (0 to len(instructions))
        """
        pass

    def _create_evaluation_prompt(self, instructions: List[str], original_paragraph: str, completion: str) -> Tuple[str, str]:
        """Create system and user prompts for evaluation."""
        system_prompt = (
            "You are an expert evaluator of instruction-following in text editing tasks. "
            "You will be given a set of editing instructions, the original paragraph, and a modified paragraph. "
            "You must return ONLY the number of instructions that were followed exactly. "
            "Do not explain or justify. Just return a single number (0 to N)."
        )

        user_prompt = f"""
            Instructions:
            {chr(10).join(f"{i+1}. {instr}" for i, instr in enumerate(instructions))}

            Original Paragraph:
            {original_paragraph}

            Edited Output:
            {completion}

            How many of the above {len(instructions)} instructions were followed exactly?
            Return just the number, nothing else.
        """
        return system_prompt, user_prompt

    def _extract_score(self, response_text: str, max_instructions: int) -> int:
        """Extract numerical score from response text."""
        matched_str = re.search(r'\d+', response_text.strip())
        if matched_str:
            value = int(matched_str.group(0))
            return max(0, min(value, max_instructions))
        else:
            print(f"[{self.name}] Unexpected format: {response_text}")
            return 0

In [4]:

class OpenAIJudge(BaseJudge):
    """OpenAI-based instruction-following judge."""

    def __init__(self, model: str = "gpt-4"):
        super().__init__("OpenAI")

        api_key = os.getenv('OPENAI_API_KEY')
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model

    async def evaluate(self, instructions: List[str], original_paragraph: str, completion: str) -> int:
        """Evaluate using OpenAI GPT model."""
        system_prompt, user_prompt = self._create_evaluation_prompt(instructions, original_paragraph, completion)

        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=0,
                max_tokens=50
            )
            reply = response.choices[0].message.content
            return self._extract_score(reply, len(instructions))

        except Exception as e:
            print(f"[{self.name}] Error: {e}")
            return 0

In [5]:
class ClaudeJudge(BaseJudge):
    """Claude-based instruction-following judge."""

    def __init__(self, model: str = "claude-3-5-sonnet-20240620"):
        super().__init__("Claude")
        api_key = os.getenv('ANTHROPIC_API_KEY')
        self.client = anthropic.AsyncAnthropic(api_key=api_key)
        self.model = model

    async def evaluate(self, instructions: List[str], original_paragraph: str, completion: str) -> int:
        """Evaluate using Claude model."""
        system_prompt, user_prompt = self._create_evaluation_prompt(instructions, original_paragraph, completion)

        try:
            response = await self.client.messages.create(
                model=self.model,
                max_tokens=50,
                temperature=0,
                system=system_prompt,
                messages=[
                    {"role": "user", "content": user_prompt}
                ]
            )
            reply = response.content[0].text
            return self._extract_score(reply, len(instructions))

        except Exception as e:
            print(f"[{self.name}] Error: {e}")
            return 0

In [6]:
class InstructionProcessor:
    """Utility class for processing instructions and prompts."""

    @staticmethod
    def extract_instructions_and_paragraph(prompt: str) -> Tuple[List[str], str]:
        """
        Extract instructions and paragraph from a formatted prompt.

        Args:
            prompt: The formatted prompt containing instructions and paragraph

        Returns:
            Tuple of (instructions_list, paragraph)

        Raises:
            ValueError: If markers are not found or content is empty
        """
        start_marker = "Instructions:"
        end_marker = "Return the final output as plain text with line breaks between paragraphs."

        if start_marker not in prompt or end_marker not in prompt:
            raise ValueError("Start or end marker not found in prompt.")

        # Find start and end positions
        start_index = prompt.index(start_marker) + len(start_marker)
        end_index = prompt.index(end_marker) + len(end_marker)

        # Extract instruction block
        instructions_block = prompt[start_index:prompt.index(end_marker)].strip()

        # Split instructions into list by lines and remove empty ones
        instructions_list = [line.strip() for line in instructions_block.splitlines() if line.strip()]

        # Extract paragraph after end marker
        paragraph = prompt[end_index:].strip()

        if not instructions_list:
            raise ValueError("No instructions found between markers.")
        if not paragraph:
            raise ValueError("No paragraph found after the instructions block.")

        return instructions_list, paragraph

    @staticmethod
    def chunk_instructions(instructions: List[str], n_parts: int = 2) -> List[List[str]]:
        """
        Split instructions into approximately equal chunks.

        Args:
            instructions: List of instructions to split
            n_parts: Number of parts to split into

        Returns:
            List of instruction chunks
        """
        total = len(instructions)
        base_size = total // n_parts
        remainder = total % n_parts

        chunks = []
        start = 0

        for i in range(n_parts):
            # First 'remainder' chunks get one extra item
            chunk_size = base_size + (1 if i < remainder else 0)
            end = start + chunk_size
            chunks.append(instructions[start:end])
            start = end

        return chunks

In [7]:
class InstructionFollowingEvaluator:

    """
    Main evaluator class that coordinates multiple judges to score instruction-following with batch processing.
    """

    def __init__(self, judges: List[BaseJudge] = None, n_chunks: int = 2,
                 batch_size: int = 10, max_concurrent_batches: int = 3):
        """
        Initialize the evaluator.

        Args:
            judges: List of judge instances to use for evaluation. If None, auto-creates based on available API keys.
            n_chunks: Number of chunks to split instructions into for parallel processing
            batch_size: Number of completions to process in each batch
            max_concurrent_batches: Maximum number of batches to process concurrently
        """
        self.n_chunks = n_chunks
        self.batch_size = batch_size
        self.max_concurrent_batches = max_concurrent_batches
        self.processor = InstructionProcessor()
        self.judges = judges

        # output queue
        self.q = queue.Queue()

    async def _evaluate_single_completion(
        self,
        instructions: List[str],
        paragraph: str,
        completion: str,
        model_id:str
    ) -> List[float]:
        """
        Evaluate a single completion using all judges.

        Args:
            instructions: List of instructions to evaluate against
            paragraph: Original paragraph
            completion: Completion to evaluate

        Returns:
            List of scores from each judge (normalized 0.0 to 1.0)
        """
        # Split instructions into chunks for parallel processing
        instruction_chunks = self.processor.chunk_instructions(instructions, self.n_chunks)
        total_instructions = len(instructions)

        judge_scores = []

        # Evaluate with each judge
        for judge in self.judges:
            # Process chunks in parallel for this judge
            chunk_tasks = [
                judge.evaluate(chunk, paragraph, completion)
                for chunk in instruction_chunks
            ]

            chunk_results = await asyncio.gather(*chunk_tasks, return_exceptions=True)

            # Sum up followed instructions across chunks
            total_followed = 0
            for result in chunk_results:
                if isinstance(result, Exception):
                    print(f"[{judge.name}] Chunk evaluation failed: {result}")
                    continue
                total_followed += result

            # Normalize score
            normalized_score = total_followed / total_instructions if total_instructions > 0 else 0.0
            self.q.put([judge.name, model_id, normalized_score])
            judge_scores.append(normalized_score)

        return judge_scores

    async def _process_batch(
        self,
        batch_data: List[Tuple[str, str, str]],
    ) -> List[Tuple[str, float]]:
        """
        Process a batch of completions asynchronously.

        Args:
            batch_data: List of (prompt, completion, model_id) tuples
            instructions: List of instructions to evaluate against
            paragraph: Original paragraph

        Returns:
            List of (model_id, score) tuples
        """
        batch_results = []

        # Create tasks for all completions in this batch
        evaluation_tasks = []
        for obj in batch_data:
            instructions, paragraph = self.processor.extract_instructions_and_paragraph(obj[0])
            task = self._evaluate_single_completion(instructions, paragraph, obj[1], obj[2])
            evaluation_tasks.append((task, obj[2]))

        print(f"Processing batch of {len(evaluation_tasks)} completions...")

        # Execute all evaluations in the batch concurrently
        for task, model_id in evaluation_tasks:
            try:
                judge_scores = await task

                # Average across judges for this completion
                if judge_scores:
                    final_score = sum(judge_scores) / len(judge_scores)
                    batch_results.append((model_id, final_score))
                else:
                    print(f"No valid scores for model {model_id}")
                    batch_results.append((model_id, 0.0))

            except Exception as e:
                print(f"Evaluation failed for model {model_id}: {e}")
                batch_results.append((model_id, 0.0))

        return batch_results

    def _create_batches(self, data: List[Tuple[str, str, str]]) -> List[List[Tuple[str, str, str]]]:
        """Create batches from the input data."""
        batches = []
        for i in range(0, len(data), self.batch_size):
            batch = data[i:i + self.batch_size]
            batches.append(batch)
        return batches

    async def score(
        self,
        prompts: List[str],
        completions: List[str],
        models: List[str]
    ) -> Dict[str, float]:
        """
        Score prompt-completion pairs using multiple judges with batch processing.

        Args:
            prompts: List of full prompts (including instructions + paragraph)
            completions: List of completions corresponding to the prompts
            models: List of model identifiers for each completion

        Returns:
            Dictionary: model_id → average compliance score (0.0 to 1.0)
        """
        if not (len(prompts) == len(completions) == len(models)):
            raise ValueError("Input lists must be equal length")

        if not prompts:
            return {}

        # Prepare data for batch processing
        data = list(zip(prompts, completions, models))
        batches = self._create_batches(data)

        print(f"Processing {len(data)} completions in {len(batches)} batches...")

        model_scores = defaultdict(list)

        # Process batches with concurrency control
        semaphore = asyncio.Semaphore(self.max_concurrent_batches)

        async def process_batch_with_semaphore(batch):
            async with semaphore:
                return await self._process_batch(batch)

        # Create batch processing tasks
        batch_tasks = [process_batch_with_semaphore(batch) for batch in batches]

        # Process all batches
        batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

        # Collect results from all batches
        for batch_result in batch_results:
            if isinstance(batch_result, Exception):
                print(f"Batch processing failed: {batch_result}")
                continue

            for model_id, score in batch_result:
                model_scores[model_id].append(score)

        # Average scores per model
        avg_scores = {
            model_id: sum(scores) / len(scores) if scores else 0.0
            for model_id, scores in model_scores.items()
        }

        # Extract elements and create dictionary
        scored_list = []
        while not self.q.empty():
            judge_name, model_id, score = self.q.get()
            scored_list.append({"judge_name":judge_name, "model_id":model_id, "score":score})

        with open("./scored_output.json", "w") as f:
            json.dump(scored_list, f)

        return avg_scores

In [8]:
async def create_evaluator(
    batch_size: int = 10,
    max_concurrent_batches: int = 3
) -> InstructionFollowingEvaluator:
    """
    Create an evaluator with specified judges and flexible API key configuration.

    Args:
        openai_api_key: OpenAI API key (overrides other sources)
        claude_api_key: Claude API key (overrides other sources)
        use_openai: Whether to include OpenAI judge
        use_claude: Whether to include Claude judge
        config_file: Path to JSON config file with API keys
        api_config: Dictionary with API key configuration
        batch_size: Number of completions to process in each batch
        max_concurrent_batches: Maximum number of batches to process concurrently

    Returns:
        Configured InstructionFollowingEvaluator instance
    """

    # Create judges based on preferences and availability
    judges = []
    judges.append(OpenAIJudge())
    judges.append(ClaudeJudge())

    return InstructionFollowingEvaluator(judges, batch_size=batch_size,
                                       max_concurrent_batches=max_concurrent_batches)

In [9]:
async def evaluate_completions(
    prompts: List[str],
    completions: List[str],
    models: List[str],
    openai_api_key: Optional[str] = None,
    claude_api_key: Optional[str] = None,
    config_file: Optional[str] = None,
    batch_size: int = 10,
    max_concurrent_batches: int = 3
) -> Dict[str, float]:
    """
    Convenience function to evaluate completions with flexible API key configuration and batch processing.

    Args:
        prompts: List of prompts
        completions: List of completions
        models: List of model identifiers
        openai_api_key: OpenAI API key
        claude_api_key: Claude API key
        config_file: Path to config file with API keys
        batch_size: Number of completions to process in each batch
        max_concurrent_batches: Maximum number of batches to process concurrently

    Returns:
        Dictionary of model scores
    """
    evaluator = await create_evaluator(
        batch_size=batch_size,
        max_concurrent_batches=max_concurrent_batches
    )

    return await evaluator.score(prompts, completions, models)

In [10]:
# creating data
import re
from typing import List

def read_prompt_as_text(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()

def load_completions(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    completions = re.findall(r"Completion \d+:\s*(.*?)\s*(?=Completion \d+:|$)", text, re.DOTALL)
    return [c.strip() for c in completions]

def load_models(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        models = [line.strip() for line in f.readlines() if line.strip()]
    return models

In [11]:
async def main():
    # Sample data
    prompt = read_prompt_as_text("/content/Prompts.txt")
    completions = load_completions("/content/Completions.txt")
    models = load_models("/content/Models.txt")
    prompts = [prompt] * len(completions)

    # Create evaluator with batch processing configuration
    evaluator1 = await create_evaluator(
        batch_size=5,  # Process 5 completions per batch
        max_concurrent_batches=2  # Run maximum 2 batches concurrently
    )

    # Get scores using batch processing
    scores = await evaluator1.score(prompts, completions, models)

    print("Model Scores:")
    for model_id, score in scores.items():
        print(f"{model_id}: {score:.3f}")

In [12]:
 await main()

Processing 3 completions in 1 batches...
Processing batch of 3 completions...
Model Scores:
Gemini: 0.464
Mistral: 0.500
OpenAi: 0.464


Indicual testing

In [None]:
!pip install pytest pytest-asyncio


Collecting pytest-asyncio
  Downloading pytest_asyncio-1.1.0-py3-none-any.whl.metadata (4.1 kB)
Downloading pytest_asyncio-1.1.0-py3-none-any.whl (15 kB)
Installing collected packages: pytest-asyncio
Successfully installed pytest-asyncio-1.1.0


In [None]:
prompt_test1 = extend_list_by_repeating(prompt1, 90)
ompletion_test1 = extend_list_by_repeating(completions, 90)
model_test1 = extend_list_by_repeating(models, 90)



In [None]:
print(len(model_test1))

90


In [None]:
import asyncio
from typing import List, Dict
from collections import defaultdict

# -------- Dummy Mock Judge (returns fixed score) --------
class MockJudge:
    def __init__(self, fixed_score=3):
        self.fixed_score = fixed_score

    async def evaluate(self, prompt: str, completion: str, model_id: str) -> Dict[str, float]:
        await asyncio.sleep(0.01)  # simulate async delay
        return self.fixed_score


In [None]:
async def test_async_execution():
    print("🔄 Running async execution test...")

    evaluator = InstructionFollowingEvaluator(
        judges=[MockJudge(fixed_score=4)],
        n_chunks=10,
        batch_size=10,
        max_concurrent_batches=5,
    )

    scores = await evaluator.score(prompt_test1, ompletion_test1, model_test1)

    assert isinstance(scores, dict), "❌ Output should be a dictionary"
    assert len(scores) > 0, "❌ At least one model score should be computed"
    print("✅ Async execution test passed!")
    print(scores)




In [None]:
await test_async_execution()

🔄 Running async execution test...
Processing 90 completions in 9 batches...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
Processing batch of 10 completions...
✅ Async execution test passed!
{'Gemini': 2.8571428571428568, 'Mistral': 2.8571428571428568, 'OpenAi': 2.8571428571428568}
