From 68ac32a1be199ac912999034acb98c81183e2107 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Fri, 19 Sep 2025 15:44:29 -0700 Subject: [PATCH 01/11] Update cepo to 2025Q5 version --- optillm/cepo/cepo.py | 501 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 439 insertions(+), 62 deletions(-) diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py index 3f098bcf..c6e3b497 100644 --- a/optillm/cepo/cepo.py +++ b/optillm/cepo/cepo.py @@ -2,12 +2,15 @@ import yaml import json import optillm -from optillm import conversation_logger +import time +from optillm import conversation_logger +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from typing import Literal, Any, Optional from cerebras.cloud.sdk import BadRequestError as CerebrasBadRequestError from openai import BadRequestError as OpenAIBadRequestError +from openai import InternalServerError as OpenAIInternalServerError @dataclass @@ -52,7 +55,8 @@ def init_cepo_config(cmd_line_args: dict) -> CepoConfig: def extract_question_only(task: str) -> str: - """We noticed that sometimes if the task includes specific formatting instructions, they may interfere with the reasoning flow. This + """ + We noticed that sometimes if the task includes specific formatting instructions, they may interfere with the reasoning flow. This is a temporary workaround to extract the question only from the task. Work in progress. """ question_only = task.replace('\n## Question: \n\n', '') @@ -60,6 +64,195 @@ def extract_question_only(task: str) -> str: return question_only +def remove_think_section(response): + """ + Remove a ... block from the response text, if present. + + Args: + response (str): Raw model output. + + Returns: + str: Response without the section, or an empty string if input + is invalid or empty. + """ + if not isinstance(response, str) or not response: + return "" + if not response.startswith("") and "" not in response: + return response + match = re.search(r"\s*(.*)", response, re.DOTALL) + if match: + parsed_response = match.group(1) + return parsed_response + else: + return response + + +def extract_llm_response(response): + """ + Extract text content and finish reason from an LLM response. + + Supports both non-streaming responses (dict-like with `.choices[0].message.content`) + and streaming responses (iterable of chunks with `.choices[0].delta.content`). + + Args: + response: LLM response object or streaming generator. + + Returns: + Tuple[str, Optional[str]]: + - Extracted text content (stripped). + - Finish reason (or None if unavailable). + """ + # Case 1: non-streaming response (dict-like object) + if hasattr(response, "choices") and hasattr(response.choices[0], "message"): + content = response.choices[0].message.content + if content: + content = content.strip() + finish_reason = getattr(response.choices[0], "finish_reason", None) + return content, finish_reason + + # Case 2: streaming response (generator) + full_content = "" + finish_reason = None + for chunk in response: + delta = chunk.choices[0].delta + if hasattr(delta, "content") and delta.content: + full_content += delta.content + if chunk.choices[0].finish_reason is not None: + finish_reason = chunk.choices[0].finish_reason + return full_content.strip(), finish_reason + + +def llm_call( + client: Any, + provider_request: dict, +) -> tuple[str, str, int]: + """ + Call the LLM with retries on transient errors. + + Makes a chat completion request to the given client and extracts the response. + Retries up to 2 times on 400/500 errors with exponential backoff. + + Args: + client (Any): LLM API client instance. + provider_request (dict): LMM call params. + + Returns: + tuple[str, str, int]: + - response_text: Model output (post-processed, never None). + - finish_reason: Why generation stopped. + - completion_tokens: Number of tokens generated. + """ + retries = 2 # total attempts = retries + 1 initial call + for attempt in range(retries): + try: + response_object = client.chat.completions.create( + stream=False, + **provider_request + ) + response_text, finish_reason = extract_llm_response(response_object) + completion_tokens = getattr(getattr(response_object, "usage", None), "completion_tokens", 0) or 0 + response_text = response_text or "" # Normalize None to "" + if response_text is not None: + response_text = remove_think_section(response_text) + return response_text, finish_reason, completion_tokens + + except (OpenAIBadRequestError, OpenAIInternalServerError) as e: + # Retry on 400 or 500 + if attempt < retries - 1: + sleep_time = 0.2 * (attempt + 1) + print(f"Got {e.__class__.__name__}, retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + continue + raise + + +def llm_call_reason_effort_fallback( + client: Any, + provider_request: dict, + reasoning_effort_levels: list +) -> tuple[Optional[Any], str, int]: + """ + Call LLM with fallback on reasoning effort levels. + + This function wraps `llm_call` with retry and degradation logic to handle + two main classes of errors: + + 1. **Incomplete generation (finish_reason = "length")**: + - The model returns a response object but does not finish generation + (e.g., truncated output). + - In this case, the reasoning effort is reduced, and another attempt + is made with lower levels. + + 2. **Server/validation errors (e.g., 400 BadRequest, 500 InternalServerError)**: + - Often caused by gpt-oss's "expected output number" error, which cannot be + fully recovered within the current API. + - The function retries once, and if the error persists, reasoning effort + is degraded to try again at lower levels. + + The fallback sequence continues until either: + - A valid response is obtained (not truncated and not `None`), or + - All reasoning effort levels are exhausted, in which case the last + attempted result (possibly `None`) is returned. + + Args: + client (Any): LLM API client instance used for making calls. + provider_request (dict): LMM call params. + reasoning_effort_levels (list): Ordered list of reasoning effort levels + (e.g., ["high", "medium", "low"]) to try in fallback. + + Returns: + tuple: + - response: The LLM response object, or `None` if all attempts failed. + - finish_reason (str): Reason why generation finished ("stop", + "length", "error", etc.). + - completion_tokens (int): Number of tokens generated in the final attempt. + + Notes: + - This function prints diagnostic information when degrading reasoning effort. + - For persistent server-side issues (400/500), degradation is attempted + automatically, but a permanent fix may require upstream changes + (see https://github.com/pydantic/pydantic-ai/issues/2449). + """ + for effort in reasoning_effort_levels: + try: + # Try with the current reasoning effort level + provider_request["reasoning_effort"] = effort + response, finish_reason, completion_tokens = llm_call( + client=client, + provider_request=provider_request, + ) + if response is not None and finish_reason != "length": + return response, finish_reason, completion_tokens + print(f"Reasoning fallback from {effort}, to lower one") + except (OpenAIBadRequestError, OpenAIInternalServerError) as e: + # After 2 retries at this reasoning effort level it failed with error 400/500, lower level + print("400/500 persisted after retries at reasoning effort", effort, "→ degrading") + continue + + return None, "error", 0 + + +def fallback_direct_answer(client, model, question, max_tokens=None, temperature=1.0, top_p=1.0): # TODO clean-up + messages = [ + {"role": "user", "content": question}, + ] + + response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( + messages=messages, + client=client, + model=model, + max_tokens=max_tokens, + temperature=temperature, + top_p=top_p, + reasoning_effort_levels=["high", "medium", "low"] + ) + if response is None or finish_reason == "length": + print("Direct answer failed, empty response or length") + response = "" + messages.append({"role": "assistant", "content": response}) + return response, messages + + def generate_completion(system_prompt: str, task: str, client: Any, model: str, cepo_config: CepoConfig, approach: Optional[str] = None, request_id: str = None) -> str: """ Generates a completion based on the provided system prompt and task. @@ -80,33 +273,197 @@ def generate_completion(system_prompt: str, task: str, client: Any, model: str, cb_log = {} plans = [] - for i in range(cepo_config.planning_m): # m is the maximum number of attempts to generate n plans - # Step 1 - Generate a plan - if cepo_config.use_plan_diversity: - assert approach - assert isinstance(approach, str) - content = f"To answer this question, can you come up with a concise plan using to solve it step-by-step but do not provide the "\ - f"final answer. Here is the approach you need to follow to generate the plan: {approach}. "\ - f"Also, for each step, provide your confidence in the correctness of that step as well as your ability "\ - f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}" + def generate_single_plan(i): + local_cb_log = {} + local_completion_tokens = 0 + + if cepo_config.planning_max_tokens_step1 != 0: + if cepo_config.use_plan_diversity: + assert approach + content = ( + f"To answer this question, can you come up with a concise plan using to solve it step-by-step but do not provide the " + f"final answer. Here is the approach you need to follow to generate the plan: {approach}. " + f"Also, for each step, provide your confidence in the correctness of that step as well as your ability " + f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}" + ) + else: + assert not approach + content = ( + f"To answer this question, can you come up with a concise plan to solve it step-by-step but do not provide the " + f"final answer. Also, for each step, provide your confidence in the correctness of that step as well as your ability " + f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}" + ) + + messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": content}] + + provider_request = { + "model": model, + "messages": messages, + "max_tokens": cepo_config.planning_max_tokens_step1, + "temperature": cepo_config.planning_temperature_step1, + "top_p": 1.0 + } + + response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( + client=client, + provider_request=provider_request, + reasoning_effort_levels=["high", "medium"] + ) + local_completion_tokens += completion_tokens + # Log provider call if conversation logging is enabled + if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: + response_dict = response.model_dump() if hasattr(response, 'model_dump') else response + optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) + + if finish_reason == "length": + return i, None, local_completion_tokens, {f"messages_planning_{i}_rejected_due_to_length": messages} + parsed_plan = response else: - assert not approach - content = f"To answer this question, can you come up with a concise plan to solve it step-by-step but do not provide the "\ - f"final answer. Also, for each step, provide your confidence in the correctness of that step as well as your ability "\ - f"to execute it correctly. Here is the question:\n{question_only}\nRead the question again:\n\n{question_only}" + messages = [] + parsed_plan = "" + + # Step 2 – Execute plan + if cepo_config.planning_max_tokens_step1 != 0: + messages.append({"role": "assistant", "content": parsed_plan}) + messages.append({"role": "user", "content": "Can you execute the above plan step-by-step to produce the final answer. Be extra careful when executing steps where your confidence is lower. /think"}) + else: + messages.append({"role": "user", "content": f"Can you solve this problem step-by-step to produce the final answer? Here is the question:\n{question_only}\nRead the question again:\n\n{question_only} /think"}) - messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": content}] - - # Prepare request for logging provider_request = { - "model": model, - "messages": messages, - "max_tokens": cepo_config.planning_max_tokens_step1, - "temperature": cepo_config.planning_temperature_step1, - "stream": False, - } + "model": model, + "messages": messages, + "max_tokens": cepo_config.planning_max_tokens_step1, + "temperature": cepo_config.planning_temperature_step1, + "top_p": 1.0 + } - response = client.chat.completions.create(**provider_request) + response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( + client=client, + provider_request=provider_request, + reasoning_effort_levels=["high", "medium"] + ) + local_completion_tokens += completion_tokens + + # Log provider call if conversation logging is enabled + if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: + response_dict = response.model_dump() if hasattr(response, 'model_dump') else response + optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) + + if finish_reason == "length": + return i, None, local_completion_tokens, {f"messages_planning_{i}_rejected_due_to_length": messages} + + parsed_exec = response + messages.append({"role": "assistant", "content": parsed_exec}) + local_cb_log[f"messages_planning_{i}"] = messages + return i, parsed_exec, local_completion_tokens, local_cb_log + + # Step 1 & 2: Parallel planning + execution + with ThreadPoolExecutor(max_workers=cepo_config.planning_m) as executor: + futures = [executor.submit(generate_single_plan, i) for i in range(cepo_config.planning_m)] + + for future in as_completed(futures): + i, plan, tokens_used, log_entry = future.result() + completion_tokens += tokens_used + cb_log.update(log_entry) + if plan: + plans.append((i, plan)) + if cepo_config.print_output: + print(f"\nCePO: Plan proposal generated. Attempt {i + 1} out of {cepo_config.planning_m}.\n") + if len(plans) == cepo_config.planning_n: + break + + plans = [plan for _, plan in sorted(plans)] # keep original order + + if not plans: + # Fallback plan + fallback_generation, fallback_messages = fallback_direct_answer(client, model, question_only) + plans.append(fallback_generation) + cb_log[f"messages_planning_fallback_used"] = fallback_messages + if cepo_config.print_output: + print(f"\nCePO: No plans generated successfully. Taking the fallback.\n") + + # Step 3 - Review and consolidate plans + plans_message = "" + for i, plan in enumerate(plans): + plans_message += f"Response {i + 1}:\n{plan}\n\n" + plans_message = plans_message.rstrip() + + content = f"Can you review your last {len(plans)} responses and identify any inconsistency between them. After that, can you address "\ + f"it and present a final step-by-step solution to the problem? Here is the question:\n{question_only} /think" + + user_content = f"Previous responses to review:\n\n{plans_message}\n\n{content}" + messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}] + + provider_request = { + "model": model, + "messages": messages, + "max_tokens": cepo_config.planning_max_tokens_step1, + "temperature": cepo_config.planning_temperature_step1, + "top_p": 1.0 + } + + response, finish_reason, completion_tokens_ = llm_call_reason_effort_fallback( + client=client, + provider_request=provider_request, + reasoning_effort_levels=["high", "medium"] + ) + completion_tokens += completion_tokens_ + + # Log provider call if conversation logging is enabled + if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: + response_dict = response.model_dump() if hasattr(response, 'model_dump') else response + optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) + + if response is None or finish_reason == "length": + print("Step 3 failed and only taking plans[0]") + final_solution = plans[0] + else: + completion_tokens += completion_tokens + final_solution = response + messages.append({"role": "assistant", "content": final_solution}) + + # Step 4 – Final answer + if cepo_config.planning_max_tokens_step4 != 0: + content = f"Use your final solution from above to correctly answer the question. Here is the question:\n{task} /think" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"Here's my final solution: {final_solution}\n\nNow {content}"} + ] + + provider_request = { + "model": model, + "messages": messages, + "max_tokens": cepo_config.planning_max_tokens_step1, + "temperature": cepo_config.planning_temperature_step1, + "top_p": 1.0 + } + + response, finish_reason, completion_tokens_ = llm_call_reason_effort_fallback( + client=client, + provider_request=provider_request, + reasoning_effort_levels=["high", "medium"] + ) + completion_tokens += completion_tokens_ + + # Log provider call if conversation logging is enabled + if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: + response_dict = response.model_dump() if hasattr(response, 'model_dump') else response + optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) + + if response is None or finish_reason == "length": + print("Step 4 failed and only taking step 3 output") + final_output = final_solution + else: + final_output = response + else: + final_output = final_solution + + cb_log["messages"] = messages + if cepo_config.print_output: + print(f"\nCePO: Answer generated for one bestofn_n attempt.") + + return final_output, completion_tokens, cb_log + # Log provider call if conversation logging is enabled if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: @@ -279,7 +636,7 @@ def generate_approaches(system_prompt: str, initial_query: str, num_approach: in return approaches, completion_tokens -def generate_n_completions(system_prompt: str, initial_query: str, client: Any, model: str, cepo_config: CepoConfig) -> tuple[list[str], int, dict]: +def generate_n_completions(system_prompt: str, initial_query: str, client: Any, model: str, cepo_config: CepoConfig, request_id: str) -> tuple[list[str], int, dict]: """ Generates n completions for the Best of N step of CePO. @@ -297,7 +654,7 @@ def generate_n_completions(system_prompt: str, initial_query: str, client: Any, cb_log = {} cb_log["system_prompt"] = system_prompt cb_log["initial_query"] = initial_query - completions = [] + completions = [None] * cepo_config.bestofn_n approaches = None # Generate Approach and Descriptions @@ -316,24 +673,35 @@ def generate_n_completions(system_prompt: str, initial_query: str, client: Any, if cepo_config.print_output: print(f"\nCePO: Plan diversity approaches ({cepo_config.bestofn_n}):\n{approaches}\n") - for i in range(cepo_config.bestofn_n): + def run_single_completion(i): if cepo_config.print_output: print(f"\nCePO: Generating completion {i + 1} out of {cepo_config.bestofn_n} \n") approach = approaches[i] if approaches else None response_i, completion_tokens_i, cb_log_i = generate_completion(system_prompt, initial_query, client, model, cepo_config, approach, request_id) - completions.append(response_i) - completion_tokens += completion_tokens_i - cb_log[f"completion_{i}_response"] = response_i - cb_log[f"completion_{i}_log"] = cb_log_i - cb_log[f"completion_{i}_completion_tokens"] = completion_tokens_i + return i, response_i, completion_tokens_i, cb_log_i + + # Run in parallel + with ThreadPoolExecutor(max_workers=cepo_config.bestofn_n) as executor: + futures = [executor.submit(run_single_completion, i) for i in range(cepo_config.bestofn_n)] + for future in as_completed(futures): + i, response_i, tokens_i, cb_log_i = future.result() + completions[i] = response_i + completion_tokens += tokens_i + cb_log[f"completion_{i}_response"] = response_i + cb_log[f"completion_{i}_log"] = cb_log_i + cb_log[f"completion_{i}_completion_tokens"] = tokens_i + + if cepo_config.print_output: + print(f"\nCePO: All Answers generated!") + completions = [c if isinstance(c, str) else "" for c in completions] return completions, completion_tokens, cb_log def rate_completions_absolute(system_prompt: str, initial_query: str, client: Any, model: str, completions: list[str], cepo_config: CepoConfig, cb_log: dict, request_id: str = None) -> tuple[str, int, dict]: """ Rates completions for the Best of N step of CePO. Each completion is rated on a scale of 1 to 10 individually. - + Parameters: system_prompt (str): The system prompt to guide the model. initial_query (str): The task or question to be addressed. @@ -346,7 +714,7 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An Tuple[str, int, dict]: The generated completion, number of tokens used, and a log dictionary. """ completion_tokens = 0 - content = "Please act as an impartial judge and evaluate the accuracy of the response provided by an AI assistant to "\ + rating_prompt = "Please act as an impartial judge and evaluate the accuracy of the response provided by an AI assistant to "\ "the user question displayed below. Your evaluation should consider only correctness and accuracy as the primary factor. "\ "Evaluation Criteria:\n"\ "- Correctness: How free is it from errors or mistakes?\n"\ @@ -362,33 +730,42 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An "Be as objective as possible. After providing your detailed explanation, "\ "please rate the response as 0 or 1, (0 for incorrect and 1 for correct) by strictly following this format: "\ "\"Rating: [[rating]]\", for example: \"Rating: [[0]]\"" - rating_messages = [{"role": "system", "content": system_prompt + content}, - {"role": "user", "content": initial_query}] + + rating_format_instruction = "\n\nRate the above response beginning with the detailed explanation followed by a rating of 0 or 1 by strictly following this format: \"Explanation: \n\nRating: [[rating]]\"" ratings = [] for i, completion in enumerate(completions): - rating_messages.append({"role": "assistant", "content": completion}) - content = "Rate the above response beginning with the detailed explanation followed by a rating of 0 or 1 "\ - "by strictly following this format: \"Explanation: \n\nRating: [[rating]]\"." - rating_messages.append({"role": "user", "content": content}) + # Create a fresh conversation with proper role alternation for each completion + system_content = f"USER QUESTION: {initial_query}\n\nRESPONSE: {completion}" + rating_messages = [ + {"role": "system", "content": system_prompt + "\n\n" + rating_prompt}, + {"role": "user", "content": system_content + rating_format_instruction} + ] # Prepare request for logging provider_request = { "model": model, "messages": rating_messages, "max_tokens": cepo_config.bestofn_max_tokens, - "temperature": cepo_config.bestofn_temperature + "temperature": cepo_config.bestofn_temperature, + "top_p": 1.0 } rating_response = client.chat.completions.create(**provider_request) - + rating_response, _, completion_tokens = llm_call_reason_effort_fallback( + client=client, + provider_request=provider_request, + reasoning_effort_levels=["high", "medium"] + ) + # Log provider call if conversation logging is enabled if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: response_dict = rating_response.model_dump() if hasattr(rating_response, 'model_dump') else rating_response optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) + completion_tokens += rating_response.usage.completion_tokens - rating_response = rating_response.choices[0].message.content.strip() + cb_log[f"rating_response_{i}"] = rating_response if cepo_config.print_output: print(f"\nCePO: Rating response for completion {i}: {rating_response}") @@ -401,8 +778,6 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An ratings.append(float(rating_response)) except ValueError: ratings.append(-1) - - rating_messages = rating_messages[:-2] # clear the last two messages to start over in the next iteration best_index = ratings.index(max(ratings)) cb_log["ratings"] = ratings @@ -428,9 +803,7 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An Tuple[str, int, dict]: The generated completion, number of tokens used, and a log dictionary. """ completion_tokens = 0 - rating_messages = [{"role": "system", "content": system_prompt}, - {"role": "user", "content": initial_query}] - content = "Please act as an impartial judge and compare the quality of the two responses provided by the AI assistant " \ + rating_prompt = "Please act as an impartial judge and compare the quality of the two responses provided by the AI assistant " \ "to the user's question displayed below. Evaluation Criteria:\n" \ "- Helpfulness: How effectively does the response meet the user's needs?\n" \ "- Relevance: How directly does the response address the original question?\n" \ @@ -446,17 +819,22 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An "Reply with \"Better Response: [[response id]]\".\n" \ "If the first response is better, reply with \"Better Response: [[0]]\". " \ "If the second response is better, reply with \"Better Response: [[1]]\"." - rating_messages.append({"role": "system", "content": content}) ratings = [0] * cepo_config.bestofn_n pairs = [(i, j) for i in range(cepo_config.bestofn_n) for j in range(cepo_config.bestofn_n) if i != j] + for pair in pairs: - responses_pair = f"Response 0: {completions[pair[0]]}\n\nResponse 1: {completions[pair[1]]}" - rating_messages.append({"role": "assistant", "content": responses_pair}) - content = "Reply with \"Better Response: [[response id]]\".\n" \ - "If the first response is better, reply with \"Better Response: [[0]]\". " \ - "If the second response is better, reply with \"Better Response: [[1]]\"." - rating_messages.append({"role": "system", "content": content}) + # Create comparison content with both responses + comparison_content = f"User Question: {initial_query}\n\n" \ + f"Response 0: {completions[pair[0]]}\n\n" \ + f"Response 1: {completions[pair[1]]}\n\n" \ + f"Which response is better? Please provide your reasoning and then indicate your choice with \"Better Response: [[0]]\" if the first response is better, or \"Better Response: [[1]]\" if the second response is better." + + # Create a fresh conversation for each comparison with proper system→user structure + rating_messages = [ + {"role": "system", "content": system_prompt + "\n\n" + rating_prompt}, + {"role": "user", "content": comparison_content} + ] # Prepare request for logging provider_request = { @@ -465,16 +843,16 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An "max_tokens": cepo_config.bestofn_max_tokens, "temperature": cepo_config.bestofn_temperature } - rating_response = client.chat.completions.create(**provider_request) # Log provider call if conversation logging is enabled if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: response_dict = rating_response.model_dump() if hasattr(rating_response, 'model_dump') else rating_response optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) - completion_tokens += rating_response.usage.completion_tokens + completion_tokens += rating_response.usage.completion_tokens rating_response = rating_response.choices[0].message.content.strip() + cb_log[f"rating_response_for_pair_{pair[0]}_{pair[1]}"] = rating_response if cepo_config.print_output: print(f"\nCePO: Rating response for pair {pair}: {rating_response}") @@ -490,8 +868,6 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An ratings[pair[0]] += 1 # if parsing unsuccessful, default to the first response else: ratings[pair[0]] += 1 # if parsing unsuccessful, default to the first response - - rating_messages = rating_messages[:-2] best_index = ratings.index(max(ratings)) cb_log["ratings"] = ratings @@ -527,8 +903,9 @@ def cepo(system_prompt: str, initial_query: str, client: Any, model: str, cepo_c """ # Generate completions - completions, completion_tokens_planning, cb_log = generate_n_completions(system_prompt, initial_query, client, model, cepo_config) # cb_log is a dictionary for debugging purposes - + completions, completion_tokens_planning, cb_log = generate_n_completions(system_prompt, initial_query, client, model, cepo_config, request_id) # cb_log is a dictionary for debugging purposes + completions = [c for c in completions if c] # safeguard in case completion is None (observed with GPT OSS) + # Rate the completions if cepo_config.bestofn_rating_type == "absolute": rate_completions_fn = rate_completions_absolute From 79604cdad7fe4aa4dfcbbda530962a64a560b385 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Wed, 24 Sep 2025 10:33:52 -0700 Subject: [PATCH 02/11] Add majority vote rating --- optillm/cepo/cepo.py | 277 ++++++++++++++++++++++++------------------- 1 file changed, 157 insertions(+), 120 deletions(-) diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py index c6e3b497..3bf39a4d 100644 --- a/optillm/cepo/cepo.py +++ b/optillm/cepo/cepo.py @@ -3,8 +3,10 @@ import json import optillm import time +import math_verify from optillm import conversation_logger +from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from typing import Literal, Any, Optional @@ -34,6 +36,82 @@ class CepoConfig: print_output: bool = False # whether to print the output of each stage +MCQ_PATTERNS = [ + # 0)"**Answer:** A" or "*Answers* – B", i.e. markdown‐wrapped "Answer(s)" with an unwrapped letter. + re.compile( + r'''(?ix) # case‐insensitive, ignore‐space + (?:\*{1,2}|_{1,2}) # leading *…* or _…_ + Answer[s]? # Answer or Answers + \s*[:\-–]? # optional separator + (?:\*{1,2}|_{1,2}) # closing wrapper + \s* # optional space + ([ABCD])\b # the actual letter + ''', + re.X + ), + + # 0.1) + re.compile(r'''(?ix) # ignore case, allow verbose mode + ^\s* # optional leading whitespace + (?:\*{1,2}|_{1,2})? # optional markdown wrapper + Answer:? # the word 'answer' with an optional colon + (?:\*{1,2}|_{1,2})? # optional markdown wrapper again + \s*:?\s* # optional colon with optional spaces + (?:\*{1,2}|_{1,2})? # optional markdown wrapper before letter + ([ABCD]) # capture the letter + (?:\*{1,2}|_{1,2})? # optional markdown wrapper after letter + \s* # optional trailing whitespace, end of line + ''', re.MULTILINE), + + # 1) Answer: (C) or Answers: (B) + re.compile(r'(?ix)\bAnswer[s]?\b\s*[:\-–]?\s*\(\s*([ABCD])\s*\)'), + + # 2) Answer: C or Answers – D + re.compile(r'(?ix)\bAnswer[s]?\b\s*[:\-–]?\s*([ABCD])\b'), + + # 3) Option B or Choice: C + re.compile(r'(?ix)\b(?:Option|Choice)\b\s*[:\-–]?\s*([ABCD])\b'), + + # 7) LaTeX \boxed{...A...}, catches both \boxed{A} and + # \boxed{\text{A } 2.08\times10^{-6}\,\mathrm{m}} etc. + re.compile(r'(?x)\\boxed\{[^}]*?([ABCD])[^}]*\}', re.MULTILINE), + + # 7.5) LaTeX \boxed{\textbf{...C...}} + re.compile(r'(?x)\\boxed\{[^}]*?\\textbf\{[^}]*?([ABCD])[^}]*\}[^}]*\}', re.MULTILINE), + + # 7.51) LaTeX \boxed{\text{...C...}} + re.compile(r'(?x)\\boxed\{[^}]*?\\text\{[^}]*?([ABCD])[^}]*\}[^}]*\}', re.MULTILINE), + + # 4) bare singletons: (A) [B] + re.compile(r'(?x)(? CepoConfig: # get the command line arguments @@ -463,121 +541,6 @@ def generate_single_plan(i): print(f"\nCePO: Answer generated for one bestofn_n attempt.") return final_output, completion_tokens, cb_log - - - # Log provider call if conversation logging is enabled - if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: - response_dict = response.model_dump() if hasattr(response, 'model_dump') else response - optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) - completion_tokens += response.usage.completion_tokens - - if response.choices[0].finish_reason == "length": - # Skipping plan generation due to exceeding the token budget. Usually it means the plan is incomplete. - continue - - # Step 2 - Execute the plan - content = f"Can you execute the above plan step-by-step to produce the final answer. "\ - f"Be extra careful when executing steps where your confidence is lower." - messages.extend([{"role": "assistant", "content": response.choices[0].message.content}, {"role": "user", "content": content}]) - - # Prepare request for logging - provider_request = { - "model": model, - "messages": messages, - "max_tokens": cepo_config.planning_max_tokens_step2, - "temperature": cepo_config.planning_temperature_step2, - "stream": False, - } - - response = client.chat.completions.create(**provider_request) - - # Log provider call if conversation logging is enabled - if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: - response_dict = response.model_dump() if hasattr(response, 'model_dump') else response - optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) - completion_tokens += response.usage.completion_tokens - - if response.choices[0].finish_reason == "length": - messages.append({"role": "assistant", "content": response.choices[0].message.content}) - cb_log[f"messages_planning_{i}_rejected_due_to_length"] = messages - if cepo_config.print_output: - print(f"\nCePO: Plan proposal rejected due to length. Attempt {i + 1} out of {cepo_config.planning_m}.\nMessages: {messages}") - continue - - plans.append(response.choices[0].message.content) - messages.append({"role": "assistant", "content": response.choices[0].message.content}) - cb_log[f"messages_planning_{i}"] = messages - if cepo_config.print_output: - print(f"\nCePO: Plan proposal generated. Attempt {i + 1} out of {cepo_config.planning_m}.\nMessages: {messages}") - - if len(plans) == cepo_config.planning_n: - break - - if not plans: - # If no plans were generated succesfully, take the last one even if it was rejected due to length - plans.append(response.choices[0].message.content) - messages.append({"role": "assistant", "content": response.choices[0].message.content}) - cb_log[f"messages_planning_{i}_no_plans_so_taking_the_last_one"] = messages - if cepo_config.print_output: - print(f"\nCePO: No plans generated successfully. Taking the last one from rejected due to length.\nMessages: {messages}") - - # Step 3 - Review and address inconsistencies - try: - plans_message = "" - for i, plan in enumerate(plans): - plans_message += f"Response {i + 1}:\n{plan}\n\n" - plans_message = plans_message[:-2] # remove the last 2x newline - content = f"Can you review your last {len(plans)} responses and identify any inconsistency between them. After that, can you address "\ - f"it and present a final step-by-step solution to the problem? Here is the question:\n{question_only}" - messages = [{"role": "assistant", "content": plans_message}, {"role": "user", "content": content}] - - # Prepare request for logging - provider_request = { - "model": model, - "messages": messages, - "max_tokens": cepo_config.planning_max_tokens_step3, - "temperature": cepo_config.planning_temperature_step3, - "stream": False, - } - - response = client.chat.completions.create(**provider_request) - - # Log provider call if conversation logging is enabled - if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: - response_dict = response.model_dump() if hasattr(response, 'model_dump') else response - optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) - final_solution = response.choices[0].message.content - completion_tokens += response.usage.completion_tokens - except (CerebrasBadRequestError, OpenAIBadRequestError) as e: - # In case of an error, take the first plan as the final solution - final_solution = plans[0] - messages = [] - - # Step 4 - Answer the question - content = f"Use your final solution from above to correctly answer the question. Here is the question:\n{task}" - messages = [{"role": "assistant", "content": final_solution}, {"role": "user", "content": content}] - - # Prepare request for logging - provider_request = { - "model": model, - "messages": messages, - "max_tokens": cepo_config.planning_max_tokens_step4, - "temperature": cepo_config.planning_temperature_step4, - "stream": False, - } - - response = client.chat.completions.create(**provider_request) - - # Log provider call if conversation logging is enabled - if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: - response_dict = response.model_dump() if hasattr(response, 'model_dump') else response - optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) - completion_tokens += response.usage.completion_tokens - - cb_log["messages"] = messages - if cepo_config.print_output: - print(f"\nCePO: Answer generated.\nMessages: {messages}") - return response.choices[0].message.content, completion_tokens, cb_log def generate_approaches(system_prompt: str, initial_query: str, num_approach: int, client: Any, model: str, cepo_config: CepoConfig, max_retry: int = 2, request_id: str = None) -> tuple[list[str], int]: @@ -877,6 +840,79 @@ def rate_completions_pairwise(system_prompt: str, initial_query: str, client: An return completions[best_index], completion_tokens, cb_log +def extract_answer_mathverify(response_str, last_n_chars=100): + response_str = str(response_str) + try: + float(response_str) + return [float(response_str)] + except: + response_str = response_str.split("", 1)[1] if "" in response_str else response_str + if last_n_chars is not None: + response_str = response_str[-last_n_chars:] + parsed_result = math_verify.parse(response_str, parsing_timeout=None) + return parsed_result + + +def extract_abcd(text: str) -> str | None: + """ + Scan text (with Markdown/LaTeX wrappers intact) and return + 'A', 'B', 'C', or 'D' if a correct-answer declaration is found. + Otherwise return None. + """ + matches = [] + for prio, pat in enumerate(MCQ_PATTERNS): + m = pat.search(text) + if m: + letter = m.group(1).upper() + if letter in 'ABCD': + matches.append((prio, m, letter)) + + matches.sort(key=lambda triple: ( + triple[0], + len(triple[1].group(0)) + )) + for _, match, letter in matches: + return letter + return text.removeprefix('**')[:1] + + +def majority_vote_math(completions, last_n_chars=100): + extracted_answer_map = [] + for response in completions: + extracted_answer = extract_answer_mathverify(response, last_n_chars) + extracted_answer = extracted_answer[0] if extracted_answer else None + extracted_answer_map.append((response, extracted_answer)) + + counts = Counter(answer for _, answer in extracted_answer_map) + majority_answer, count = counts.most_common(1)[0] + # TODO it may return all "None", we probably should handle this case + # Return one response whose extracted answer matches the majority + for response, answer in extracted_answer_map: + if answer == majority_answer: + return response, count + + +def majority_vote_mcq(completions, last_n_chars=100): + extracted_answer_map = [] + for response in completions: + extracted_answer = extract_abcd(response[-last_n_chars:]) + extracted_answer_map.append((response, extracted_answer)) + + counts = Counter(answer for _, answer in extracted_answer_map) + majority_answer, count = counts.most_common(1)[0] + # TODO it may return all "None", we probably should handle this case + for response, answer in extracted_answer_map: + if answer == majority_answer: + return response, count + + +def rate_completions_majority_vote(completions: list[str], last_n_chars: int = 150) -> tuple[str, int, dict]: + mcq_majority, count = majority_vote_mcq(completions, last_n_chars) + if mcq_majority is None: + return majority_vote_math(completions, last_n_chars) + return mcq_majority, count + + def cepo(system_prompt: str, initial_query: str, client: Any, model: str, cepo_config: CepoConfig, request_id: str = None) -> tuple[str, int]: """ Applies CePO reasoning flow for the given task. First, it generates multiple completions, and then rates them to select the best one. @@ -907,14 +943,15 @@ def cepo(system_prompt: str, initial_query: str, client: Any, model: str, cepo_c completions = [c for c in completions if c] # safeguard in case completion is None (observed with GPT OSS) # Rate the completions + rating_model = cepo_config.rating_model if cepo_config.rating_model else model if cepo_config.bestofn_rating_type == "absolute": - rate_completions_fn = rate_completions_absolute + best_completion, completion_tokens_rating, cb_log = rate_completions_absolute(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id) elif cepo_config.bestofn_rating_type == "pairwise": - rate_completions_fn = rate_completions_pairwise + best_completion, completion_tokens_rating, cb_log = rate_completions_pairwise(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id) + elif cepo_config.bestofn_rating_type == "majority_with_code_exec": + best_completion, _ = rate_completions_majority_vote(completions) + completion_tokens_rating = 0 else: raise ValueError("Invalid rating type in cepo_config") - rating_model = cepo_config.rating_model if cepo_config.rating_model else model - - best_completion, completion_tokens_rating, cb_log = rate_completions_fn(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id) return best_completion, completion_tokens_planning + completion_tokens_rating From f35d53429affeb5ff63dd561fd999b0ca25c2f27 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Thu, 25 Sep 2025 18:09:56 +0000 Subject: [PATCH 03/11] Integrate qwen cepo flow --- optillm/cepo/cepo.py | 94 ++++++++++++-------- optillm/cepo/configs/cepo_config.yaml | 5 +- optillm/cepo/configs/cepo_config_gptoss.yaml | 21 +++++ optillm/cepo/configs/cepo_config_qwen3.yaml | 21 +++++ 4 files changed, 104 insertions(+), 37 deletions(-) create mode 100644 optillm/cepo/configs/cepo_config_gptoss.yaml create mode 100644 optillm/cepo/configs/cepo_config_qwen3.yaml diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py index 3bf39a4d..9c7bcf16 100644 --- a/optillm/cepo/cepo.py +++ b/optillm/cepo/cepo.py @@ -20,18 +20,22 @@ class CepoConfig: bestofn_n: int # number of responses to be generated in best of n stage bestofn_temperature: float # temperature for verifier in best of n stage bestofn_max_tokens: int # maximum number of tokens for verifier in best of n stage - bestofn_rating_type: Literal["absolute", "pairwise"] # type of rating in best of n stage + bestofn_rating_type: Literal["absolute", "pairwise", "majority"] # type of rating in best of n stage planning_n: int # number of plans generated in planning stage planning_m: int # number of attempts to generate n plans in planning stage planning_temperature_step1: float # temperature for generator in step 1 of planning stage planning_temperature_step2: float # temperature for generator in step 2 of planning stage + planning_temperature_direct_resp: float # temperature for generator after step 2 if planning fails and answer directly planning_temperature_step3: float # temperature for generator in step 3 of planning stage planning_temperature_step4: float # temperature for generator in step 4 of planning stage planning_max_tokens_step1: int # maximum number of tokens in step 1 of planning stage planning_max_tokens_step2: int # maximum number of tokens in step 2 of planning stage + planning_max_tokens_direct_resp: float # maximum number of tokens after step 2 if planning fails and answer directly planning_max_tokens_step3: int # maximum number of tokens in step 3 of planning stage planning_max_tokens_step4: int # maximum number of tokens in step 4 of planning stage use_plan_diversity: bool # whether to use plan diversity + use_reasoning_fallback: bool # whether to fallback to lower levels of reasoning when higher level fails + num_of_retries: int # number of retries if llm call fails, 0 for no retries rating_model: Optional[str] = None # model to be used for rating print_output: bool = False # whether to print the output of each stage @@ -203,6 +207,7 @@ def extract_llm_response(response): def llm_call( client: Any, provider_request: dict, + cepo_config: CepoConfig ) -> tuple[str, str, int]: """ Call the LLM with retries on transient errors. @@ -220,7 +225,7 @@ def llm_call( - finish_reason: Why generation stopped. - completion_tokens: Number of tokens generated. """ - retries = 2 # total attempts = retries + 1 initial call + retries = cepo_config.num_of_retries # total attempts = retries + 1 initial call for attempt in range(retries): try: response_object = client.chat.completions.create( @@ -247,7 +252,8 @@ def llm_call( def llm_call_reason_effort_fallback( client: Any, provider_request: dict, - reasoning_effort_levels: list + reasoning_effort_levels: list, + cepo_config: CepoConfig ) -> tuple[Optional[Any], str, int]: """ Call LLM with fallback on reasoning effort levels. @@ -291,6 +297,8 @@ def llm_call_reason_effort_fallback( automatically, but a permanent fix may require upstream changes (see https://github.com/pydantic/pydantic-ai/issues/2449). """ + if not cepo_config.use_reasoning_fallback: + reasoning_effort_levels = ["high"] for effort in reasoning_effort_levels: try: # Try with the current reasoning effort level @@ -298,6 +306,7 @@ def llm_call_reason_effort_fallback( response, finish_reason, completion_tokens = llm_call( client=client, provider_request=provider_request, + cepo_config=cepo_config ) if response is not None and finish_reason != "length": return response, finish_reason, completion_tokens @@ -310,27 +319,6 @@ def llm_call_reason_effort_fallback( return None, "error", 0 -def fallback_direct_answer(client, model, question, max_tokens=None, temperature=1.0, top_p=1.0): # TODO clean-up - messages = [ - {"role": "user", "content": question}, - ] - - response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( - messages=messages, - client=client, - model=model, - max_tokens=max_tokens, - temperature=temperature, - top_p=top_p, - reasoning_effort_levels=["high", "medium", "low"] - ) - if response is None or finish_reason == "length": - print("Direct answer failed, empty response or length") - response = "" - messages.append({"role": "assistant", "content": response}) - return response, messages - - def generate_completion(system_prompt: str, task: str, client: Any, model: str, cepo_config: CepoConfig, approach: Optional[str] = None, request_id: str = None) -> str: """ Generates a completion based on the provided system prompt and task. @@ -385,7 +373,8 @@ def generate_single_plan(i): response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( client=client, provider_request=provider_request, - reasoning_effort_levels=["high", "medium"] + reasoning_effort_levels=["high", "medium"], + cepo_config=cepo_config ) local_completion_tokens += completion_tokens # Log provider call if conversation logging is enabled @@ -418,7 +407,8 @@ def generate_single_plan(i): response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( client=client, provider_request=provider_request, - reasoning_effort_levels=["high", "medium"] + reasoning_effort_levels=["high", "medium"], + cepo_config=cepo_config ) local_completion_tokens += completion_tokens @@ -453,10 +443,39 @@ def generate_single_plan(i): plans = [plan for _, plan in sorted(plans)] # keep original order if not plans: - # Fallback plan - fallback_generation, fallback_messages = fallback_direct_answer(client, model, question_only) - plans.append(fallback_generation) - cb_log[f"messages_planning_fallback_used"] = fallback_messages + # If no plans were generated, attempt to answer directly + messages = [ + {"role": "user", "content": question_only}, + ] + + provider_request = { + "model": model, + "messages": messages, + "max_tokens": cepo_config.planning_max_tokens_step2_direct, + "temperature":cepo_config.planning_temperature_step2_direct, + "top_p": 0.95, + "reasoning_effort_levels": ["high", "medium", "low"] + } + + response, finish_reason, completion_tokens = llm_call_reason_effort_fallback( + client=client, + provider_request=provider_request, + cepo_config=cepo_config + ) + local_completion_tokens += completion_tokens + + # Log provider call if conversation logging is enabled + if hasattr(optillm, 'conversation_logger') and optillm.conversation_logger and request_id: + response_dict = response.model_dump() if hasattr(response, 'model_dump') else response + optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) + + if response is None or finish_reason == "length": + print("Direct answer failed, empty response or length") + response = "" + messages.append({"role": "assistant", "content": response}) + + plans.append(response) + cb_log[f"messages_planning_fallback_used"] = messages if cepo_config.print_output: print(f"\nCePO: No plans generated successfully. Taking the fallback.\n") @@ -483,7 +502,8 @@ def generate_single_plan(i): response, finish_reason, completion_tokens_ = llm_call_reason_effort_fallback( client=client, provider_request=provider_request, - reasoning_effort_levels=["high", "medium"] + reasoning_effort_levels=["high", "medium"], + cepo_config=cepo_config ) completion_tokens += completion_tokens_ @@ -519,7 +539,8 @@ def generate_single_plan(i): response, finish_reason, completion_tokens_ = llm_call_reason_effort_fallback( client=client, provider_request=provider_request, - reasoning_effort_levels=["high", "medium"] + reasoning_effort_levels=["high", "medium"], + cepo_config=cepo_config ) completion_tokens += completion_tokens_ @@ -718,7 +739,8 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An rating_response, _, completion_tokens = llm_call_reason_effort_fallback( client=client, provider_request=provider_request, - reasoning_effort_levels=["high", "medium"] + reasoning_effort_levels=["high", "medium"], + cepo_config=cepo_config ) # Log provider call if conversation logging is enabled @@ -906,7 +928,7 @@ def majority_vote_mcq(completions, last_n_chars=100): return response, count -def rate_completions_majority_vote(completions: list[str], last_n_chars: int = 150) -> tuple[str, int, dict]: +def rate_completions_majority(completions: list[str], last_n_chars: int = 150) -> tuple[str, int, dict]: mcq_majority, count = majority_vote_mcq(completions, last_n_chars) if mcq_majority is None: return majority_vote_math(completions, last_n_chars) @@ -948,8 +970,8 @@ def cepo(system_prompt: str, initial_query: str, client: Any, model: str, cepo_c best_completion, completion_tokens_rating, cb_log = rate_completions_absolute(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id) elif cepo_config.bestofn_rating_type == "pairwise": best_completion, completion_tokens_rating, cb_log = rate_completions_pairwise(system_prompt, initial_query, client, rating_model, completions, cepo_config, cb_log, request_id) - elif cepo_config.bestofn_rating_type == "majority_with_code_exec": - best_completion, _ = rate_completions_majority_vote(completions) + elif cepo_config.bestofn_rating_type == "majority": + best_completion, _ = rate_completions_majority(completions) completion_tokens_rating = 0 else: raise ValueError("Invalid rating type in cepo_config") diff --git a/optillm/cepo/configs/cepo_config.yaml b/optillm/cepo/configs/cepo_config.yaml index 27da0da5..f96db582 100644 --- a/optillm/cepo/configs/cepo_config.yaml +++ b/optillm/cepo/configs/cepo_config.yaml @@ -1,17 +1,20 @@ bestofn_n: 3 bestofn_temperature: 0.1 bestofn_max_tokens: 4096 -bestofn_rating_type: "absolute" # or "pairwise" +bestofn_rating_type: "absolute" # or "pairwise", "majority" planning_n: 3 planning_m: 6 planning_temperature_step1: 0.55 planning_temperature_step2: 0.25 +planning_temperature_direct_resp: 0.1 planning_temperature_step3: 0.1 planning_temperature_step4: 0 planning_max_tokens_step1: 4096 planning_max_tokens_step2: 4096 +planning_max_tokens_direct_resp: 4096 planning_max_tokens_step3: 4096 planning_max_tokens_step4: 4096 use_plan_diversity: False rating_model: null +use_reasoning_effort_fallback: False print_output: False \ No newline at end of file diff --git a/optillm/cepo/configs/cepo_config_gptoss.yaml b/optillm/cepo/configs/cepo_config_gptoss.yaml new file mode 100644 index 00000000..0eb9cf4c --- /dev/null +++ b/optillm/cepo/configs/cepo_config_gptoss.yaml @@ -0,0 +1,21 @@ +bestofn_n: 1 +bestofn_temperature: 0.6 +bestofn_max_tokens: 40960 +bestofn_rating_type: "absolute" +planning_n: 2 +planning_m: 4 +planning_temperature_step1: 1.0 +planning_temperature_step2: 1.0 +planning_temperature_direct_resp: 0.6 +planning_temperature_step3: 1.0 +planning_temperature_step4: 0.5 +planning_max_tokens_step1: 40960 +planning_max_tokens_step2: 40960 +planning_max_tokens_direct_resp: 32768 +planning_max_tokens_step3: 40960 +planning_max_tokens_step4: 40960 +use_plan_diversity: False +rating_model: null +use_reasoning_fallback: True +num_of_retries: 2 +print_output: true \ No newline at end of file diff --git a/optillm/cepo/configs/cepo_config_qwen3.yaml b/optillm/cepo/configs/cepo_config_qwen3.yaml new file mode 100644 index 00000000..af1b8dfa --- /dev/null +++ b/optillm/cepo/configs/cepo_config_qwen3.yaml @@ -0,0 +1,21 @@ +bestofn_n: 3 +bestofn_temperature: 0.6 +bestofn_max_tokens: 20480 +bestofn_rating_type: "majority" +planning_n: 2 +planning_m: 4 +planning_temperature_step1: 0.8 +planning_temperature_step2: 0.8 +planning_temperature_direct_resp: 0.6 +planning_temperature_step3: 0.8 +planning_temperature_step4: 0.8 +planning_max_tokens_step1: 28672 +planning_max_tokens_step2: 24576 +planning_max_tokens_direct_resp: 32768 +planning_max_tokens_step3: 20481 +planning_max_tokens_step4: 20482 +use_plan_diversity: False +rating_model: null +use_reasoning_fallback: False +num_of_retries: 0 +print_output: False \ No newline at end of file From 19f6ae0c4257df5c5a51d13d4da58d3b7a6f7842 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Thu, 25 Sep 2025 20:57:36 +0000 Subject: [PATCH 04/11] Fix retries and handle corner cases --- optillm/cepo/cepo.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py index 9c7bcf16..b8ec65c8 100644 --- a/optillm/cepo/cepo.py +++ b/optillm/cepo/cepo.py @@ -225,7 +225,7 @@ def llm_call( - finish_reason: Why generation stopped. - completion_tokens: Number of tokens generated. """ - retries = cepo_config.num_of_retries # total attempts = retries + 1 initial call + retries = cepo_config.num_of_retries + 1 # total attempts = retries + 1 initial call for attempt in range(retries): try: response_object = client.chat.completions.create( @@ -907,11 +907,11 @@ def majority_vote_math(completions, last_n_chars=100): counts = Counter(answer for _, answer in extracted_answer_map) majority_answer, count = counts.most_common(1)[0] - # TODO it may return all "None", we probably should handle this case - # Return one response whose extracted answer matches the majority + for response, answer in extracted_answer_map: if answer == majority_answer: return response, count + return extracted_answer_map[0][0], 0 def majority_vote_mcq(completions, last_n_chars=100): @@ -922,10 +922,11 @@ def majority_vote_mcq(completions, last_n_chars=100): counts = Counter(answer for _, answer in extracted_answer_map) majority_answer, count = counts.most_common(1)[0] - # TODO it may return all "None", we probably should handle this case + for response, answer in extracted_answer_map: if answer == majority_answer: return response, count + return extracted_answer_map[0][0], 0 def rate_completions_majority(completions: list[str], last_n_chars: int = 150) -> tuple[str, int, dict]: From de308a48b9eafebfc7cfe8dacb51774d7d3642f2 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Thu, 25 Sep 2025 21:25:44 +0000 Subject: [PATCH 05/11] Fix formatting --- optillm/cepo/cepo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py index b8ec65c8..31be1102 100644 --- a/optillm/cepo/cepo.py +++ b/optillm/cepo/cepo.py @@ -30,7 +30,7 @@ class CepoConfig: planning_temperature_step4: float # temperature for generator in step 4 of planning stage planning_max_tokens_step1: int # maximum number of tokens in step 1 of planning stage planning_max_tokens_step2: int # maximum number of tokens in step 2 of planning stage - planning_max_tokens_direct_resp: float # maximum number of tokens after step 2 if planning fails and answer directly + planning_max_tokens_direct_resp: float # maximum number of tokens after step 2 if planning fails and answer directly planning_max_tokens_step3: int # maximum number of tokens in step 3 of planning stage planning_max_tokens_step4: int # maximum number of tokens in step 4 of planning stage use_plan_diversity: bool # whether to use plan diversity From 36cf197acf2e86c44dc4cf43a49a0cad36cc9050 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Thu, 25 Sep 2025 21:26:05 +0000 Subject: [PATCH 06/11] Update readme --- README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.md b/README.md index 47f4aa83..f17049f5 100644 --- a/README.md +++ b/README.md @@ -490,12 +490,16 @@ optillm supports various command-line arguments for configuration. When using Do | `--cepo_planning_m` | Number of attempts to generate n plans in planning stage | 6 | | `--cepo_planning_temperature_step1` | Temperature for generator in step 1 of planning stage | 0.55 | | `--cepo_planning_temperature_step2` | Temperature for generator in step 2 of planning stage | 0.25 | +| `--cepo_planning_temperature_direct_resp` | Temperature for generator after step 2 if planning fails and answer directly | 0.1 | | `--cepo_planning_temperature_step3` | Temperature for generator in step 3 of planning stage | 0.1 | | `--cepo_planning_temperature_step4` | Temperature for generator in step 4 of planning stage | 0 | | `--cepo_planning_max_tokens_step1` | Maximum number of tokens in step 1 of planning stage | 4096 | | `--cepo_planning_max_tokens_step2` | Maximum number of tokens in step 2 of planning stage | 4096 | +| `--cepo_planning_max_tokens_direct_resp` | Maximum number of tokens after step 2 if planning fails and answer directly | 4096 | | `--cepo_planning_max_tokens_step3` | Maximum number of tokens in step 3 of planning stage | 4096 | | `--cepo_planning_max_tokens_step4` | Maximum number of tokens in step 4 of planning stage | 4096 | +| `--cepo_use_reasoning_fallback` | Whether to fallback to lower levels of reasoning when higher level fails | False | +| `--cepo_num_of_retries` | Number of retries if llm call fails, 0 for no retries | 0 | | `--cepo_print_output` | Whether to print the output of each stage | `False` | | `--cepo_config_file` | Path to CePO configuration file | `None` | | `--cepo_use_plan_diversity` | Use additional plan diversity step | `False` | @@ -584,6 +588,19 @@ Authorization: Bearer your_secret_api_key ¹ Numbers in parentheses for LongCePO indicate accuracy of majority voting from 5 runs. +### CePO on math and code benchmarks (Sep 2025) + +| Method | AIME 2024 | AIME 2025 | GPQA | LiveCodeBench | +| ----------------------: | :-------: | :-------: | :----: | :-----------: | +| Qwen3 8B | 74.0 | 68.3 | 59.3 | 55.7 | +| CePO (using Qwen3 8B) | 86.7 | 80.0 | 62.5 | 60.5 | +| Qwen3 32B | 81.4 | 72.9 | 66.8 | 65.7 | +| CePO (using Qwen3 32B) | **90.7** | **83.3** | 70.0 | **71.9** | +| Qwen3 235B | 85.7 | 81.5 | 71.1 | 70.7 | +| DeepSeek R1 | 79.8 | 70.0 | 71.5 | 64.3 | +| OpenAI o3-mini | 79.6 | 74.8 | 76.8 | 66.3 | +| Grok3 Think | 83.9 | 77.3 |**80.2**| 70.6 | + ### CePO on math and code benchmarks (Mar 2025) | Method | Math-L5 | MMLU-Pro (Math) | CRUX | LiveCodeBench (pass@1) | Simple QA | From 5557c5e90593d60860a26bca5803aea59c492c19 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Fri, 26 Sep 2025 17:24:04 +0000 Subject: [PATCH 07/11] Fix bestofn_n in OSS config --- optillm/cepo/configs/cepo_config_gptoss.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optillm/cepo/configs/cepo_config_gptoss.yaml b/optillm/cepo/configs/cepo_config_gptoss.yaml index 0eb9cf4c..72d78252 100644 --- a/optillm/cepo/configs/cepo_config_gptoss.yaml +++ b/optillm/cepo/configs/cepo_config_gptoss.yaml @@ -1,4 +1,4 @@ -bestofn_n: 1 +bestofn_n: 3 bestofn_temperature: 0.6 bestofn_max_tokens: 40960 bestofn_rating_type: "absolute" From 3166e7edeb0f2693d138d62f2501342e29782acc Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Fri, 26 Sep 2025 17:24:27 +0000 Subject: [PATCH 08/11] Add example usage --- optillm/cepo/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/optillm/cepo/README.md b/optillm/cepo/README.md index baa0b06b..bb7b949e 100644 --- a/optillm/cepo/README.md +++ b/optillm/cepo/README.md @@ -23,6 +23,19 @@ The model reviews all generated solution proposals and their associated plans, i **Step 4**: Final Solution The model uses the refined plan from Step 3 to produce the final answer. +## Example Usage + +Here’s an example of running Optillm using the CePO method for Qwen3 deployed with VLLM on port 8001: + +```bash +OPENAI_API_KEY=serving-on-vllm \ +python optillm.py \ + --base-url http://localhost:8001/v1 \ + --approach cepo \ + --port 8000 \ + --cepo_config_file ./optillm/cepo/cepo_configs/cepo_qwen3.yaml +``` + ## CePO Current Status This project is a work in progress, and the provided code is in an early experimental stage. While the proposed approach works well across the benchmarks we tested, further improvements can be achieved by task-specific customizations to prompts. \ No newline at end of file From 92451b0945a7ad72dc51a05133474249f234e515 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Fri, 26 Sep 2025 18:18:55 +0000 Subject: [PATCH 09/11] Fix absolute rating --- optillm/cepo/cepo.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/optillm/cepo/cepo.py b/optillm/cepo/cepo.py index 31be1102..c9a7c2cf 100644 --- a/optillm/cepo/cepo.py +++ b/optillm/cepo/cepo.py @@ -736,7 +736,7 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An } rating_response = client.chat.completions.create(**provider_request) - rating_response, _, completion_tokens = llm_call_reason_effort_fallback( + rating_response, _, completion_tokens_ = llm_call_reason_effort_fallback( client=client, provider_request=provider_request, reasoning_effort_levels=["high", "medium"], @@ -748,8 +748,7 @@ def rate_completions_absolute(system_prompt: str, initial_query: str, client: An response_dict = rating_response.model_dump() if hasattr(rating_response, 'model_dump') else rating_response optillm.conversation_logger.log_provider_call(request_id, provider_request, response_dict) - completion_tokens += rating_response.usage.completion_tokens - rating_response = rating_response.choices[0].message.content.strip() + completion_tokens += completion_tokens_ cb_log[f"rating_response_{i}"] = rating_response if cepo_config.print_output: From ca112b435e5f31464c3e64e2b16e3366da560f42 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Fri, 26 Sep 2025 20:10:41 +0000 Subject: [PATCH 10/11] Add math_verify to requirements --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b0c13f6c..a849f1c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,4 +33,5 @@ adaptive-classifier datasets mcp # MLX support for Apple Silicon optimization -mlx-lm>=0.24.0; platform_machine=="arm64" and sys_platform=="darwin" \ No newline at end of file +mlx-lm>=0.24.0; platform_machine=="arm64" and sys_platform=="darwin" +math_verify \ No newline at end of file From ba45c609d117b08f2df99d8dd89f01fc01d54213 Mon Sep 17 00:00:00 2001 From: Pawel Filipczuk Date: Fri, 26 Sep 2025 20:20:34 +0000 Subject: [PATCH 11/11] Fix config - incorrect param name --- optillm/cepo/configs/cepo_config.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/optillm/cepo/configs/cepo_config.yaml b/optillm/cepo/configs/cepo_config.yaml index f96db582..7b28957a 100644 --- a/optillm/cepo/configs/cepo_config.yaml +++ b/optillm/cepo/configs/cepo_config.yaml @@ -16,5 +16,6 @@ planning_max_tokens_step3: 4096 planning_max_tokens_step4: 4096 use_plan_diversity: False rating_model: null -use_reasoning_effort_fallback: False +use_reasoning_fallback: False +num_of_retries: 0 print_output: False \ No newline at end of file