In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import beta

def plot_cumulative_success(history_df, prompts, title="Cumulative Success Rate", marker_size=3, evolution_start=None):
    """Plot cumulative success rate for each prompt with optional evolution marker."""
    plt.figure(figsize=(12 if evolution_start else 10, 6))
    colors = {}
    
    prompt_set = set(history_df["chosen_prompt"]) if evolution_start else prompts
    for prompt in prompt_set:
        prompt_hist = history_df[history_df["chosen_prompt"] == prompt]
        if len(prompt_hist) > 0:
            cumsum = prompt_hist["reward"].cumsum() / (np.arange(len(prompt_hist)) + 1)
            x_vals = prompt_hist.index if evolution_start else cumsum.index
            label = (prompt[:40] + "...") if len(prompt) > 40 else prompt
            line = plt.plot(x_vals, cumsum, label=label, marker='o', markersize=marker_size, alpha=0.7)
            colors[prompt] = line[0].get_color()
    
    if evolution_start:
        plt.axvline(x=evolution_start, color='red', linestyle='--', linewidth=2, label='Evolution Start')
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    else:
        plt.legend()
    
    plt.title(title)
    plt.xlabel("Trial Number")
    plt.ylabel("Observed Success Rate")
    plt.tight_layout()
    plt.show()
    return colors

def plot_beta_distributions(results, prompts, colors, show_prior=True, original_winner=None, original_alpha=None, original_beta=None):
    """Plot Beta distributions (prior/posterior or evolution comparison)."""
    x = np.linspace(0, 1, 100)
    
    if show_prior:
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        # Prior
        axes[0].set_title("Initial Beta Distributions (Prior)")
        for i, prompt in enumerate(prompts):
            axes[0].plot(x, np.ones_like(x), label=prompt, color=colors[prompt], alpha=0.7)
        axes[0].set_xlabel("Success Rate")
        axes[0].set_ylabel("Density")
        axes[0].legend()
        
        # Posterior
        axes[1].set_title("Final Beta Distributions (Posterior)")
        for i, prompt in enumerate(prompts):
            y = beta.pdf(x, results.loc[i, "successes"] + 1, results.loc[i, "failures"] + 1)
            axes[1].plot(x, y, label=prompt, color=colors[prompt], alpha=0.7, linewidth=2)
        axes[1].set_xlabel("Success Rate")
        axes[1].set_ylabel("Density")
        axes[1].legend()
    else:
        # Evolution comparison
        plt.figure(figsize=(12, 6))
        if original_winner and original_alpha and original_beta:
            y_orig = beta.pdf(x, original_alpha, original_beta)
            label = f"{original_winner[:40]}... (Original)" if len(original_winner) > 40 else f"{original_winner} (Original)"
            plt.fill_between(x, y_orig, alpha=0.2, color='gray', label=label)
            plt.plot(x, y_orig, color='gray', alpha=0.4, linewidth=2, linestyle='--')
        
        for i, prompt in enumerate(results["prompt"]):
            if prompt != original_winner:
                y = beta.pdf(x, results.loc[i, "successes"] + 1, results.loc[i, "failures"] + 1)
                label = (prompt[:40] + "...") if len(prompt) > 40 else prompt
                plt.plot(x, y, label=label, alpha=0.7, linewidth=2)
        
        plt.title("Final Beta Distributions After Evolution")
        plt.xlabel("Success Rate")
        plt.ylabel("Density")
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    
    plt.tight_layout()
    plt.show()

In [None]:
import numpy as np
import pandas as pd
import random
from tqdm import tqdm

# ------------------------
# Setup
# ------------------------
prompts = [
    "Hi, how are you today?",
    "We have an amazing offer just for you!",
    "Did you know we can save you time and money?",
    "I'd love to show you something that could help your business.",
    "What's your biggest challenge right now?"
]

true_effectiveness = np.random.beta(2, 2, len(prompts))

results = pd.DataFrame({"prompt": prompts, "successes": np.zeros(len(prompts)), "failures": np.zeros(len(prompts))})

def get_reward(prompt_index, effectiveness_array):
    """Simulate customer response according to hidden effectiveness."""
    return 1 if random.random() < effectiveness_array[prompt_index] else 0

# ------------------------
# Thompson Sampling
# ------------------------
calls_per_day, days = 10, 30
n_rounds = calls_per_day * days
history = []

for t in tqdm(range(n_rounds), desc="Simulating calls"):
    samples = [np.random.beta(results.loc[i, "successes"] + 1, results.loc[i, "failures"] + 1) for i in range(len(prompts))]
    chosen = np.argmax(samples) # Choose which prompt to use
    reward = get_reward(chosen, true_effectiveness)
    
    results.loc[chosen, "successes" if reward == 1 else "failures"] += 1
    history.append({"round": t, "chosen_prompt": prompts[chosen], "reward": reward, "true_effectiveness": true_effectiveness[chosen]})

history_df = pd.DataFrame(history)

# ------------------------
# Visualization
# ------------------------
colors = plot_cumulative_success(history_df, prompts, "Cumulative Success Rate per Prompt (Simulated)")
plot_beta_distributions(results, prompts, colors, show_prior=True)

# Print results
results["estimated_rate"] = results["successes"] / (results["successes"] + results["failures"])
print("True effectiveness:", true_effectiveness)
print(results)

# Save winner for comparison
initial_best_idx = results["estimated_rate"].idxmax()
original_winner = prompts[initial_best_idx]
original_winner_alpha = results.loc[initial_best_idx, "successes"] + 1
original_winner_beta = results.loc[initial_best_idx, "failures"] + 1

In [None]:
from openai import OpenAI
import scipy.stats as sps
import scipy.integrate as spi

OPENAI_API_KEY = "your-api-key-here"
OPENAI_MODEL_NAME = "gpt-4.1-mini" # If you cannot do it with 4.1-mini, you are doing something wrong ;)
RETIREMENT_THRESHOLD = 0.05

# ------------------------
# Evolutionary Extension
# ------------------------
total_tokens_used = 0
all_used_prompts = set(prompts)

def mutate_prompt(base_prompt, existing_prompts):
    """Generate a variation using LLM."""
    global total_tokens_used
    client = OpenAI(api_key=OPENAI_API_KEY)
    
    for attempt in range(5):
        response = client.chat.completions.create(
            model=OPENAI_MODEL_NAME,
            messages=[
                {"role": "system", "content": "You are a helpful assistant that creates variations of sales prompts."},
                {"role": "user", "content": f"Create a single variation of this sales prompt: '{base_prompt}'. Avoid these: {existing_prompts}. Only respond with the new prompt."}
            ],
            max_tokens=50,
            temperature=0.8
        )
        total_tokens_used += response.usage.total_tokens
        new_prompt = response.choices[0].message.content.strip()
        if new_prompt not in existing_prompts:
            return new_prompt
    return f"{new_prompt} (v{attempt})"

def prob_beta_mean_higher_numerical(alpha1, beta1, alpha2, beta2):
    """
    Calculates the probability that a random variable drawn from the first
    Beta distribution is greater than a random variable drawn from the second
    Beta distribution. P(X1 > X2).
    """
    def integrand(x):
        return sps.beta.pdf(x, a=alpha1, b=beta1) * sps.beta.cdf(x, a=alpha2, b=beta2)
    prob_X1_gt_X2, _ = spi.quad(integrand, 0, 1)
    return prob_X1_gt_X2

# Initialize evolution
evolution_history = []
active_prompts = prompts.copy()
active_true_effectiveness = true_effectiveness.copy()

# Create enhanced evolution results DataFrame with all necessary columns
evolution_results = pd.DataFrame({
    "prompt": active_prompts,
    "successes": results["successes"].values.copy(),
    "failures": results["failures"].values.copy(),
    "alpha": results["successes"].values.copy() + 1,
    "beta": results["failures"].values.copy() + 1,
    "estimated_rate": 0.0,
    "ci_lower": 0.0,
    "ci_upper": 0.0
})

# Update derived columns
evolution_results["estimated_rate"] = evolution_results["alpha"] / (evolution_results["alpha"] + evolution_results["beta"])

for i in range(len(evolution_results)):
    evolution_results.loc[i, "ci_lower"] = sps.beta.ppf(0.05, evolution_results.loc[i, "alpha"], evolution_results.loc[i, "beta"])
    evolution_results.loc[i, "ci_upper"] = sps.beta.ppf(0.95, evolution_results.loc[i, "alpha"], evolution_results.loc[i, "beta"])

for t in tqdm(range(n_rounds), desc="Evolving prompts"):
    # Thompson Sampling: sample from each prompt's beta distribution
    samples = [np.random.beta(evolution_results.loc[i, "alpha"], evolution_results.loc[i, "beta"]) 
               for i in range(len(active_prompts))]
    chosen = np.argmax(samples)
    reward = get_reward(chosen, active_true_effectiveness)
    
    # Update successes/failures and derived columns
    evolution_results.loc[chosen, "successes"] += reward
    evolution_results.loc[chosen, "failures"] += (1 - reward)
    evolution_results.loc[chosen, "alpha"] = evolution_results.loc[chosen, "successes"] + 1
    evolution_results.loc[chosen, "beta"] = evolution_results.loc[chosen, "failures"] + 1
    evolution_results.loc[chosen, "estimated_rate"] = evolution_results.loc[chosen, "alpha"] / (evolution_results.loc[chosen, "alpha"] + evolution_results.loc[chosen, "beta"])
    evolution_results.loc[chosen, "ci_lower"] = sps.beta.ppf(0.05, evolution_results.loc[chosen, "alpha"], evolution_results.loc[chosen, "beta"])
    evolution_results.loc[chosen, "ci_upper"] = sps.beta.ppf(0.95, evolution_results.loc[chosen, "alpha"], evolution_results.loc[chosen, "beta"])

    evolution_history.append({
        "round": t + n_rounds,
        "chosen_prompt": active_prompts[chosen],
        "reward": reward,
        "true_effectiveness": active_true_effectiveness[chosen],
        "alpha": evolution_results.loc[chosen, "alpha"],
        "beta": evolution_results.loc[chosen, "beta"],
        "estimated_rate": evolution_results.loc[chosen, "estimated_rate"]
    })

    # Every day (after 10 calls), retire weak prompts
    if (t + 1) % 10 == 0:
        # Find the best prompt (highest mean success rate)
        best_idx = evolution_results["estimated_rate"].idxmax()
        best_alpha = evolution_results.loc[best_idx, "alpha"]
        best_beta = evolution_results.loc[best_idx, "beta"]
        best_mean = evolution_results.loc[best_idx, "estimated_rate"]
        
        retire_indices = []
        
        # Compare each prompt against the best prompt's beta distribution
        for i in range(len(active_prompts)):
            if i != best_idx:
                current_alpha = evolution_results.loc[i, "alpha"]
                current_beta = evolution_results.loc[i, "beta"]

                # Calculate P(current_mean > best_mean) using the numerical integration function
                # Note: Parameters are alpha1, beta1, alpha2, beta2.
                # We want P(current > best), so current's params go first, best's params go second.
                prob_current_greater_than_best = prob_beta_mean_higher_numerical(
                    alpha1=current_alpha,
                    beta1=current_beta,
                    alpha2=best_alpha,
                    beta2=best_beta
                )

                # Retire if there's less than a 5% chance that the current prompt is actually better than the best
                # This means we're 95% confident the current prompt is NOT better than the best.
                if prob_current_greater_than_best < RETIREMENT_THRESHOLD:
                    retire_indices.append(i)

        if len(retire_indices) > 0:
            best_prompt = active_prompts[best_idx]
            
            for idx in retire_indices:
                new_prompt = mutate_prompt(best_prompt, all_used_prompts)
                active_prompts[idx] = new_prompt
                all_used_prompts.add(new_prompt)
                
                # Perturb effectiveness
                best_effectiveness = active_true_effectiveness[best_idx]
                variation = np.random.uniform(-0.2, 0.2)
                active_true_effectiveness[idx] = np.clip(best_effectiveness * (1 + variation), 0, 1)
                
                # Inherit prior with uncertainty from best prompt
                best_total = evolution_results.loc[best_idx, "successes"] + evolution_results.loc[best_idx, "failures"]
                best_rate = evolution_results.loc[best_idx, "estimated_rate"]
                new_total = max(10, best_total * 0.2)
                
                new_successes = best_rate * new_total
                new_failures = new_total - new_successes
                
                # Update all columns for the new prompt
                evolution_results.loc[idx, "prompt"] = new_prompt
                evolution_results.loc[idx, "successes"] = new_successes
                evolution_results.loc[idx, "failures"] = new_failures
                evolution_results.loc[idx, "alpha"] = new_successes + 1
                evolution_results.loc[idx, "beta"] = new_failures + 1
                evolution_results.loc[idx, "estimated_rate"] = evolution_results.loc[idx, "alpha"] / (evolution_results.loc[idx, "alpha"] + evolution_results.loc[idx, "beta"])
                evolution_results.loc[idx, "ci_lower"] = sps.beta.ppf(0.05, evolution_results.loc[idx, "alpha"], evolution_results.loc[idx, "beta"])
                evolution_results.loc[idx, "ci_upper"] = sps.beta.ppf(0.95, evolution_results.loc[idx, "alpha"], evolution_results.loc[idx, "beta"])
# ------------------------
# Visualization: Evolution
# ------------------------
evolution_df = pd.DataFrame(evolution_history)
full_history = pd.concat([history_df, evolution_df], ignore_index=True)

plot_cumulative_success(full_history, None, "Cumulative Success Rate with Evolution", marker_size=2, evolution_start=n_rounds)

evolution_results["estimated_rate"] = evolution_results["successes"] / (evolution_results["successes"] + evolution_results["failures"])
print("\nFinal Active Prompts after Evolution:")
print(evolution_results)

plot_beta_distributions(evolution_results, None, None, show_prior=False, original_winner=original_winner, original_alpha=original_winner_alpha, original_beta=original_winner_beta)

print(f"\nTotal OpenAI tokens used: {total_tokens_used}")
print(f"Total unique prompts generated: {len(all_used_prompts)}")