Insight 1: In contrast to the scarcity of global optima, local optima are commonly prevalent and perform well, making them more valuable for query-efficient prompt optimization. \
Insight 2:  the selection of the input domain, including both the generation and representation of prompt candidates, will influence the identification of high-performing prompts     especially those local optimal ones

In [1]:
import numpy as np
from typing import Callable
from functools import partial
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
from transformers import AutoTokenizer, AutoModel
from scipy.spatial.distance import cosine
from sklearn.decomposition import PCA
from sklearn.metrics import f1_score
import random
import os
import yaml
from tqdm import tqdm
from abc import ABC, abstractmethod

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class ZOPOAlgorithm:
    def __init__(
        self,
        prompt_generation: Callable,
        nlp_embedding: Callable,
        inverse_embedding: Callable,
        projection_function: Callable,
        evaluation_function: Callable,
        m: int = 10,
        T: int = 5,
        delta: float = 0.1,  # δ parameter from Prop. 1
        kappa: float = 1.0,  # κ parameter bounding kernel function
        alpha: float = 0.01,  # Step size for gradient updates
    ):
        """
        Initialize the ZOPO (Zero-Order Prompt Optimization) Algorithm
        
        Args:
            prompt_generation_model: Model g(·) that generates prompts
            nlp_embedding_model: Model h(·) that embeds prompts into vector space
            inverse_embedding_model: Function h^-1(·) that maps from embedding back to prompt
            projection_function: Function P_Z for projection in the embedding space
            evaluation_function: Function F(·) to evaluate embeddings
            m: Size of prompt candidates
            T: Number of iterations
            delta: Confidence parameter δ ∈ (0, 1)
            kappa: Upper bound κ on kernel function
            alpha: Step size for gradient updates
        """
        self.g = prompt_generation
        self.h = nlp_embedding
        self.h_inverse = inverse_embedding
        self.P_Z = projection_function
        self.F = evaluation_function
        self.m = m
        self.T = T
        self.delta = delta
        self.kappa = kappa
        self.alpha = alpha

        self.generator = pipeline('text-generation', model='ehristoforu/coolqwen-3b-it', device_map='auto')
        model_name = "sentence-transformers/all-MiniLM-L6-v2"
        self.embedding_tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.embedding_model = AutoModel.from_pretrained(model_name)
        
        # For tracking historical queries and their relevance
        self.historical_queries = {}  # Dictionary mapping z to its historical queries
        self.query_evaluations = {}   # Store F(z) values for all queries
        
    def run(self, D_demo):
        """
        Run the ZOPO algorithm
        
        Args:
            D_demo: Demonstration data
            
        Returns:
            Optimized prompt
        """
        # Initialize sets V and Z
        V = set()  # Set of prompts
        Z = set()  # Set of embeddings
        
        # Step 2-6: Fill V and Z with m prompt candidates
        while len(V) < self.m:
            # Generate a prompt using model g
            v = self.g(self.generator, D_demo)
            
            # Embed the prompt using model h
            z = self.h(v, self.embedding_model, self.embedding_tokenizer)
            
            # Convert z to tuple for hashability if it's a numpy array
            if isinstance(z, np.ndarray):
                z_key = tuple(z.flatten())
            else:
                z_key = z
            
            # Add to sets if not already present
            if v not in V:
                V.add(v)
                Z.add(z_key)
                
                # Initialize historical queries for this z
                self.historical_queries[z_key] = []
                
                # Evaluate F(z) and store it
                self.query_evaluations[z_key] = self.F(z)
        
        # Convert Z to a list for iteration and indexing
        Z_list = list(Z)
        
        # Step 7-12: Iterate T times
        for t in range(1, self.T + 1):
            for i, z_t in enumerate(Z_list):
                # Check if local exploration is needed based on uncertainty
                if self.should_explore(z_t):
                    # Perform uncertainty-informed local exploration
                    eta_t = self.uncertainty_informed_local_exploration(z_t)
                    
                    # Update z_t+1 using projection
                    # Apply gradient step: z + η where η is the exploration direction
                    z_t_plus_eta = self.add_vectors(z_t, eta_t)
                    z_t_plus_1 = self.P_Z(z_t_plus_eta)
                    
                    # Map back to prompt space
                    v_t_plus_1 = self.h_inverse(z_t_plus_1)
                    
                    # Convert to hashable format
                    if isinstance(z_t_plus_1, np.ndarray):
                        z_t_plus_1_key = tuple(z_t_plus_1.flatten())
                    else:
                        z_t_plus_1_key = z_t_plus_1
                    
                    # Query to yield evaluation
                    F_z_t_plus_1 = self.F(z_t_plus_1)
                    
                    # Store evaluation and update historical queries
                    self.query_evaluations[z_t_plus_1_key] = F_z_t_plus_1
                    
                    # Add this query to historical queries for z_t
                    self.historical_queries[z_t].append(z_t_plus_1_key)
                    
                    # Initialize historical queries for the new point if needed
                    if z_t_plus_1_key not in self.historical_queries:
                        self.historical_queries[z_t_plus_1_key] = []
                    
                    # Update the list with the new point
                    Z_list[i] = z_t_plus_1_key
        
        # Step 13-14: Return the best prompt
        z_star = max(Z_list, key=lambda z: self.query_evaluations[z])
        v_star = self.h_inverse(z_star)
        
        return v_star
    
    def add_vectors(self, z, eta):
        """Helper function to add vectors that might be in tuple form"""
        if isinstance(z, tuple) and isinstance(eta, tuple):
            return tuple(np.array(z) + np.array(eta))
        elif isinstance(z, tuple):
            return tuple(np.array(z) + eta)
        elif isinstance(eta, tuple):
            return tuple(z + np.array(eta))
        else:
            return z + eta
    
    def should_explore(self, z):
        """
        Determine if we should do local exploration for embedding z
        According to Section 4.3, this is defined by the indicator function I_A_t(z_t)
        which equals 1 if z_t ∈ A_t and 0 otherwise.
        
        Args:
            z: The embedding to check
            
        Returns:
            Boolean indicating whether to explore
        """
        # Calculate the effective relevance β for z based on historical queries
        beta = self.calculate_effective_relevance(z)
        
        # Get the number of relevant queries |N_{z,β}|
        N_z_beta = self.get_relevant_queries_count(z, beta)
        
        # According to the paper, we should explore when β or |N_{z,β}| is small
        # Let's define thresholds for these values
        min_beta_threshold = 0.3  # Minimum acceptable effective relevance
        min_queries_threshold = 5  # Minimum number of relevant queries
        
        # Exploration condition: z_t ∈ A_t
        return beta < min_beta_threshold or N_z_beta < min_queries_threshold
    
    def calculate_effective_relevance(self, z):
        """
        Calculate effective relevance β for a given input z
        This is a measure of how relevant historical queries are for gradient estimation at z
        
        Args:
            z: The input embedding
            
        Returns:
            The effective relevance β
        """
        # This would depend on the specific kernel function k(z, z')
        # and the gradient estimation method being used
        # For simplicity, we'll use a placeholder implementation
        historical_queries = self.historical_queries.get(z, [])
        
        if not historical_queries:
            return 0.0  # No historical queries, so relevance is zero
        
        # Calculate kernel-based similarity between z and historical queries
        similarities = [self.kernel(z, z_prime) for z_prime in historical_queries]
        
        # Beta is the average similarity
        return sum(similarities) / len(similarities) if similarities else 0.0
    
    def kernel(self, z, z_prime):
        """
        Kernel function k(z, z') measuring similarity between embeddings
        The paper assumes k(z, z') ≤ α and |k''(z, z)| ≤ κ
        
        Args:
            z: First embedding
            z_prime: Second embedding
            
        Returns:
            Kernel similarity value
        """
        # Convert tuples to arrays if needed
        z_array = np.array(z) if isinstance(z, tuple) else z
        z_prime_array = np.array(z_prime) if isinstance(z_prime, tuple) else z_prime
        
        # Implementation of a simple RBF kernel
        # k(z, z') = exp(-γ||z - z'||²)
        gamma = 0.1  # Width parameter
        distance_squared = np.sum((z_array - z_prime_array) ** 2)
        return np.exp(-gamma * distance_squared)
    
    def get_relevant_queries_count(self, z, beta):
        """
        Get the count of relevant queries N_{z,β} for a given input z and relevance threshold β
        N_{z,β} = {z' ∈ {z_τ}_{τ=1}^t : ||∂_z k(z', z)||² ≥ β}
        
        Args:
            z: The input embedding
            beta: The effective relevance threshold
            
        Returns:
            The count of relevant queries
        """
        historical_queries = self.historical_queries.get(z, [])
        
        # Count queries where the gradient of the kernel exceeds β
        relevant_count = 0
        for z_prime in historical_queries:
            gradient_norm_squared = self.kernel_gradient_norm_squared(z_prime, z)
            if gradient_norm_squared >= beta:
                relevant_count += 1
                
        return relevant_count
    
    def kernel_gradient_norm_squared(self, z_prime, z):
        """
        Calculate ||∂_z k(z', z)||² - the squared norm of the gradient of the kernel
        
        Args:
            z_prime: The first input embedding
            z: The second input embedding with respect to which we differentiate
            
        Returns:
            The squared norm of the kernel gradient
        """
        # For an RBF kernel k(z, z') = exp(-γ||z - z'||²)
        # ∂_z k(z', z) = 2γ(z' - z)exp(-γ||z - z'||²)
        
        # Convert tuples to arrays if needed
        z_array = np.array(z) if isinstance(z, tuple) else z
        z_prime_array = np.array(z_prime) if isinstance(z_prime, tuple) else z_prime
        
        gamma = 0.1  # Same as in kernel function
        diff = z_prime_array - z_array
        distance_squared = np.sum(diff ** 2)
        k_value = np.exp(-gamma * distance_squared)
        
        # ||∂_z k(z', z)||² = 4γ²||z' - z||²exp(-2γ||z - z'||²)
        gradient_norm_squared = 4 * gamma**2 * distance_squared * k_value**2
        
        return gradient_norm_squared
    
    def uncertainty_informed_local_exploration(self, z):
        """
        Perform uncertainty-informed local exploration to reduce gradient estimation error
        
        Args:
            z: The input embedding
            
        Returns:
            Exploration direction η_t
        """
        # Calculate variance of gradient estimator Σ²_t(z)
        # This is related to predictive uncertainty
        gradient_variance = self.calculate_gradient_variance(z)
        
        # Generate random direction
        z_dim = len(np.array(z) if isinstance(z, tuple) else z)
        random_direction = np.random.randn(z_dim)
        random_direction = random_direction / np.linalg.norm(random_direction)
        
        # Scale by a factor related to the gradient variance
        # Higher variance should lead to larger exploration steps
        scale_factor = np.sqrt(np.trace(gradient_variance)) * self.alpha
        
        # Return the exploration direction η_t
        return tuple(scale_factor * random_direction)
    
    def calculate_gradient_variance(self, z):
        """
        Calculate the variance of the gradient estimator Σ²_t(z)
        
        Args:
            z: The input embedding
            
        Returns:
            Covariance matrix of gradient estimate
        """
        # This would be a complex calculation based on the NTK-GP model
        # Simplification: just return an identity matrix scaled by some uncertainty measure
        z_dim = len(np.array(z) if isinstance(z, tuple) else z)
        
        # Get effective relevance and number of relevant queries
        beta = self.calculate_effective_relevance(z)
        N_z_beta = self.get_relevant_queries_count(z, beta)
        
        # According to Proposition 1, variance is bounded by ωκ - ωβ/d / (α + σ²/|N_{z,β}|)
        # where ω = d + 2(√d + 1)ln(1/δ)
        d = z_dim  # Dimension of the embedding space
        delta = self.delta
        omega = d + 2 * (np.sqrt(d) + 1) * np.log(1/delta)
        
        # Simplified variance estimate
        if N_z_beta > 0:
            variance_scale = omega * self.kappa - (omega * beta / d) / (1 + 1 / N_z_beta)
        else:
            variance_scale = omega * self.kappa  # Maximum uncertainty when no relevant queries
            
        return np.eye(z_dim) * max(variance_scale, 0.01)  # Ensure positive variance

In [None]:
def prompt_generation(generator, demo_data, task_description=None):
    """
    Uses an LLM from Hugging Face Transformers to generate prompt candidates based on demonstration data
    
    Args:
        demo_data: Example data for the task
        task_description: Optional description of the target task
        
    Returns:
        A prompt string
    """

    # Default task if none provided
    if task_description is None:
        task_description = "sentiment analysis"
    
    # Format the examples into a string
    examples_str = "\n\n".join([f"Example {i+1}: {example}" for i, example in enumerate(demo_data)])
    
    # Create a system message that instructs the LLM
    system_message = f"""
    You are an expert at creating effective prompts for language models. 
    Your task is to generate a concise, clear instruction prompt for {task_description}.
    
    The prompt should:
    1. Be concise but clear
    2. Provide specific instructions about the format of the expected output
    3. Be effective at getting language models to perform the target task
    """
    
    # Create a user message with the task and examples
    user_message = f"""
    Task: {task_description}
    
    Here are some examples of the task:
    {examples_str}
    
    Please generate a single effective instruction prompt for this task. Do not include any explanations, 
    just provide the prompt itself. The prompt should work well when prepended to new examples.
    """
    
    try:
        # Format input for the model (adjust based on the specific model's format requirements)
        # For chat models like Llama-2, this is a common format
        input_text = f"<s>[INST] <<SYS>>\n{system_message}\n<</SYS>>\n\n{user_message} [/INST]"
        
        # Generate the prompt
        outputs = generator(
            input_text,
            max_new_tokens=100,
            temperature=0.7,
            top_p=0.9,
            do_sample=True,
            return_full_text=False
        )
        
        # Extract the generated prompt
        generated_prompt = outputs[0]['generated_text'].strip()
        return generated_prompt
        
    except Exception as e:
        # Fallback to template-based generation if model inference fails
        print(f"Error using transformers model: {e}")
        print("Using fallback prompt generation method")
        return fallback_prompt_generation(task_description)
        
def fallback_prompt_generation(task_description):
    """
    Fallback method to generate prompts in case the model inference fails
    
    Args:
        task_description: Description of the target task
        
    Returns:
        A prompt string
    """
    # Templates for different common tasks
    templates = {
        "sentiment analysis": [
            "Classify the sentiment of the following text as positive or negative:",
            "Determine if the following text expresses a positive or negative sentiment:",
            "Is the sentiment of the following text positive or negative?",
            "Analyze the sentiment of the text below and respond with only 'positive' or 'negative':",
        ],
        "text classification": [
            "Classify the following text into one of these categories: {categories}",
            "Which category best describes the following text? Categories: {categories}",
            "Read the following text and classify it as {categories}:",
        ],
        "summarization": [
            "Summarize the following text in one paragraph:",
            "Provide a concise summary of the text below:",
            "Create a brief summary of the following content:",
        ],
        "question answering": [
            "Answer the following question based on the given context:",
            "Using the information provided, answer this question:",
            "Read the context below and answer the question that follows:",
        ]
    }
    
    # Determine which template set to use
    task_key = "sentiment analysis"  # Default
    for key in templates:
        if key in task_description.lower():
            task_key = key
            break
    
    # Randomly select a template
    template_options = templates[task_key]
    template = np.random.choice(template_options)
    
    # Fill in placeholders if needed
    if "{categories}" in template:
        # Extract categories from task description or use defaults
        if "categories" in task_description.lower():
            # Try to extract categories from the task description
            try:
                categories_part = task_description.lower().split("categories:")[1].strip()
                categories = categories_part.split(",")
                categories_str = ", ".join(categories)
            except:
                categories_str = "positive, negative, neutral"
        else:
            categories_str = "positive, negative, neutral"
        
        template = template.replace("{categories}", categories_str)
    
    return template

def nlp_embedding(prompt, model, tokenizer):
    """
    Creates embeddings using a pre-trained transformer model from Hugging Face.
    
    Args:
        prompt: The prompt string to embed
        model: Hugging Face model to use (default: sentence-transformers/all-MiniLM-L6-v2)
        tokenizer: Tokenizer for the model
        
    Returns:
        A vector representation (embedding) of the prompt
    """
    # Prepare inputs
    inputs = tokenizer(prompt, padding=True, truncation=True, return_tensors="pt")
    
    # Get embeddings
    with torch.no_grad():
        outputs = model(**inputs)
        # Use the [CLS] token embedding or mean pooling
        embeddings = outputs.last_hidden_state[:, 0, :].numpy()  # [CLS] token embedding
        
        # Alternative: mean pooling across all tokens
        # attention_mask = inputs['attention_mask']
        # input_mask_expanded = attention_mask.unsqueeze(-1).expand(outputs.last_hidden_state.size()).float()
        # sum_embeddings = torch.sum(outputs.last_hidden_state * input_mask_expanded, 1)
        # sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
        # embeddings = (sum_embeddings / sum_mask).numpy()
    
    return tuple(embeddings[0].tolist())  # Convert to tuple for hashability

def inverse_embedding(embedding, model_name="sentence-transformers/all-MiniLM-L6-v2"):
    """
    Maps an embedding back to the closest matching prompt template
    
    Args:
        embedding: The embedding vector (tuple or array)
        model_name: Name of the Hugging Face model used for embedding
        
    Returns:
        A prompt string that most closely matches the embedding
    """
    # Convert tuple to list if needed
    if isinstance(embedding, tuple):
        embedding = np.array(embedding)
    
    # Define a set of template prompts
    templates = [
        "Classify the sentiment of the following text as positive or negative:",
        "Determine if the following text expresses a positive or negative sentiment:",
        "Is the sentiment of the following text positive or negative?",
        "Analyze the following text and determine if it has a positive or negative sentiment:",
        "Rate the sentiment of the following text as either positive or negative:"
    ]
    
    # Load the same model used for the embedding
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    
    # Compute embeddings for all templates
    template_embeddings = []
    for template in templates:
        inputs = tokenizer(template, padding=True, truncation=True, return_tensors="pt")
        with torch.no_grad():
            outputs = model(**inputs)
            # Use the same embedding approach as in nlp_embedding_model
            template_emb = outputs.last_hidden_state[:, 0, :].numpy()[0]
            template_embeddings.append(template_emb)
    
    # Find the template with the smallest cosine distance to the input embedding
    distances = [cosine(embedding, template_emb) for template_emb in template_embeddings]
    closest_index = np.argmin(distances)
    
    return templates[closest_index]

# 4. Define the projection function P_Z
def projection_function(z):
    """
    Projects the embedding back to the valid embedding space
    
    Args:
        z: The embedding vector to project
        
    Returns:
        The projected embedding vector
    """
    # Convert tuple to array if needed
    if isinstance(z, tuple):
        z = np.array(z)
    
    # Apply constraints to ensure embedding is valid
    # Example: ensure all values are positive
    z = np.maximum(z, 0)
    
    # Example: ensure embedding dimensions stay within reasonable bounds
    z[0] = np.clip(z[0], 3, 15)  # Number of words between 3 and 15
    z[1] = np.clip(z[1], 3, 10)  # Average word length between 3 and 10
    z[2] = round(z[2])  # Question mark presence is binary (0 or 1)
    z[3] = np.clip(round(z[3]), 0, 3)  # Between 0 and 3 imperatives
    
    return tuple(z)  # Convert back to tuple for hashability

def evaluation_function(z, task_type="instruction_induction", reference_embeddings=None, 
                        model_name="sentence-transformers/all-MiniLM-L6-v2"):
    """
    Evaluates the quality of a prompt embedding based on academic performance metrics.
    
    Args:
        z: The embedding vector to evaluate (tuple or array)
        task_type: Type of task for evaluation metric selection
        reference_embeddings: Dict of reference embeddings with their scores by task
        model_name: Name of the Hugging Face model used for embedding
        
    Returns:
        A score representing relative performance compared to optimal methods
    """
    # Convert tuple to array if needed
    if isinstance(z, tuple):
        z = np.array(z)
    
    # Initialize reference embeddings if not provided
    if reference_embeddings is None:
        # In practice, this would be pre-computed from a benchmark dataset
        reference_embeddings = generate_reference_embeddings(model_name)
    
    # Define task categories and their evaluation metrics
    task_metrics = {
        "common_concept": "f1",
        "informal_to_formal": "f1",
        "orthography_starts_with": "exact_match",
        "taxonomy_animal": "exact_match",
        "synonyms": "set_containing",
        "instruction_induction": "exact_match",
        "arithmetic_reasoning": "accuracy"
    }
    
    # Simulate performance on different tasks
    task_performances = {}
    for task, metric in task_metrics.items():
        # Find the similarity between our embedding and reference embeddings for this task
        task_refs = reference_embeddings.get(task, [{"embedding": z, "score": 0.5}])
        
        # Calculate embedding similarity to reference solutions
        similarities = []
        for ref in task_refs:
            ref_emb = ref["embedding"]
            similarity = 1 - np.linalg.norm(z - ref_emb) / np.sqrt(len(z))
            similarities.append((similarity, ref["score"]))
        
        # Weight reference scores by embedding similarity
        weighted_scores = [sim * score for sim, score in similarities]
        task_score = sum(weighted_scores) / max(sum(sim for sim, _ in similarities), 1e-10)
        
        # Adjust based on metric type
        if metric == "f1":
            # F1 score is already between 0-1
            pass
        elif metric == "exact_match" or metric == "accuracy":
            # For exact matching metrics, we want a sharper distinction
            task_score = task_score ** 1.5
        elif metric == "set_containing":
            # For set containment, use a different weighting
            task_score = task_score ** 0.8
        
        task_performances[task] = task_score
    
    # Calculate performance profile according to Dolan & Moré (2002)
    # For ρ(0): count tasks where this method achieves best performance
    # Simulate best performances for each task
    best_performances = {task: 0.95 + 0.05 * np.random.random() for task in task_metrics}
    
    # Calculate ratio to best performance for each task
    performance_ratios = {task: score / best_performances[task] for task, score in task_performances.items()}
    
    # Calculate ρ(0): proportion of tasks where performance is optimal
    rho_0 = sum(1 for ratio in performance_ratios.values() if ratio >= 0.99) / len(task_metrics)
    
    # Calculate ρ(0.05): proportion of tasks where performance is within 5% of optimal
    rho_5 = sum(1 for ratio in performance_ratios.values() if ratio >= 0.95) / len(task_metrics)
    
    # Combine scores with weighting favoring ρ(0)
    final_score = (0.7 * rho_0) + (0.3 * rho_5)
    
    # Add small controlled noise for real-world variability
    noise = np.random.normal(0, 0.03)
    
    return final_score + noise

def generate_reference_embeddings(model_name):
    """
    Generate reference embeddings for different tasks with their scores.
    In practice, these would come from benchmark datasets.
    """
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    
    # Example prompts by task with their effectiveness scores (would be much more extensive in practice)
    task_prompts = {
        "common_concept": [
            {"prompt": "What concept do these items share in common?", "score": 0.92},
            {"prompt": "Identify the common category for these objects.", "score": 0.88}
        ],
        "informal_to_formal": [
            {"prompt": "Convert this informal text to formal language.", "score": 0.94},
            {"prompt": "Rewrite the following in a professional tone.", "score": 0.89}
        ],
        "orthography_starts_with": [
            {"prompt": "List five words that start with the prefix:", "score": 0.91},
            {"prompt": "Generate words beginning with the following letters:", "score": 0.87}
        ],
        "taxonomy_animal": [
            {"prompt": "Classify this animal in the taxonomic hierarchy.", "score": 0.93},
            {"prompt": "Place this organism in its correct biological classification.", "score": 0.90}
        ],
        "synonyms": [
            {"prompt": "Provide synonyms for the following word:", "score": 0.95},
            {"prompt": "What are alternative words with the same meaning as:", "score": 0.89}
        ],
        "instruction_induction": [
            {"prompt": "Based on these examples, what's the underlying pattern?", "score": 0.91},
            {"prompt": "Identify the rule being demonstrated in these instances.", "score": 0.88}
        ],
        "arithmetic_reasoning": [
            {"prompt": "Solve this math problem step by step:", "score": 0.94},
            {"prompt": "Calculate the answer and show your work:", "score": 0.90}
        ]
    }
    
    # Generate embeddings for each prompt
    reference_embeddings = {}
    for task, prompts in task_prompts.items():
        reference_embeddings[task] = []
        for item in prompts:
            inputs = tokenizer(item["prompt"], padding=True, truncation=True, return_tensors="pt")
            with torch.no_grad():
                outputs = model(**inputs)
                emb = outputs.last_hidden_state[:, 0, :].numpy()[0]
                reference_embeddings[task].append({"embedding": emb, "score": item["score"]})
    
    return reference_embeddings

In [2]:
model_name = "ehristoforu/coolqwen-3b-it"
model = pipeline('text-generation', model=model_name, device_map='auto')

class GenerationTemplate:
    """
    Takes a prompt template and provides methods for filling in blanks.
    The format is as follows:
    [APE] is where text will be generated by the LLM.
    [full_DEMO] is where the full demo will be inserted.
    [INPUT] is where the input to the first demo will be inserted.
    [OUTPUT] is where the output from the first demo will be inserted.
    """

    def __init__(self, template):
        self.template = template
        # Check that the template is valid
        # There should be exactly one [APE] token
        assert self.template.count('[APE]') == 1

    def fill(self, full_demo='', input='', output=''):
        """
        Fills in the template with the given values.
        """
        return self.template.replace('[full_DEMO]', full_demo).replace(
            '[INPUT]', input).replace('[OUTPUT]', output)


class EvalTemplate:
    """
    Takes a prompt template and provides methods for filling in blanks.
    The format is as follows:
    [PROMPT] is where the prompt will be inserted.
    [full_DEMO] is where the full demo will be inserted.
    [INPUT] is where the input to the first demo will be inserted.
    [OUTPUT] is where the output from the first demo will be inserted.
    """

    def __init__(self, template):
        self.template = template

    def fill(self, prompt='', full_demo='', input='', output=''):
        """
        Fills in the template with the given values.
        """
        return self.template.replace('[PROMPT]', prompt).replace(
            '[full_DEMO]', full_demo).replace('[INPUT]', input).replace('[OUTPUT]', output)

    def convert_to_generation_template(self):
        """
        Converts the evaluation template to a generation template.
        """
        return GenerationTemplate(self.template.replace('[PROMPT]', '[APE]'))


class DemosTemplate:
    """
    Takes a template for the full demo and provides methods for filling in blanks.
    The format is as follows:
    [INPUT], [OUTPUT]

    """

    def __init__(self, template, delimiter='\n\n'):
        self.template = template
        self.delimiter = delimiter

    def fill(self, data):
        """
        Fills in the template with the given values. Data is a tuple of lists.
        """
        demos = ''
        for i, (input_, output_) in enumerate(zip(*data)):
            demos += self.template.replace('[INPUT]', input_).replace(
                '[OUTPUT]', output_)

            if i != len(data[0]) - 1:
                demos += self.delimiter

        return demos

Loading checkpoint shards: 100%|██████████| 2/2 [00:04<00:00,  2.20s/it]
Device set to use cuda:0


In [3]:
class EvaluationResult(ABC):

    @abstractmethod
    def sorted(self, method='default'):
        """Get the results in the form of a sorted prompt and score list.
        Has a method argument to support sorting by various metrics."""
        pass

    @abstractmethod
    def in_place(self, method='default'):
        """Get the results in the form of a list of prompts and scores without sorting."""
        pass

class BanditsEvaluationResult(EvaluationResult):

    def __init__(self, prompts, scores, infos):
        self.prompts = prompts
        self.scores = scores
        self.infos = infos

    def sorted(self, method='default'):
        """Sort the prompts and scores. There is no choice of method for now."""
        idx = np.argsort(self.scores)
        prompts, scores = [self.prompts[i]
                           for i in idx], [self.scores[i] for i in idx]
        # Reverse
        prompts, scores = prompts[::-1], scores[::-1]
        return prompts, scores

    def in_place(self, method='default'):
        """Return the prompts and scores in place. There is no choice of method for now."""
        return self.prompts, self.scores

    def sorted_infos(self):
        """Sort the infos."""
        idx = np.argsort(self.scores)
        infos = [self.infos[i] for i in idx]
        # Reverse
        infos = infos[::-1]
        return infos

    def __str__(self):
        s = ''
        prompts, scores = self.sorted()
        s += 'score: prompt\n'
        s += '----------------\n'
        for prompt, score in list(zip(prompts, scores))[:10]:
            s += f'{score:.2f}: {prompt}\n'
        return s


class BatchBanditAlgo(ABC):

    @ abstractmethod
    def choose(self, n):
        """Choose n prompts from the scores.
        Parameters:
            n: The number of prompts to choose.
        Returns:
            A list of indices of the chosen prompts.
        """
        pass

    @ abstractmethod
    def update(self, chosen, scores):
        """Update the scores for the chosen prompts.
        Parameters:
            chosen: A list of indices of the chosen prompts.
            scores: A list of scores for each chosen prompt in the form of a list.
        """
        pass

    @ abstractmethod
    def reset(self):
        """Reset the algorithm."""
        pass

    @ abstractmethod
    def get_scores(self):
        """Get the scores for all prompts.
        Returns:
            A list of scores.
        """
        pass

    @ abstractmethod
    def get_infos(self):
        """Get the infos for all prompts.
        Returns:
            A list of infos.
        """
        pass


class CountAverageBanditAlgo(BatchBanditAlgo):

    def __init__(self, num_prompts, num_samples):
        self.num_prompts = num_prompts
        self.num_samples = num_samples
        self.reset()

    def update(self, chosen, scores):
        for i, score in zip(chosen, scores):
            self.counts[i] += self.num_samples
            self.scores[i] += score * self.num_samples

    def reset(self):
        self.counts = np.zeros(self.num_prompts)
        self.scores = np.zeros(self.num_prompts)

    def get_scores(self):
        # Some counts may be 0, so we need to avoid division by 0.
        return np.divide(self.scores, self.counts, out=np.zeros_like(self.scores), where=self.counts != 0)


class UCBBanditAlgo(CountAverageBanditAlgo):

    def __init__(self, num_prompts, num_samples, c):
        super().__init__(num_prompts, num_samples)
        self.c = c

    def choose(self, n):
        if np.sum(self.counts) == 0:
            # If all counts are 0, choose randomly.
            return random.sample(range(self.num_prompts), n)
        scores = self.get_scores()
        counts = self.counts + 1e-3
        ucb_scores = scores + self.c * np.sqrt(np.log(np.sum(counts)) / counts)
        # Choose the prompts with the highest UCB scores
        return np.argsort(ucb_scores)[::-1][:n]

    def get_infos(self):
        return self.counts

def get_bandit_algo(bandit_method, num_prompts, config):
    """
    Returns the bandit method object.
    Parameters:
        bandit_method: The bandit method to use. ('epsilon-greedy')
    Returns:
        A bandit method object.
    """
    if bandit_method == 'ucb':
        return UCBBanditAlgo(num_prompts, config['base_eval_config']['num_samples'], config['bandit_config']['c'])
    else:
        raise ValueError('Invalid bandit method.')

In [4]:
def bandits_evaluator(prompts, eval_template, eval_data, demos_template, few_shot_data, config):
    base_eval_method = get_eval_method(config['base_eval_method'])
    bandit_algo = get_bandit_algo(
        config['bandit_method'], len(prompts), config)
    rounds = config['rounds']
    if config['num_prompts_per_round'] < 1:
        num_prompts_per_round = int(
            len(prompts) * config['num_prompts_per_round'])
    else:
        num_prompts_per_round = config['num_prompts_per_round']
    num_prompts_per_round = min(num_prompts_per_round, len(prompts))
    for _ in tqdm(range(rounds), desc='Evaluating prompts'):
        # Sample the prompts
        sampled_prompts_idx = bandit_algo.choose(num_prompts_per_round)
        sampled_prompts = [prompts[i] for i in sampled_prompts_idx]
        # Evaluate the sampled prompts
        sampled_eval_results = base_eval_method(
            sampled_prompts, eval_template, eval_data, demos_template, few_shot_data, config['base_eval_config'])
        _, scores = sampled_eval_results.in_place(method='mean')
        # Update the bandit algorithm
        bandit_algo.update(sampled_prompts_idx, scores)

    return BanditsEvaluationResult(prompts, bandit_algo.get_scores(), bandit_algo.get_infos())

def subsample_data(data, subsample_size):
    """
    Subsample data. Data is in the form of a tuple of lists.
    """
    inputs, outputs = data
    assert len(inputs) == len(outputs)
    indices = random.sample(range(len(inputs)), subsample_size)
    inputs = [inputs[i] for i in indices]
    outputs = [outputs[i] for i in indices]
    return inputs, outputs

class LikelihoodEvaluationResult(EvaluationResult):
    """
    A class for storing the results of a likelihood evaluation. Supports
    sorting prompts by various statistics of the likelihoods.
    """

    def __init__(self, prompts, log_probs, num_samples):
        self.prompts = prompts
        self.log_probs = log_probs
        self.prompt_log_probs = self._compute_avg_likelihood(
            prompts, log_probs, num_samples)

    def _compute_avg_likelihood(self, prompts, log_probs, num_samples):
        i = 0
        prompt_log_probs = []
        for prompt in prompts:
            prompt_log_probs.append([])
            for _ in range(num_samples):
                lps = log_probs[i]
                prompt_log_probs[-1].append(sum(lps) / len(lps))
                i += 1
        return prompt_log_probs

    def _agg_likelihoods(self, method):
        """For each prompt, compute a statistic of the likelihoods (e.g., mean, median, etc.)"""
        if method == 'mean':
            return [np.mean(lps) for lps in self.prompt_log_probs]
        elif method == 'median':
            return [np.median(lps) for lps in self.prompt_log_probs]
        elif method == 'std':
            return [np.std(lps) for lps in self.prompt_log_probs]
        elif method == 'max':
            return [np.max(lps) for lps in self.prompt_log_probs]
        elif method == 'min':
            return [np.min(lps) for lps in self.prompt_log_probs]
        elif method == 'iqm':
            return [np.mean(np.percentile(lps, [25, 75])) for lps in self.prompt_log_probs]
        else:
            raise ValueError(
                f'Unknown method {method} for aggregating likelihoods')

    def sorted(self, method='default'):
        if method == 'default':
            scores = self._agg_likelihoods('mean')
        else:
            scores = self._agg_likelihoods(method)
        # Sort prompts by score
        sorted_prompts = [p for _, p in sorted(zip(scores, self.prompts))]
        sorted_scores = sorted(scores)
        # Reverse both and convert to lists
        sorted_prompts = list(reversed(sorted_prompts))
        sorted_scores = list(reversed(sorted_scores))
        return sorted_prompts, sorted_scores

    def in_place(self, method='default'):
        if method == 'default':
            scores = self._agg_likelihoods('mean')
        else:
            scores = self._agg_likelihoods(method)
        return self.prompts, scores

    def __str__(self):
        s = ''
        prompts, scores = self.sorted()
        s += 'log(p): prompt\n'
        s += '----------------\n'
        for prompt, score in list(zip(prompts, scores))[:10]:
            s += f'{score:.2f}: {prompt}\n'
        return s
    
special_output_token = '[[[[OUTPUT]]]]'
def get_query(prompt, eval_template, input_, output_, demo_data, demos_template):
    """
    Returns the text sent to the LLM for likelihood evaluation.
    Parameters:
        prompt: The prompt.
        eval_template: The template for the evaluation queries.
        input_: The input.
        output_: The output.
    Returns:
        The query for the LLM and the range of the output text in the form of (start_idx, end_idx).
    """
    demos = demos_template.fill(demo_data)
    query = eval_template.fill(prompt=prompt,
                               input=input_,
                               output=output_,
                               full_demo=demos)
    query_without_output = eval_template.fill(prompt=prompt,
                                              input=input_,
                                              output=special_output_token,
                                              full_demo=demos)

    first_idx = query_without_output.find(special_output_token)
    output_idx = first_idx, first_idx + len(output_)
    return query, output_idx

def likelihood_evaluator(prompts, eval_template, eval_data, demos_template, few_shot_data, config):
    """
    For each prompt, evaluate the likelihood of the data (output) given the prompt.
    Parameters:
        prompts: A list of prompts.
        eval_template: The template for the evaluation queries.
        eval_data: The data to use for evaluation.
        config: The configuration dictionary.
    Returns:
        A LikelihoodEvaluationResult object.
    """
    queries = []
    output_indices = []
    for prompt in prompts:
        subsampled_data = subsample_data(
            eval_data, config['num_samples'])
        for d in zip(*subsampled_data):
            input_, output_ = d
            demo_data = subsample_data(
                few_shot_data, config['num_few_shot'])
            query, output_idx = get_query(
                prompt, eval_template, input_, output_, demo_data, demos_template)
            queries.append(query)
            output_indices.append(output_idx)

    log_probs, _ = model.log_probs(queries, output_indices)

    res = LikelihoodEvaluationResult(prompts, log_probs, config['num_samples'])

    return res

def get_eval_method(eval_method):
    """
    Returns the evaluation method object.
    Parameters:
        eval_method: The evaluation method to use. ('likelihood')
    Returns:
        An evaluation method object.
    """
    if callable(eval_method):
        return eval_method
    if eval_method == 'likelihood':
        return likelihood_evaluator
    elif eval_method == 'bandits':
        return bandits_evaluator
    else:
        raise ValueError('Invalid evaluation method.')


def evalute_prompts(prompts, eval_template, eval_data, demos_template, few_shot_data, eval_method, config):
    """
    Returns the scores for a list of prompts.
    Parameters:
        prompts: A list of prompts.
        eval_template: The template for the evaluation queries.
        eval_data: The data to use for evaluation.
        eval_method: The evaluation method to use. ('likelihood')
        config: The configuration dictionary.
    Returns:
        An evaluation result object.
    """
    eval_method = get_eval_method(eval_method)
    return eval_method(prompts, eval_template, eval_data, demos_template, few_shot_data, config)


def demo_function(eval_template, config):
    """
    Returns a function that can be manually test the LLM with a chosen prompt.
    Parameters:
        eval_template: The template for the evaluation queries.
        config: The configuration dictionary.
    Returns:
        A function that takes a prompt and returns a demo.
    """
    def fn(prompt, inputs):
        if not isinstance(inputs, list):
            inputs = [inputs]
        queries = []
        for input_ in inputs:
            query = eval_template.fill(prompt=prompt, input=input_)
            queries.append(query)
        outputs = model.generate_text(
            queries, n=1)
        return [out.strip().split('\n')[0] for out in outputs]

    return fn

In [5]:
def create_split(data, split_size):
    """
    Split data into two parts. Data is in the form of a tuple of lists.
    """
    inputs, outputs = data
    assert len(inputs) == len(outputs)
    indices = random.sample(range(len(inputs)), split_size)
    inputs1 = [inputs[i] for i in indices]
    outputs1 = [outputs[i] for i in indices]
    inputs2 = [inputs[i] for i in range(len(inputs)) if i not in indices]
    outputs2 = [outputs[i] for i in range(len(inputs)) if i not in indices]
    return (inputs1, outputs1), (inputs2, outputs2)

In [6]:
def update_config(config, base_config='configs/default.yaml'):
    # Get default config from yaml
    with open(os.path.join(os.path.dirname(__file__), base_config)) as f:
        default_config = yaml.safe_load(f)

    # Update default config with user config
    # Note that the config is a nested dictionary, so we need to update it recursively
    def update(d, u):
        for k, v in u.items():
            if isinstance(v, dict):
                d[k] = update(d.get(k, {}), v)
            else:
                d[k] = v
        return d

    return update(default_config, config)


def simple_config(eval_model, prompt_gen_model, prompt_gen_mode, num_prompts, eval_rounds, prompt_gen_batch_size, eval_batch_size):
    """Returns a config and splits the data into sensible chunks."""
    conf = update_config({}, 'configs/bandits.yaml')
    conf['generation']['model']['gpt_config']['model'] = prompt_gen_model
    if prompt_gen_mode == 'insert':
        conf['generation']['model']['name'] = 'GPT_insert'
        conf['generation']['model']['batch_size'] = 1
    elif prompt_gen_mode == 'forward':
        conf['generation']['model']['name'] = 'GPT_forward'
        conf['generation']['model']['batch_size'] = prompt_gen_batch_size
    conf['generation']['num_subsamples'] = num_prompts // 10
    conf['generation']['num_prompts_per_subsample'] = 10

    conf['evaluation']['base_eval_config']['model']['gpt_config']['model'] = eval_model
    conf['evaluation']['base_eval_config']['model']['batch_size'] = eval_batch_size
    # total eval = rounds * num_prompts_per_round * num_samples
    # We fix the number of samples to 10 and the number of prompts per round to 1/3 of
    # the total number of prompts. We then set the number of rounds to be the number of
    # prompts divided by the number of prompts per round.
    conf['evaluation']['num_prompts_per_round'] = 0.334
    conf['evaluation']['rounds'] = eval_rounds
    conf['evaluation']['base_eval_config']['num_samples'] = 5
    # In this simple demo, there is no dataset splitting, so we just use the same data for prompt generation and evaluation
    return conf

In [7]:
def get_simple_prompt_gen_template(prompt_gen_template, prompt_gen_mode):
    if prompt_gen_template is None:
        if prompt_gen_mode == 'forward':
            prompt_gen_template = "I gave a friend an instruction. Based on the instruction they produced the following input-output pairs:\n\n[full_DEMO]\n\nThe instruction was to [APE]"
        elif prompt_gen_mode == 'insert':
            prompt_gen_template = None
        else:
            raise ValueError(
                'Invalid prompt_gen_mode: {}'.format(prompt_gen_mode))
    return prompt_gen_template


def simple_ape(dataset,
               eval_template='Instruction: [PROMPT]\nInput: [INPUT]\nOutput: [OUTPUT]',
               prompt_gen_template=None,
               demos_template='Input: [INPUT]\nOutput: [OUTPUT]',
               eval_model='text-davinci-002',
               prompt_gen_model='text-davinci-002',
               prompt_gen_mode='forward',
               num_prompts=50,
               eval_rounds=20,
               prompt_gen_batch_size=200,
               eval_batch_size=500):
    """
    Function that wraps the find_prompts function to make it easier to use.
    Design goals: include default values for most parameters, and automatically
    fill out the config dict for the user in a way that fits almost all use cases.

    The main shortcuts this function takes are:
    - Uses the same dataset for prompt generation, evaluation, and few shot demos
    - Uses UCB algorithm for evaluation
    - Fixes the number of prompts per round to num_prompts // 3  (so the first three rounds will
        sample every prompt once)
    - Fixes the number of samples per prompt per round to 5
    Parameters:
        dataset: The dataset to use for evaluation.
        eval_template: The template for the evaluation queries.
        prompt_gen_template: The template to use for prompt generation.
        demos_template: The template for the demos.
        eval_model: The model to use for evaluation.
        prompt_gen_model: The model to use for prompt generation.
        prompt_gen_mode: The mode to use for prompt generation.
        num_prompts: The number of prompts to generate during the search.
        eval_rounds: The number of evaluation rounds to run.
    Returns:
        An evaluation result and a function to evaluate the prompts with new inputs.
    """
    prompt_gen_template = get_simple_prompt_gen_template(
        prompt_gen_template, prompt_gen_mode)
    conf = simple_config(
        eval_model, prompt_gen_model, prompt_gen_mode, num_prompts, eval_rounds, prompt_gen_batch_size, eval_batch_size)
    return find_prompts(eval_template, demos_template, dataset, dataset, conf, prompt_gen_template=prompt_gen_template)


def simple_eval(dataset,
                prompts,
                eval_template='Instruction: [PROMPT]\nInput: [INPUT]\nOutput: [OUTPUT]',
                demos_template='Input: [INPUT]\nOutput: [OUTPUT]',
                eval_model='text-davinci-002',
                num_samples=50):
    """
    Function that wraps the evaluate_prompts function to make it easier to use.
    Parameters:
        dataset: The dataset to use for evaluation.
        prompts: The list of prompts to evaluate.
        eval_template: The template for the evaluation queries.
        demos_template: The template for the demos.
        eval_model: The model to use for evaluation.
    Returns:
        An evaluation result.
    """
    eval_template = EvalTemplate(eval_template)
    demos_template = DemosTemplate(demos_template)
    conf = update_config({}, 'configs/default.yaml')
    conf['evaluation']['model']['gpt_config']['model'] = eval_model
    conf['evaluation']['num_samples'] = min(len(dataset[0]), num_samples)
    res = evalute_prompts(
        prompts, eval_template, dataset, demos_template, dataset, conf['evaluation']['method'], conf['evaluation'])
    return res

def generate_prompts(prompt_gen_template, demos_template, prompt_gen_data, config):
    """
    Generates prompts using the prompt generator.
    Parameters:
        prompt_gen_template: The template for the prompt generator queries.
        demos_template: The template for the demonstrations.
        prompt_gen_data: The data to use for prompt generation.
        config: The configuration dictionary.
    Returns:
        A list of prompts.
    """
    queries = []
    for _ in range(config['num_subsamples']):
        subsampled_data = subsample_data(
            prompt_gen_data, config['num_demos'])
        queries.append(get_query(prompt_gen_template,
                                 demos_template, subsampled_data))

    prompts = model.generate_text(
        queries, n=config['num_prompts_per_subsample'])
    return prompts

def find_prompts(eval_template,
                 demos_template,
                 prompt_gen_data,
                 eval_data,
                 conf,
                 base_conf='configs/default.yaml',
                 few_shot_data=None,
                 prompt_gen_template=None):
    """
    Function to generate prompts using APE.
    Parameters:
        eval_template: The template for the evaluation queries.
        demos_template: The template for the demos.
        prompt_gen_data: The data to use for prompt generation.
        eval_data: The data to use for evaluation.
        conf: The configuration dictionary.
        few_shot_data: The data to use for demonstrations during eval (not implemented yet).
        eval_method: The evaluation method to use. ('likelihood')
        prompt_gen_template: The template to use for prompt generation.
        verbosity: The verbosity level.
    Returns:
        An evaluation result. Also returns a function to evaluate the prompts with new inputs.
    """

    conf = update_config(conf, base_conf)

    # Generate prompts
    eval_template = EvalTemplate(eval_template)
    demos_template = DemosTemplate(demos_template)
    if prompt_gen_template is None:
        prompt_gen_template = eval_template.convert_to_generation_template()
    else:
        prompt_gen_template = GenerationTemplate(prompt_gen_template)

    if few_shot_data is None:
        few_shot_data = prompt_gen_data

    print('Generating prompts...')
    prompts = generate_prompts(
        prompt_gen_template, demos_template, prompt_gen_data, conf['generation'])

    print('Model returned {} prompts. Deduplicating...'.format(len(prompts)))
    prompts = list(set(prompts))
    print('Deduplicated to {} prompts.'.format(len(prompts)))

    print('Evaluating prompts...')

    res = evalute_prompts(prompts, eval_template, eval_data, demos_template, few_shot_data,
                                   conf['evaluation']['method'], conf['evaluation'])

    print('Finished evaluating.')

    demo_fn = demo_function(eval_template, conf['demo'])

    return res, demo_fn


def evaluate_prompts(prompts, eval_template, eval_data, demos_template, few_shot_data, conf,
                     base_conf='configs/default.yaml'):
    """
    Function to evaluate a list of prompts.
    Parameters:
        prompts: The list of prompts to evaluate.
        eval_template: The template for the evaluation queries.
        eval_data: The data to use for evaluation.
        eval_method: The evaluation method to use. ('likelihood')
        conf: The configuration dictionary.
        base_conf: The base configuration file.
    Returns:
        A list of prompts and their scores, sorted by score.
    """

    conf = update_config(conf, base_conf)

    # Generate prompts
    eval_template = EvalTemplate(eval_template)
    demos_template = DemosTemplate(demos_template)

    print('Evaluating prompts...')
    res = evalute_prompts(
        prompts, eval_template, eval_data, demos_template, few_shot_data, conf['evaluation']['method'],
        conf['evaluation'])

    print('Finished evaluating.')

    return res

def get_generation_query(eval_template,
                         demos_template,
                         conf,
                         prompt_gen_data,
                         prompt_gen_template=None,
                         num_query=1):
    # Generate prompts
    eval_template = EvalTemplate(eval_template)
    demos_template = DemosTemplate(demos_template)
    if prompt_gen_template is None:
        prompt_gen_template = eval_template.convert_to_generation_template()
    else:
        prompt_gen_template = GenerationTemplate(prompt_gen_template)

    # First, generate a few prompt queries:
    queries = []
    for _ in range(num_query):
        subsampled_data = subsample_data(
            prompt_gen_data, conf['generation']['num_demos'])
        queries.append(get_query(prompt_gen_template,
                                          demos_template, subsampled_data))

    return queries


def get_evaluation_query(eval_template,
                         demos_template,
                         conf,
                         eval_data,
                         few_shot_data,
                         eval_query=None,
                         num_query=1
                         ):
    eval_template = EvalTemplate(eval_template)
    demos_template = DemosTemplate(demos_template)

    if conf['evaluation']['method'] == 'bandits':
        eval_base_method = conf['evaluation']['base_eval_method']
        num_few_shot = conf['evaluation']['base_eval_config']['num_few_shot']
    else:
        eval_base_method = conf['evaluation']['method']
        num_few_shot = conf['evaluation']['num_few_shot']

    if eval_query is None:
        if eval_base_method == 'likelihood':
            eval_query = get_query
        else:
            raise ValueError(
                'Cannot estimate costs for: {}'.format(eval_base_method))

    max_prompt_len = conf['generation']['model']['gpt_config']['max_tokens']
    filler_prompt = 'GGGG' * max_prompt_len

    queries = []
    for _ in range(num_query):
        idx = random.randint(0, len(eval_data[0]) - 1)
        input_, output_ = eval_data[0][idx], eval_data[1][idx]
        demo_data = subsample_data(few_shot_data, num_few_shot)
        query = eval_query(filler_prompt, eval_template, input_,
                           output_, demo_data, demos_template)[0]
        queries.append(query)
    return queries

In [4]:
# Define demo data (not used in this simple example)
demo_data = [
        "Example input: 'This movie was fantastic! I loved every minute of it.' Expected output: 'positive'",
        "Example input: 'The service was terrible and the food was cold.' Expected output: 'negative'",
        "Example input: 'I can't believe how amazing this product is!' Expected output: 'positive'",
        "Example input: 'I requested a refund because it didn't work as advertised.' Expected output: 'negative'"
    ]

task_description = "sentiment analysis of product reviews, classifying them as positive or negative"
prompt_gen = partial(prompt_generation, task_description=task_description)

# Initialize ZOPO with our functions
zopo = ZOPOAlgorithm(
    prompt_generation=prompt_generation,
    nlp_embedding=nlp_embedding,
    inverse_embedding=inverse_embedding,
    projection_function=projection_function,
    evaluation_function=evaluation_function,
    m=5,      # Number of initial prompt candidates
    T=3,      # Number of optimization iterations
    delta=0.1, # Confidence parameter
    kappa=1.0, # Kernel bound
    alpha=0.1  # Step size
)

# Run the algorithm
best_prompt = zopo.run(demo_data)

print("Optimized prompt:", best_prompt)

# Show the embedding for the best prompt
best_embedding = nlp_embedding(best_prompt, zopo.embedding_model, zopo.embedding_tokenizer)
print("Embedding:", best_embedding)

# Show the evaluation score
best_score = evaluation_function(best_embedding)
print("Evaluation score:", best_score)

Loading checkpoint shards: 100%|██████████| 2/2 [00:04<00:00,  2.07s/it]
Device set to use cuda:0


Optimized prompt: Is the sentiment of the following text positive or negative?
Embedding: (-0.011109919287264347, 0.3535281717777252, -0.0517016164958477, 0.02186959609389305, -0.10469207912683487, -0.17762413620948792, 0.030455095693469048, -0.06047740951180458, 0.24406816065311432, -0.014500077813863754, 0.18065515160560608, 0.11902257055044174, 0.15227621793746948, 0.121745266020298, 0.03753717243671417, 0.024582503363490105, 0.05646296590566635, -0.22020116448402405, 0.047522012144327164, -0.024562573060393333, -0.06697848439216614, 0.12067969143390656, 0.032858602702617645, 0.04359618201851845, -0.060819435864686966, 0.2984614372253418, -0.133926659822464, 0.15761683881282806, 0.017571687698364258, -0.6304754614830017, -0.2724848687648773, 0.05615871399641037, 0.0970522090792656, 0.044642701745033264, -0.10183541476726532, -0.04565545916557312, 0.028056440874934196, -0.027529120445251465, 0.04863203316926956, 0.17135927081108093, -0.19201165437698364, -0.10806091129779816, 0.16634