diff --git a/README.md b/README.md
index 47f4aa83..f17049f5 100644
--- a/README.md
+++ b/README.md
@@ -490,12 +490,16 @@ optillm supports various command-line arguments for configuration. When using Do
| `--cepo_planning_m` | Number of attempts to generate n plans in planning stage | 6 |
| `--cepo_planning_temperature_step1` | Temperature for generator in step 1 of planning stage | 0.55 |
| `--cepo_planning_temperature_step2` | Temperature for generator in step 2 of planning stage | 0.25 |
+| `--cepo_planning_temperature_direct_resp` | Temperature for generator after step 2 if planning fails and answer directly | 0.1 |
| `--cepo_planning_temperature_step3` | Temperature for generator in step 3 of planning stage | 0.1 |
| `--cepo_planning_temperature_step4` | Temperature for generator in step 4 of planning stage | 0 |
| `--cepo_planning_max_tokens_step1` | Maximum number of tokens in step 1 of planning stage | 4096 |
| `--cepo_planning_max_tokens_step2` | Maximum number of tokens in step 2 of planning stage | 4096 |
+| `--cepo_planning_max_tokens_direct_resp` | Maximum number of tokens after step 2 if planning fails and answer directly | 4096 |
| `--cepo_planning_max_tokens_step3` | Maximum number of tokens in step 3 of planning stage | 4096 |
| `--cepo_planning_max_tokens_step4` | Maximum number of tokens in step 4 of planning stage | 4096 |
+| `--cepo_use_reasoning_fallback` | Whether to fallback to lower levels of reasoning when higher level fails | False |
+| `--cepo_num_of_retries` | Number of retries if llm call fails, 0 for no retries | 0 |
| `--cepo_print_output` | Whether to print the output of each stage | `False` |
| `--cepo_config_file` | Path to CePO configuration file | `None` |
| `--cepo_use_plan_diversity` | Use additional plan diversity step | `False` |
@@ -584,6 +588,19 @@ Authorization: Bearer your_secret_api_key
¹ Numbers in parentheses for LongCePO indicate accuracy of majority voting from 5 runs.
+### CePO on math and code benchmarks (Sep 2025)
+
+| Method | AIME 2024 | AIME 2025 | GPQA | LiveCodeBench |
+| ----------------------: | :-------: | :-------: | :----: | :-----------: |
+| Qwen3 8B | 74.0 | 68.3 | 59.3 | 55.7 |
+| CePO (using Qwen3 8B) | 86.7 | 80.0 | 62.5 | 60.5 |
+| Qwen3 32B | 81.4 | 72.9 | 66.8 | 65.7 |
+| CePO (using Qwen3 32B) | **90.7** | **83.3** | 70.0 | **71.9** |
+| Qwen3 235B | 85.7 | 81.5 | 71.1 | 70.7 |
+| DeepSeek R1 | 79.8 | 70.0 | 71.5 | 64.3 |
+| OpenAI o3-mini | 79.6 | 74.8 | 76.8 | 66.3 |
+| Grok3 Think | 83.9 | 77.3 |**80.2**| 70.6 |
+
### CePO on math and code benchmarks (Mar 2025)
| Method | Math-L5 | MMLU-Pro (Math) | CRUX | LiveCodeBench (pass@1) | Simple QA |
diff --git a/optillm/cepo/README.md b/optillm/cepo/README.md
index baa0b06b..bb7b949e 100644
--- a/optillm/cepo/README.md
+++ b/optillm/cepo/README.md
@@ -23,6 +23,19 @@ The model reviews all generated solution proposals and their associated plans, i
**Step 4**: Final Solution
The model uses the refined plan from Step 3 to produce the final answer.
+## Example Usage
+
+Here’s an example of running Optillm using the CePO method for Qwen3 deployed with VLLM on port 8001:
+
+```bash
+OPENAI_API_KEY=serving-on-vllm \
+python optillm.py \
+ --base-url http://localhost:8001/v1 \
+ --approach cepo \
+ --port 8000 \
+ --cepo_config_file ./optillm/cepo/cepo_configs/cepo_qwen3.yaml
+```
+
## CePO Current Status
This project is a work in progress, and the provided code is in an early experimental stage. While the proposed approach works well across the benchmarks we tested, further improvements can be achieved by task-specific customizations to prompts.
\ No newline at end of file
diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py
index 3f098bcf..c9a7c2cf 100644
--- a/optillm/cepo/cepo.py
+++ b/optillm/cepo/cepo.py
@@ -2,12 +2,17 @@
import yaml
import json
import optillm
-from optillm import conversation_logger
+import time
+import math_verify
+from optillm import conversation_logger
+from collections import Counter
+from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Literal, Any, Optional
from cerebras.cloud.sdk import BadRequestError as CerebrasBadRequestError
from openai import BadRequestError as OpenAIBadRequestError
+from openai import InternalServerError as OpenAIInternalServerError
@dataclass
@@ -15,22 +20,102 @@ class CepoConfig:
bestofn_n: int # number of responses to be generated in best of n stage
bestofn_temperature: float # temperature for verifier in best of n stage
bestofn_max_tokens: int # maximum number of tokens for verifier in best of n stage
- bestofn_rating_type: Literal["absolute", "pairwise"] # type of rating in best of n stage
+ bestofn_rating_type: Literal["absolute", "pairwise", "majority"] # type of rating in best of n stage
planning_n: int # number of plans generated in planning stage
planning_m: int # number of attempts to generate n plans in planning stage
planning_temperature_step1: float # temperature for generator in step 1 of planning stage
planning_temperature_step2: float # temperature for generator in step 2 of planning stage
+ planning_temperature_direct_resp: float # temperature for generator after step 2 if planning fails and answer directly
planning_temperature_step3: float # temperature for generator in step 3 of planning stage
planning_temperature_step4: float # temperature for generator in step 4 of planning stage
planning_max_tokens_step1: int # maximum number of tokens in step 1 of planning stage
planning_max_tokens_step2: int # maximum number of tokens in step 2 of planning stage
+ planning_max_tokens_direct_resp: float # maximum number of tokens after step 2 if planning fails and answer directly
planning_max_tokens_step3: int # maximum number of tokens in step 3 of planning stage
planning_max_tokens_step4: int # maximum number of tokens in step 4 of planning stage
use_plan_diversity: bool # whether to use plan diversity
+ use_reasoning_fallback: bool # whether to fallback to lower levels of reasoning when higher level fails
+ num_of_retries: int # number of retries if llm call fails, 0 for no retries
rating_model: Optional[str] = None # model to be used for rating
print_output: bool = False # whether to print the output of each stage
+MCQ_PATTERNS = [
+ # 0)"**Answer:** A" or "*Answers* – B", i.e. markdown‐wrapped "Answer(s)" with an unwrapped letter.
+ re.compile(
+ r'''(?ix) # case‐insensitive, ignore‐space
+ (?:\*{1,2}|_{1,2}) # leading *…* or _…_
+ Answer[s]? # Answer or Answers
+ \s*[:\-–]? # optional separator
+ (?:\*{1,2}|_{1,2}) # closing wrapper
+ \s* # optional space
+ ([ABCD])\b # the actual letter
+ ''',
+ re.X
+ ),
+
+ # 0.1)
+ re.compile(r'''(?ix) # ignore case, allow verbose mode
+ ^\s* # optional leading whitespace
+ (?:\*{1,2}|_{1,2})? # optional markdown wrapper
+ Answer:? # the word 'answer' with an optional colon
+ (?:\*{1,2}|_{1,2})? # optional markdown wrapper again
+ \s*:?\s* # optional colon with optional spaces
+ (?:\*{1,2}|_{1,2})? # optional markdown wrapper before letter
+ ([ABCD]) # capture the letter
+ (?:\*{1,2}|_{1,2})? # optional markdown wrapper after letter
+ \s* # optional trailing whitespace, end of line
+ ''', re.MULTILINE),
+
+ # 1) Answer: (C) or Answers: (B)
+ re.compile(r'(?ix)\bAnswer[s]?\b\s*[:\-–]?\s*\(\s*([ABCD])\s*\)'),
+
+ # 2) Answer: C or Answers – D
+ re.compile(r'(?ix)\bAnswer[s]?\b\s*[:\-–]?\s*([ABCD])\b'),
+
+ # 3) Option B or Choice: C
+ re.compile(r'(?ix)\b(?:Option|Choice)\b\s*[:\-–]?\s*([ABCD])\b'),
+
+ # 7) LaTeX \boxed{...A...}, catches both \boxed{A} and
+ # \boxed{\text{A } 2.08\times10^{-6}\,\mathrm{m}} etc.
+ re.compile(r'(?x)\\boxed\{[^}]*?([ABCD])[^}]*\}', re.MULTILINE),
+
+ # 7.5) LaTeX \boxed{\textbf{...C...}}
+ re.compile(r'(?x)\\boxed\{[^}]*?\\textbf\{[^}]*?([ABCD])[^}]*\}[^}]*\}', re.MULTILINE),
+
+ # 7.51) LaTeX \boxed{\text{...C...}}
+ re.compile(r'(?x)\\boxed\{[^}]*?\\text\{[^}]*?([ABCD])[^}]*\}[^}]*\}', re.MULTILINE),
+
+ # 4) bare singletons: (A) [B]
+ re.compile(r'(?x)(? CepoConfig:
# get the command line arguments
@@ -52,7 +137,8 @@ def init_cepo_config(cmd_line_args: dict) -> CepoConfig:
def extract_question_only(task: str) -> str:
- """We noticed that sometimes if the task includes specific formatting instructions, they may interfere with the reasoning flow. This
+ """
+ We noticed that sometimes if the task includes specific formatting instructions, they may interfere with the reasoning flow. This
is a temporary workaround to extract the question only from the task. Work in progress.
"""
question_only = task.replace('\n## Question: \n\n', '')
@@ -60,6 +146,179 @@ def extract_question_only(task: str) -> str:
return question_only
+def remove_think_section(response):
+ """
+ Remove a ... block from the response text, if present.
+
+ Args:
+ response (str): Raw model output.
+
+ Returns:
+ str: Response without the section, or an empty string if input
+ is invalid or empty.
+ """
+ if not isinstance(response, str) or not response:
+ return ""
+ if not response.startswith("") and "" not in response:
+ return response
+ match = re.search(r"\s*(.*)", response, re.DOTALL)
+ if match:
+ parsed_response = match.group(1)
+ return parsed_response
+ else:
+ return response
+
+
+def extract_llm_response(response):
+ """
+ Extract text content and finish reason from an LLM response.
+
+ Supports both non-streaming responses (dict-like with `.choices[0].message.content`)
+ and streaming responses (iterable of chunks with `.choices[0].delta.content`).
+
+ Args:
+ response: LLM response object or streaming generator.
+
+ Returns:
+ Tuple[str, Optional[str]]:
+ - Extracted text content (stripped).
+ - Finish reason (or None if unavailable).
+ """
+ # Case 1: non-streaming response (dict-like object)
+ if hasattr(response, "choices") and hasattr(response.choices[0], "message"):
+ content = response.choices[0].message.content
+ if content:
+ content = content.strip()
+ finish_reason = getattr(response.choices[0], "finish_reason", None)
+ return content, finish_reason
+
+ # Case 2: streaming response (generator)
+ full_content = ""
+ finish_reason = None
+ for chunk in response:
+ delta = chunk.choices[0].delta
+ if hasattr(delta, "content") and delta.content:
+ full_content += delta.content
+ if chunk.choices[0].finish_reason is not None:
+ finish_reason = chunk.choices[0].finish_reason
+ return full_content.strip(), finish_reason
+
+
+def llm_call(
+ client: Any,
+ provider_request: dict,
+ cepo_config: CepoConfig
+) -> tuple[str, str, int]:
+ """
+ Call the LLM with retries on transient errors.
+
+ Makes a chat completion request to the given client and extracts the response.
+ Retries up to 2 times on 400/500 errors with exponential backoff.
+
+ Args:
+ client (Any): LLM API client instance.
+ provider_request (dict): LMM call params.
+
+ Returns:
+ tuple[str, str, int]:
+ - response_text: Model output (post-processed, never None).
+ - finish_reason: Why generation stopped.
+ - completion_tokens: Number of tokens generated.
+ """
+ retries = cepo_config.num_of_retries + 1 # total attempts = retries + 1 initial call
+ for attempt in range(retries):
+ try:
+ response_object = client.chat.completions.create(
+ stream=False,
+ **provider_request
+ )
+ response_text, finish_reason = extract_llm_response(response_object)
+ completion_tokens = getattr(getattr(response_object, "usage", None), "completion_tokens", 0) or 0
+ response_text = response_text or "" # Normalize None to ""
+ if response_text is not None:
+ response_text = remove_think_section(response_text)
+ return response_text, finish_reason, completion_tokens
+
+ except (OpenAIBadRequestError, OpenAIInternalServerError) as e:
+ # Retry on 400 or 500
+ if attempt < retries - 1:
+ sleep_time = 0.2 * (attempt + 1)
+ print(f"Got {e.__class__.__name__}, retrying in {sleep_time:.1f}s...")
+ time.sleep(sleep_time)
+ continue
+ raise
+
+
+def llm_call_reason_effort_fallback(
+ client: Any,
+ provider_request: dict,
+ reasoning_effort_levels: list,
+ cepo_config: CepoConfig
+) -> tuple[Optional[Any], str, int]:
+ """
+ Call LLM with fallback on reasoning effort levels.
+
+ This function wraps `llm_call` with retry and degradation logic to handle
+ two main classes of errors:
+
+ 1. **Incomplete generation (finish_reason = "length")**:
+ - The model returns a response object but does not finish generation
+ (e.g., truncated output).
+ - In this case, the reasoning effort is reduced, and another attempt
+ is made with lower levels.
+
+ 2. **Server/validation errors (e.g., 400 BadRequest, 500 InternalServerError)**:
+ - Often caused by gpt-oss's "expected output number" error, which cannot be
+ fully recovered within the current API.
+ - The function retries once, and if the error persists, reasoning effort
+ is degraded to try again at lower levels.
+
+ The fallback sequence continues until either:
+ - A valid response is obtained (not truncated and not `None`), or
+ - All reasoning effort levels are exhausted, in which case the last
+ attempted result (possibly `None`) is returned.
+
+ Args:
+ client (Any): LLM API client instance used for making calls.
+ provider_request (dict): LMM call params.
+ reasoning_effort_levels (list): Ordered list of reasoning effort levels
+ (e.g., ["high", "medium", "low"]) to try in fallback.
+
+ Returns:
+ tuple:
+ - response: The LLM response object, or `None` if all attempts failed.
+ - finish_reason (str): Reason why generation finished ("stop",
+ "length", "error", etc.).
+ - completion_tokens (int): Number of tokens generated in the final attempt.
+
+ Notes:
+ - This function prints diagnostic information when degrading reasoning effort.
+ - For persistent server-side issues (400/500), degradation is attempted
+ automatically, but a permanent fix may require upstream changes
+ (see https://github.com/pydantic/pydantic-ai/issues/2449).
+ """
+ if not cepo_config.use_reasoning_fallback:
+ reasoning_effort_levels = ["high"]
+ for effort in reasoning_effort_levels:
+ try:
+ # Try with the current reasoning effort level
+ provider_request["reasoning_effort"] = effort
+ response, finish_reason, completion_tokens = llm_call(
+ client=client,
+ provider_request=provider_request,
+ cepo_config=cepo_config
+ )
+ if response is not None and finish_reason != "length":
+ return response, finish_reason, completion_tokens
+ print(f"Reasoning fallback from {effort}, to lower one")
+ except (OpenAIBadRequestError, OpenAIInternalServerError) as e:
+ # After 2 retries at this reasoning effort level it failed with error 400/500, lower level
+ print("400/500 persisted after retries at reasoning effort", effort, "→ degrading")
+ continue
+
+ return None, "error", 0
+
+
def generate_completion(system_prompt: str, task: str, client: Any, model: str, cepo_config: CepoConfig, approach: Optional[str] = None, request_id: str = None) -> str:
"""
Generates a completion based on the provided system prompt and task.
@@ -80,147 +339,229 @@ def generate_completion(system_prompt: str, task: str, client: Any, model: str,
cb_log = {}
plans = []
- for i in range(cepo_config.planning_m): # m is the maximum number of attempts to generate n plans
- # Step 1 - Generate a plan
- if cepo_config.use_plan_diversity:
- assert approach
- assert isinstance(approach, str)
- content = f"To answer this question, can you come up with a concise plan using to solve it step-by-step but do not provide the "\
- f"final answer. Here is the approach you need to follow to generate the plan: {approach}. "\
- f"Also, for each step, provide your confidence in the correctness of that step as well as your ability "\
- f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}"
+ def generate_single_plan(i):
+ local_cb_log = {}
+ local_completion_tokens = 0
+
+ if cepo_config.planning_max_tokens_step1 != 0:
+ if cepo_config.use_plan_diversity:
+ assert approach
+ content = (
+ f"To answer this question, can you come up with a concise plan using to solve it step-by-step but do not provide the "
+ f"final answer. Here is the approach you need to follow to generate the plan: {approach}. "
+ f"Also, for each step, provide your confidence in the correctness of that step as well as your ability "
+ f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}"
+ )
+ else:
+ assert not approach
+ content = (
+ f"To answer this question, can you come up with a concise plan to solve it step-by-step but do not provide the "
+ f"final answer. Also, for each step, provide your confidence in the correctness of that step as well as your ability "
+ f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}"
+ )
+
+ messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": content}]
+
+ provider_request = {
+ "model": model,
+ "messages": messages,
+ "max_tokens": cepo_config.planning_max_tokens_step1,
+ "temperature": cepo_config.planning_temperature_step1,
+ "top_p": 1.0
+ }
+
+ response, finish_reason, completion_tokens = llm_call_reason_effort_fallback(
+ client=client,
+ provider_request=provider_request,
+ reasoning_effort_levels=["high", "medium"],
+ cepo_config=cepo_config
+ )
+ local_completion_tokens += completion_tokens
+ # Log provider call if conversation logging is enabled
+ if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
+ response_dict = response.model_dump() if hasattr(response, 'model_dump') else response
+ optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
+
+ if finish_reason == "length":
+ return i, None, local_completion_tokens, {f"messages_planning_{i}_rejected_due_to_length": messages}
+ parsed_plan = response
else:
- assert not approach
- content = f"To answer this question, can you come up with a concise plan to solve it step-by-step but do not provide the "\
- f"final answer. Also, for each step, provide your confidence in the correctness of that step as well as your ability "\
- f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}"
+ messages = []
+ parsed_plan = ""
+
+ # Step 2 – Execute plan
+ if cepo_config.planning_max_tokens_step1 != 0:
+ messages.append({"role": "assistant", "content": parsed_plan})
+ messages.append({"role": "user", "content": "Can you execute the above plan step-by-step to produce the final answer. Be extra careful when executing steps where your confidence is lower. /think"})
+ else:
+ messages.append({"role": "user", "content": f"Can you solve this problem step-by-step to produce the final answer? Here is the question:\n{question_only}\nRead the question again:\n\n{question_only} /think"})
- messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": content}]
-
- # Prepare request for logging
provider_request = {
- "model": model,
- "messages": messages,
- "max_tokens": cepo_config.planning_max_tokens_step1,
- "temperature": cepo_config.planning_temperature_step1,
- "stream": False,
- }
-
- response = client.chat.completions.create(**provider_request)
+ "model": model,
+ "messages": messages,
+ "max_tokens": cepo_config.planning_max_tokens_step1,
+ "temperature": cepo_config.planning_temperature_step1,
+ "top_p": 1.0
+ }
+ response, finish_reason, completion_tokens = llm_call_reason_effort_fallback(
+ client=client,
+ provider_request=provider_request,
+ reasoning_effort_levels=["high", "medium"],
+ cepo_config=cepo_config
+ )
+ local_completion_tokens += completion_tokens
+
# Log provider call if conversation logging is enabled
if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
response_dict = response.model_dump() if hasattr(response, 'model_dump') else response
optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
- completion_tokens += response.usage.completion_tokens
- if response.choices[0].finish_reason == "length":
- # Skipping plan generation due to exceeding the token budget. Usually it means the plan is incomplete.
- continue
+ if finish_reason == "length":
+ return i, None, local_completion_tokens, {f"messages_planning_{i}_rejected_due_to_length": messages}
- # Step 2 - Execute the plan
- content = f"Can you execute the above plan step-by-step to produce the final answer. "\
- f"Be extra careful when executing steps where your confidence is lower."
- messages.extend([{"role": "assistant", "content": response.choices[0].message.content}, {"role": "user", "content": content}])
-
- # Prepare request for logging
- provider_request = {
- "model": model,
- "messages": messages,
- "max_tokens": cepo_config.planning_max_tokens_step2,
- "temperature": cepo_config.planning_temperature_step2,
- "stream": False,
- }
-
- response = client.chat.completions.create(**provider_request)
-
- # Log provider call if conversation logging is enabled
- if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
- response_dict = response.model_dump() if hasattr(response, 'model_dump') else response
- optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
- completion_tokens += response.usage.completion_tokens
+ parsed_exec = response
+ messages.append({"role": "assistant", "content": parsed_exec})
+ local_cb_log[f"messages_planning_{i}"] = messages
+ return i, parsed_exec, local_completion_tokens, local_cb_log
- if response.choices[0].finish_reason == "length":
- messages.append({"role": "assistant", "content": response.choices[0].message.content})
- cb_log[f"messages_planning_{i}_rejected_due_to_length"] = messages
- if cepo_config.print_output:
- print(f"\nCePO: Plan proposal rejected due to length. Attempt {i + 1} out of {cepo_config.planning_m}.\nMessages: {messages}")
- continue
+ # Step 1 & 2: Parallel planning + execution
+ with ThreadPoolExecutor(max_workers=cepo_config.planning_m) as executor:
+ futures = [executor.submit(generate_single_plan, i) for i in range(cepo_config.planning_m)]
- plans.append(response.choices[0].message.content)
- messages.append({"role": "assistant", "content": response.choices[0].message.content})
- cb_log[f"messages_planning_{i}"] = messages
- if cepo_config.print_output:
- print(f"\nCePO: Plan proposal generated. Attempt {i + 1} out of {cepo_config.planning_m}.\nMessages: {messages}")
-
- if len(plans) == cepo_config.planning_n:
- break
+ for future in as_completed(futures):
+ i, plan, tokens_used, log_entry = future.result()
+ completion_tokens += tokens_used
+ cb_log.update(log_entry)
+ if plan:
+ plans.append((i, plan))
+ if cepo_config.print_output:
+ print(f"\nCePO: Plan proposal generated. Attempt {i + 1} out of {cepo_config.planning_m}.\n")
+ if len(plans) == cepo_config.planning_n:
+ break
- if not plans:
- # If no plans were generated succesfully, take the last one even if it was rejected due to length
- plans.append(response.choices[0].message.content)
- messages.append({"role": "assistant", "content": response.choices[0].message.content})
- cb_log[f"messages_planning_{i}_no_plans_so_taking_the_last_one"] = messages
- if cepo_config.print_output:
- print(f"\nCePO: No plans generated successfully. Taking the last one from rejected due to length.\nMessages: {messages}")
+ plans = [plan for _, plan in sorted(plans)] # keep original order
- # Step 3 - Review and address inconsistencies
- try:
- plans_message = ""
- for i, plan in enumerate(plans):
- plans_message += f"Response {i + 1}:\n{plan}\n\n"
- plans_message = plans_message[:-2] # remove the last 2x newline
- content = f"Can you review your last {len(plans)} responses and identify any inconsistency between them. After that, can you address "\
- f"it and present a final step-by-step solution to the problem? Here is the question:\n{question_only}"
- messages = [{"role": "assistant", "content": plans_message}, {"role": "user", "content": content}]
+ if not plans:
+ # If no plans were generated, attempt to answer directly
+ messages = [
+ {"role": "user", "content": question_only},
+ ]
- # Prepare request for logging
provider_request = {
"model": model,
"messages": messages,
- "max_tokens": cepo_config.planning_max_tokens_step3,
- "temperature": cepo_config.planning_temperature_step3,
- "stream": False,
+ "max_tokens": cepo_config.planning_max_tokens_step2_direct,
+ "temperature":cepo_config.planning_temperature_step2_direct,
+ "top_p": 0.95,
+ "reasoning_effort_levels": ["high", "medium", "low"]
}
-
- response = client.chat.completions.create(**provider_request)
-
+
+ response, finish_reason, completion_tokens = llm_call_reason_effort_fallback(
+ client=client,
+ provider_request=provider_request,
+ cepo_config=cepo_config
+ )
+ local_completion_tokens += completion_tokens
+
# Log provider call if conversation logging is enabled
if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
response_dict = response.model_dump() if hasattr(response, 'model_dump') else response
- optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
- final_solution = response.choices[0].message.content
- completion_tokens += response.usage.completion_tokens
- except (CerebrasBadRequestError, OpenAIBadRequestError) as e:
- # In case of an error, take the first plan as the final solution
- final_solution = plans[0]
- messages = []
+ optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
+
+ if response is None or finish_reason == "length":
+ print("Direct answer failed, empty response or length")
+ response = ""
+ messages.append({"role": "assistant", "content": response})
- # Step 4 - Answer the question
- content = f"Use your final solution from above to correctly answer the question. Here is the question:\n{task}"
- messages = [{"role": "assistant", "content": final_solution}, {"role": "user", "content": content}]
+ plans.append(response)
+ cb_log[f"messages_planning_fallback_used"] = messages
+ if cepo_config.print_output:
+ print(f"\nCePO: No plans generated successfully. Taking the fallback.\n")
- # Prepare request for logging
- provider_request = {
- "model": model,
- "messages": messages,
- "max_tokens": cepo_config.planning_max_tokens_step4,
- "temperature": cepo_config.planning_temperature_step4,
- "stream": False,
- }
+ # Step 3 - Review and consolidate plans
+ plans_message = ""
+ for i, plan in enumerate(plans):
+ plans_message += f"Response {i + 1}:\n{plan}\n\n"
+ plans_message = plans_message.rstrip()
+
+ content = f"Can you review your last {len(plans)} responses and identify any inconsistency between them. After that, can you address "\
+ f"it and present a final step-by-step solution to the problem? Here is the question:\n{question_only} /think"
+
+ user_content = f"Previous responses to review:\n\n{plans_message}\n\n{content}"
+ messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}]
- response = client.chat.completions.create(**provider_request)
+ provider_request = {
+ "model": model,
+ "messages": messages,
+ "max_tokens": cepo_config.planning_max_tokens_step1,
+ "temperature": cepo_config.planning_temperature_step1,
+ "top_p": 1.0
+ }
+ response, finish_reason, completion_tokens_ = llm_call_reason_effort_fallback(
+ client=client,
+ provider_request=provider_request,
+ reasoning_effort_levels=["high", "medium"],
+ cepo_config=cepo_config
+ )
+ completion_tokens += completion_tokens_
+
# Log provider call if conversation logging is enabled
if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
response_dict = response.model_dump() if hasattr(response, 'model_dump') else response
optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
- completion_tokens += response.usage.completion_tokens
+
+ if response is None or finish_reason == "length":
+ print("Step 3 failed and only taking plans[0]")
+ final_solution = plans[0]
+ else:
+ completion_tokens += completion_tokens
+ final_solution = response
+ messages.append({"role": "assistant", "content": final_solution})
+
+ # Step 4 – Final answer
+ if cepo_config.planning_max_tokens_step4 != 0:
+ content = f"Use your final solution from above to correctly answer the question. Here is the question:\n{task} /think"
+ messages = [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": f"Here's my final solution: {final_solution}\n\nNow {content}"}
+ ]
+
+ provider_request = {
+ "model": model,
+ "messages": messages,
+ "max_tokens": cepo_config.planning_max_tokens_step1,
+ "temperature": cepo_config.planning_temperature_step1,
+ "top_p": 1.0
+ }
+
+ response, finish_reason, completion_tokens_ = llm_call_reason_effort_fallback(
+ client=client,
+ provider_request=provider_request,
+ reasoning_effort_levels=["high", "medium"],
+ cepo_config=cepo_config
+ )
+ completion_tokens += completion_tokens_
+
+ # Log provider call if conversation logging is enabled
+ if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
+ response_dict = response.model_dump() if hasattr(response, 'model_dump') else response
+ optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
+
+ if response is None or finish_reason == "length":
+ print("Step 4 failed and only taking step 3 output")
+ final_output = final_solution
+ else:
+ final_output = response
+ else:
+ final_output = final_solution
cb_log["messages"] = messages
if cepo_config.print_output:
- print(f"\nCePO: Answer generated.\nMessages: {messages}")
- return response.choices[0].message.content, completion_tokens, cb_log
+ print(f"\nCePO: Answer generated for one bestofn_n attempt.")
+
+ return final_output, completion_tokens, cb_log
def generate_approaches(system_prompt: str, initial_query: str, num_approach: int, client: Any, model: str, cepo_config: CepoConfig, max_retry: int = 2, request_id: str = None) -> tuple[list[str], int]:
@@ -279,7 +620,7 @@ def generate_approaches(system_prompt: str, initial_query: str, num_approach: in
return approaches, completion_tokens
-def generate_n_completions(system_prompt: str, initial_query: str, client: Any, model: str, cepo_config: CepoConfig) -> tuple[list[str], int, dict]:
+def generate_n_completions(system_prompt: str, initial_query: str, client: Any, model: str, cepo_config: CepoConfig, request_id: str) -> tuple[list[str], int, dict]:
"""
Generates n completions for the Best of N step of CePO.
@@ -297,7 +638,7 @@ def generate_n_completions(system_prompt: str, initial_query: str, client: Any,
cb_log = {}
cb_log["system_prompt"] = system_prompt
cb_log["initial_query"] = initial_query
- completions = []
+ completions = [None] * cepo_config.bestofn_n
approaches = None
# Generate Approach and Descriptions
@@ -316,24 +657,35 @@ def generate_n_completions(system_prompt: str, initial_query: str, client: Any,
if cepo_config.print_output:
print(f"\nCePO: Plan diversity approaches ({cepo_config.bestofn_n}):\n{approaches}\n")
- for i in range(cepo_config.bestofn_n):
+ def run_single_completion(i):
if cepo_config.print_output:
print(f"\nCePO: Generating completion {i + 1} out of {cepo_config.bestofn_n} \n")
approach = approaches[i] if approaches else None
response_i, completion_tokens_i, cb_log_i = generate_completion(system_prompt, initial_query, client, model, cepo_config, approach, request_id)
- completions.append(response_i)
- completion_tokens += completion_tokens_i
- cb_log[f"completion_{i}_response"] = response_i
- cb_log[f"completion_{i}_log"] = cb_log_i
- cb_log[f"completion_{i}_completion_tokens"] = completion_tokens_i
+ return i, response_i, completion_tokens_i, cb_log_i
+
+ # Run in parallel
+ with ThreadPoolExecutor(max_workers=cepo_config.bestofn_n) as executor:
+ futures = [executor.submit(run_single_completion, i) for i in range(cepo_config.bestofn_n)]
+ for future in as_completed(futures):
+ i, response_i, tokens_i, cb_log_i = future.result()
+ completions[i] = response_i
+ completion_tokens += tokens_i
+ cb_log[f"completion_{i}_response"] = response_i
+ cb_log[f"completion_{i}_log"] = cb_log_i
+ cb_log[f"completion_{i}_completion_tokens"] = tokens_i
+
+ if cepo_config.print_output:
+ print(f"\nCePO: All Answers generated!")
+ completions = [c if isinstance(c, str) else "" for c in completions]
return completions, completion_tokens, cb_log
def rate_completions_absolute(system_prompt: str, initial_query: str, client: Any, model: str, completions: list[str], cepo_config: CepoConfig, cb_log: dict, request_id: str = None) -> tuple[str, int, dict]:
"""
Rates completions for the Best of N step of CePO. Each completion is rated on a scale of 1 to 10 individually.
-
+
Parameters:
system_prompt (str): The system prompt to guide the model.
initial_query (str): The task or question to be addressed.
@@ -346,7 +698,7 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An
Tuple[str, int, dict]: The generated completion, number of tokens used, and a log dictionary.
"""
completion_tokens = 0
- content = "Please act as an impartial judge and evaluate the accuracy of the response provided by an AI assistant to "\
+ rating_prompt = "Please act as an impartial judge and evaluate the accuracy of the response provided by an AI assistant to "\
"the user question displayed below. Your evaluation should consider only correctness and accuracy as the primary factor. "\
"Evaluation Criteria:\n"\
"- Correctness: How free is it from errors or mistakes?\n"\
@@ -362,33 +714,42 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An
"Be as objective as possible. After providing your detailed explanation, "\
"please rate the response as 0 or 1, (0 for incorrect and 1 for correct) by strictly following this format: "\
"\"Rating: [[rating]]\", for example: \"Rating: [[0]]\""
- rating_messages = [{"role": "system", "content": system_prompt + content},
- {"role": "user", "content": initial_query}]
+
+ rating_format_instruction = "\n\nRate the above response beginning with the detailed explanation followed by a rating of 0 or 1 by strictly following this format: \"Explanation: \n\nRating: [[rating]]\""
ratings = []
for i, completion in enumerate(completions):
- rating_messages.append({"role": "assistant", "content": completion})
- content = "Rate the above response beginning with the detailed explanation followed by a rating of 0 or 1 "\
- "by strictly following this format: \"Explanation: \n\nRating: [[rating]]\"."
- rating_messages.append({"role": "user", "content": content})
+ # Create a fresh conversation with proper role alternation for each completion
+ system_content = f"USER QUESTION: {initial_query}\n\nRESPONSE: {completion}"
+ rating_messages = [
+ {"role": "system", "content": system_prompt + "\n\n" + rating_prompt},
+ {"role": "user", "content": system_content + rating_format_instruction}
+ ]
# Prepare request for logging
provider_request = {
"model": model,
"messages": rating_messages,
"max_tokens": cepo_config.bestofn_max_tokens,
- "temperature": cepo_config.bestofn_temperature
+ "temperature": cepo_config.bestofn_temperature,
+ "top_p": 1.0
}
rating_response = client.chat.completions.create(**provider_request)
-
+ rating_response, _, completion_tokens_ = llm_call_reason_effort_fallback(
+ client=client,
+ provider_request=provider_request,
+ reasoning_effort_levels=["high", "medium"],
+ cepo_config=cepo_config
+ )
+
# Log provider call if conversation logging is enabled
if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
response_dict = rating_response.model_dump() if hasattr(rating_response, 'model_dump') else rating_response
optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
- completion_tokens += rating_response.usage.completion_tokens
-
- rating_response = rating_response.choices[0].message.content.strip()
+
+ completion_tokens += completion_tokens_
+
cb_log[f"rating_response_{i}"] = rating_response
if cepo_config.print_output:
print(f"\nCePO: Rating response for completion {i}: {rating_response}")
@@ -401,8 +762,6 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An
ratings.append(float(rating_response))
except ValueError:
ratings.append(-1)
-
- rating_messages = rating_messages[:-2] # clear the last two messages to start over in the next iteration
best_index = ratings.index(max(ratings))
cb_log["ratings"] = ratings
@@ -428,9 +787,7 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An
Tuple[str, int, dict]: The generated completion, number of tokens used, and a log dictionary.
"""
completion_tokens = 0
- rating_messages = [{"role": "system", "content": system_prompt},
- {"role": "user", "content": initial_query}]
- content = "Please act as an impartial judge and compare the quality of the two responses provided by the AI assistant " \
+ rating_prompt = "Please act as an impartial judge and compare the quality of the two responses provided by the AI assistant " \
"to the user's question displayed below. Evaluation Criteria:\n" \
"- Helpfulness: How effectively does the response meet the user's needs?\n" \
"- Relevance: How directly does the response address the original question?\n" \
@@ -446,17 +803,22 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An
"Reply with \"Better Response: [[response id]]\".\n" \
"If the first response is better, reply with \"Better Response: [[0]]\". " \
"If the second response is better, reply with \"Better Response: [[1]]\"."
- rating_messages.append({"role": "system", "content": content})
ratings = [0] * cepo_config.bestofn_n
pairs = [(i, j) for i in range(cepo_config.bestofn_n) for j in range(cepo_config.bestofn_n) if i != j]
+
for pair in pairs:
- responses_pair = f"Response 0: {completions[pair[0]]}\n\nResponse 1: {completions[pair[1]]}"
- rating_messages.append({"role": "assistant", "content": responses_pair})
- content = "Reply with \"Better Response: [[response id]]\".\n" \
- "If the first response is better, reply with \"Better Response: [[0]]\". " \
- "If the second response is better, reply with \"Better Response: [[1]]\"."
- rating_messages.append({"role": "system", "content": content})
+ # Create comparison content with both responses
+ comparison_content = f"User Question: {initial_query}\n\n" \
+ f"Response 0: {completions[pair[0]]}\n\n" \
+ f"Response 1: {completions[pair[1]]}\n\n" \
+ f"Which response is better? Please provide your reasoning and then indicate your choice with \"Better Response: [[0]]\" if the first response is better, or \"Better Response: [[1]]\" if the second response is better."
+
+ # Create a fresh conversation for each comparison with proper system→user structure
+ rating_messages = [
+ {"role": "system", "content": system_prompt + "\n\n" + rating_prompt},
+ {"role": "user", "content": comparison_content}
+ ]
# Prepare request for logging
provider_request = {
@@ -465,16 +827,16 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An
"max_tokens": cepo_config.bestofn_max_tokens,
"temperature": cepo_config.bestofn_temperature
}
-
rating_response = client.chat.completions.create(**provider_request)
# Log provider call if conversation logging is enabled
if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id:
response_dict = rating_response.model_dump() if hasattr(rating_response, 'model_dump') else rating_response
optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict)
- completion_tokens += rating_response.usage.completion_tokens
+ completion_tokens += rating_response.usage.completion_tokens
rating_response = rating_response.choices[0].message.content.strip()
+
cb_log[f"rating_response_for_pair_{pair[0]}_{pair[1]}"] = rating_response
if cepo_config.print_output:
print(f"\nCePO: Rating response for pair {pair}: {rating_response}")
@@ -490,8 +852,6 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An
ratings[pair[0]] += 1 # if parsing unsuccessful, default to the first response
else:
ratings[pair[0]] += 1 # if parsing unsuccessful, default to the first response
-
- rating_messages = rating_messages[:-2]
best_index = ratings.index(max(ratings))
cb_log["ratings"] = ratings
@@ -501,6 +861,80 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An
return completions[best_index], completion_tokens, cb_log
+def extract_answer_mathverify(response_str, last_n_chars=100):
+ response_str = str(response_str)
+ try:
+ float(response_str)
+ return [float(response_str)]
+ except:
+ response_str = response_str.split("", 1)[1] if "" in response_str else response_str
+ if last_n_chars is not None:
+ response_str = response_str[-last_n_chars:]
+ parsed_result = math_verify.parse(response_str, parsing_timeout=None)
+ return parsed_result
+
+
+def extract_abcd(text: str) -> str | None:
+ """
+ Scan text (with Markdown/LaTeX wrappers intact) and return
+ 'A', 'B', 'C', or 'D' if a correct-answer declaration is found.
+ Otherwise return None.
+ """
+ matches = []
+ for prio, pat in enumerate(MCQ_PATTERNS):
+ m = pat.search(text)
+ if m:
+ letter = m.group(1).upper()
+ if letter in 'ABCD':
+ matches.append((prio, m, letter))
+
+ matches.sort(key=lambda triple: (
+ triple[0],
+ len(triple[1].group(0))
+ ))
+ for _, match, letter in matches:
+ return letter
+ return text.removeprefix('**')[:1]
+
+
+def majority_vote_math(completions, last_n_chars=100):
+ extracted_answer_map = []
+ for response in completions:
+ extracted_answer = extract_answer_mathverify(response, last_n_chars)
+ extracted_answer = extracted_answer[0] if extracted_answer else None
+ extracted_answer_map.append((response, extracted_answer))
+
+ counts = Counter(answer for _, answer in extracted_answer_map)
+ majority_answer, count = counts.most_common(1)[0]
+
+ for response, answer in extracted_answer_map:
+ if answer == majority_answer:
+ return response, count
+ return extracted_answer_map[0][0], 0
+
+
+def majority_vote_mcq(completions, last_n_chars=100):
+ extracted_answer_map = []
+ for response in completions:
+ extracted_answer = extract_abcd(response[-last_n_chars:])
+ extracted_answer_map.append((response, extracted_answer))
+
+ counts = Counter(answer for _, answer in extracted_answer_map)
+ majority_answer, count = counts.most_common(1)[0]
+
+ for response, answer in extracted_answer_map:
+ if answer == majority_answer:
+ return response, count
+ return extracted_answer_map[0][0], 0
+
+
+def rate_completions_majority(completions: list[str], last_n_chars: int = 150) -> tuple[str, int, dict]:
+ mcq_majority, count = majority_vote_mcq(completions, last_n_chars)
+ if mcq_majority is None:
+ return majority_vote_math(completions, last_n_chars)
+ return mcq_majority, count
+
+
def cepo(system_prompt: str, initial_query: str, client: Any, model: str, cepo_config: CepoConfig, request_id: str = None) -> tuple[str, int]:
"""
Applies CePO reasoning flow for the given task. First, it generates multiple completions, and then rates them to select the best one.
@@ -527,17 +961,19 @@ def cepo(system_prompt: str, initial_query: str, client: Any, model: str, cepo_c
"""
# Generate completions
- completions, completion_tokens_planning, cb_log = generate_n_completions(system_prompt, initial_query, client, model, cepo_config) # cb_log is a dictionary for debugging purposes
-
+ completions, completion_tokens_planning, cb_log = generate_n_completions(system_prompt, initial_query, client, model, cepo_config, request_id) # cb_log is a dictionary for debugging purposes
+ completions = [c for c in completions if c] # safeguard in case completion is None (observed with GPT OSS)
+
# Rate the completions
+ rating_model = cepo_config.rating_model if cepo_config.rating_model else model
if cepo_config.bestofn_rating_type == "absolute":
- rate_completions_fn = rate_completions_absolute
+ best_completion, completion_tokens_rating, cb_log = rate_completions_absolute(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id)
elif cepo_config.bestofn_rating_type == "pairwise":
- rate_completions_fn = rate_completions_pairwise
+ best_completion, completion_tokens_rating, cb_log = rate_completions_pairwise(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id)
+ elif cepo_config.bestofn_rating_type == "majority":
+ best_completion, _ = rate_completions_majority(completions)
+ completion_tokens_rating = 0
else:
raise ValueError("Invalid rating type in cepo_config")
- rating_model = cepo_config.rating_model if cepo_config.rating_model else model
-
- best_completion, completion_tokens_rating, cb_log = rate_completions_fn(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id)
return best_completion, completion_tokens_planning + completion_tokens_rating
diff --git a/optillm/cepo/configs/cepo_config.yaml b/optillm/cepo/configs/cepo_config.yaml
index 27da0da5..7b28957a 100644
--- a/optillm/cepo/configs/cepo_config.yaml
+++ b/optillm/cepo/configs/cepo_config.yaml
@@ -1,17 +1,21 @@
bestofn_n: 3
bestofn_temperature: 0.1
bestofn_max_tokens: 4096
-bestofn_rating_type: "absolute" # or "pairwise"
+bestofn_rating_type: "absolute" # or "pairwise", "majority"
planning_n: 3
planning_m: 6
planning_temperature_step1: 0.55
planning_temperature_step2: 0.25
+planning_temperature_direct_resp: 0.1
planning_temperature_step3: 0.1
planning_temperature_step4: 0
planning_max_tokens_step1: 4096
planning_max_tokens_step2: 4096
+planning_max_tokens_direct_resp: 4096
planning_max_tokens_step3: 4096
planning_max_tokens_step4: 4096
use_plan_diversity: False
rating_model: null
+use_reasoning_fallback: False
+num_of_retries: 0
print_output: False
\ No newline at end of file
diff --git a/optillm/cepo/configs/cepo_config_gptoss.yaml b/optillm/cepo/configs/cepo_config_gptoss.yaml
new file mode 100644
index 00000000..72d78252
--- /dev/null
+++ b/optillm/cepo/configs/cepo_config_gptoss.yaml
@@ -0,0 +1,21 @@
+bestofn_n: 3
+bestofn_temperature: 0.6
+bestofn_max_tokens: 40960
+bestofn_rating_type: "absolute"
+planning_n: 2
+planning_m: 4
+planning_temperature_step1: 1.0
+planning_temperature_step2: 1.0
+planning_temperature_direct_resp: 0.6
+planning_temperature_step3: 1.0
+planning_temperature_step4: 0.5
+planning_max_tokens_step1: 40960
+planning_max_tokens_step2: 40960
+planning_max_tokens_direct_resp: 32768
+planning_max_tokens_step3: 40960
+planning_max_tokens_step4: 40960
+use_plan_diversity: False
+rating_model: null
+use_reasoning_fallback: True
+num_of_retries: 2
+print_output: true
\ No newline at end of file
diff --git a/optillm/cepo/configs/cepo_config_qwen3.yaml b/optillm/cepo/configs/cepo_config_qwen3.yaml
new file mode 100644
index 00000000..af1b8dfa
--- /dev/null
+++ b/optillm/cepo/configs/cepo_config_qwen3.yaml
@@ -0,0 +1,21 @@
+bestofn_n: 3
+bestofn_temperature: 0.6
+bestofn_max_tokens: 20480
+bestofn_rating_type: "majority"
+planning_n: 2
+planning_m: 4
+planning_temperature_step1: 0.8
+planning_temperature_step2: 0.8
+planning_temperature_direct_resp: 0.6
+planning_temperature_step3: 0.8
+planning_temperature_step4: 0.8
+planning_max_tokens_step1: 28672
+planning_max_tokens_step2: 24576
+planning_max_tokens_direct_resp: 32768
+planning_max_tokens_step3: 20481
+planning_max_tokens_step4: 20482
+use_plan_diversity: False
+rating_model: null
+use_reasoning_fallback: False
+num_of_retries: 0
+print_output: False
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index b0c13f6c..a849f1c3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -33,4 +33,5 @@ adaptive-classifier
datasets
mcp
# MLX support for Apple Silicon optimization
-mlx-lm>=0.24.0; platform_machine=="arm64" and sys_platform=="darwin"
\ No newline at end of file
+mlx-lm>=0.24.0; platform_machine=="arm64" and sys_platform=="darwin"
+math_verify
\ No newline at end of file