# SWEBench Agent Local Evaluation

This notebook runs the SWEBench agent evaluation locally without using Modal. It's based on the code in the `swebench_agent_run` directory.


## Setup

First, let's import the necessary libraries and set up the environment.



In [1]:
import asyncio
import json
import traceback
import uuid
from pathlib import Path
from datetime import datetime
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Import SWEBench utilities
from codegen.extensions.swebench.utils import SWEBenchDataset, SweBenchExample, get_swe_bench_example, get_swe_bench_examples
from codegen.extensions.swebench.harness import run_agent_on_entry
from codegen.extensions.swebench.report import generate_report
from codegen.sdk.core.codebase import Codebase

# Set up directories for predictions and logs
PREDS_DNAME = Path("predictions")
LOG_DIR = Path("logs")

# Create directories if they don't exist
PREDS_DNAME.mkdir(exist_ok=True, parents=True)
LOG_DIR.mkdir(exist_ok=True, parents=True)


## Define Local Processing Functions

Now, let's define functions to process SWEBench examples locally.


In [2]:
async def process_example(example: SweBenchExample):
    """Process a single SWEBench example locally."""
    try:
        # Create a Codebase object for the example
        codebase = Codebase.from_repo(repo_full_name=example.repo, commit=example.base_commit, language="python")
        
        # Run the agent on the example
        result = await run_agent_on_entry(example, codebase=codebase)
        return result
    except Exception as e:
        error_type = type(e).__name__
        error_info = {
            "error_type": error_type,
            "error_message": str(e),
            "traceback": traceback.format_exception(type(e), e, e.__traceback__),
        }
        
        print(f"Error processing {example.instance_id}:")
        print(f"Type: {error_type}")
        print(f"Message: {str(e)}")
        print("Traceback:")
        print("".join(error_info["traceback"]))
        
        return {"instance_id": example.instance_id, "status": "error", "error_info": error_info}

In [3]:
async def process_batch_locally(examples: list[SweBenchExample], batch_size=3):
    """Process a batch of examples concurrently but locally.
    
    Args:
        examples: List of SweBenchExample objects to process
        batch_size: Number of examples to process concurrently.
                   Default is 3 which is reasonable for local processing.
    """
    results = []

    # Process examples in batches
    for i in range(0, len(examples), batch_size):
        batch = examples[i : i + batch_size]

        # Create tasks for this batch
        batch_tasks = [process_example(example) for example in batch]

        # Wait for all tasks in this batch to complete
        print(f"Processing batch {i // batch_size + 1}/{len(examples) // batch_size + 1} (examples {i + 1}-{min(i + batch_size, len(examples))})")

        try:
            batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

            # Store results
            for example, result in zip(batch, batch_results):
                if isinstance(result, Exception):
                    error_type = type(result).__name__
                    error_info = {
                        "error_type": error_type,
                        "error_message": str(result),
                        "traceback": traceback.format_exception(type(result), result, result.__traceback__),
                    }

                    print(f"Error processing {example.instance_id}:")
                    print(f"Type: {error_type}")
                    print(f"Message: {str(result)}")
                    print("Traceback:")
                    print("".join(error_info["traceback"]))

                    results.append({"instance_id": example.instance_id, "status": "error", "error_info": error_info})
                else:
                    if result is None:
                        print(f"Warning: Null result for {example.instance_id}")
                        results.append({"instance_id": example.instance_id, "status": "error", "error_info": {"error_type": "NullResult", "error_message": "Process returned None"}})
                    else:
                        results.append(result)

        except Exception as e:
            print("Batch processing error:")
            print(f"Type: {type(e).__name__}")
            print(f"Message: {str(e)}")
            traceback.print_exc()

            # Mark all examples in the batch as failed
            for example in batch:
                results.append(
                    {
                        "instance_id": example.instance_id,
                        "status": "error",
                        "error_info": {"error_type": type(e).__name__, "error_message": str(e), "traceback": traceback.format_exc(), "batch_failure": True},
                    }
                )

    return results

In [4]:
async def run_local_eval(dataset_name: str = SWEBenchDataset.LITE.value, length: int = 3, instance_id: str = None):
    """Run the evaluation locally.
    
    Args:
        dataset_name: The name of the dataset to use.
        length: The number of examples to process.
        instance_id: The instance ID of a specific example to process.
    """
    run_id = str(uuid.uuid4())
    predictions_dir = PREDS_DNAME / f"results_{run_id}"
    dataset = SWEBenchDataset(dataset_name)
    
    if instance_id:
        examples = [get_swe_bench_example(instance_id, dataset=dataset)]
    else:
        examples = get_swe_bench_examples(dataset=dataset, length=length)

    try:
        print(f"Processing {len(examples)} examples...")

        # Create output directory if it doesn't exist
        predictions_dir.mkdir(exist_ok=True, parents=True)

        # Create a timestamp for this run
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # Process all examples in parallel batches
        results = await process_batch_locally(examples)

        # Save individual results
        for result in results:
            if result and "instance_id" in result:
                instance_id = result["instance_id"]
                output_file = predictions_dir / f"{instance_id}.json"
                output_file.parent.mkdir(exist_ok=True, parents=True)
                with open(output_file, "w") as f:
                    json.dump(result, f, indent=4)

        # Save summary file
        summary_file = predictions_dir / f"summary_{timestamp}.json"
        summary = {
            "timestamp": timestamp,
            "total_examples": len(examples),
            "successful": len([r for r in results if r and "status" not in r]),
            "failed": len([r for r in results if r and "status" in r and r["status"] == "error"]),
            "error_types": {},
            "results": results,
        }

        # Collect error statistics
        for result in results:
            if result and "status" in result and result["status"] == "error":
                error_type = result.get("error_info", {}).get("error_type", "Unknown")
                summary["error_types"][error_type] = summary["error_types"].get(error_type, 0) + 1

        with open(summary_file, "w") as f:
            json.dump(summary, f, indent=4)

        print("\nProcessing complete!")
        print(f"Results saved to: {predictions_dir}")
        print(f"Summary saved to: {summary_file}")
        print(f"Successful: {summary['successful']}/{summary['total_examples']}")
        print(f"Failed: {summary['failed']}/{summary['total_examples']}")
        if summary["error_types"]:
            print("\nError type distribution:")
            for error_type, count in summary["error_types"].items():
                print(f"  {error_type}: {count}")
                
        # Generate report locally
        try:
            generate_report(predictions_dir, LOG_DIR, dataset, run_id)
        except Exception as e:
            print(f"Error generating report: {e}")
            traceback.print_exc()
            
        return summary, predictions_dir
    except Exception as e:
        print("Fatal error in run_local_eval:")
        traceback.print_exc()
        raise

## Run a Single Example

Let's run a single example to test our local evaluation setup.

In [5]:
# Get a few examples from the dataset
examples = get_swe_bench_examples(dataset=SWEBenchDataset.LITE, length=20)



In [6]:
print("Available instance IDs:")
for i, example in enumerate(examples):
    print(f"{i+1}. {example.instance_id} - {example.repo}")



Available instance IDs:
1. astropy__astropy-12907 - astropy/astropy
2. astropy__astropy-14182 - astropy/astropy
3. astropy__astropy-14365 - astropy/astropy
4. astropy__astropy-14995 - astropy/astropy
5. astropy__astropy-6938 - astropy/astropy
6. astropy__astropy-7746 - astropy/astropy
7. django__django-10914 - django/django
8. django__django-10924 - django/django
9. django__django-11001 - django/django
10. django__django-11019 - django/django
11. django__django-11039 - django/django
12. django__django-11049 - django/django
13. django__django-11099 - django/django
14. django__django-11133 - django/django
15. django__django-11179 - django/django
16. django__django-11283 - django/django
17. django__django-11422 - django/django
18. django__django-11564 - django/django
19. django__django-11583 - django/django
20. django__django-11620 - django/django


In [7]:
# Select the first example
selected_example = examples[10]
print(f"\nSelected example: {selected_example.instance_id} - {selected_example.repo}")

# Process the selected example directly
async def process_single_example():
    # Create a Codebase object for the example
    try:
        codebase = Codebase.from_repo(repo_full_name=selected_example.repo, 
                                      commit=selected_example.base_commit, 
                                      language="python")
        
        # Run the agent on the example
        result = run_agent_on_entry(selected_example, codebase=codebase)
        
        # Save the result
        run_id = str(uuid.uuid4())
        predictions_dir = PREDS_DNAME / f"results_{run_id}"
        predictions_dir.mkdir(exist_ok=True, parents=True)
        
        output_file = predictions_dir / f"{selected_example.instance_id}.json"
        with open(output_file, "w") as f:
            json.dump(result, f, indent=4)
            
        print(f"\nProcessing complete!")
        print(f"Result saved to: {output_file}")
        
        return result
    except Exception as e:
        print(f"Error processing example: {e}")
        traceback.print_exc()
        return None

# Run the processing
result = await process_single_example()


Selected example: django__django-11039 - django/django
'django__django-11039'
sqlmigrate wraps it's outpout in BEGIN/COMMIT even if the database doesn't support transactional DDL
Description
	 
		(last modified by Simon Charette)
	 
The migration executor only adds the outer BEGIN/COMMIT ​if the migration is atomic and ​the schema editor can rollback DDL but the current sqlmigrate logic only takes migration.atomic into consideration.
The issue can be addressed by
Changing sqlmigrate ​assignment of self.output_transaction to consider connection.features.can_rollback_ddl as well.
Adding a test in tests/migrations/test_commands.py based on ​an existing test for non-atomic migrations that mocks connection.features.can_rollback_ddl to False instead of overdidding MIGRATION_MODULES to point to a non-atomic migration.
I marked the ticket as easy picking because I included the above guidelines but feel free to uncheck it if you deem it inappropriate.

'django__django-11039'
['django/core/mana

In [9]:
result