#### OPRO Implementation for MMLU Computer Science Questions
### Modified version of Google DeepMind's OPRO framework optimizing for both accuracy and token count efficiency
#### paper link : https://arxiv.org/abs/2309.03409


#### to understand the underlying concept in layman terms check this simple presentation prepared by me : [Presentation](https://docs.google.com/presentation/d/1aTT6bXf9I1mFAhU5kmMMWRcUwsygnGRaXZ65U29SpWM/edit?usp=sharing)

In [5]:
!pip install numpy pandas openai tqdm tiktoken





[notice] A new release of pip is available: 23.1.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip





In [6]:
import os
import json
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
import openai
from tqdm import tqdm
import tiktoken
import random
from dataclasses import dataclass
from datetime import datetime

In [7]:
openai.api_key = os.getenv("OPENAI_API_KEY")


In [8]:
@dataclass
class OptimizationConfig:
    """Configuration parameters for optimization process"""
    max_steps: int = 150  # max optimization steps
    solutions_per_step: int = 8  # sol. generated per step
    max_history: int = 20  # max no. of previous solutions to keep
    temperature: float = 1.0  
    token_weight: float = 0.3  # weightage of token length for scoring
    max_tokens: int = 150  # token limit

@dataclass
class Solution:
    """Structure to hold solution data"""
    instruction: str
    accuracy: float
    token_count: int
    combined_score: float = 0.0
    
    def calculate_score(self, token_weight: float, max_tokens: int):
        """Calculate combined score considering both accuracy and token efficiency"""
        token_score = 1 - (self.token_count / max_tokens)
        self.combined_score = (1 - token_weight) * self.accuracy + token_weight * token_score
        return self.combined_score

In [11]:
#### Section : Token Management
##### Implement functions for token counting and management


class TokenManager:
    def __init__(self):
        self.encoder = tiktoken.encoding_for_model("gpt-3.5-turbo")
        
    def count_tokens(self, text: str) -> int:
        """Count tokens in given text"""
        return len(self.encoder.encode(text))
    
    def is_within_limit(self, text: str, max_tokens: int) -> bool:
        """Check if text is within token limit"""
        return self.count_tokens(text) <= max_tokens

In [10]:
## MMLU dataset handler

class MMluDataHandler:
    def __init__(self, data_path: str):
        """Initialize with path to MMLU CS data"""
        self.data = pd.read_csv(data_path)
        self.train_data = None
        self.test_data = None
        
    def prepare_data(self, train_ratio: float = 0.2):
        """split data into train and test sets"""
        mask = np.random.rand(len(self.data)) < train_ratio
        self.train_data = self.data[mask]
        self.test_data = self.data[~mask]
        
    def get_sample_questions(self, n: int, from_train: bool = True) -> pd.DataFrame:
        """Get n random questions from dataset"""
        source = self.train_data if from_train else self.test_data
        return source.sample(n)

In [12]:
### Scorer component

class Scorer:
    def __init__(self, model: str = "gpt-3.5-turbo"):
        self.model = model
        self.token_manager = TokenManager()
        
    def evaluate_solution(self, instruction: str, questions: pd.DataFrame) -> Tuple[float, int]:
        """Evaluate solution's accuracy and token count"""
        correct = 0
        token_count = self.token_manager.count_tokens(instruction)
        
        for _, row in questions.iterrows():
            prompt = self._create_evaluation_prompt(instruction, row)
            response = self._get_model_response(prompt)
            if self._is_correct_answer(response, row['answer']):
                correct += 1
                
        accuracy = correct / len(questions)
        return accuracy, token_count
    
    def _create_evaluation_prompt(self, instruction: str, question_data: pd.Series) -> str:
        """Create prompt for evaluation"""
        return f"{instruction}\n\nQuestion: {question_data['question']}\nA) {question_data['A']}\nB) {question_data['B']}\nC) {question_data['C']}\nD) {question_data['D']}"
    
    def _get_model_response(self, prompt: str) -> str:
        """Get response from OpenAI API"""
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            print(f"API Error: {e}")
            return ""
    
    def _is_correct_answer(self, response: str, correct_answer: str) -> bool:
        """Check if response matches correct answer"""
        return correct_answer.upper() in response.upper()


In [13]:
## optimization component - core logic behind optimization
class OptimizerEngine:
    def __init__(self, config: OptimizationConfig):
        self.config = config
        self.scorer = Scorer()
        self.token_manager = TokenManager()
        self.solutions_history: List[Solution] = []
        
    def create_meta_prompt(self, exemplars: pd.DataFrame) -> str:
        """Create meta-prompt for optimization"""
        # Sort solutions by combined score
        sorted_solutions = sorted(
            self.solutions_history[-self.config.max_history:],
            key=lambda x: x.combined_score,
            reverse=True
        )
        
        # Create prompt with previous solutions
        solutions_text = "\n".join([
            f"instruction: {sol.instruction}\naccuracy: {sol.accuracy:.2f}\ntokens: {sol.token_count}\nscore: {sol.combined_score:.2f}"
            for sol in sorted_solutions
        ])
        
        # Add exemplars
        exemplars_text = "\n\n".join([
            f"Example {i+1}:\n{row['question']}\nA) {row['A']}\nB) {row['B']}\nC) {row['C']}\nD) {row['D']}\nCorrect: {row['answer']}"
            for i, (_, row) in enumerate(exemplars.iterrows())
        ])
        
        return f"""You are an AI instruction optimizer. Create a new instruction for answering computer science questions that:
1. Maximizes accuracy in answering questions
2. Uses minimal number of tokens (be concise but effective)
3. Is different from previous instructions

Previous solutions (sorted by combined score):
{solutions_text}

Example questions:
{exemplars_text}

Generate a new instruction that should perform better than previous ones while being concise.
Instruction should be specific to computer science domain and help in answering multiple-choice questions.
"""

    def generate_solutions(self, meta_prompt: str) -> List[str]:
        """Generate new candidate solutions"""
        try:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "user", "content": meta_prompt}],
                temperature=self.config.temperature,
                n=self.config.solutions_per_step
            )
            return [choice.message.content.strip() for choice in response.choices]
        except Exception as e:
            print(f"Generation Error: {e}")
            return []

    def optimize(self, data_handler: MMluDataHandler, num_steps: int) -> Dict:
        """Run optimization process"""
        optimization_results = {
            "steps": [],
            "best_solution": None,
            "best_score": 0
        }
        
        for step in tqdm(range(num_steps)):
            # get sample questions for evaluation
            eval_questions = data_handler.get_sample_questions(3)
            
            # meta-prompt to generate solutions
            meta_prompt = self.create_meta_prompt(eval_questions)
            new_solutions = self.generate_solutions(meta_prompt)
            
            # Evaluation fo new soltions
            for instruction in new_solutions:
                if not self.token_manager.is_within_limit(instruction, self.config.max_tokens):
                    continue
                    
                accuracy, token_count = self.scorer.evaluate_solution(instruction, eval_questions)
                solution = Solution(instruction, accuracy, token_count)
                solution.calculate_score(self.config.token_weight, self.config.max_tokens)
                
                self.solutions_history.append(solution)
                
                if solution.combined_score > optimization_results["best_score"]:
                    optimization_results["best_score"] = solution.combined_score
                    optimization_results["best_solution"] = solution
            
            # step results
            step_results = {
                "step": step,
                "best_score": optimization_results["best_score"],
                "avg_score": np.mean([s.combined_score for s in self.solutions_history[-self.config.solutions_per_step:]])
            }
            optimization_results["steps"].append(step_results)
            
        return optimization_results

In [14]:
def main():
    # Init configuration
    config = OptimizationConfig()
    
    # Setup data handler
    data_handler = MMluDataHandler("path_to_mmlu_cs_data.csv")
    data_handler.prepare_data()
    
    # optimizer
    optimizer = OptimizerEngine(config)
    results = optimizer.optimize(data_handler, config.max_steps)
    
    # results timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    with open(f"optimization_results_{timestamp}.json", "w") as f:
        json.dump(results, f, indent=2, default=str)

        # display results
    best_solution = results["best_solution"]
    print(f"\nBest Solution Found:")
    print(f"Instruction: {best_solution.instruction}")
    print(f"Accuracy: {best_solution.accuracy:.2f}")
    print(f"Token Count: {best_solution.token_count}")
    print(f"Combined Score: {best_solution.combined_score:.2f}")

if __name__ == "__main__":
    main()

FileNotFoundError: [Errno 2] No such file or directory: 'path_to_mmlu_cs_data.csv'