diff --git a/examples/circle_packing/evaluator.py b/examples/circle_packing/evaluator.py index 9f4cd1c94..677365357 100644 --- a/examples/circle_packing/evaluator.py +++ b/examples/circle_packing/evaluator.py @@ -35,22 +35,22 @@ def validate_packing(centers, radii): True if valid, False otherwise """ n = centers.shape[0] - + # Check for NaN values if np.isnan(centers).any(): print("NaN values detected in circle centers") return False - + if np.isnan(radii).any(): print("NaN values detected in circle radii") return False # Check if radii are nonnegative and not nan for i in range(n): - if(radii[i] < 0): + if radii[i] < 0: print(f"Circle {i} has negative radius {radii[i]}") return False - elif(np.isnan(radii[i])): + elif np.isnan(radii[i]): print(f"Circle {i} has nan radius") return False @@ -214,7 +214,7 @@ def evaluate(program_path): centers = np.array(centers) if not isinstance(radii, np.ndarray): radii = np.array(radii) - + # Check for NaN values before validation if np.isnan(centers).any() or np.isnan(radii).any(): print("NaN values detected in solution") diff --git a/examples/circle_packing_with_artifacts/evaluator.py b/examples/circle_packing_with_artifacts/evaluator.py index a8692c936..ea3202546 100644 --- a/examples/circle_packing_with_artifacts/evaluator.py +++ b/examples/circle_packing_with_artifacts/evaluator.py @@ -295,9 +295,9 @@ def evaluate(program_path): # Add successful packing stats for good solutions if valid and target_ratio > 0.95: # Near-optimal solutions artifacts["stdout"] = f"Excellent packing! Achieved {target_ratio:.1%} of target value" - artifacts[ - "radius_stats" - ] = f"Min: {validation_details['min_radius']:.6f}, Max: {validation_details['max_radius']:.6f}, Avg: {validation_details['avg_radius']:.6f}" + artifacts["radius_stats"] = ( + f"Min: {validation_details['min_radius']:.6f}, Max: {validation_details['max_radius']:.6f}, Avg: {validation_details['avg_radius']:.6f}" + ) return EvaluationResult( metrics={ @@ -404,9 +404,9 @@ def evaluate_stage1(program_path): # Add validation issues if any if not valid: - artifacts[ - "stderr" - ] = f"Validation failed: {len(validation_details.get('boundary_violations', []))} boundary violations, {len(validation_details.get('overlaps', []))} overlaps" + artifacts["stderr"] = ( + f"Validation failed: {len(validation_details.get('boundary_violations', []))} boundary violations, {len(validation_details.get('overlaps', []))} overlaps" + ) artifacts["failure_stage"] = "stage1_geometric_validation" if validation_details.get("boundary_violations"): artifacts["boundary_issues"] = validation_details["boundary_violations"][ diff --git a/examples/llm_prompt_optimization/evaluate_prompts.py b/examples/llm_prompt_optimization/evaluate_prompts.py index b4e9c795a..2505a9024 100755 --- a/examples/llm_prompt_optimization/evaluate_prompts.py +++ b/examples/llm_prompt_optimization/evaluate_prompts.py @@ -14,41 +14,42 @@ from openai import OpenAI from tqdm import tqdm + # Initialize OpenAI client def get_client(): - api_key = os.environ.get('OPENAI_API_KEY') + api_key = os.environ.get("OPENAI_API_KEY") if not api_key: raise ValueError("OPENAI_API_KEY environment variable not set") - - return OpenAI( - base_url="https://openrouter.ai/api/v1", - api_key=api_key - ) -def load_prompt(dataset_name, prompt_type='baseline'): + return OpenAI(base_url="https://openrouter.ai/api/v1", api_key=api_key) + + +def load_prompt(dataset_name, prompt_type="baseline"): """Load prompt template for a dataset.""" - if prompt_type == 'baseline': + if prompt_type == "baseline": prompt_path = f"{dataset_name}_prompt.txt" else: # evolved prompt_path = f"openevolve_output_qwen3_{dataset_name}/best/best_program.txt" - + if not os.path.exists(prompt_path): raise FileNotFoundError(f"Prompt file not found: {prompt_path}") - - with open(prompt_path, 'r') as f: + + with open(prompt_path, "r") as f: return f.read().strip() + def load_dataset_config(dataset_name): """Load dataset configuration.""" config_path = f"{dataset_name}_prompt_dataset.yaml" - - with open(config_path, 'r') as f: + + with open(config_path, "r") as f: return yaml.safe_load(f) + def evaluate_ifeval(client, prompt_template, num_samples, model): """Evaluate IFEval dataset.""" print("\nLoading IFEval dataset...") - + # Try test split first, then train try: dataset = load_dataset("google/IFEval", split="test") @@ -56,7 +57,7 @@ def evaluate_ifeval(client, prompt_template, num_samples, model): except: dataset = load_dataset("google/IFEval", split="train") split_used = "train" - + # Determine samples to process if num_samples is None: samples_to_process = len(dataset) @@ -66,23 +67,25 @@ def evaluate_ifeval(client, prompt_template, num_samples, model): samples_to_process = min(num_samples, len(dataset)) print(f"Using {samples_to_process} samples from {split_used} split") dataset = load_dataset("google/IFEval", split=split_used, streaming=True) - dataset_iter = tqdm(dataset.take(samples_to_process), total=samples_to_process, desc="Evaluating") - + dataset_iter = tqdm( + dataset.take(samples_to_process), total=samples_to_process, desc="Evaluating" + ) + correct = 0 total = 0 empty_responses = 0 - + for i, example in enumerate(dataset_iter): if num_samples is not None and i >= samples_to_process: break - instruction = example['prompt'] - + instruction = example["prompt"] + try: formatted_prompt = prompt_template.format(instruction=instruction) except KeyError: # Handle prompts with different placeholder names formatted_prompt = prompt_template.replace("{instruction}", instruction) - + # Call LLM with retries output_text = None for attempt in range(3): @@ -91,9 +94,9 @@ def evaluate_ifeval(client, prompt_template, num_samples, model): model=model, messages=[{"role": "user", "content": formatted_prompt}], temperature=0.1, - max_tokens=4096 + max_tokens=4096, ) - + if response and response.choices and response.choices[0].message: output_text = response.choices[0].message.content if output_text and output_text.strip(): @@ -102,28 +105,29 @@ def evaluate_ifeval(client, prompt_template, num_samples, model): if attempt == 2: print(f"\nError after 3 attempts: {e}") time.sleep(2) - + if not output_text or not output_text.strip(): empty_responses += 1 else: # Simple evaluation: response has reasonable length if len(output_text.strip()) > 20: correct += 1 - + total += 1 - + accuracy = correct / total if total > 0 else 0.0 return accuracy, correct, total, empty_responses + def evaluate_hover(client, prompt_template, num_samples, model): """Evaluate HoVer dataset.""" print("\nLoading HoVer dataset...") - + # Try test split first (but it's unlabeled), then validation try: test_dataset = load_dataset("hover", split="test", trust_remote_code=True) # Check if test set has labels - if test_dataset[0]['label'] != -1: + if test_dataset[0]["label"] != -1: dataset = test_dataset split_used = "test" else: @@ -133,7 +137,7 @@ def evaluate_hover(client, prompt_template, num_samples, model): except: dataset = load_dataset("hover", split="validation", trust_remote_code=True) split_used = "validation" - + # Determine samples to process if num_samples is None: samples_to_process = len(dataset) @@ -143,23 +147,25 @@ def evaluate_hover(client, prompt_template, num_samples, model): samples_to_process = min(num_samples, len(dataset)) print(f"Using {samples_to_process} samples from {split_used} split") dataset = load_dataset("hover", split=split_used, streaming=True, trust_remote_code=True) - dataset_iter = tqdm(dataset.take(samples_to_process), total=samples_to_process, desc="Evaluating") - + dataset_iter = tqdm( + dataset.take(samples_to_process), total=samples_to_process, desc="Evaluating" + ) + correct = 0 total = 0 empty_responses = 0 - + for i, example in enumerate(dataset_iter): if num_samples is not None and i >= samples_to_process: break - claim = example['claim'] - label = example['label'] # Integer: 0=SUPPORTED, 1=NOT_SUPPORTED - + claim = example["claim"] + label = example["label"] # Integer: 0=SUPPORTED, 1=NOT_SUPPORTED + try: formatted_prompt = prompt_template.format(claim=claim) except KeyError: formatted_prompt = prompt_template.replace("{claim}", claim) - + # Call LLM with retries output_text = None for attempt in range(3): @@ -168,9 +174,9 @@ def evaluate_hover(client, prompt_template, num_samples, model): model=model, messages=[{"role": "user", "content": formatted_prompt}], temperature=0.1, - max_tokens=4096 + max_tokens=4096, ) - + if response and response.choices and response.choices[0].message: output_text = response.choices[0].message.content if output_text and output_text.strip(): @@ -179,43 +185,48 @@ def evaluate_hover(client, prompt_template, num_samples, model): if attempt == 2: print(f"\nError after 3 attempts: {e}") time.sleep(2) - + if not output_text or not output_text.strip(): empty_responses += 1 else: output_upper = output_text.strip().upper() - + # Parse prediction from output - if 'NOT SUPPORTED' in output_upper or 'NOT_SUPPORTED' in output_upper: + if "NOT SUPPORTED" in output_upper or "NOT_SUPPORTED" in output_upper: prediction = 1 # NOT_SUPPORTED - elif 'SUPPORTED' in output_upper: - prediction = 0 # SUPPORTED + elif "SUPPORTED" in output_upper: + prediction = 0 # SUPPORTED else: prediction = -1 # Invalid/unclear response - + # Compare with actual label if prediction == label: correct += 1 - + total += 1 - + accuracy = correct / total if total > 0 else 0.0 return accuracy, correct, total, empty_responses + def evaluate_hotpotqa(client, prompt_template, num_samples, model): """Evaluate HotpotQA dataset.""" print("\nLoading HotpotQA dataset (this may take a moment)...") - + # Try test split first, then validation try: - dataset = load_dataset("hotpotqa/hotpot_qa", "distractor", split="test", trust_remote_code=True) + dataset = load_dataset( + "hotpotqa/hotpot_qa", "distractor", split="test", trust_remote_code=True + ) split_used = "test" except: - dataset = load_dataset("hotpotqa/hotpot_qa", "distractor", split="validation", trust_remote_code=True) + dataset = load_dataset( + "hotpotqa/hotpot_qa", "distractor", split="validation", trust_remote_code=True + ) split_used = "validation" - + print(f"Dataset loaded. Using {split_used} split with {len(dataset)} samples") - + # Determine samples to process if num_samples is None: samples_to_process = len(dataset) @@ -223,36 +234,35 @@ def evaluate_hotpotqa(client, prompt_template, num_samples, model): else: samples_to_process = min(num_samples, len(dataset)) print(f"Using {samples_to_process} samples") - + correct = 0 total = 0 empty_responses = 0 - + for i in tqdm(range(samples_to_process), desc="Evaluating"): example = dataset[i] - - question = example['question'] - context = example['context'] - answer = example['answer'].lower().strip() - + + question = example["question"] + context = example["context"] + answer = example["answer"].lower().strip() + # Format context context_str = "" - titles = context['title'] - sentences = context['sentences'] - + titles = context["title"] + sentences = context["sentences"] + for title, sents in zip(titles, sentences): context_str += f"{title}: {' '.join(sents)}\n" - + try: formatted_prompt = prompt_template.format( - context=context_str.strip(), - question=question + context=context_str.strip(), question=question ) except KeyError: # Try alternative formatting formatted_prompt = prompt_template.replace("{context}", context_str.strip()) formatted_prompt = formatted_prompt.replace("{question}", question) - + # Call LLM with retries output_text = None for attempt in range(3): @@ -261,9 +271,9 @@ def evaluate_hotpotqa(client, prompt_template, num_samples, model): model=model, messages=[{"role": "user", "content": formatted_prompt}], temperature=0.1, - max_tokens=4096 + max_tokens=4096, ) - + if response and response.choices and response.choices[0].message: output_text = response.choices[0].message.content if output_text and output_text.strip(): @@ -272,65 +282,76 @@ def evaluate_hotpotqa(client, prompt_template, num_samples, model): if attempt == 2: print(f"\nError after 3 attempts: {e}") time.sleep(2) - + if not output_text or not output_text.strip(): empty_responses += 1 else: output_lower = output_text.strip().lower() - + # Check if answer is in output if answer in output_lower: correct += 1 - + total += 1 - + accuracy = correct / total if total > 0 else 0.0 return accuracy, correct, total, empty_responses + def main(): - parser = argparse.ArgumentParser(description='Evaluate prompts on GEPA benchmark datasets') - parser.add_argument('--dataset', type=str, required=True, - choices=['ifeval', 'hover', 'hotpotqa', 'all'], - help='Dataset to evaluate on') - parser.add_argument('--prompt-type', type=str, default='baseline', - choices=['baseline', 'evolved'], - help='Type of prompt to use') - parser.add_argument('--samples', type=int, default=None, - help='Number of samples to evaluate (default: full dataset)') - parser.add_argument('--model', type=str, default='qwen/qwen3-8b', - help='Model to use for evaluation') - parser.add_argument('--output', type=str, default=None, - help='Output file for results (default: auto-generated)') - + parser = argparse.ArgumentParser(description="Evaluate prompts on GEPA benchmark datasets") + parser.add_argument( + "--dataset", + type=str, + required=True, + choices=["ifeval", "hover", "hotpotqa", "all"], + help="Dataset to evaluate on", + ) + parser.add_argument( + "--prompt-type", + type=str, + default="baseline", + choices=["baseline", "evolved"], + help="Type of prompt to use", + ) + parser.add_argument( + "--samples", + type=int, + default=None, + help="Number of samples to evaluate (default: full dataset)", + ) + parser.add_argument( + "--model", type=str, default="qwen/qwen3-8b", help="Model to use for evaluation" + ) + parser.add_argument( + "--output", type=str, default=None, help="Output file for results (default: auto-generated)" + ) + args = parser.parse_args() - + # Initialize client client = get_client() - + # Determine which datasets to evaluate - if args.dataset == 'all': - datasets = ['ifeval', 'hover', 'hotpotqa'] + if args.dataset == "all": + datasets = ["ifeval", "hover", "hotpotqa"] else: datasets = [args.dataset] - + # Evaluation functions - eval_funcs = { - 'ifeval': evaluate_ifeval, - 'hover': evaluate_hover, - 'hotpotqa': evaluate_hotpotqa - } - + eval_funcs = {"ifeval": evaluate_ifeval, "hover": evaluate_hover, "hotpotqa": evaluate_hotpotqa} + # Load baseline results for comparison baseline_results = {} - if os.path.exists('baseline_results_50samples.json'): - with open('baseline_results_50samples.json', 'r') as f: + if os.path.exists("baseline_results_50samples.json"): + with open("baseline_results_50samples.json", "r") as f: baseline_data = json.load(f) - for result in baseline_data.get('results', []): - baseline_results[result['dataset']] = result['accuracy'] - + for result in baseline_data.get("results", []): + baseline_results[result["dataset"]] = result["accuracy"] + # Store results all_results = [] - + print(f"\n{'='*60}") print(f"PROMPT EVALUATION - {args.prompt_type.upper()}") print(f"Model: {args.model}") @@ -339,45 +360,45 @@ def main(): else: print(f"Samples per dataset: Full dataset") print(f"{'='*60}") - + for dataset_name in datasets: print(f"\nEvaluating {dataset_name.upper()}...") - + try: # Load prompt prompt_template = load_prompt(dataset_name, args.prompt_type) print(f"Loaded {args.prompt_type} prompt ({len(prompt_template)} chars)") - + # Run evaluation start_time = time.time() accuracy, correct, total, empty_responses = eval_funcs[dataset_name]( client, prompt_template, args.samples, args.model ) elapsed_time = time.time() - start_time - + # Get baseline accuracy baseline_acc = baseline_results.get(dataset_name) if baseline_acc: improvement = ((accuracy - baseline_acc) / baseline_acc) * 100 else: improvement = 0 - + # Store result result = { - 'dataset': dataset_name, - 'prompt_type': args.prompt_type, - 'accuracy': accuracy, - 'baseline_accuracy': baseline_acc, - 'improvement_percent': improvement, - 'correct': correct, - 'total': total, - 'empty_responses': empty_responses, - 'elapsed_time': elapsed_time, - 'timestamp': datetime.now().isoformat() + "dataset": dataset_name, + "prompt_type": args.prompt_type, + "accuracy": accuracy, + "baseline_accuracy": baseline_acc, + "improvement_percent": improvement, + "correct": correct, + "total": total, + "empty_responses": empty_responses, + "elapsed_time": elapsed_time, + "timestamp": datetime.now().isoformat(), } - + all_results.append(result) - + # Print results print(f"\nResults for {dataset_name.upper()}:") print(f" Accuracy: {accuracy:.3f} ({correct}/{total})") @@ -386,65 +407,68 @@ def main(): print(f" Improvement: {improvement:+.1f}%") print(f" Empty responses: {empty_responses}") print(f" Time: {elapsed_time:.1f}s ({elapsed_time/total:.1f}s per sample)") - + except Exception as e: print(f"Error evaluating {dataset_name}: {str(e)}") - all_results.append({ - 'dataset': dataset_name, - 'prompt_type': args.prompt_type, - 'error': str(e), - 'timestamp': datetime.now().isoformat() - }) - + all_results.append( + { + "dataset": dataset_name, + "prompt_type": args.prompt_type, + "error": str(e), + "timestamp": datetime.now().isoformat(), + } + ) + # Save results output_path = args.output if not output_path: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = f"evaluation_results_{args.prompt_type}_{timestamp}.json" - + final_results = { - 'prompt_type': args.prompt_type, - 'model': args.model, - 'samples_per_dataset': args.samples, - 'timestamp': datetime.now().isoformat(), - 'results': all_results + "prompt_type": args.prompt_type, + "model": args.model, + "samples_per_dataset": args.samples, + "timestamp": datetime.now().isoformat(), + "results": all_results, } - + # Calculate aggregate statistics - valid_results = [r for r in all_results if 'error' not in r] + valid_results = [r for r in all_results if "error" not in r] if valid_results: - total_correct = sum(r['correct'] for r in valid_results) - total_samples = sum(r['total'] for r in valid_results) + total_correct = sum(r["correct"] for r in valid_results) + total_samples = sum(r["total"] for r in valid_results) aggregate_accuracy = total_correct / total_samples if total_samples > 0 else 0 - - final_results['summary'] = { - 'aggregate_accuracy': aggregate_accuracy, - 'total_correct': total_correct, - 'total_samples': total_samples, - 'datasets_evaluated': len(valid_results) + + final_results["summary"] = { + "aggregate_accuracy": aggregate_accuracy, + "total_correct": total_correct, + "total_samples": total_samples, + "datasets_evaluated": len(valid_results), } - - with open(output_path, 'w') as f: + + with open(output_path, "w") as f: json.dump(final_results, f, indent=2) - + # Print summary print(f"\n{'='*60}") print("EVALUATION SUMMARY") print(f"{'='*60}") - + for result in all_results: - if 'error' not in result: + if "error" not in result: print(f"\n{result['dataset'].upper()}:") print(f" Accuracy: {result['accuracy']:.3f}") - if result.get('baseline_accuracy'): + if result.get("baseline_accuracy"): print(f" vs Baseline: {result['improvement_percent']:+.1f}%") - - if 'summary' in final_results: + + if "summary" in final_results: print(f"\nAGGREGATE:") print(f" Overall Accuracy: {final_results['summary']['aggregate_accuracy']:.3f}") print(f" Total Samples: {final_results['summary']['total_samples']}") - + print(f"\nResults saved to: {output_path}") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/examples/llm_prompt_optimization/evaluator.py b/examples/llm_prompt_optimization/evaluator.py index c5c36dd81..98c5294c5 100644 --- a/examples/llm_prompt_optimization/evaluator.py +++ b/examples/llm_prompt_optimization/evaluator.py @@ -12,28 +12,28 @@ from datasets import load_dataset # Read config.yaml to get model settings -with open(os.path.join(os.path.dirname(__file__), "config.yaml"), 'r') as f: +with open(os.path.join(os.path.dirname(__file__), "config.yaml"), "r") as f: config = yaml.safe_load(f) # Get model settings from config -llm_config = config.get('llm', {}) -api_base = llm_config.get('api_base', 'http://localhost:1234/v1') +llm_config = config.get("llm", {}) +api_base = llm_config.get("api_base", "http://localhost:1234/v1") # Handle both single model and model list configurations -models = llm_config.get('models', []) +models = llm_config.get("models", []) if models: # Use first model from list - TASK_MODEL_NAME = models[0].get('name', 'default-model') + TASK_MODEL_NAME = models[0].get("name", "default-model") else: # Fallback to direct model specification - TASK_MODEL_NAME = llm_config.get('primary_model', 'default-model') + TASK_MODEL_NAME = llm_config.get("primary_model", "default-model") # Get evaluator settings -evaluator_config = config.get('evaluator', {}) -MAX_RETRIES = evaluator_config.get('max_retries', 3) +evaluator_config = config.get("evaluator", {}) +MAX_RETRIES = evaluator_config.get("max_retries", 3) # Get max_tokens from LLM config -MAX_TOKENS = llm_config.get('max_tokens', 16000) +MAX_TOKENS = llm_config.get("max_tokens", 16000) print(f"Using max_tokens: {MAX_TOKENS}") # Initialize OpenAI client once for all evaluations @@ -42,15 +42,18 @@ # Determine which dataset to use based on the OPENEVOLVE_PROMPT environment variable import sys -prompt_file = os.environ.get('OPENEVOLVE_PROMPT') + +prompt_file = os.environ.get("OPENEVOLVE_PROMPT") if not prompt_file: # Default to a generic dataset config if not using the wrapper script evaluator_dir = os.path.dirname(os.path.abspath(__file__)) - DATASET_CONFIG_PATH = os.path.join(evaluator_dir, 'dataset_config.yaml') + DATASET_CONFIG_PATH = os.path.join(evaluator_dir, "dataset_config.yaml") print("Warning: OPENEVOLVE_PROMPT not set. Using default dataset_config.yaml") else: basename = os.path.basename(prompt_file) - dataset_filename = basename.replace('_prompt.txt', '_prompt_dataset.yaml').replace('.txt', '_dataset.yaml') + dataset_filename = basename.replace("_prompt.txt", "_prompt_dataset.yaml").replace( + ".txt", "_dataset.yaml" + ) evaluator_dir = os.path.dirname(os.path.abspath(__file__)) DATASET_CONFIG_PATH = os.path.join(evaluator_dir, dataset_filename) print(f"Dataset configuration: {dataset_filename}") @@ -59,47 +62,51 @@ def calculate_prompt_features(prompt): """ Calculate custom features for MAP-Elites binning - + Returns: tuple: (prompt_length, reasoning_strategy) - both in range 0-9 """ # Feature 1: Prompt length bin (0-9) length = len(prompt) if length < 100: - prompt_length = 0 # Minimal + prompt_length = 0 # Minimal elif length < 200: - prompt_length = 1 # Very short + prompt_length = 1 # Very short elif length < 400: - prompt_length = 2 # Short + prompt_length = 2 # Short elif length < 600: - prompt_length = 3 # Medium-short + prompt_length = 3 # Medium-short elif length < 900: - prompt_length = 4 # Medium + prompt_length = 4 # Medium elif length < 1200: - prompt_length = 5 # Medium-long + prompt_length = 5 # Medium-long elif length < 1600: - prompt_length = 6 # Long + prompt_length = 6 # Long elif length < 2000: - prompt_length = 7 # Very long + prompt_length = 7 # Very long elif length < 2500: - prompt_length = 8 # Extensive + prompt_length = 8 # Extensive else: - prompt_length = 9 # Very extensive - + prompt_length = 9 # Very extensive + # Feature 2: Reasoning strategy (0-9) prompt_lower = prompt.lower() - + # Check for few-shot examples - has_example = ('example' in prompt_lower or - prompt.count('####') >= 4 or - bool(re.search(r'problem:.*?solution:', prompt_lower, re.DOTALL))) - + has_example = ( + "example" in prompt_lower + or prompt.count("####") >= 4 + or bool(re.search(r"problem:.*?solution:", prompt_lower, re.DOTALL)) + ) + # Check for Chain-of-Thought (CoT) indicators - has_cot = ('step by step' in prompt_lower or - 'step-by-step' in prompt_lower or - any(phrase in prompt_lower for phrase in ['think through', 'reasoning', 'explain your']) or - bool(re.search(r'(first|then|next|finally)', prompt_lower))) - + has_cot = ( + "step by step" in prompt_lower + or "step-by-step" in prompt_lower + or any(phrase in prompt_lower for phrase in ["think through", "reasoning", "explain your"]) + or bool(re.search(r"(first|then|next|finally)", prompt_lower)) + ) + # Assign reasoning strategy bins if has_example: # Few-shot examples (bins 7-9) @@ -111,7 +118,7 @@ def calculate_prompt_features(prompt): reasoning_strategy = 7 # Basic few-shot elif has_cot: # Chain-of-thought (bins 4-6) - if 'must' in prompt_lower or 'exactly' in prompt_lower: + if "must" in prompt_lower or "exactly" in prompt_lower: reasoning_strategy = 6 # Strict CoT elif length > 500: reasoning_strategy = 5 # Detailed CoT @@ -121,87 +128,104 @@ def calculate_prompt_features(prompt): # Basic prompts (bins 0-3) if length < 100: reasoning_strategy = 0 # Minimal - elif 'solve' in prompt_lower or 'calculate' in prompt_lower: + elif "solve" in prompt_lower or "calculate" in prompt_lower: reasoning_strategy = 2 # Direct instruction else: reasoning_strategy = 1 # Simple prompt - + return prompt_length, reasoning_strategy def load_prompt_config(prompt_path): """Load the prompt from text file and dataset config from matching _dataset.yaml file.""" # Load prompt from text file - with open(prompt_path, 'r') as f: + with open(prompt_path, "r") as f: prompt = f.read().strip() - + # Load the configuration (already determined from environment variable) if not os.path.exists(DATASET_CONFIG_PATH): raise FileNotFoundError(f"Dataset configuration not found: {DATASET_CONFIG_PATH}") - - with open(DATASET_CONFIG_PATH, 'r') as f: + + with open(DATASET_CONFIG_PATH, "r") as f: config = yaml.safe_load(f) - + return config, prompt + def load_hf_dataset(config): """Load HuggingFace dataset based on configuration.""" - dataset_name = config['dataset_name'] - dataset_config = config.get('dataset_config', None) - split = config.get('split', 'test') - trust_remote_code = config.get('trust_remote_code', True) # Default to True for convenience - + dataset_name = config["dataset_name"] + dataset_config = config.get("dataset_config", None) + split = config.get("split", "test") + trust_remote_code = config.get("trust_remote_code", True) # Default to True for convenience + print(f"Loading dataset: {dataset_name}") - + # Special handling for HotpotQA - always use non-streaming mode - if dataset_name == "hotpot_qa" or config.get('is_hotpotqa', False): + if dataset_name == "hotpot_qa" or config.get("is_hotpotqa", False): print("Using non-streaming mode for HotpotQA to avoid PyArrow issues") streaming = False else: # For other datasets, use streaming if not specified - streaming = config.get('streaming', True) - + streaming = config.get("streaming", True) + try: # Try to load the specified split if dataset_config: - dataset = load_dataset(dataset_name, dataset_config, split=split, - trust_remote_code=trust_remote_code, streaming=streaming) + dataset = load_dataset( + dataset_name, + dataset_config, + split=split, + trust_remote_code=trust_remote_code, + streaming=streaming, + ) else: - dataset = load_dataset(dataset_name, split=split, - trust_remote_code=trust_remote_code, streaming=streaming) + dataset = load_dataset( + dataset_name, split=split, trust_remote_code=trust_remote_code, streaming=streaming + ) except: # Fallback to train split if test is not available print(f"Split '{split}' not found, falling back to 'train'") if dataset_config: - dataset = load_dataset(dataset_name, dataset_config, split='train', - trust_remote_code=trust_remote_code, streaming=streaming) + dataset = load_dataset( + dataset_name, + dataset_config, + split="train", + trust_remote_code=trust_remote_code, + streaming=streaming, + ) else: - dataset = load_dataset(dataset_name, split='train', - trust_remote_code=trust_remote_code, streaming=streaming) - + dataset = load_dataset( + dataset_name, + split="train", + trust_remote_code=trust_remote_code, + streaming=streaming, + ) + # Print dataset info - if hasattr(dataset, '__len__'): + if hasattr(dataset, "__len__"): print(f"Dataset loaded with {len(dataset)} examples") else: print(f"Dataset loaded (streaming mode)") - + return dataset + def evaluate_prompt(prompt, dataset, config, num_samples): """Evaluate a prompt on a subset of the dataset.""" - input_field = config['input_field'] - target_field = config['target_field'] - + input_field = config["input_field"] + target_field = config["target_field"] + # Check dataset type - dataset_name = config.get('dataset_name', '').lower() - is_emotion = 'emotion' in dataset_name - is_gsm8k = 'gsm8k' in dataset_name - is_hotpotqa = config.get('is_hotpotqa', False) - is_ifeval = config.get('is_ifeval', False) - is_hover = config.get('is_hover', False) - + dataset_name = config.get("dataset_name", "").lower() + is_emotion = "emotion" in dataset_name + is_gsm8k = "gsm8k" in dataset_name + is_hotpotqa = config.get("is_hotpotqa", False) + is_ifeval = config.get("is_ifeval", False) + is_hover = config.get("is_hover", False) + # Sample from dataset - handle both streaming and non-streaming - if hasattr(dataset, 'take'): + if hasattr(dataset, "take"): # Streaming dataset samples = dataset.take(num_samples) sample_iter = tqdm(samples, desc=f"Evaluating {num_samples} samples", total=num_samples) @@ -210,22 +234,24 @@ def evaluate_prompt(prompt, dataset, config, num_samples): indices = range(min(num_samples, len(dataset))) samples = dataset.select(indices) sample_iter = tqdm(samples, desc=f"Evaluating {num_samples} samples") - + correct = 0 total = 0 - + for example in sample_iter: input_text = example[input_field] expected = example[target_field] - + # Prepare the prompt with appropriate formatting if is_hotpotqa: # Format context from paragraphs - context_items = example.get('context', {}) + context_items = example.get("context", {}) context_text = "" - if 'title' in context_items and 'sentences' in context_items: + if "title" in context_items and "sentences" in context_items: # Handle the specific structure of HotpotQA - for i, (title, sentences) in enumerate(zip(context_items['title'], context_items['sentences'])): + for i, (title, sentences) in enumerate( + zip(context_items["title"], context_items["sentences"]) + ): context_text += f"Paragraph {i+1} ({title}):\n" context_text += " ".join(sentences) + "\n\n" formatted_prompt = prompt.format(context=context_text.strip(), question=input_text) @@ -238,12 +264,10 @@ def evaluate_prompt(prompt, dataset, config, num_samples): else: # Default formatting for other datasets formatted_prompt = prompt.format(input_text=input_text) - + # Prepare the message for the LLM - messages = [ - {"role": "user", "content": formatted_prompt} - ] - + messages = [{"role": "user", "content": formatted_prompt}] + # Call the LLM with retry logic for attempt in range(MAX_RETRIES): try: @@ -252,7 +276,7 @@ def evaluate_prompt(prompt, dataset, config, num_samples): model=TASK_MODEL_NAME, messages=messages, temperature=0.1, # Low temperature for consistent results - max_tokens=MAX_TOKENS + max_tokens=MAX_TOKENS, ) break except Exception as e: @@ -260,134 +284,144 @@ def evaluate_prompt(prompt, dataset, config, num_samples): print(f"Failed to get response after {MAX_RETRIES} attempts: {e}") raise e time.sleep(1) - + # Handle potential None response if not response: print(f"Warning: No response object from LLM") total += 1 # Count as incorrect continue - + if not response.choices: print(f"Warning: No choices in response from LLM") total += 1 # Count as incorrect continue - + if not response.choices[0].message: print(f"Warning: No message in response choice") total += 1 # Count as incorrect continue - + output_text = response.choices[0].message.content if output_text is None: print(f"Warning: None content in LLM response") print(f"Full response: {response}") total += 1 # Count as incorrect continue - + output_text = output_text.strip() - + # Extract prediction from output try: if is_gsm8k: # For GSM8K, extract the numeric answer after #### # First, extract the expected answer from the ground truth - expected_answer = expected.split('####')[-1].strip() + expected_answer = expected.split("####")[-1].strip() try: - expected_number = float(expected_answer.replace(',', '')) + expected_number = float(expected_answer.replace(",", "")) except: print(f"Warning: Could not parse expected answer: {expected_answer}") total += 1 continue - + # Extract prediction from model output prediction = None - if '####' in output_text: - predicted_answer = output_text.split('####')[-1].strip() + if "####" in output_text: + predicted_answer = output_text.split("####")[-1].strip() # Extract just the number, removing any extra text like $ signs import re - numbers = re.findall(r'-?\$?[\d,]+\.?\d*', predicted_answer) + + numbers = re.findall(r"-?\$?[\d,]+\.?\d*", predicted_answer) if numbers: try: # Remove $ and , from the number - number_str = numbers[0].replace('$', '').replace(',', '') + number_str = numbers[0].replace("$", "").replace(",", "") prediction = float(number_str) except: pass - + # If we found a prediction, check if it matches if prediction is not None: # Check if answers match (with small tolerance for floats) if abs(prediction - expected_number) < 0.001: correct += 1 - + total += 1 continue # Skip the general case to avoid double counting - + elif is_hotpotqa: # For HotpotQA, do exact match comparison (case-insensitive) output_lower = output_text.lower().strip() expected_lower = str(expected).lower().strip() - + # Remove common punctuation for better matching - output_lower = output_lower.rstrip('.,!?;:') - expected_lower = expected_lower.rstrip('.,!?;:') - + output_lower = output_lower.rstrip(".,!?;:") + expected_lower = expected_lower.rstrip(".,!?;:") + if output_lower == expected_lower: correct += 1 elif expected_lower in output_lower: # Partial credit if answer is contained in response correct += 1 - + total += 1 continue - + elif is_ifeval: # For IFEval, we need more complex evaluation # For now, do basic keyword matching # Note: Full IFEval requires checking multiple constraints output_lower = output_text.lower() - + # Simple heuristic: check if response seems to follow instruction format if len(output_text.strip()) > 10: # Non-trivial response correct += 1 # Simplified - real IFEval needs constraint checking - + total += 1 continue - + elif is_hover: # For HoVer, check if prediction matches SUPPORTED/NOT_SUPPORTED output_upper = output_text.upper() expected_upper = str(expected).upper() - + # Look for the verdict in the output - if 'SUPPORTED' in output_upper and 'NOT' not in output_upper.replace('NOT SUPPORTED', ''): - prediction = 'SUPPORTED' - elif 'NOT SUPPORTED' in output_upper or 'NOT_SUPPORTED' in output_upper: - prediction = 'NOT_SUPPORTED' + if "SUPPORTED" in output_upper and "NOT" not in output_upper.replace( + "NOT SUPPORTED", "" + ): + prediction = "SUPPORTED" + elif "NOT SUPPORTED" in output_upper or "NOT_SUPPORTED" in output_upper: + prediction = "NOT_SUPPORTED" else: prediction = None - + if prediction == expected_upper: correct += 1 - + total += 1 continue - + elif is_emotion: # For emotion classification (0-5) - numbers = re.findall(r'\b[0-5]\b', output_text) + numbers = re.findall(r"\b[0-5]\b", output_text) if numbers: prediction = int(numbers[-1]) # Use the last number found else: # Try to infer from emotion keywords output_lower = output_text.lower() emotion_map = { - 'sadness': 0, 'sad': 0, - 'joy': 1, 'happy': 1, 'happiness': 1, - 'love': 2, - 'anger': 3, 'angry': 3, - 'fear': 4, 'afraid': 4, 'scared': 4, - 'surprise': 5, 'surprised': 5 + "sadness": 0, + "sad": 0, + "joy": 1, + "happy": 1, + "happiness": 1, + "love": 2, + "anger": 3, + "angry": 3, + "fear": 4, + "afraid": 4, + "scared": 4, + "surprise": 5, + "surprised": 5, } prediction = -1 for emotion, label in emotion_map.items(): @@ -396,168 +430,165 @@ def evaluate_prompt(prompt, dataset, config, num_samples): break else: # For sentiment classification (0-1) - numbers = re.findall(r'\b[01]\b', output_text) + numbers = re.findall(r"\b[01]\b", output_text) if numbers: prediction = int(numbers[-1]) # Use the last number found else: # Try to infer from keywords output_lower = output_text.lower() - if 'positive' in output_lower: + if "positive" in output_lower: prediction = 1 - elif 'negative' in output_lower: + elif "negative" in output_lower: prediction = 0 else: prediction = -1 # Invalid prediction - + if prediction == expected: correct += 1 - + total += 1 - + except Exception as e: print(f"Error parsing response '{output_text}': {e}") total += 1 # Count as incorrect - + accuracy = correct / total if total > 0 else 0.0 return accuracy, correct, total + def evaluate_stage1(prompt_path): """ Stage 1 evaluation: Quick evaluation with 10% of samples - + Args: prompt_path: Path to the prompt file - + Returns: Dictionary with combined_score metric """ - print('-' * 80) + print("-" * 80) print("Starting Stage 1 evaluation...") - print('-' * 80) - + print("-" * 80) + try: # Load prompt configuration config, prompt = load_prompt_config(prompt_path) print(f"Loaded prompt configuration") - + # Load dataset dataset = load_hf_dataset(config) - + # Get number of samples from config - num_samples = config.get('max_samples', 50) + num_samples = config.get("max_samples", 50) # Fixed to 10 samples for Stage 1 (quick evaluation) stage1_samples = 10 - + print(f"Stage 1: Evaluating {stage1_samples} samples...") - + # Run evaluation - accuracy, correct, total = evaluate_prompt( - prompt, dataset, config, stage1_samples - ) - + accuracy, correct, total = evaluate_prompt(prompt, dataset, config, stage1_samples) + print(f"Stage 1 accuracy: {accuracy:.3f} ({correct}/{total})") - print('-' * 80) - + print("-" * 80) + # Calculate custom features prompt_length, reasoning_strategy = calculate_prompt_features(prompt) print(f"Prompt features - Length bin: {prompt_length}, Reasoning bin: {reasoning_strategy}") - + return { "combined_score": accuracy, "prompt_length": prompt_length, - "reasoning_strategy": reasoning_strategy + "reasoning_strategy": reasoning_strategy, } - + except Exception as e: print(f"Stage 1 evaluation failed: {str(e)}") traceback.print_exc() - print('-' * 80) - + print("-" * 80) + # Always return feature dimensions, even on failure try: # Try to calculate features from the failed prompt - with open(prompt_path, 'r') as f: + with open(prompt_path, "r") as f: failed_prompt = f.read().strip() prompt_length, reasoning_strategy = calculate_prompt_features(failed_prompt) except: # Fallback values if prompt can't be read prompt_length, reasoning_strategy = 0, 0 - + return { "combined_score": 0.0, "prompt_length": prompt_length, "reasoning_strategy": reasoning_strategy, - "error": str(e) + "error": str(e), } def evaluate_stage2(prompt_path): """ Stage 2 evaluation: Full evaluation with all samples - + Args: prompt_path: Path to the prompt file - + Returns: Dictionary with combined_score metric """ - print('-' * 80) + print("-" * 80) print("Starting Stage 2 evaluation...") - print('-' * 80) - + print("-" * 80) + try: # Load prompt configuration config, prompt = load_prompt_config(prompt_path) print(f"Loaded prompt configuration") - + # Load dataset dataset = load_hf_dataset(config) - + # Get number of samples from config - num_samples = config.get('max_samples', 50) + num_samples = config.get("max_samples", 50) # Fixed to 40 samples for Stage 2 (comprehensive evaluation) stage2_samples = 40 - + print(f"Stage 2: Evaluating {stage2_samples} samples...") - + # Run evaluation - accuracy, correct, total = evaluate_prompt( - prompt, dataset, config, stage2_samples - ) - + accuracy, correct, total = evaluate_prompt(prompt, dataset, config, stage2_samples) + print(f"Stage 2 accuracy: {accuracy:.3f} ({correct}/{total})") - print('-' * 80) - + print("-" * 80) + # Calculate custom features prompt_length, reasoning_strategy = calculate_prompt_features(prompt) print(f"Prompt features - Length bin: {prompt_length}, Reasoning bin: {reasoning_strategy}") - + return { "combined_score": accuracy, "prompt_length": prompt_length, - "reasoning_strategy": reasoning_strategy + "reasoning_strategy": reasoning_strategy, } - + except Exception as e: print(f"Stage 2 evaluation failed: {str(e)}") traceback.print_exc() - print('-' * 80) - + print("-" * 80) + # Always return feature dimensions, even on failure try: # Try to calculate features from the failed prompt - with open(prompt_path, 'r') as f: + with open(prompt_path, "r") as f: failed_prompt = f.read().strip() prompt_length, reasoning_strategy = calculate_prompt_features(failed_prompt) except: # Fallback values if prompt can't be read prompt_length, reasoning_strategy = 0, 0 - + return { "combined_score": 0.0, "prompt_length": prompt_length, "reasoning_strategy": reasoning_strategy, - "error": str(e) + "error": str(e), } @@ -565,11 +596,11 @@ def evaluate(prompt_path): """ Main evaluation function - for backwards compatibility Calls evaluate_stage2 for full evaluation - + Args: prompt_path: Path to the prompt file - + Returns: Dictionary with combined_score metric """ - return evaluate_stage2(prompt_path) \ No newline at end of file + return evaluate_stage2(prompt_path) diff --git a/examples/mlx_metal_kernel_opt/run_benchmarks.py b/examples/mlx_metal_kernel_opt/run_benchmarks.py index 3095a8523..bc7c5fc2b 100644 --- a/examples/mlx_metal_kernel_opt/run_benchmarks.py +++ b/examples/mlx_metal_kernel_opt/run_benchmarks.py @@ -457,7 +457,9 @@ def print_comparison_summary(comparison_results): print(f" ā±ļø Average Time Reduction: {summary['avg_time_reduction_pct']:+.2f}%") print(f"\nš ABSOLUTE PERFORMANCE:") - print(f" šµ Standard MLX-LM: {summary['avg_standard_decode_speed']:.1f} tokens/sec average") + print( + f" šµ Standard MLX-LM: {summary['avg_standard_decode_speed']:.1f} tokens/sec average" + ) print( f" š Metal Kernel Optimized: {summary['avg_optimized_decode_speed']:.1f} tokens/sec average" ) diff --git a/examples/r_robust_regression/evaluator.py b/examples/r_robust_regression/evaluator.py index 66b9577b1..12527478e 100644 --- a/examples/r_robust_regression/evaluator.py +++ b/examples/r_robust_regression/evaluator.py @@ -19,7 +19,7 @@ async def evaluate(program_path: str) -> EvaluationResult: """ Evaluate an R program implementing robust regression. - + Tests the program on synthetic data with outliers to measure: - Accuracy (MSE, MAE, R-squared) - Robustness to outliers @@ -33,7 +33,7 @@ async def evaluate(program_path: str) -> EvaluationResult: generate_regression_data(n_samples=100, n_features=3, outlier_fraction=0.2, noise=0.1), generate_regression_data(n_samples=200, n_features=5, outlier_fraction=0.15, noise=0.2), ] - + total_score = 0 total_mse = 0 total_mae = 0 @@ -41,13 +41,14 @@ async def evaluate(program_path: str) -> EvaluationResult: total_r_squared = 0 total_outlier_robustness = 0 total_time = 0 - + artifacts = {"test_results": []} - + for i, (X, y, true_coeffs) in enumerate(test_cases): # Create a temporary R script that sources the program and runs it - with tempfile.NamedTemporaryFile(mode='w', suffix='.r', delete=False) as f: - f.write(f""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".r", delete=False) as f: + f.write( + f""" # Source the program source("{program_path}") @@ -66,115 +67,109 @@ async def evaluate(program_path: str) -> EvaluationResult: # Save results write(jsonlite::toJSON(metrics, auto_unbox=TRUE), "results.json") -""") +""" + ) test_script = f.name - + # Save test data to temporary CSV files - X_file = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) - y_file = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) - np.savetxt(X_file.name, X, delimiter=',', fmt='%.6f') - np.savetxt(y_file.name, y, delimiter=',', fmt='%.6f') + X_file = tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) + y_file = tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) + np.savetxt(X_file.name, X, delimiter=",", fmt="%.6f") + np.savetxt(y_file.name, y, delimiter=",", fmt="%.6f") X_file.close() y_file.close() - + # Run the R script try: result = subprocess.run( - ['Rscript', test_script], + ["Rscript", test_script], capture_output=True, text=True, timeout=30, - cwd=os.path.dirname(test_script) + cwd=os.path.dirname(test_script), ) - + if result.returncode != 0: - artifacts["test_results"].append({ - "test_case": i, - "error": "R execution failed", - "stderr": result.stderr - }) + artifacts["test_results"].append( + {"test_case": i, "error": "R execution failed", "stderr": result.stderr} + ) continue - + # Read results - results_path = os.path.join(os.path.dirname(test_script), 'results.json') + results_path = os.path.join(os.path.dirname(test_script), "results.json") if not os.path.exists(results_path): - artifacts["test_results"].append({ - "test_case": i, - "error": "No results file produced" - }) + artifacts["test_results"].append( + {"test_case": i, "error": "No results file produced"} + ) continue - - with open(results_path, 'r') as f: + + with open(results_path, "r") as f: metrics = json.load(f) - + # Calculate case score (emphasize robustness for cases with outliers) outlier_fraction = [0.0, 0.1, 0.2, 0.15][i] if outlier_fraction > 0: # For cases with outliers, prioritize robust metrics case_score = ( - 0.2 * (1 - min(metrics.get('mse', 1), 1)) + - 0.3 * (1 - min(metrics.get('medae', 1), 1)) + - 0.4 * metrics.get('outlier_robustness', 0) + - 0.1 * max(0, metrics.get('r_squared', 0)) + 0.2 * (1 - min(metrics.get("mse", 1), 1)) + + 0.3 * (1 - min(metrics.get("medae", 1), 1)) + + 0.4 * metrics.get("outlier_robustness", 0) + + 0.1 * max(0, metrics.get("r_squared", 0)) ) else: # For clean data, prioritize accuracy case_score = ( - 0.4 * (1 - min(metrics.get('mse', 1), 1)) + - 0.3 * (1 - min(metrics.get('mae', 1), 1)) + - 0.2 * max(0, metrics.get('r_squared', 0)) + - 0.1 * metrics.get('outlier_robustness', 0) + 0.4 * (1 - min(metrics.get("mse", 1), 1)) + + 0.3 * (1 - min(metrics.get("mae", 1), 1)) + + 0.2 * max(0, metrics.get("r_squared", 0)) + + 0.1 * metrics.get("outlier_robustness", 0) ) - + total_score += case_score - total_mse += metrics.get('mse', 1) - total_mae += metrics.get('mae', 1) - total_medae += metrics.get('medae', 1) - total_r_squared += max(0, metrics.get('r_squared', 0)) - total_outlier_robustness += metrics.get('outlier_robustness', 0) - total_time += metrics.get('execution_time', 1) - - artifacts["test_results"].append({ - "test_case": i, - "outlier_fraction": outlier_fraction, - "metrics": metrics, - "case_score": case_score - }) - + total_mse += metrics.get("mse", 1) + total_mae += metrics.get("mae", 1) + total_medae += metrics.get("medae", 1) + total_r_squared += max(0, metrics.get("r_squared", 0)) + total_outlier_robustness += metrics.get("outlier_robustness", 0) + total_time += metrics.get("execution_time", 1) + + artifacts["test_results"].append( + { + "test_case": i, + "outlier_fraction": outlier_fraction, + "metrics": metrics, + "case_score": case_score, + } + ) + except subprocess.TimeoutExpired: - artifacts["test_results"].append({ - "test_case": i, - "error": "Timeout" - }) + artifacts["test_results"].append({"test_case": i, "error": "Timeout"}) except Exception as e: - artifacts["test_results"].append({ - "test_case": i, - "error": str(e) - }) + artifacts["test_results"].append({"test_case": i, "error": str(e)}) finally: # Cleanup os.unlink(test_script) os.unlink(X_file.name) os.unlink(y_file.name) - if os.path.exists(os.path.join(os.path.dirname(test_script), 'results.json')): - os.unlink(os.path.join(os.path.dirname(test_script), 'results.json')) - + if os.path.exists(os.path.join(os.path.dirname(test_script), "results.json")): + os.unlink(os.path.join(os.path.dirname(test_script), "results.json")) + # Calculate average metrics n_successful = len([r for r in artifacts["test_results"] if "error" not in r]) if n_successful == 0: return EvaluationResult( metrics={ "score": 0.0, - "mse": float('inf'), - "mae": float('inf'), - "medae": float('inf'), + "mse": float("inf"), + "mae": float("inf"), + "medae": float("inf"), "r_squared": 0.0, "outlier_robustness": 0.0, - "execution_time": float('inf') + "execution_time": float("inf"), }, - artifacts=artifacts + artifacts=artifacts, ) - + avg_score = total_score / n_successful avg_mse = total_mse / n_successful avg_mae = total_mae / n_successful @@ -182,11 +177,11 @@ async def evaluate(program_path: str) -> EvaluationResult: avg_r_squared = total_r_squared / n_successful avg_outlier_robustness = total_outlier_robustness / n_successful avg_time = total_time / n_successful - + # Add efficiency bonus for faster execution efficiency_bonus = max(0, 1 - avg_time) * 0.1 final_score = min(1.0, avg_score + efficiency_bonus) - + return EvaluationResult( metrics={ "score": final_score, @@ -195,54 +190,57 @@ async def evaluate(program_path: str) -> EvaluationResult: "medae": avg_medae, "r_squared": avg_r_squared, "outlier_robustness": avg_outlier_robustness, - "execution_time": avg_time + "execution_time": avg_time, }, - artifacts=artifacts + artifacts=artifacts, ) - + except Exception as e: return EvaluationResult( metrics={ "score": 0.0, - "mse": float('inf'), - "mae": float('inf'), - "medae": float('inf'), + "mse": float("inf"), + "mae": float("inf"), + "medae": float("inf"), "r_squared": 0.0, "outlier_robustness": 0.0, - "execution_time": float('inf') + "execution_time": float("inf"), }, - artifacts={"error": str(e), "type": "evaluation_error"} + artifacts={"error": str(e), "type": "evaluation_error"}, ) def generate_regression_data(n_samples=100, n_features=3, outlier_fraction=0.1, noise=0.1): """Generate synthetic regression data with outliers.""" np.random.seed(42) - + # Generate features X = np.random.randn(n_samples, n_features) - + # True coefficients true_coeffs = np.random.randn(n_features + 1) # +1 for intercept - + # Generate target values y = true_coeffs[0] + X @ true_coeffs[1:] + noise * np.random.randn(n_samples) - + # Add outliers n_outliers = int(n_samples * outlier_fraction) if n_outliers > 0: outlier_indices = np.random.choice(n_samples, n_outliers, replace=False) # Make outliers by adding large errors - y[outlier_indices] += np.random.choice([-1, 1], n_outliers) * np.random.uniform(3, 10, n_outliers) - + y[outlier_indices] += np.random.choice([-1, 1], n_outliers) * np.random.uniform( + 3, 10, n_outliers + ) + return X, y, true_coeffs # For testing if __name__ == "__main__": import sys + if len(sys.argv) > 1: result = asyncio.run(evaluate(sys.argv[1])) print(f"Score: {result.metrics['score']:.4f}") print(f"MSE: {result.metrics['mse']:.4f}") - print(f"Outlier Robustness: {result.metrics['outlier_robustness']:.4f}") \ No newline at end of file + print(f"Outlier Robustness: {result.metrics['outlier_robustness']:.4f}") diff --git a/examples/rust_adaptive_sort/evaluator.py b/examples/rust_adaptive_sort/evaluator.py index 1a2d7005a..65f4ae290 100644 --- a/examples/rust_adaptive_sort/evaluator.py +++ b/examples/rust_adaptive_sort/evaluator.py @@ -19,7 +19,7 @@ async def evaluate(program_path: str) -> EvaluationResult: """ Evaluate a Rust sorting algorithm implementation. - + Tests the algorithm on various data patterns to measure: - Correctness - Performance (speed) @@ -30,27 +30,27 @@ async def evaluate(program_path: str) -> EvaluationResult: # Create a temporary Rust project with tempfile.TemporaryDirectory() as temp_dir: project_dir = Path(temp_dir) / "sort_test" - + # Initialize Cargo project result = subprocess.run( ["cargo", "init", "--name", "sort_test", str(project_dir)], capture_output=True, - text=True + text=True, ) - + if result.returncode != 0: return EvaluationResult( metrics={"score": 0.0, "compile_success": 0.0}, - artifacts={"error": "Failed to create Cargo project", "stderr": result.stderr} + artifacts={"error": "Failed to create Cargo project", "stderr": result.stderr}, ) - + # Copy the program to src/lib.rs lib_path = project_dir / "src" / "lib.rs" - with open(program_path, 'r') as src: + with open(program_path, "r") as src: lib_content = src.read() - with open(lib_path, 'w') as dst: + with open(lib_path, "w") as dst: dst.write(lib_content) - + # Create main.rs with benchmark code main_content = """ use sort_test::{adaptive_sort, run_benchmark}; @@ -170,18 +170,18 @@ async def evaluate(program_path: str) -> EvaluationResult: } """ main_path = project_dir / "src" / "main.rs" - with open(main_path, 'w') as f: + with open(main_path, "w") as f: f.write(main_content) - + # Build the project build_result = subprocess.run( ["cargo", "build", "--release"], cwd=project_dir, capture_output=True, text=True, - timeout=60 + timeout=60, ) - + if build_result.returncode != 0: # Extract compilation errors return EvaluationResult( @@ -190,24 +190,24 @@ async def evaluate(program_path: str) -> EvaluationResult: "compile_success": 0.0, "correctness": 0.0, "performance_score": 0.0, - "adaptability_score": 0.0 + "adaptability_score": 0.0, }, artifacts={ "error": "Compilation failed", "stderr": build_result.stderr, - "stdout": build_result.stdout - } + "stdout": build_result.stdout, + }, ) - + # Run the benchmark run_result = subprocess.run( ["cargo", "run", "--release"], cwd=project_dir, capture_output=True, text=True, - timeout=30 + timeout=30, ) - + if run_result.returncode != 0: return EvaluationResult( metrics={ @@ -215,41 +215,35 @@ async def evaluate(program_path: str) -> EvaluationResult: "compile_success": 1.0, "correctness": 0.0, "performance_score": 0.0, - "adaptability_score": 0.0 + "adaptability_score": 0.0, }, - artifacts={ - "error": "Runtime error", - "stderr": run_result.stderr - } + artifacts={"error": "Runtime error", "stderr": run_result.stderr}, ) - + # Parse JSON output try: # Find JSON in output (between first { and last }) output = run_result.stdout - start = output.find('{') - end = output.rfind('}') + 1 + start = output.find("{") + end = output.rfind("}") + 1 json_str = output[start:end] - + results = json.loads(json_str) - + # Calculate overall score - correctness = results['correctness'] - performance = results['performance_score'] - adaptability = results['adaptability_score'] - + correctness = results["correctness"] + performance = results["performance_score"] + adaptability = results["adaptability_score"] + # Weighted score (correctness is mandatory) if correctness < 1.0: overall_score = 0.0 else: - overall_score = ( - 0.6 * performance + - 0.4 * adaptability - ) - + overall_score = 0.6 * performance + 0.4 * adaptability + # Check for memory safety (basic check via valgrind if available) memory_safe = 1.0 # Rust is memory safe by default - + return EvaluationResult( metrics={ "score": overall_score, @@ -257,16 +251,16 @@ async def evaluate(program_path: str) -> EvaluationResult: "correctness": correctness, "performance_score": performance, "adaptability_score": adaptability, - "avg_time": results['avg_time'], - "memory_safe": memory_safe + "avg_time": results["avg_time"], + "memory_safe": memory_safe, }, artifacts={ - "times": results['times'], - "all_correct": results['all_correct'], - "build_output": build_result.stdout - } + "times": results["times"], + "all_correct": results["all_correct"], + "build_output": build_result.stdout, + }, ) - + except (json.JSONDecodeError, KeyError) as e: return EvaluationResult( metrics={ @@ -274,14 +268,14 @@ async def evaluate(program_path: str) -> EvaluationResult: "compile_success": 1.0, "correctness": 0.0, "performance_score": 0.0, - "adaptability_score": 0.0 + "adaptability_score": 0.0, }, artifacts={ "error": f"Failed to parse results: {str(e)}", - "stdout": run_result.stdout - } + "stdout": run_result.stdout, + }, ) - + except subprocess.TimeoutExpired: return EvaluationResult( metrics={ @@ -289,9 +283,9 @@ async def evaluate(program_path: str) -> EvaluationResult: "compile_success": 0.0, "correctness": 0.0, "performance_score": 0.0, - "adaptability_score": 0.0 + "adaptability_score": 0.0, }, - artifacts={"error": "Timeout during evaluation"} + artifacts={"error": "Timeout during evaluation"}, ) except Exception as e: return EvaluationResult( @@ -300,18 +294,19 @@ async def evaluate(program_path: str) -> EvaluationResult: "compile_success": 0.0, "correctness": 0.0, "performance_score": 0.0, - "adaptability_score": 0.0 + "adaptability_score": 0.0, }, - artifacts={"error": str(e), "type": "evaluation_error"} + artifacts={"error": str(e), "type": "evaluation_error"}, ) # For testing if __name__ == "__main__": import sys + if len(sys.argv) > 1: result = asyncio.run(evaluate(sys.argv[1])) print(f"Score: {result.metrics['score']:.4f}") print(f"Correctness: {result.metrics['correctness']:.4f}") print(f"Performance: {result.metrics['performance_score']:.4f}") - print(f"Adaptability: {result.metrics['adaptability_score']:.4f}") \ No newline at end of file + print(f"Adaptability: {result.metrics['adaptability_score']:.4f}") diff --git a/examples/signal_processing/evaluator.py b/examples/signal_processing/evaluator.py index 67911002b..6abe2021a 100644 --- a/examples/signal_processing/evaluator.py +++ b/examples/signal_processing/evaluator.py @@ -47,86 +47,86 @@ def safe_float(value): def calculate_slope_changes(signal_data): """ Calculate slope change penalty S(Īø) - counts directional reversals - + Args: signal_data: 1D array of signal values - + Returns: Number of slope changes (directional reversals) """ if len(signal_data) < 3: return 0 - + # Calculate differences diffs = np.diff(signal_data) - + # Count sign changes in consecutive differences sign_changes = 0 for i in range(1, len(diffs)): - if np.sign(diffs[i]) != np.sign(diffs[i-1]) and diffs[i-1] != 0: + if np.sign(diffs[i]) != np.sign(diffs[i - 1]) and diffs[i - 1] != 0: sign_changes += 1 - + return sign_changes def calculate_lag_error(filtered_signal, original_signal, window_size): """ Calculate instantaneous lag error L_recent(Īø) = |y[n] - x[n]| - + Args: filtered_signal: Output of the filter original_signal: Original input signal window_size: Size of the processing window - + Returns: Instantaneous lag error at the most recent sample """ if len(filtered_signal) == 0: return 1.0 # Maximum penalty - + # Account for processing delay delay = window_size - 1 if len(original_signal) <= delay: return 1.0 - + # Compare the last filtered sample with the corresponding original sample recent_filtered = filtered_signal[-1] recent_original = original_signal[delay + len(filtered_signal) - 1] - + return abs(recent_filtered - recent_original) def calculate_average_tracking_error(filtered_signal, original_signal, window_size): """ Calculate average tracking error L_avg(Īø) over the window - + Args: filtered_signal: Output of the filter original_signal: Original input signal window_size: Size of the processing window - + Returns: Average absolute error over the processed samples """ if len(filtered_signal) == 0: return 1.0 # Maximum penalty - + # Account for processing delay delay = window_size - 1 if len(original_signal) <= delay: return 1.0 - + # Align signals - aligned_original = original_signal[delay:delay + len(filtered_signal)] - + aligned_original = original_signal[delay : delay + len(filtered_signal)] + # Ensure same length min_length = min(len(filtered_signal), len(aligned_original)) if min_length == 0: return 1.0 - + filtered_aligned = filtered_signal[:min_length] original_aligned = aligned_original[:min_length] - + # Calculate mean absolute error return np.mean(np.abs(filtered_aligned - original_aligned)) @@ -134,78 +134,83 @@ def calculate_average_tracking_error(filtered_signal, original_signal, window_si def calculate_false_reversal_penalty(filtered_signal, clean_signal, window_size): """ Calculate false reversal penalty R(Īø) - mismatched trend changes - + Args: filtered_signal: Output of the filter clean_signal: Ground truth clean signal window_size: Size of the processing window - + Returns: Penalty for trend changes that don't match the clean signal """ if len(filtered_signal) < 3 or len(clean_signal) < 3: return 0 - + # Account for processing delay delay = window_size - 1 if len(clean_signal) <= delay: return 1.0 - + # Align signals - aligned_clean = clean_signal[delay:delay + len(filtered_signal)] + aligned_clean = clean_signal[delay : delay + len(filtered_signal)] min_length = min(len(filtered_signal), len(aligned_clean)) - + if min_length < 3: return 0 - + filtered_aligned = filtered_signal[:min_length] clean_aligned = aligned_clean[:min_length] - + # Calculate trend changes for both signals filtered_diffs = np.diff(filtered_aligned) clean_diffs = np.diff(clean_aligned) - + # Count mismatched trend changes false_reversals = 0 for i in range(1, len(filtered_diffs)): # Check if there's a trend change in filtered signal - filtered_change = (np.sign(filtered_diffs[i]) != np.sign(filtered_diffs[i-1]) - and filtered_diffs[i-1] != 0) - + filtered_change = ( + np.sign(filtered_diffs[i]) != np.sign(filtered_diffs[i - 1]) + and filtered_diffs[i - 1] != 0 + ) + # Check if there's a corresponding trend change in clean signal - clean_change = (np.sign(clean_diffs[i]) != np.sign(clean_diffs[i-1]) - and clean_diffs[i-1] != 0) - + clean_change = ( + np.sign(clean_diffs[i]) != np.sign(clean_diffs[i - 1]) and clean_diffs[i - 1] != 0 + ) + # Count as false reversal if filtered has change but clean doesn't if filtered_change and not clean_change: false_reversals += 1 - + return false_reversals def calculate_composite_score(S, L_recent, L_avg, R, alpha=[0.3, 0.2, 0.2, 0.3]): """ Calculate the composite metric J(Īø) = αāĀ·S(Īø) + αāĀ·L_recent(Īø) + αāĀ·L_avg(Īø) + αāĀ·R(Īø) - + All metrics are normalized and converted to penalties (higher = worse) The final score is converted to a maximization problem (higher = better) """ # Normalize slope changes (typical range 0-100) S_norm = min(S / 50.0, 2.0) - + # Lag errors are already in reasonable range (0-10 typically) L_recent_norm = min(L_recent, 2.0) L_avg_norm = min(L_avg, 2.0) - + # Normalize false reversals (typical range 0-50) R_norm = min(R / 25.0, 2.0) - + # Calculate weighted penalty - penalty = alpha[0] * S_norm + alpha[1] * L_recent_norm + alpha[2] * L_avg_norm + alpha[3] * R_norm - + penalty = ( + alpha[0] * S_norm + alpha[1] * L_recent_norm + alpha[2] * L_avg_norm + alpha[3] * R_norm + ) + # Convert to maximization score (higher is better) score = 1.0 / (1.0 + penalty) - + return score @@ -214,43 +219,47 @@ def generate_test_signals(num_signals=5): Generate multiple test signals with different characteristics """ test_signals = [] - + for i in range(num_signals): np.random.seed(42 + i) # Different seed for each signal length = 500 + i * 100 # Varying lengths noise_level = 0.2 + i * 0.1 # Varying noise levels - + t = np.linspace(0, 10, length) - + # Different signal characteristics if i == 0: # Smooth sinusoidal with trend clean = 2 * np.sin(2 * np.pi * 0.5 * t) + 0.1 * t elif i == 1: # Multiple frequency components - clean = (np.sin(2 * np.pi * 0.5 * t) + - 0.5 * np.sin(2 * np.pi * 2 * t) + - 0.2 * np.sin(2 * np.pi * 5 * t)) + clean = ( + np.sin(2 * np.pi * 0.5 * t) + + 0.5 * np.sin(2 * np.pi * 2 * t) + + 0.2 * np.sin(2 * np.pi * 5 * t) + ) elif i == 2: # Non-stationary with changing frequency clean = np.sin(2 * np.pi * (0.5 + 0.2 * t) * t) elif i == 3: # Step changes - clean = np.concatenate([ - np.ones(length//3), - 2 * np.ones(length//3), - 0.5 * np.ones(length - 2*(length//3)) - ]) + clean = np.concatenate( + [ + np.ones(length // 3), + 2 * np.ones(length // 3), + 0.5 * np.ones(length - 2 * (length // 3)), + ] + ) else: # Random walk with trend clean = np.cumsum(np.random.randn(length) * 0.1) + 0.05 * t - + # Add noise noise = np.random.normal(0, noise_level, length) noisy = clean + noise - + test_signals.append((noisy, clean)) - + return test_signals @@ -267,120 +276,120 @@ def evaluate(program_path): # Check if required function exists if not hasattr(program, "run_signal_processing"): - return { - "composite_score": 0.0, - "error": "Missing run_signal_processing function" - } + return {"composite_score": 0.0, "error": "Missing run_signal_processing function"} # Generate test signals test_signals = generate_test_signals(5) - + # Collect metrics across all test signals all_scores = [] all_metrics = [] successful_runs = 0 - + for i, (noisy_signal, clean_signal) in enumerate(test_signals): try: # Run the algorithm with timeout start_time = time.time() - + # Call the program's main function result = run_with_timeout( program.run_signal_processing, kwargs={ - 'signal_length': len(noisy_signal), - 'noise_level': 0.3, - 'window_size': 20 + "signal_length": len(noisy_signal), + "noise_level": 0.3, + "window_size": 20, }, - timeout_seconds=10 + timeout_seconds=10, ) - + execution_time = time.time() - start_time - + # Validate result format if not isinstance(result, dict): print(f"Signal {i}: Invalid result format") continue - - if 'filtered_signal' not in result: + + if "filtered_signal" not in result: print(f"Signal {i}: Missing filtered_signal in result") continue - - filtered_signal = result['filtered_signal'] - + + filtered_signal = result["filtered_signal"] + if len(filtered_signal) == 0: print(f"Signal {i}: Empty filtered signal") continue - + # Convert to numpy arrays filtered_signal = np.array(filtered_signal) - + # Calculate metrics using the generated test signal window_size = 20 - + # Calculate all penalty components S = calculate_slope_changes(filtered_signal) L_recent = calculate_lag_error(filtered_signal, noisy_signal, window_size) L_avg = calculate_average_tracking_error(filtered_signal, noisy_signal, window_size) R = calculate_false_reversal_penalty(filtered_signal, clean_signal, window_size) - + # Calculate composite score composite_score = calculate_composite_score(S, L_recent, L_avg, R) - + # Additional quality metrics correlation = 0.0 noise_reduction = 0.0 - + try: # Calculate correlation with clean signal delay = window_size - 1 - aligned_clean = clean_signal[delay:delay + len(filtered_signal)] + aligned_clean = clean_signal[delay : delay + len(filtered_signal)] min_length = min(len(filtered_signal), len(aligned_clean)) - + if min_length > 1: - corr_result = pearsonr(filtered_signal[:min_length], aligned_clean[:min_length]) + corr_result = pearsonr( + filtered_signal[:min_length], aligned_clean[:min_length] + ) correlation = corr_result[0] if not np.isnan(corr_result[0]) else 0.0 - + # Calculate noise reduction - aligned_noisy = noisy_signal[delay:delay + len(filtered_signal)] + aligned_noisy = noisy_signal[delay : delay + len(filtered_signal)] aligned_noisy = aligned_noisy[:min_length] aligned_clean = aligned_clean[:min_length] - + if min_length > 0: noise_before = np.var(aligned_noisy - aligned_clean) noise_after = np.var(filtered_signal[:min_length] - aligned_clean) - noise_reduction = ((noise_before - noise_after) / noise_before - if noise_before > 0 else 0) + noise_reduction = ( + (noise_before - noise_after) / noise_before if noise_before > 0 else 0 + ) noise_reduction = max(0, noise_reduction) # Ensure non-negative - + except Exception as e: print(f"Signal {i}: Error calculating additional metrics: {e}") - + # Store metrics metrics = { - 'slope_changes': safe_float(S), - 'lag_error': safe_float(L_recent), - 'avg_error': safe_float(L_avg), - 'false_reversals': safe_float(R), - 'composite_score': safe_float(composite_score), - 'correlation': safe_float(correlation), - 'noise_reduction': safe_float(noise_reduction), - 'execution_time': safe_float(execution_time), - 'signal_length': len(filtered_signal) + "slope_changes": safe_float(S), + "lag_error": safe_float(L_recent), + "avg_error": safe_float(L_avg), + "false_reversals": safe_float(R), + "composite_score": safe_float(composite_score), + "correlation": safe_float(correlation), + "noise_reduction": safe_float(noise_reduction), + "execution_time": safe_float(execution_time), + "signal_length": len(filtered_signal), } - + all_scores.append(composite_score) all_metrics.append(metrics) successful_runs += 1 - + except TimeoutError: print(f"Signal {i}: Timeout") continue except Exception as e: print(f"Signal {i}: Error - {str(e)}") continue - + # If no successful runs, return minimal scores if successful_runs == 0: return { @@ -392,35 +401,35 @@ def evaluate(program_path): "correlation": 0.0, "noise_reduction": 0.0, "success_rate": 0.0, - "error": "All test signals failed" + "error": "All test signals failed", } - + # Calculate aggregate metrics avg_composite_score = np.mean(all_scores) - avg_slope_changes = np.mean([m['slope_changes'] for m in all_metrics]) - avg_lag_error = np.mean([m['lag_error'] for m in all_metrics]) - avg_avg_error = np.mean([m['avg_error'] for m in all_metrics]) - avg_false_reversals = np.mean([m['false_reversals'] for m in all_metrics]) - avg_correlation = np.mean([m['correlation'] for m in all_metrics]) - avg_noise_reduction = np.mean([m['noise_reduction'] for m in all_metrics]) - avg_execution_time = np.mean([m['execution_time'] for m in all_metrics]) + avg_slope_changes = np.mean([m["slope_changes"] for m in all_metrics]) + avg_lag_error = np.mean([m["lag_error"] for m in all_metrics]) + avg_avg_error = np.mean([m["avg_error"] for m in all_metrics]) + avg_false_reversals = np.mean([m["false_reversals"] for m in all_metrics]) + avg_correlation = np.mean([m["correlation"] for m in all_metrics]) + avg_noise_reduction = np.mean([m["noise_reduction"] for m in all_metrics]) + avg_execution_time = np.mean([m["execution_time"] for m in all_metrics]) success_rate = successful_runs / len(test_signals) - + # Calculate additional derived scores smoothness_score = 1.0 / (1.0 + avg_slope_changes / 20.0) # Higher is better responsiveness_score = 1.0 / (1.0 + avg_lag_error) # Higher is better accuracy_score = max(0, avg_correlation) # 0-1, higher is better efficiency_score = min(1.0, 1.0 / max(0.001, avg_execution_time)) # Speed bonus - + # Overall score combining multiple factors overall_score = ( - 0.4 * avg_composite_score + # Primary metric - 0.2 * smoothness_score + # Smoothness - 0.2 * accuracy_score + # Correlation with clean signal - 0.1 * avg_noise_reduction + # Noise reduction capability - 0.1 * success_rate # Reliability + 0.4 * avg_composite_score # Primary metric + + 0.2 * smoothness_score # Smoothness + + 0.2 * accuracy_score # Correlation with clean signal + + 0.1 * avg_noise_reduction # Noise reduction capability + + 0.1 * success_rate # Reliability ) - + return { "composite_score": safe_float(avg_composite_score), "overall_score": safe_float(overall_score), # Primary selection metric @@ -435,17 +444,13 @@ def evaluate(program_path): "accuracy_score": safe_float(accuracy_score), "efficiency_score": safe_float(efficiency_score), "execution_time": safe_float(avg_execution_time), - "success_rate": safe_float(success_rate) + "success_rate": safe_float(success_rate), } - + except Exception as e: print(f"Evaluation failed: {str(e)}") print(traceback.format_exc()) - return { - "composite_score": 0.0, - "overall_score": 0.0, - "error": str(e) - } + return {"composite_score": 0.0, "overall_score": 0.0, "error": str(e)} def evaluate_stage1(program_path): @@ -460,62 +465,44 @@ def evaluate_stage1(program_path): # Check if required function exists if not hasattr(program, "run_signal_processing"): - return { - "runs_successfully": 0.0, - "error": "Missing run_signal_processing function" - } + return {"runs_successfully": 0.0, "error": "Missing run_signal_processing function"} # Quick test with small signal try: result = run_with_timeout( program.run_signal_processing, - kwargs={'signal_length': 100, 'noise_level': 0.3, 'window_size': 10}, - timeout_seconds=5 + kwargs={"signal_length": 100, "noise_level": 0.3, "window_size": 10}, + timeout_seconds=5, ) - - if isinstance(result, dict) and 'filtered_signal' in result: - filtered_signal = result['filtered_signal'] + + if isinstance(result, dict) and "filtered_signal" in result: + filtered_signal = result["filtered_signal"] if len(filtered_signal) > 0: # Quick quality check composite_score = 0.5 # Baseline score for working programs - + # Bonus for reasonable output length expected_length = 100 - 10 + 1 # signal_length - window_size + 1 if len(filtered_signal) == expected_length: composite_score += 0.2 - + return { "runs_successfully": 1.0, "composite_score": composite_score, - "output_length": len(filtered_signal) + "output_length": len(filtered_signal), } else: - return { - "runs_successfully": 0.5, - "error": "Empty filtered signal" - } + return {"runs_successfully": 0.5, "error": "Empty filtered signal"} else: - return { - "runs_successfully": 0.3, - "error": "Invalid result format" - } - + return {"runs_successfully": 0.3, "error": "Invalid result format"} + except TimeoutError: - return { - "runs_successfully": 0.0, - "error": "Timeout in stage 1" - } + return {"runs_successfully": 0.0, "error": "Timeout in stage 1"} except Exception as e: - return { - "runs_successfully": 0.0, - "error": f"Stage 1 error: {str(e)}" - } - + return {"runs_successfully": 0.0, "error": f"Stage 1 error: {str(e)}"} + except Exception as e: - return { - "runs_successfully": 0.0, - "error": f"Stage 1 failed: {str(e)}" - } + return {"runs_successfully": 0.0, "error": f"Stage 1 failed: {str(e)}"} def evaluate_stage2(program_path): diff --git a/examples/signal_processing/initial_program.py b/examples/signal_processing/initial_program.py index 2636e4746..1350c577a 100644 --- a/examples/signal_processing/initial_program.py +++ b/examples/signal_processing/initial_program.py @@ -2,7 +2,7 @@ """ Real-Time Adaptive Signal Processing Algorithm for Non-Stationary Time Series -This algorithm implements a sliding window approach to filter volatile, non-stationary +This algorithm implements a sliding window approach to filter volatile, non-stationary time series data while minimizing noise and preserving signal dynamics. """ import numpy as np @@ -13,70 +13,70 @@ def adaptive_filter(x, window_size=20): """ Adaptive signal processing algorithm using sliding window approach. - + Args: x: Input signal (1D array of real-valued samples) window_size: Size of the sliding window (W samples) - + Returns: y: Filtered output signal with length = len(x) - window_size + 1 """ if len(x) < window_size: raise ValueError(f"Input signal length ({len(x)}) must be >= window_size ({window_size})") - + # Initialize output array output_length = len(x) - window_size + 1 y = np.zeros(output_length) - + # Simple moving average as baseline for i in range(output_length): - window = x[i:i + window_size] - + window = x[i : i + window_size] + # Basic moving average filter y[i] = np.mean(window) - + return y def enhanced_filter_with_trend_preservation(x, window_size=20): """ Enhanced version with trend preservation using weighted moving average. - + Args: x: Input signal (1D array of real-valued samples) window_size: Size of the sliding window - + Returns: y: Filtered output signal """ if len(x) < window_size: raise ValueError(f"Input signal length ({len(x)}) must be >= window_size ({window_size})") - + output_length = len(x) - window_size + 1 y = np.zeros(output_length) - + # Create weights that emphasize recent samples weights = np.exp(np.linspace(-2, 0, window_size)) weights = weights / np.sum(weights) - + for i in range(output_length): - window = x[i:i + window_size] - + window = x[i : i + window_size] + # Weighted moving average with exponential weights y[i] = np.sum(window * weights) - + return y def process_signal(input_signal, window_size=20, algorithm_type="enhanced"): """ Main signal processing function that applies the selected algorithm. - + Args: input_signal: Input time series data window_size: Window size for processing algorithm_type: Type of algorithm to use ("basic" or "enhanced") - + Returns: Filtered signal """ @@ -92,91 +92,91 @@ def process_signal(input_signal, window_size=20, algorithm_type="enhanced"): def generate_test_signal(length=1000, noise_level=0.3, seed=42): """ Generate synthetic test signal with known characteristics. - + Args: length: Length of the signal noise_level: Standard deviation of noise to add seed: Random seed for reproducibility - + Returns: Tuple of (noisy_signal, clean_signal) """ np.random.seed(seed) t = np.linspace(0, 10, length) - + # Create a complex signal with multiple components clean_signal = ( - 2 * np.sin(2 * np.pi * 0.5 * t) + # Low frequency component - 1.5 * np.sin(2 * np.pi * 2 * t) + # Medium frequency component - 0.5 * np.sin(2 * np.pi * 5 * t) + # Higher frequency component - 0.8 * np.exp(-t/5) * np.sin(2 * np.pi * 1.5 * t) # Decaying oscillation + 2 * np.sin(2 * np.pi * 0.5 * t) # Low frequency component + + 1.5 * np.sin(2 * np.pi * 2 * t) # Medium frequency component + + 0.5 * np.sin(2 * np.pi * 5 * t) # Higher frequency component + + 0.8 * np.exp(-t / 5) * np.sin(2 * np.pi * 1.5 * t) # Decaying oscillation ) - + # Add non-stationary behavior trend = 0.1 * t * np.sin(0.2 * t) # Slowly varying trend clean_signal += trend - + # Add random walk component for non-stationarity random_walk = np.cumsum(np.random.randn(length) * 0.05) clean_signal += random_walk - + # Add noise noise = np.random.normal(0, noise_level, length) noisy_signal = clean_signal + noise - + return noisy_signal, clean_signal def run_signal_processing(signal_length=1000, noise_level=0.3, window_size=20): """ Run the signal processing algorithm on a test signal. - + Returns: Dictionary containing results and metrics """ # Generate test signal noisy_signal, clean_signal = generate_test_signal(signal_length, noise_level) - + # Process the signal filtered_signal = process_signal(noisy_signal, window_size, "enhanced") - + # Calculate basic metrics if len(filtered_signal) > 0: # Align signals for comparison (account for processing delay) delay = window_size - 1 aligned_clean = clean_signal[delay:] aligned_noisy = noisy_signal[delay:] - + # Ensure same length min_length = min(len(filtered_signal), len(aligned_clean)) filtered_signal = filtered_signal[:min_length] aligned_clean = aligned_clean[:min_length] aligned_noisy = aligned_noisy[:min_length] - + # Calculate correlation with clean signal correlation = np.corrcoef(filtered_signal, aligned_clean)[0, 1] if min_length > 1 else 0 - + # Calculate noise reduction noise_before = np.var(aligned_noisy - aligned_clean) noise_after = np.var(filtered_signal - aligned_clean) noise_reduction = (noise_before - noise_after) / noise_before if noise_before > 0 else 0 - + return { - 'filtered_signal': filtered_signal, - 'clean_signal': aligned_clean, - 'noisy_signal': aligned_noisy, - 'correlation': correlation, - 'noise_reduction': noise_reduction, - 'signal_length': min_length + "filtered_signal": filtered_signal, + "clean_signal": aligned_clean, + "noisy_signal": aligned_noisy, + "correlation": correlation, + "noise_reduction": noise_reduction, + "signal_length": min_length, } else: return { - 'filtered_signal': [], - 'clean_signal': [], - 'noisy_signal': [], - 'correlation': 0, - 'noise_reduction': 0, - 'signal_length': 0 + "filtered_signal": [], + "clean_signal": [], + "noisy_signal": [], + "correlation": 0, + "noise_reduction": 0, + "signal_length": 0, } diff --git a/examples/web_scraper_optillm/evaluator.py b/examples/web_scraper_optillm/evaluator.py index 14098b31c..c0a1e1c65 100644 --- a/examples/web_scraper_optillm/evaluator.py +++ b/examples/web_scraper_optillm/evaluator.py @@ -15,120 +15,117 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - def evaluate(program_path: str) -> Dict: """ Evaluate the web scraper program. - + Args: program_path: Path to the program to evaluate - + Returns: Dictionary with metrics and artifacts for OpenEvolve compatibility """ try: # Import the program sys.path.insert(0, os.path.dirname(program_path)) - program_name = os.path.basename(program_path).replace('.py', '') + program_name = os.path.basename(program_path).replace(".py", "") program = __import__(program_name) - + # Test data: HTML content from various documentation sources test_cases = get_test_cases() - + # Evaluate each test case metrics = { - 'accuracy': 0.0, - 'completeness': 0.0, - 'robustness': 0.0, - 'parsing_errors': 0.0, - 'total_score': 0.0 + "accuracy": 0.0, + "completeness": 0.0, + "robustness": 0.0, + "parsing_errors": 0.0, + "total_score": 0.0, } - + artifacts = {} - + total_correct = 0 total_expected = 0 parsing_errors = 0 - + for i, test_case in enumerate(test_cases): try: # Run the scraper - docs = program.scrape_api_docs(test_case['html']) - + docs = program.scrape_api_docs(test_case["html"]) + # Evaluate accuracy - correct, expected = evaluate_extraction(docs, test_case['expected']) + correct, expected = evaluate_extraction(docs, test_case["expected"]) total_correct += correct total_expected += expected - + # Test parameter extraction for doc in docs: - if 'parameters' not in doc: - doc['parameters'] = program.extract_parameters(doc.get('signature', '')) - + if "parameters" not in doc: + doc["parameters"] = program.extract_parameters(doc.get("signature", "")) + # Test formatting formatted = program.format_documentation(docs) - + # Store results for debugging - artifacts[f'test_case_{i}'] = { - 'expected_count': expected, - 'found_count': correct, - 'extracted_functions': [doc.get('name', 'unknown') for doc in docs], - 'formatted_length': len(formatted) + artifacts[f"test_case_{i}"] = { + "expected_count": expected, + "found_count": correct, + "extracted_functions": [doc.get("name", "unknown") for doc in docs], + "formatted_length": len(formatted), } - + except Exception as e: parsing_errors += 1 - artifacts[f'test_case_{i}_error'] = str(e) - + artifacts[f"test_case_{i}_error"] = str(e) + # Calculate metrics if total_expected > 0: - metrics['accuracy'] = total_correct / total_expected - - metrics['completeness'] = min(1.0, total_correct / 20) # Expect ~20 functions total - metrics['robustness'] = max(0.0, 1.0 - (parsing_errors / len(test_cases))) - metrics['parsing_errors'] = parsing_errors / len(test_cases) - + metrics["accuracy"] = total_correct / total_expected + + metrics["completeness"] = min(1.0, total_correct / 20) # Expect ~20 functions total + metrics["robustness"] = max(0.0, 1.0 - (parsing_errors / len(test_cases))) + metrics["parsing_errors"] = parsing_errors / len(test_cases) + # Overall score - use 'combined_score' as primary metric for evolution - metrics['combined_score'] = ( - metrics['accuracy'] * 0.4 + - metrics['completeness'] * 0.3 + - metrics['robustness'] * 0.3 + metrics["combined_score"] = ( + metrics["accuracy"] * 0.4 + metrics["completeness"] * 0.3 + metrics["robustness"] * 0.3 ) - + # Add detailed feedback for the LLM - artifacts['evaluation_feedback'] = generate_feedback(metrics, artifacts) - + artifacts["evaluation_feedback"] = generate_feedback(metrics, artifacts) + # Return dictionary format for OpenEvolve compatibility return metrics - + except Exception as e: return { - 'accuracy': 0.0, - 'completeness': 0.0, - 'robustness': 0.0, - 'parsing_errors': 1.0, - 'combined_score': 0.0, - 'error': str(e), - 'traceback': traceback.format_exc(), - 'stage': 'program_import' + "accuracy": 0.0, + "completeness": 0.0, + "robustness": 0.0, + "parsing_errors": 1.0, + "combined_score": 0.0, + "error": str(e), + "traceback": traceback.format_exc(), + "stage": "program_import", } def get_test_cases() -> List[Dict[str, Any]]: """ Get test cases with HTML content and expected results. - + These test cases include URLs that will be fetched by optillm's readurls plugin during evolution, providing the LLM with actual documentation structure. - + Returns: List of test cases with HTML content and expected results """ return [ { - 'name': 'json_module_docs', - 'html': ''' + "name": "json_module_docs", + "html": """
tags.")
- elif metrics['accuracy'] < 0.8:
+ feedback.append(
+ "Consider improving the HTML parsing logic to handle different documentation formats."
+ )
+ feedback.append(
+ "Look for patterns like , , and tags."
+ )
+ elif metrics["accuracy"] < 0.8:
feedback.append("ā
**Good Accuracy**: Most functions are found, but some are missed.")
feedback.append("Fine-tune the extraction logic for edge cases.")
else:
feedback.append("š **Excellent Accuracy**: Function extraction is working well!")
-
+
feedback.append("")
-
+
# Completeness feedback
- if metrics['completeness'] < 0.5:
+ if metrics["completeness"] < 0.5:
feedback.append("ā ļø **Low Completeness**: Not extracting enough functions overall.")
feedback.append("Increase the limit or improve the search scope.")
-
+
# Robustness feedback
- if metrics['robustness'] < 0.8:
+ if metrics["robustness"] < 0.8:
feedback.append("ā ļø **Low Robustness**: The scraper fails on some HTML formats.")
feedback.append("Add try-catch blocks and handle different documentation structures.")
feedback.append("Consider multiple parsing strategies and fallback methods.")
-
+
# Specific improvements
feedback.append("")
feedback.append("## Specific Improvements:")
-
+
# Analyze test case results
for key, value in artifacts.items():
- if key.startswith('test_case_') and isinstance(value, dict):
- if 'error' in key:
+ if key.startswith("test_case_") and isinstance(value, dict):
+ if "error" in key:
feedback.append(f"- Fix error in {key}: {value}")
- elif value.get('found_count', 0) < value.get('expected_count', 0):
- feedback.append(f"- Improve extraction for {key}: found {value.get('found_count', 0)}/{value.get('expected_count', 0)} functions")
-
+ elif value.get("found_count", 0) < value.get("expected_count", 0):
+ feedback.append(
+ f"- Improve extraction for {key}: found {value.get('found_count', 0)}/{value.get('expected_count', 0)} functions"
+ )
+
# Documentation URL hints (these will be fetched by readurls plugin)
feedback.append("")
feedback.append("## Documentation References:")
@@ -362,5 +367,5 @@ def generate_feedback(metrics: Dict[str, float], artifacts: Dict[str, Any]) -> s
feedback.append("- Python docs: https://docs.python.org/3/library/json.html")
feedback.append("- Requests docs: https://requests.readthedocs.io/en/latest/api/")
feedback.append("- BeautifulSoup docs: https://www.crummy.com/software/BeautifulSoup/bs4/doc/")
-
- return '\n'.join(feedback)
\ No newline at end of file
+
+ return "\n".join(feedback)
diff --git a/examples/web_scraper_optillm/initial_program.py b/examples/web_scraper_optillm/initial_program.py
index f0c97f5a2..4daead447 100644
--- a/examples/web_scraper_optillm/initial_program.py
+++ b/examples/web_scraper_optillm/initial_program.py
@@ -18,94 +18,102 @@
def scrape_api_docs(html_content: str) -> List[Dict[str, any]]:
"""
Extract API documentation from HTML content.
-
+
Args:
html_content: Raw HTML content of a documentation page
-
+
Returns:
List of dictionaries containing function documentation
"""
- soup = BeautifulSoup(html_content, 'html.parser')
+ soup = BeautifulSoup(html_content, "html.parser")
functions = []
-
+
# Try multiple approaches to find functions
# 1. Look for code blocks
- code_blocks = soup.find_all('code')
+ code_blocks = soup.find_all("code")
for block in code_blocks:
text = block.get_text(strip=True)
- if '(' in text and ')' in text:
- functions.append({
- 'name': text.split('(')[0].strip(),
- 'signature': text,
- 'description': 'No description found',
- 'parameters': extract_parameters(text)
- })
-
+ if "(" in text and ")" in text:
+ functions.append(
+ {
+ "name": text.split("(")[0].strip(),
+ "signature": text,
+ "description": "No description found",
+ "parameters": extract_parameters(text),
+ }
+ )
+
# 2. Look for function signatures in headers (h3)
- h3_blocks = soup.find_all('h3')
+ h3_blocks = soup.find_all("h3")
for block in h3_blocks:
text = block.get_text(strip=True)
- if '(' in text and ')' in text:
- functions.append({
- 'name': text.split('(')[0].strip(),
- 'signature': text,
- 'description': 'No description found',
- 'parameters': extract_parameters(text)
- })
-
+ if "(" in text and ")" in text:
+ functions.append(
+ {
+ "name": text.split("(")[0].strip(),
+ "signature": text,
+ "description": "No description found",
+ "parameters": extract_parameters(text),
+ }
+ )
+
# 3. Look for dt elements with sig class
- dt_blocks = soup.find_all('dt', class_='sig')
+ dt_blocks = soup.find_all("dt", class_="sig")
for block in dt_blocks:
- sig_name = block.find(class_='sig-name')
+ sig_name = block.find(class_="sig-name")
if sig_name:
name = sig_name.get_text(strip=True)
- functions.append({
- 'name': name,
- 'signature': block.get_text(strip=True),
- 'description': 'No description found',
- 'parameters': extract_parameters(block.get_text(strip=True))
- })
-
+ functions.append(
+ {
+ "name": name,
+ "signature": block.get_text(strip=True),
+ "description": "No description found",
+ "parameters": extract_parameters(block.get_text(strip=True)),
+ }
+ )
+
return functions[:20] # Return more functions
def extract_parameters(signature: str) -> List[Dict[str, str]]:
"""
Extract parameter information from a function signature.
-
+
Args:
signature: Function signature string
-
+
Returns:
List of parameter dictionaries
"""
params = []
# Very basic parameter extraction
- match = re.search(r'\((.*?)\)', signature)
+ match = re.search(r"\((.*?)\)", signature)
if match:
param_string = match.group(1)
if param_string:
- param_parts = param_string.split(',')
+ param_parts = param_string.split(",")
for part in param_parts:
part = part.strip()
if part:
- params.append({
- 'name': part.split('=')[0].strip(),
- 'type': 'unknown',
- 'default': None,
- 'description': ''
- })
-
+ params.append(
+ {
+ "name": part.split("=")[0].strip(),
+ "type": "unknown",
+ "default": None,
+ "description": "",
+ }
+ )
+
return params
def format_documentation(api_docs: List[Dict[str, any]]) -> str:
"""
Format extracted documentation into a readable string.
-
+
Args:
api_docs: List of API documentation dictionaries
-
+
Returns:
Formatted documentation string
"""
@@ -114,15 +122,17 @@ def format_documentation(api_docs: List[Dict[str, any]]) -> str:
output.append(f"Function: {doc['name']}")
output.append(f"Signature: {doc['signature']}")
output.append(f"Description: {doc['description']}")
-
- if doc.get('parameters'):
+
+ if doc.get("parameters"):
output.append("Parameters:")
- for param in doc['parameters']:
+ for param in doc["parameters"]:
output.append(f" - {param['name']}: {param.get('description', 'No description')}")
-
+
output.append("") # Empty line between functions
-
- return '\n'.join(output)
+
+ return "\n".join(output)
+
+
# EVOLVE-BLOCK-END
@@ -143,7 +153,7 @@ def format_documentation(api_docs: List[Dict[str, any]]) -> str: