In [1]:
%%capture
!apt-get install -y ffmpeg freeglut3-dev xvfb
!pip install setuptools
!pip install gymnasium shimmy
!pip install stable-baselines3
!pip install sentence-transformers
!pip install cerebras-cloud-sdk
!pip install gradio

In [2]:
import os
import time
import json
import random
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from cerebras.cloud.sdk import Cerebras
from sentence_transformers import SentenceTransformer, util
import gradio as gr



In [3]:
###############################
# SETUP
###############################

import json

CONFIG_FILE = "config.json"
with open(CONFIG_FILE, "r") as file:
    config = json.load(file)
    api_key = config.get("api_key")

client = Cerebras(api_key=api_key)


# Load the dataset
with open("questions_dataset.json", "r") as f:
    questions_data = json.load(f)

# Define prompt variants (actions)
prompt_variants = [
    "Please answer concisely: {question}",
    "Answer with step-by-step reasoning: {question}",
    "You are an expert, give a detailed explanation: {question}",
    "Provide the shortest possible correct answer: {question}",
    "Reframe the question in simpler terms and then answer: {question}"
]
num_actions = len(prompt_variants)

# Load a sentence-transformers model for semantic similarity
model_embeddings = SentenceTransformer('all-MiniLM-L6-v2')

  and should_run_async(code)
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [4]:
###############################
# HELPER FUNCTIONS
###############################

def make_key(question, prompt):
    """Create a unique key for the question-prompt pair for caching."""
    return question.strip() + "||" + prompt.strip()

def count_tokens(text):
    """Simple token counting heuristic based on whitespace."""
    return len(text.split())

def compute_reward(llm_answer, gold_answer):
    """Compute the reward using semantic similarity and a token penalty."""
    if llm_answer is None:
        return 0.0

    # Encode answers and compute similarity
    emb_answer = model_embeddings.encode(llm_answer, convert_to_tensor=True)
    emb_gold = model_embeddings.encode(gold_answer, convert_to_tensor=True)
    similarity = util.cos_sim(emb_answer, emb_gold).item()

    # Introduce a token penalty
    token_penalty = count_tokens(llm_answer) * 0.001
    reward = similarity - token_penalty

    # Ensure reward is not negative
    reward = max(0.0, reward)
    return reward

# LLM models to cycle through
models = ["llama3.1-70b", "llama-3.3-70b"]
model_index = 0

def query_llm(prompt):
    """Call the Cerebras LLM and return its response."""
    global model_index
    retry_attempts = 5
    while retry_attempts > 0:
        try:
            model = models[model_index]
            time.sleep(2)  # Delay to avoid rate limit issues
            chat_completion = client.chat.completions.create(
                messages=[{"role":"user","content":prompt}],
                model=model,
            )
            # Rotate model for load balancing
            model_index = (model_index + 1) % len(models)
            return chat_completion.choices[0].message.content
        except Exception as e:
            error_message = str(e)
            if "Requests per minute limit exceeded" in error_message:
                print(f"Rate limit exceeded for {model}. Retrying after 60 seconds...")
                time.sleep(60)
                retry_attempts -= 1
            else:
                print(f"Error: {e}")
                return None
    print("Rate limit retry attempts exhausted.")
    return None

In [5]:
###############################
# CACHING RESPONSES
###############################

# Try to load cached responses if available
try:
    with open("cached_responses.json", "r") as f:
        llm_responses = json.load(f)
except FileNotFoundError:
    llm_responses = {}

# Prefetch responses for all question-prompt pairs
for q in questions_data:
    for pv in prompt_variants:
        fp = pv.format(question=q["question"])
        key = make_key(q["question"], fp)
        if key not in llm_responses:
            ans = query_llm(fp)
            llm_responses[key] = ans
            with open("cached_responses.json", "w") as f:
                json.dump(llm_responses, f)

In [6]:
###############################
# CUSTOM GYM ENVIRONMENT (MULTI-STEP EPISODES)
###############################

class PromptEnv(gym.Env):
    metadata = {"render_modes":["human"]}

    def __init__(self, questions, prompt_variants, max_steps=3):
        """
        A custom environment where the agent chooses prompt variants for a question.
        The agent has multiple steps to refine the prompt. Each step yields a reward
        based on semantic similarity and token length.
        """
        super(PromptEnv, self).__init__()
        self.questions = questions
        self.prompt_variants = prompt_variants
        self.num_actions = len(prompt_variants)
        self.observation_space = spaces.Discrete(len(questions))
        self.action_space = spaces.Discrete(self.num_actions)
        self.max_steps = max_steps

        self.current_question_idx = None
        self.step_count = 0
        self.current_prompt = None

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.current_question_idx = self.np_random.integers(0, len(self.questions))
        self.step_count = 0
        self.current_prompt = None
        return self.current_question_idx, {}

    def step(self, action):
        q_obj = self.questions[self.current_question_idx]
        question = q_obj["question"]
        gold = q_obj["gold_answer"]

        chosen_prompt = self.prompt_variants[action].format(question=question)

        # If it's the first step, initialize the prompt; otherwise, refine it
        if self.step_count == 0:
            self.current_prompt = chosen_prompt
        else:
            self.current_prompt += "\n" + chosen_prompt

        key = make_key(question, self.current_prompt)
        llm_answer = llm_responses.get(key, None)

        # Compute the reward for this step
        reward = compute_reward(llm_answer, gold)

        self.step_count += 1
        done = (self.step_count >= self.max_steps)
        return self.current_question_idx, reward, done, False, {}

    def render(self):
        pass

In [7]:
###############################
# TEST AND TRAIN THE MODEL
###############################

env = PromptEnv(questions_data, prompt_variants, max_steps=3)

# Test a single step for sanity check
obs, info = env.reset()
action = env.action_space.sample()
obs, reward, done, truncated, info = env.step(action)
print("Test step reward:", reward)

# Train the PPO model
model = PPO("MlpPolicy", env, verbose=1, device="cpu")
model.learn(total_timesteps=5000)

def evaluate(model, env, n_episodes=20):
    """Evaluate the model by running multiple episodes and averaging the reward."""
    rewards = []
    for _ in range(n_episodes):
        obs, info = env.reset()
        done = False
        episode_reward = 0.0
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, r, done, truncated, _ = env.step(action)
            episode_reward += r
        rewards.append(episode_reward)
    return np.mean(rewards)

avg_reward = evaluate(model, env, n_episodes=50)
print("Average Reward after training:", avg_reward)


Test step reward: 0.09384522247314453
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 3        |
|    ep_rew_mean     | 0.455    |
| time/              |          |
|    fps             | 27       |
|    iterations      | 1        |
|    time_elapsed    | 74       |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 3           |
|    ep_rew_mean          | 0.578       |
| time/                   |             |
|    fps                  | 28          |
|    iterations           | 2           |
|    time_elapsed         | 144         |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.017274698 |
|    clip_fraction        | 0.278       |
|    clip_range         

In [8]:
###############################
# GRADIO DEMO
###############################
question_texts = [q["question"] for q in questions_data]
question_to_index = {q["question"]: i for i,q in enumerate(questions_data)}

def get_model_answer(selected_question):
    """
    For the selected question, use the trained model to pick one best action (prompt variant).
    Display the chosen prompt variant and retrieve the LLM's cached answer.
    """
    q_idx = question_to_index[selected_question]
    action, _ = model.predict(q_idx, deterministic=True)
    chosen_prompt_variant = prompt_variants[action]
    prompt = chosen_prompt_variant.format(question=selected_question)
    key = make_key(selected_question, prompt)
    llm_answer = llm_responses.get(key, "No response found.")
    return f"**Chosen Prompt Variant:** {chosen_prompt_variant}\n\n**LLM Answer:** {llm_answer}"

dropdown = gr.Dropdown(choices=question_texts, label="Select a Question", value=question_texts[0])
output = gr.Markdown()

interface = gr.Interface(fn=get_model_answer, inputs=dropdown, outputs=output,
                         title="RL-Optimized Prompt Demo (Semantic Similarity & Multi-Step)",
                         description="Select a question. The RL agent (trained with semantic similarity reward and multi-step episodes) chooses a prompt variant. The LLM's answer is shown.")

interface.launch()

  from websockets.server import WebSocketServerProtocol


Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://be9b39aa1491338fef.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


