**Generative Models for Code** -- Final Project<br><br>
**Maria Gancayco (mig2131@columbia.edu)**<br>
**Stephen Wright (svw2112@columbia.edu)**<br>
*Due:* Thursday, 19 Dec 2024 at 11:59pm ET

### Notebook Evaluation and Setup

The following notebook will provide all results when run sequentially for the following models: GPT-4, DeepSeek 7B, and Semcoder (10-30). A GPU must be used in order to generate new results. All needed results are printed to stdout with appropriate documentation.

The current setup is intended for Google Colab usage. To get results for GPT-4 in Google Colab, please provide an api key with the environment variable name OPENAI_API_KEY in Google Colab secrets. To reproduce novelty results using Claude as judge, please similarly add an ANTHROPIC_API_KEY in Google Colab secrets. For GCP VM environments, please use a shell environment variable with the same names (or if you're willing, you can just put the API key value in directly).




### Imports and Setup

In [9]:
# Setup: Environment and Memory Management

import torch
import gc
from pathlib import Path
from dataclasses import dataclass
from typing import Optional

# Check and display GPU availability for transparency
print("CUDA available:", torch.cuda.is_available())
print("GPU device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

# Memory management utilities
def clear_memory() -> None:
    """
    Clears GPU memory cache and performs garbage collection.

    This function is crucial for maintaining optimal memory usage during model evaluation,
    especially when loading and comparing multiple large language models.
    """
    if torch.cuda.is_available():
        torch.cuda.empty_cache()  # Clear CUDA cache
    gc.collect()  # Trigger Python garbage collection

def get_memory_status() -> None:
    """
    Displays current GPU memory usage statistics.

    Reports both allocated and reserved memory in megabytes (MB).
    This helps monitor memory consumption during model operations.

    Note:
        - Allocated memory: Actually used GPU memory
        - Reserved memory: Total memory reserved by PyTorch
    """
    if torch.cuda.is_available():
        # Convert bytes to MB for better readability
        allocated = torch.cuda.memory_allocated() / 1024**2
        reserved = torch.cuda.memory_reserved() / 1024**2
        print(f"GPU Memory: Allocated: {allocated:.2f}MB, Reserved: {reserved:.2f}MB")
clear_memory()
# Initialize by checking current memory status
get_memory_status()

CUDA available: True
GPU device name: NVIDIA A100-SXM4-40GB
GPU Memory: Allocated: 13180.49MB, Reserved: 13182.00MB


In [10]:
# Configuration and Setup

@dataclass
class ExperimentConfig:
    """
    Configuration dataclass containing all hyperparameters and settings for model evaluation.

    Attributes:
        model_name (str): Name/path of the model to be evaluated
        batch_size (int): Number of samples processed in each batch
        learning_rate (float): Learning rate for model optimization
        num_epochs (int): Number of training epochs
        max_seq_length (int): Maximum sequence length for input tokenization
        gradient_accumulation_steps (int): Number of steps to accumulate gradients
        warmup_steps (Optional[int]): Number of warmup steps for learning rate scheduler
        weight_decay (float): L2 regularization factor
        eval_steps (int): Frequency of evaluation steps
        save_steps (int): Frequency of model checkpoint saves
        logging_steps (int): Frequency of logging training metrics
    """
    model_name: str
    batch_size: int
    learning_rate: float
    num_epochs: int
    max_seq_length: int
    gradient_accumulation_steps: int
    warmup_steps: Optional[int] = None
    weight_decay: float = 0.01
    eval_steps: int = 100
    save_steps: int = 100
    logging_steps: int = 10

# Set up results directory for storing evaluation outputs
results_dir = Path("./results")
results_dir.mkdir(parents=True, exist_ok=True)  # Create directory if it doesn't exist

print("Configuration and directories initialized!")

Configuration and directories initialized!


In [11]:
config = ExperimentConfig(
    model_name="deepseek-ai/deepseek-coder-7b-instruct-v1.5",
    batch_size=1,                    # Small batch size due to model size
    learning_rate=5e-5,             # Conservative learning rate for fine-tuning
    num_epochs=3,                   # Number of training epochs
    max_seq_length=512,            # Maximum sequence length for input processing
    gradient_accumulation_steps=32, # Accumulate gradients to simulate larger batch size
    warmup_steps=100               # Warmup steps for learning rate scheduler
)

In [74]:
# Model Dependencies and Imports

# Install core dependencies for transformer model handling and evaluation
!pip install transformers torch timeout-decorator

# Import required libraries
import torch  # PyTorch for deep learning operations
from transformers import (
    AutoTokenizer,         # For tokenization of input text
    AutoModelForCausalLM   # For loading pre-trained causal language models
)
import timeout_decorator
!pip install datasets
from datasets import load_dataset
import numpy as np
from anthropic import Anthropic
import json
from google.colab import userdata
import os
from typing import Dict, List, Tuple

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from typing import Optional
import re

import tempfile
import subprocess
import statistics
import json
from pathlib import Path
from google.colab import files
!pip install openai==0.28
import openai
openai.api_key = userdata.get('OPENAI_API_KEY')



In [13]:
dataset = load_dataset("openai_humaneval")

In [14]:
# Model Loading and Code Generation

def load_model_and_tokenizer(config: ExperimentConfig) -> tuple[AutoModelForCausalLM, AutoTokenizer]:

    try:
        # Clear memory before loading new model to prevent OOM errors
        clear_memory()

        print(f"Loading {config.model_name}...")

        # Initialize tokenizer with remote code execution enabled
        tokenizer = AutoTokenizer.from_pretrained(
            config.model_name,
            trust_remote_code=True  # Required for custom tokenizer implementations
        )

        # Load model with memory-efficient settings
        model = AutoModelForCausalLM.from_pretrained(
            config.model_name,
            trust_remote_code=True,
            torch_dtype=torch.bfloat16,    # Use bfloat16 for memory efficiency
            device_map="auto",             # Optimize model placement across available devices
            low_cpu_mem_usage=True         # Minimize CPU memory during loading
        )

        # Enable gradient checkpointing if available
        if hasattr(model, "gradient_checkpointing_enable"):
            model.gradient_checkpointing_enable()  # Trade compute for memory savings

        print("Model loaded successfully!")
        get_memory_status()  # Display current memory usage

        return model, tokenizer

    except Exception as e:
        print(f"Error loading model: {str(e)}")
        raise

def generate_code(
    model: AutoModelForCausalLM,
    tokenizer: AutoTokenizer,
    prompt: str,
    max_new_tokens: int = 512,
    temperature: float = 0.8,
    top_p: float = 0.95,
    top_k: int = 50
) -> str:

    try:
        # Format prompt as chat message
        messages = [{"role": "user", "content": prompt}]
        print("Generating inputs...")
        # Tokenize input with chat template
        inputs = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(model.device)
        print("Generating outputs...")
        # Generate code with specified parameters
        outputs = model.generate(
            inputs,
            max_new_tokens=max_new_tokens,  # Control generation length
            do_sample=True,                 # Enable sampling-based generation
            temperature=temperature,         # Control randomness
            top_p=top_p,                    # Nucleus sampling threshold
            top_k=top_k,                    # Top-k sampling parameter
            num_return_sequences=1,         # Generate single sequence
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id
        )

        # Decode and return only the generated portion (excluding prompt)
        return tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)

    except Exception as e:
        print(f"Error in code generation: {str(e)}")
        return ""


In [7]:
# Initialize model and tokenizer using configuration
deepseek_7b_model, deepseek_7b_tokenizer = load_model_and_tokenizer(config)

Loading deepseek-ai/deepseek-coder-7b-instruct-v1.5...


tokenizer_config.json:   0%|          | 0.00/1.87k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/4.61M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/621 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/22.5k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/3 [00:00<?, ?it/s]

model-00001-of-00003.safetensors:   0%|          | 0.00/4.99G [00:00<?, ?B/s]

model-00002-of-00003.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00003-of-00003.safetensors:   0%|          | 0.00/3.85G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/121 [00:00<?, ?B/s]

Model loaded successfully!
GPU Memory: Allocated: 13180.49MB, Reserved: 13182.00MB


In [20]:
"""
#################################
# SemCoder Model Setup
#################################
This section handles the installation and setup of the SemCoder model,
including Git LFS setup and repository cloning.
"""

# Clear GPU memory before new model setup
clear_memory()  # Ensure clean memory state for new model

# Install Git LFS and clone SemCoder repository
print("Installing Git LFS and cloning SemCoder...")
!git lfs install  # Initialize Git Large File Storage for model weights

# Clone SemCoder from HuggingFace repository
# Note: Using /content/SemCoder path for Google Colab compatibility
!git clone https://huggingface.co/semcoder/semcoder /content/SemCoder

if os.path.exists('/content/SemCoder'):
    print("SemCoder repository cloned successfully!")
else:
    raise RuntimeError("Failed to clone SemCoder repository")  # Critical error if clone fails

Installing Git LFS and cloning SemCoder...
Git LFS initialized.
Cloning into '/content/SemCoder'...
remote: Enumerating objects: 20, done.[K
remote: Counting objects: 100% (16/16), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 20 (delta 1), reused 0 (delta 0), pack-reused 4 (from 1)[K
Unpacking objects: 100% (20/20), 399.12 KiB | 1.56 MiB/s, done.
Filtering content: 100% (4/4), 4.55 GiB | 18.40 MiB/s, done.
Encountered 2 file(s) that may not have been copied correctly on Windows:
	model-00002-of-00003.safetensors
	model-00001-of-00003.safetensors

See: `git lfs help smudge` for more details.
SemCoder repository cloned successfully!


In [21]:
"""
#################################
# SemCoder File Verification
#################################
This module verifies the integrity of the SemCoder installation by checking
for all required model files in the safetensors format.
"""

def verify_semcoder_files() -> None:
    """
    Verifies the presence of all required SemCoder model files.

    Checks for:
        - Configuration files (config.json, tokenizer.json)
        - Model weight files in safetensors format
        - Model index file

    Raises:
        RuntimeError: If any required files are missing from the installation
    """
    # Define required files for model functionality
    required_files = [
        'config.json',           # Model configuration
        'tokenizer.json',        # Tokenizer configuration
        'model.safetensors.index.json',  # Model weights index
        # Sharded model weights in safetensors format
        'model-00001-of-00003.safetensors',
        'model-00002-of-00003.safetensors',
        'model-00003-of-00003.safetensors'
    ]
    missing_files: List[str] = []

    # Display current directory contents for debugging
    print("SemCoder directory contents:")
    files = os.listdir('/content/SemCoder')
    print("\n".join(files))

    # Check for missing files
    for file in required_files:
        if file not in files:
            missing_files.append(file)

    # Handle verification results
    if missing_files:
        raise RuntimeError(f"Missing required files: {', '.join(missing_files)}")
    else:
        print("\nAll required files present!")
        print("\nModel files verification successful!")

# Execute verification
verify_semcoder_files()

SemCoder directory contents:
model-00001-of-00003.safetensors
generation_config.json
model.safetensors.index.json
tokenizer_config.json
.git
model-00002-of-00003.safetensors
special_tokens_map.json
training_args.bin
model-00003-of-00003.safetensors
.gitattributes
config.json
README.md
trainer_state.json
tokenizer.json

All required files present!

Model files verification successful!


In [22]:
"""
#################################
# SemCoder Model Implementation
#################################
This module implements the SemCoder model class with memory-efficient loading
and code generation capabilities.
"""

class SemCoderModel:
    """
    A class implementing the SemCoder model with optimized loading and generation.

    Attributes:
        model_path (str): Path to the local SemCoder model files
        model: The loaded language model (initialized in load())
        tokenizer: The model's tokenizer (initialized in load())
    """

    def __init__(self, model_path: str):
        """
        Initialize SemCoder model instance.

        Args:
            model_path (str): Path to the local model directory
        """
        self.model_path = model_path
        self.model: Optional[AutoModelForCausalLM] = None
        self.tokenizer: Optional[AutoTokenizer] = None

    def load(self) -> None:
        """
        Load the SemCoder model and tokenizer with memory optimizations.

        Implements:
            - Memory clearing before load
            - bfloat16 precision for efficiency
            - Automatic device mapping
            - Gradient checkpointing

        Raises:
            Exception: If model loading fails
        """
        try:
            # Ensure clean memory state
            clear_memory()

            # Load tokenizer first
            print("Loading SemCoder tokenizer...")
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)

            # Load model with optimizations
            print("Loading SemCoder model...")
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_path,
                torch_dtype=torch.bfloat16,    # Use bfloat16 for memory efficiency
                device_map="auto",             # Automatic device placement
                low_cpu_mem_usage=True         # Minimize CPU memory usage
            )

            # Enable memory optimization
            if hasattr(self.model, "gradient_checkpointing_enable"):
                self.model.gradient_checkpointing_enable()

            print("Successfully loaded SemCoder!")
            get_memory_status()  # Display memory usage

        except Exception as e:
            print(f"Error loading SemCoder: {str(e)}")
            raise

    def generate_code(self, prompt: str, max_new_tokens: int = 512) -> str:
        """
        Generate code using the loaded SemCoder model.

        Args:
            prompt (str): Input prompt for code generation
            max_new_tokens (int): Maximum number of tokens to generate

        Returns:
            str: Generated code or empty string if generation fails

        Note:
            Uses sampling-based generation with temperature=0.7 and top_p=0.95
            for balanced creativity and coherence
        """
        try:
            # Tokenize input with proper device placement
            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                padding=True,
                truncation=True
            ).to(self.model.device)

            # Generate with specified parameters
            outputs = self.model.generate(
                inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_new_tokens=max_new_tokens,
                do_sample=True,         # Enable sampling
                temperature=0.7,        # Control randomness
                top_p=0.95             # Nucleus sampling threshold
            )

            return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        except Exception as e:
            print(f"Error generating code: {str(e)}")
            return ""

# Initialize and load SemCoder model
semcoder = SemCoderModel("/content/SemCoder")
semcoder.load()

Loading SemCoder tokenizer...
Loading SemCoder model...


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Successfully loaded SemCoder!
GPU Memory: Allocated: 26045.14MB, Reserved: 26058.00MB


### General Utilities

In [16]:
def extract_test_suites(content: str) -> list[str]:
    """
    Extract test suites from the content and format them with function calls.
    Handles both standalone assert statements and function definitions.
    Returns a list of formatted test suite strings.
    """
    # Split content into test suite blocks
    test_blocks = re.split(r'Generated \d+ enhanced tests\nTotal tests so far: \d+/\d+\n+Generated tests:', content)

    # Remove empty blocks
    test_blocks = [block.strip() for block in test_blocks if block.strip()]

    formatted_suites = []
    for block in test_blocks:
        if "unittest.TestCase" in block:
          print("FORMATTED TEST SUITE:")
          print(block)
          formatted_suites.append(block)
          continue


        print("ORIGINAL TEST SUITE:")
        print(block)
        suite_parts = []

        # First, collect any imports at the start of the block
        import_statements = re.findall(r'^import [^\n]+', block, re.MULTILINE)

        # Extract function-based tests
        test_functions = re.finditer(r'def (test_\w+)\(\):\n((?:[ ]{4}.*\n?)+)', block)

        # Extract standalone assert statements (not within functions)
        # Looking for asserts that are at the start of a line and not indented
        standalone_asserts = re.finditer(r'^assert [^\n]+$', block, re.MULTILINE)

        # Extract standalone pytest.raises statements
        standalone_raises = re.finditer(r'^with pytest\.raises\([^\)]+\):\n[ ]{4}[^\n]+\n', block, re.MULTILINE)

        # Add imports if they exist
        if import_statements:
            suite_parts.extend(import_statements)
            suite_parts.append("")  # Add blank line after imports

        # Add standalone asserts
        for match in standalone_asserts:
            suite_parts.append(match.group(0))

        # Add standalone pytest.raises
        for match in standalone_raises:
            suite_parts.append(match.group(0).rstrip())

        # Add function-based tests
        for match in test_functions:
            func_name = match.group(1)
            func_body = match.group(2).rstrip()
            formatted_func = f"def {func_name}():\n{func_body}\n{func_name}()"
            suite_parts.append(formatted_func)

        if suite_parts:
            formatted_suite = "\n".join(suite_parts)
            print("FORMATTED TEST SUITE:")
            print(formatted_suite)
            print("-" * 50)
            formatted_suites.append(formatted_suite)

    return formatted_suites

def process_file_path(file_path: str) -> list[str]:
    """Process a file by path and return list of formatted test suite strings."""
    with open(file_path, 'r') as f:
        content = f.read()
    return extract_test_suites(content)

def process_file_content(content: str) -> list[str]:
    """Process file content directly and return list of formatted test suite strings."""
    return extract_test_suites(content)

In [24]:
import pytest
def execute_test_case(code: str, test_case: str) -> bool:
    try:
        namespace = {}
        # Execute the function code
        exec(code, namespace)
        # Execute the test case
        exec("import pytest", namespace)
        exec(test_case, namespace)
        return True
    except pytest.raises.Exception:
        # This catches when pytest.raises() fails (i.e., expected exception wasn't raised)
        return False
    except Exception as e:
        # Catch any other exceptions
        return False

def check_syntax(code: str) -> bool:
        try:
            compile(code, '<string>', 'exec')
            return True
        except SyntaxError:
            return False
@timeout_decorator.timeout(5)
def evaluate_single_test_suite(solution: str,
                               generated_tests: str) -> Dict:
        syntax_valid = check_syntax(solution + "\n" + generated_tests)

        # Execute test cases if syntax is valid
        if syntax_valid:
            # TODO:- consider using thread pool for parallel test execution
            execution_success = execute_test_case(solution, generated_tests)
        else:
            execution_success = False

        return {
            "syntax_valid": syntax_valid,
            "execution_success": execution_success
        }
def evaluate_test_suite(model_type, dataset, n_tasks, test_suites):
  solutions = dataset['test']["canonical_solution"]
  metrics = {"syntax_validity": 0.0,  # Syntactic correctness
            "execution_accuracy": 0.0  # Functional correctness
  }
  results = []
  with open(f'{model_type}_test_case_generation_accuracy_results.txt', 'w') as f:
          for i in range(n_tasks):
              solution = solutions[i]
              full_solution = dataset['test']["prompt"][i] + solution
              cleaned_tests = test_suites[i]
              result = evaluate_single_test_suite(full_solution, cleaned_tests)

              f.write(f"PROBLEM {i}:\n")
              print(f"PROBLEM {i}:\n")
              f.write("CANONICAL SOLUTION:\n")
              print("CANONICAL SOLUTION:\n")
              f.write(full_solution + "\n")
              print(full_solution + "\n")
              f.write("CLEANED TESTS:\n")
              print("CLEANED TESTS:\n")
              f.write(cleaned_tests + "\n")
              print(cleaned_tests)
              f.write("RESULT:\n" + str(result) + "\n")
              print("RESULT:\n" + str(result))

              results.append(result)

          metrics["syntax_validity"] = np.mean([r["syntax_valid"] for r in results])
          metrics["execution_accuracy"] = np.mean([r["execution_success"] for r in results])
          f.write(str(metrics))

def clean_deepseek_generated_code(code: str) -> str:
        """Clean up generated code to extract only the functions."""
        lines = code.split('\n')
        cleaned_lines = []
        found_start = False
        found_test_func_call = False
        for line in lines:
            if line.startswith('```python'):
                found_start = True
            elif line.startswith('```'):
                if found_test_func_call: break
                else: found_start = False
            elif found_start:
                if line.startswith('test_') and line.endswith('()'):
                    found_test_func_call = True
                cleaned_lines.append(line)

        return '\n'.join(cleaned_lines).strip()
def clean_semcoder_generated_code(code: str) -> str:
    """Clean up generated code to extract only the functions."""
    lines = code.split('\n')
    cleaned_lines = []
    in_function = False

    for line in lines:
        if line.strip().startswith('def '):
            in_function = True
            cleaned_lines.append(line)
        elif in_function and (line.startswith('    ') or not line.strip()):
            cleaned_lines.append(line)
        elif in_function and line.strip() and not line.startswith('    '):
            in_function = False
            cleaned_lines.append('')

    return '\n'.join(cleaned_lines).strip()

### HumanEval+ Test Case Generation

In [25]:
def generate_humaneval_plus_tests(model_type, deepseek_model=None, deepseek_tokenizer=None, num_total_tests=100):
    dataset = load_dataset("openai_humaneval")
    results = []
    total_tests_generated = 0
    with open(f'{model_type}_test_case_generation_results.txt', 'w') as f:
      for i in range(len(dataset['test'])):
          if total_tests_generated >= num_total_tests:
              break

          problem = dataset['test'][i]
          prompt = problem['prompt']
          solution = problem['canonical_solution']
          entry_point = problem['entry_point']
          test_code = problem['test']

          # Extract working test cases
          check_match = re.search(r'def check\(candidate\):\s*(.*?)(?=\n\n|$)', test_code, re.DOTALL)
          test_cases = re.findall(r'assert.*?(?=\n|$)', check_match.group(1) if check_match else '')

          test_prompt = f"""
Please provide executable test cases for this function:
{prompt}

Working test examples:
{test_cases}

Include these types of tests:
1. Performance test:
def test_{entry_point}_perf():
    {test_cases[0].replace('candidate', entry_point)}

2. Edge case test:
def test_{entry_point}_edge():
    {test_cases[-1].replace('candidate', entry_point)}

3. Error test:
def test_{entry_point}_error():
    with pytest.raises(TypeError):
        {entry_point}(None)

Only provide executable test cases. No placeholders."""

          try:
              generated_tests, cleaned_tests = None, None
              if model_type == "semcoder":
                generated_tests = semcoder.generate_code(test_prompt)
                cleaned_tests = clean_semcoder_generated_code(generated_tests)
              elif "deepseek"in model_type:
                generated_tests = generate_code(deepseek_model, deepseek_tokenizer, test_prompt, max_new_tokens=4096)
                cleaned_tests = clean_deepseek_generated_code(generated_tests)
              elif model_type == "gpt-4":
                response = openai.ChatCompletion.create(
                    model="gpt-4",
                    messages=[
                        {"role": "system", "content": "You are a helpful assistant."},
                        {"role": "user", "content": test_prompt}
                    ],
                    max_tokens=4096,
                    temperature=0.8,
                    top_p=0.95
                )
                generated_tests = response["choices"][0]["message"]["content"].strip()
                cleaned_tests = clean_deepseek_generated_code(generated_tests) #TODO:- Please rename this since we're also using for OpenAI
              if cleaned_tests:
                  num_tests = len(re.findall(r'def test_', cleaned_tests))
                  total_tests_generated += num_tests

                  result = {
                      'problem_id': i,
                      'entry_point': entry_point,
                      'tests': cleaned_tests,
                      'num_tests': num_tests
                  }
                  results.append(result)

                  print(f"Generated {num_tests} enhanced tests")
                  print(f"Total tests so far: {total_tests_generated}/{num_total_tests}")
                  print("\nTest prompt:")
                  print(test_prompt)
                  print("\nGenerated tests:")
                  print(generated_tests)
                  print("\nCleaned tests:")
                  print(cleaned_tests)

                  f.write(f"Generated {num_tests} enhanced tests\n")
                  f.write(f"Total tests so far: {total_tests_generated}/{num_total_tests}")
                  f.write("\nGenerated tests:\n")
                  f.write(cleaned_tests + "\n")
              else:
                  print("No valid tests generated")

          except Exception as e:
              print(f"Error generating tests: {str(e)}")
              continue

    return results, total_tests_generated

In [18]:
# Generate HumanEval+ tests for DeepSeek 7B
print("Generating HumanEval+ test cases...")
plus_results, total_plus_tests = generate_humaneval_plus_tests("deepseek_7b", deepseek_model=deepseek_7b_model, deepseek_tokenizer=deepseek_7b_tokenizer, num_total_tests=100)

Generating HumanEval+ test cases...
Generating inputs...
Generating outputs...
Generated 9 enhanced tests
Total tests so far: 9/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """


Working test examples:
['assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True', 'assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False', 'assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True', 'assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) == False', 'assert candidate([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) == True', 'assert candidate([1.1, 2.2, 3.1, 4.1, 5.1], 1.0) == True', 'assert candidate([1.1, 2.2, 3.1, 4.1, 5.1],

In [26]:
# Generate HumanEval+ tests for SemCoder
print("Generating HumanEval+ test cases...")
plus_results, total_plus_tests = generate_humaneval_plus_tests("semcoder", num_total_tests=100)

Generating HumanEval+ test cases...


Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 3/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """


Working test examples:
['assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True', 'assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False', 'assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True', 'assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) == False', 'assert candidate([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) == True', 'assert candidate([1.1, 2.2, 3.1, 4.1, 5.1], 1.0) == True', 'assert candidate([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False']

Include these types of tests:
1. Performance test:
def test_h

Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 15/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def mean_absolute_deviation(numbers: List[float]) -> float:
    """ For a given list of input numbers, calculate Mean Absolute Deviation
    around the mean of this dataset.
    Mean Absolute Deviation is the average absolute difference between each
    element and a centerpoint (mean in this case):
    MAD = average | x - x_mean |
    >>> mean_absolute_deviation([1.0, 2.0, 3.0, 4.0])
    1.0
    """


Working test examples:
['assert abs(candidate([1.0, 2.0, 3.0]) - 2.0/3.0) < 1e-6', 'assert abs(candidate([1.0, 2.0, 3.0, 4.0]) - 1.0) < 1e-6', 'assert abs(candidate([1.0, 2.0, 3.0, 4.0, 5.0]) - 6.0/5.0) < 1e-6']

Include these types of tests:
1. Performance test:
def test_mean_absolute_deviation_perf():
    assert abs(mean_absolute_deviation([1.0, 2.0, 3.0]) - 2.0/3.0) < 1e-6

2. Edge case test:
def test_mean_absolute_deviation_edge():
   

Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 30/100

Test prompt:

Please provide executable test cases for this function:
from typing import List, Tuple


def rolling_max(numbers: List[int]) -> List[int]:
    """ From a given list of integers, generate a list of rolling maximum element found until given moment
    in the sequence.
    >>> rolling_max([1, 2, 3, 2, 3, 4, 2])
    [1, 2, 3, 3, 3, 4, 4]
    """


Working test examples:
['assert candidate([]) == []', 'assert candidate([1, 2, 3, 4]) == [1, 2, 3, 4]', 'assert candidate([4, 3, 2, 1]) == [4, 4, 4, 4]', 'assert candidate([3, 2, 3, 100, 3]) == [3, 3, 3, 100, 100]']

Include these types of tests:
1. Performance test:
def test_rolling_max_perf():
    assert rolling_max([]) == []

2. Edge case test:
def test_rolling_max_edge():
    assert rolling_max([3, 2, 3, 100, 3]) == [3, 3, 3, 100, 100]

3. Error test:
def test_rolling_max_error():
    with pytest.raises(TypeError):
        rolling_max(None)

Only provide executable test case

Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 45/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def all_prefixes(string: str) -> List[str]:
    """ Return list of all prefixes from shortest to longest of the input string
    >>> all_prefixes('abc')
    ['a', 'ab', 'abc']
    """


Working test examples:
["assert candidate('') == []", "assert candidate('asdfgh') == ['a', 'as', 'asd', 'asdf', 'asdfg', 'asdfgh']", "assert candidate('WWW') == ['W', 'WW', 'WWW']"]

Include these types of tests:
1. Performance test:
def test_all_prefixes_perf():
    assert all_prefixes('') == []

2. Edge case test:
def test_all_prefixes_edge():
    assert all_prefixes('WWW') == ['W', 'WW', 'WWW']

3. Error test:
def test_all_prefixes_error():
    with pytest.raises(TypeError):
        all_prefixes(None)

Only provide executable test cases. No placeholders.

Generated tests:

Please provide executable test cases for this function:
from typing import List


Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 60/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def sort_numbers(numbers: str) -> str:
    """ Input is a space-delimited string of numberals from 'zero' to 'nine'.
    Valid choices are 'zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight' and 'nine'.
    Return the string with numbers sorted from smallest to largest
    >>> sort_numbers('three one five')
    'one three five'
    """


Working test examples:
["assert candidate('') == ''", "assert candidate('three') == 'three'", "assert candidate('three five nine') == 'three five nine'", "assert candidate('five zero four seven nine eight') == 'zero four five seven eight nine'", "assert candidate('six five four three two one zero') == 'zero one two three four five six'"]

Include these types of tests:
1. Performance test:
def test_sort_numbers_perf():
    assert sort_numbers('') == ''

2. Edge case test:
def test_sort_

Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 72/100

Test prompt:

Please provide executable test cases for this function:


def strlen(string: str) -> int:
    """ Return length of given string
    >>> strlen('')
    0
    >>> strlen('abc')
    3
    """


Working test examples:
["assert candidate('') == 0", "assert candidate('x') == 1", "assert candidate('asdasnakj') == 9"]

Include these types of tests:
1. Performance test:
def test_strlen_perf():
    assert strlen('') == 0

2. Edge case test:
def test_strlen_edge():
    assert strlen('asdasnakj') == 9

3. Error test:
def test_strlen_error():
    with pytest.raises(TypeError):
        strlen(None)

Only provide executable test cases. No placeholders.

Generated tests:

Please provide executable test cases for this function:


def strlen(string: str) -> int:
    """ Return length of given string
    >>> strlen('')
    0
    >>> strlen('abc')
    3
    """


Working test examples:
["assert candidate('') == 0", "assert candidate('x')

Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 87/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def concatenate(strings: List[str]) -> str:
    """ Concatenate list of strings into a single string
    >>> concatenate([])
    ''
    >>> concatenate(['a', 'b', 'c'])
    'abc'
    """


Working test examples:
["assert candidate([]) == ''", "assert candidate(['x', 'y', 'z']) == 'xyz'", "assert candidate(['x', 'y', 'z', 'w', 'k']) == 'xyzwk'"]

Include these types of tests:
1. Performance test:
def test_concatenate_perf():
    assert concatenate([]) == ''

2. Edge case test:
def test_concatenate_edge():
    assert concatenate(['x', 'y', 'z', 'w', 'k']) == 'xyzwk'

3. Error test:
def test_concatenate_error():
    with pytest.raises(TypeError):
        concatenate(None)

Only provide executable test cases. No placeholders.

Generated tests:

Please provide executable test cases for this function:
from typing import List


def concatenate(

Setting `pad_token_id` to `eos_token_id`:32014 for open-end generation.


Generated 3 enhanced tests
Total tests so far: 102/100

Test prompt:

Please provide executable test cases for this function:


def sort_third(l: list):
    """This function takes a list l and returns a list l' such that
    l' is identical to l in the indicies that are not divisible by three, while its values at the indicies that are divisible by three are equal
    to the values of the corresponding indicies of l, but sorted.
    >>> sort_third([1, 2, 3])
    [1, 2, 3]
    >>> sort_third([5, 6, 3, 4, 8, 9, 2])
    [2, 6, 3, 4, 8, 9, 5]
    """


Working test examples:
['assert tuple(candidate([1, 2, 3])) == tuple(sort_third([1, 2, 3]))', 'assert tuple(candidate([5, 3, -5, 2, -3, 3, 9, 0, 123, 1, -10])) == tuple(sort_third([5, 3, -5, 2, -3, 3, 9, 0, 123, 1, -10]))', 'assert tuple(candidate([5, 8, -12, 4, 23, 2, 3, 11, 12, -10])) == tuple(sort_third([5, 8, -12, 4, 23, 2, 3, 11, 12, -10]))', 'assert tuple(candidate([5, 6, 3, 4, 8, 9, 2])) == tuple([2, 6, 3, 4, 8, 9, 5])', 'assert tuple(

### Final Results: DeepSeek vs SemCoder

In [27]:
deepseek_7b_extracted_test_suites = process_file_path("/content/deepseek_7b_test_case_generation_results.txt")

ORIGINAL TEST SUITE:
import pytest
from typing import List

def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    """
    # Implementation of the function goes here

# Test case 1: Basic functionality with positive numbers and threshold
def test_has_close_elements_basic():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True

# Test case 2: Basic functionality with positive numbers and threshold
def test_has_close_elements_basic2():
    assert has_close_elements([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.3) == False

# Test case 3: Performance test with a large list and threshold
def test_has_close_elements_perf():
    large_list = [i * 1.0 for i in range(100000)] + [99999.0]
    assert has_close_elements(large_list, 1000.0) == True

# Test case 4: Edge case test with a list where no two elements are close
def test_has_close_elements_edge():
 

In [28]:
semcoder_extracted_test_suites = process_file_path("/content/semcoder_test_case_generation_results.txt")

ORIGINAL TEST SUITE:
def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """



def test_has_close_elements_perf():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True


def test_has_close_elements_edge():
    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False


def test_has_close_elements_error():
    with pytest.raises(TypeError):
        has_close_elements(None)
FORMATTED TEST SUITE:
def test_has_close_elements_perf():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
test_has_close_elements_perf()
def test_has_close_elements_edge():
    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False
test_has_close_elements_edge()
def test_has_close_elemen

In [29]:
evaluate_test_suite("deepseek_7b", dataset, len(deepseek_7b_extracted_test_suites), deepseek_7b_extracted_test_suites)

PROBLEM 0:

CANONICAL SOLUTION:

from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """
    for idx, elem in enumerate(numbers):
        for idx2, elem2 in enumerate(numbers):
            if idx != idx2:
                distance = abs(elem - elem2)
                if distance < threshold:
                    return True

    return False


CLEANED TESTS:

import pytest

def test_has_close_elements_basic():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
test_has_close_elements_basic()
def test_has_close_elements_basic2():
    assert has_close_elements([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.3) == False
test_has_close_elements_basic2()
def test_has_close_elements_perf():
    lar

In [30]:
evaluate_test_suite("semcoder", dataset, len(semcoder_extracted_test_suites), semcoder_extracted_test_suites)

PROBLEM 0:

CANONICAL SOLUTION:

from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """
    for idx, elem in enumerate(numbers):
        for idx2, elem2 in enumerate(numbers):
            if idx != idx2:
                distance = abs(elem - elem2)
                if distance < threshold:
                    return True

    return False


CLEANED TESTS:

def test_has_close_elements_perf():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
test_has_close_elements_perf()
def test_has_close_elements_edge():
    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False
test_has_close_elements_edge()
def test_has_close_elements_error():
    with pytest.raises(TypeError

### DeepSeek Simple Prompt Results

In [None]:
metrics = {"syntax_validity": 0.0,  # Syntactic correctness
            "execution_accuracy": 0.0  # Functional correctness
}
def evaluate_model(model, dataset, model_type, tokenizer, n_tasks: int = None):
        solutions = dataset['test']["canonical_solution"]
        if n_tasks is None:
            n_tasks = len(solutions)

        results = []
        with open(f'{model_type}_test_case_generation_results.txt', 'w') as f:
          for i in range(n_tasks):
              solution = solutions[i]
              full_solution = dataset['test']["prompt"][i] + solution

              prompt = f"""
              Please provide and execute a set of test cases for the following function:
              {full_solution}

              Please do not include natural language or anything that cannot be compiled/executed.
              Please only provided the test cases and their immediate execution.

              Example:
              def test_hello_with_name():
                  assert hello("Alice") == "Hello, Alice"
                  assert hello("Bob") == "Hello, Bob"
              test_hello_with_name()

              def test_hello_without_name():
                  assert hello(None) == "Hello, world"
                  assert hello("") == "Hello, world"
              test_hello_without_name()
              """
              generated_tests = ""
              if model_type == "deepseek":
                  generated_tests = generate_code(
                      model,
                      tokenizer,
                      prompt,
                      max_new_tokens=4096
                  )
              elif model_type == "semcoder":
                  generated_tests = model.generate_code(prompt, max_new_tokens=4096)

              cleaned_tests = clean_deepseek_generated_code(generated_tests) if model_type == "deepseek" else "" #no-op for now
              result = evaluate_single_test_suite(full_solution, cleaned_tests)

              f.write(f"PROBLEM {i}:\n")
              print(f"PROBLEM {i}:\n")
              f.write("CANONICAL SOLUTION:\n")
              print("CANONICAL SOLUTION:\n")
              f.write(full_solution + "\n")
              print(full_solution + "\n")
              f.write("GENERATED TESTS:\n")
              print("GENERATED TESTS:\n")
              f.write(generated_tests + "\n")
              print(generated_tests)
              f.write("CLEANED TESTS:\n")
              print("CLEANED TESTS:\n")
              f.write(cleaned_tests + "\n")
              print(cleaned_tests)
              f.write("RESULT:\n" + str(result) + "\n")
              print("RESULT:\n" + str(result))

              results.append(result)

          # Calculate aggregate metrics
          metrics["syntax_validity"] = np.mean([r["syntax_valid"] for r in results])
          metrics["execution_accuracy"] = np.mean([r["execution_success"] for r in results])
          f.write(str(metrics))
        return metrics

### SemCoder Simple Prompt Results

In [None]:
metrics = evaluate_model(semcoder, "semcoder", tokenizer, 100)
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")

 ### Code Coverage Assessment

In [31]:
# First, install required packages
!pip install pytest pytest-cov coverage
from google.colab import files  # Colab-specific import

Collecting pytest-cov
  Downloading pytest_cov-6.0.0-py3-none-any.whl.metadata (27 kB)
Collecting coverage
  Downloading coverage-7.6.9-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.2 kB)
Downloading pytest_cov-6.0.0-py3-none-any.whl (22 kB)
Downloading coverage-7.6.9-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (234 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.0/235.0 kB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: coverage, pytest-cov
Successfully installed coverage-7.6.9 pytest-cov-6.0.0


In [33]:
def calculate_aggregate_metrics(results, target_score_name) -> Dict:
    if not results:
        return {'error': 'No valid results to analyze'}

    score_values = [r[target_score_name] for r in results if target_score_name in r]

    if not score_values:
        return {'error': 'No valid score values found'}

    return {
        f'mean_{target_score_name}': statistics.mean(score_values),
        f'median_{target_score_name}': statistics.median(score_values),
        f'min_{target_score_name}': min(score_values),
        f'max_{target_score_name}': max(score_values),
        f'std_dev': statistics.stdev(score_values) if len(score_values) > 1 else 0,
        'total_entries_analyzed': len(score_values)
    }

In [61]:
class TestCoverageAnalyzer:
    def __init__(self, input_file: str = "", output_dir: str = "/content/coverage_results"):
        """Initialize the analyzer with input file path and output directory."""
        self.input_file = input_file
        self.output_dir = output_dir
        self.coverage_results = []
        os.makedirs(output_dir, exist_ok=True)

    def create_test_files(self, solution: str, tests: str, temp_dir: str) -> Tuple[str, str]:
        """Create temporary Python files for the solution and tests."""
        # Create solution file
        solution_file = Path(temp_dir) / "solution.py"
        with open(solution_file, 'w') as f:
            f.write(solution)

        # Create test file with proper imports for Colab
        test_file = Path(temp_dir) / "test_solution.py"
        with open(test_file, 'w') as f:
            f.write("import sys\n")
            f.write(f"sys.path.append('{temp_dir}')\n")
            f.write("from solution import *\n")
            f.write(tests)

        return str(solution_file), str(test_file)

    def get_coverage_data(self):
        with open('coverage.json') as f:
            coverage_data = json.load(f)
            for file_path, file_data in coverage_data['files'].items():
                  if 'solution.py' in file_path:
                     return {
                        'line_coverage': file_data['summary']['percent_covered'],
                        'total_lines': file_data['summary']['num_statements'],
                        'covered_lines': file_data['summary']['covered_lines'],
                        'missing_lines': file_data['summary']['missing_lines']
                     }
    def run_coverage_analysis(self, solution_file: str, test_file: str, temp_dir: str) -> Dict:
        """Run pytest with coverage and return results."""
        try:
            orig_dir = os.getcwd()
            os.chdir(temp_dir)

            # Run pytest with coverage using python -m to ensure proper module resolution
            cmd = ['python3', '-m', 'pytest', '--cov=solution',
                '--cov-report=json', 'test_solution.py', '-v']

            env = os.environ.copy()
            env['PYTHONPATH'] = temp_dir  # Ensure proper module resolution

            result = subprocess.run(cmd, capture_output=True, text=True, env=env)
            if os.path.exists('coverage.json'): return self.get_coverage_data()
            return {'error': 'No coverage data generated'}

        except subprocess.CalledProcessError as e:
            print(f"Command output: {e.output}")
            return {'error': f'pytest failed: {str(e)}'}
        except Exception as e:
            print(f"Exception details: {str(e)}")
            return {'error': f'Analysis failed: {str(e)}'}
        finally:
            os.chdir(orig_dir)

In [62]:
coverage_analyzer = TestCoverageAnalyzer()
deep_seek_coverage_results = []
for index, test_suite in enumerate(deepseek_7b_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  with tempfile.TemporaryDirectory() as temp_dir:
    solution_file, test_file = coverage_analyzer.create_test_files(solution, test_suite, temp_dir)
    result = coverage_analyzer.run_coverage_analysis(solution_file, test_file, temp_dir)
    if 'line_coverage' in result:
      deep_seek_coverage_results.append(result)
print(calculate_aggregate_metrics(deep_seek_coverage_results, "line_coverage"))

{'mean_line_coverage': 98.48484848484848, 'median_line_coverage': 100.0, 'min_line_coverage': 77.77777777777777, 'max_line_coverage': 100.0, 'std_dev': 5.195139212178621, 'total_entries_analyzed': 22}


In [63]:
semcoder_coverage_results = []
for index, test_suite in enumerate(semcoder_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  with tempfile.TemporaryDirectory() as temp_dir:
    solution_file, test_file = coverage_analyzer.create_test_files(solution, test_suite, temp_dir)
    result = coverage_analyzer.run_coverage_analysis(solution_file, test_file, temp_dir)
    if 'line_coverage' in result:
      semcoder_coverage_results.append(result)
print(calculate_aggregate_metrics(semcoder_coverage_results, "line_coverage"))

{'mean_line_coverage': 96.75324675324676, 'median_line_coverage': 100.0, 'min_line_coverage': 21.428571428571427, 'max_line_coverage': 100.0, 'std_dev': 14.40695307294402, 'total_entries_analyzed': 33}


### Measuring Novelty and Diversity

#### Measuring with LLM as Judge

In [37]:
!pip install anthropic

Collecting anthropic
  Downloading anthropic-0.42.0-py3-none-any.whl.metadata (23 kB)
Downloading anthropic-0.42.0-py3-none-any.whl (203 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/203.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.4/203.4 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: anthropic
Successfully installed anthropic-0.42.0


In [38]:
def analyze_novelty_with_claude(source_function: str, generated_tests: str, original_tests: str = None) -> dict:
    """Use Claude API to analyze test novelty."""

    anthropic = Anthropic(api_key=userdata.get('ANTHROPIC_API_KEY'))

    prompt = f"""
As an expert test engineer, analyze the semantic novelty and diversity of the generated test cases for the given function. Consider the function's purpose, edge cases, and expected behaviors.

Source Function:

{source_function}


Generated Test Suite:

{generated_tests}

Original Test Suite:

{original_tests}

Please analyze:
1. How well do the tests cover different aspects of the function's behavior?
2. What novel testing scenarios are introduced?
3. Are there important edge cases or boundary conditions tested?
4. How diverse are the test inputs and scenarios?
5. Are the tests relevant to the function's purpose?

Provide your analysis in the following JSON format:
{{
    "novelty_score": <float between 0.0 and 1.0>,
    "novel_aspects": [<list of strings describing novel aspects>],
    "unique_scenarios": [<list of strings describing unique test scenarios>],
    "coverage_assessment": <string describing overall test coverage>,
    "recommendations": [<list of strings with suggested additional test cases>]
}}
Do not provide any other additonal text other than the JSON in order to facilitate
text processing.

"""

    message = anthropic.messages.create(
        model="claude-3-sonnet-20240229",
        max_tokens=4096,
        temperature=0,  # Use 0 for consistent analysis
        messages=[{
            "role": "user",
            "content": prompt
        }]
    )

    try:
        # Parse the response as JSON
        analysis = json.loads(message.content[0].text)
        return analysis
    except json.JSONDecodeError:
        print("Failed to parse Claude's response as JSON")
        return None

In [41]:
deepseek_7b_novelty_results = []
for index, test_suite in enumerate(deepseek_7b_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  original_tests = dataset['test']["test"][index]
  result = analyze_novelty_with_claude(solution, test_suite, original_tests)
  print(result)
  deepseek_7b_novelty_results.append(result)
print(calculate_aggregate_metrics(deepseek_7b_novelty_results, "novelty_score"))

{'novelty_score': 0.7, 'novel_aspects': ['Tests for performance with a large input list', 'Tests for error handling with invalid input types', 'Tests for edge cases with duplicate elements and zero threshold'], 'unique_scenarios': ['Large input list with a large threshold', 'Input list with all elements equal and zero threshold', 'Invalid input types for both list and threshold'], 'coverage_assessment': 'The generated test suite covers a good range of scenarios, including basic cases, edge cases, performance, and error handling. However, it lacks some important edge cases and boundary conditions.', 'recommendations': ['Test with an empty list input', 'Test with a list containing only one element', 'Test with a threshold of 0.0 and a list with distinct elements', 'Test with a negative threshold value', 'Test with a threshold value larger than the maximum difference between any two elements in the list']}
{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (passing None as

In [42]:
semcoder_novelty_results = []
for index, test_suite in enumerate(semcoder_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  original_tests = dataset['test']["test"][index]
  result = analyze_novelty_with_claude(solution, test_suite, original_tests)
  semcoder_novelty_results.append(result)
  print(result)
print(calculate_aggregate_metrics(semcoder_novelty_results, "novelty_score"))

{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (TypeError)', 'Tests for performance edge case'], 'unique_scenarios': ['Passing None as input', 'Large list with close elements'], 'coverage_assessment': 'The generated test suite covers some important aspects like error handling and performance edge cases, but lacks comprehensive coverage of boundary conditions and diverse input scenarios.', 'recommendations': ['Test with empty list', 'Test with list containing duplicate elements', 'Test with list containing negative numbers', 'Test with threshold values at or near 0', 'Test with large threshold values']}
{'novelty_score': 0.6, 'novel_aspects': ['Tests for error handling (passing None as input)', 'Tests for performance (large input string)'], 'unique_scenarios': ['Empty string input', 'Nested parentheses', 'Single group of parentheses', 'Multiple groups of parentheses', 'Unbalanced parentheses (not tested)'], 'coverage_assessment': 'The tests cover a good range of scen

In [43]:
import json

# Assuming deep_seek_novelty_results and semcoder_novelty_results are lists of dictionaries
# as produced by your analyze_novelty_with_claude function.


def write_results_to_file(results, filename):
    with open(filename, 'w') as f:
        json.dump(results, f, indent=4)


write_results_to_file(deepseek_7b_novelty_results, 'deep_seek_7b_novelty_results.json')
write_results_to_file(semcoder_novelty_results, 'semcoder_novelty_results.json')

from google.colab import files

files.download('deep_seek_7b_novelty_results.json')
files.download('semcoder_novelty_results.json')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

#### Measuring with Patterns

In [68]:
from typing import Dict, List
import re
from collections import defaultdict

class CoveragePatternAnalyzer:
    """Analyzes test coverage patterns focusing on types of test cases."""

    def __init__(self):
        self.patterns = {
            'edge_cases': {
                'empty_input': r'assert.*(?:empty|\[\]|\{\}|\(\)|""|\'\'|\b==\s*\[\]|\b==\s*""|\b==\s*\'\')',
                'null_input': r'assert.*(?:None|null)',
                'single_element': r'assert.*\[[^,\]]+\]'
            },
            'boundary_testing': {
                'zero_values': r'assert.*\b0\b',
                'negative_values': r'assert.*-\d+',
                'large_values': r'assert.*\d{5,}'
            },
            'error_handling': {
                'exception_testing': r'with\s+pytest\.raises\([^)]+\)',
                'invalid_input': r'assert.*(invalid|wrong|incorrect|bad)'
            },
            'functionality': {
                'typical_case': r'assert.*normal|typical|standard',
                'complex_input': r'assert.*(?:\[.*,.*,.*\]|\{.*:.*,.*:.*\}|\(.*,.*,.*\))'
            }
        }

    def _extract_assertions(self, test_code: str) -> List[str]:
        """Extract assertions with improved handling of multi-line and truncated assertions."""
        lines = test_code.split('\n')
        assertions = []
        current_assertion = None
        in_raises_block = False
        bracket_count = 0
        paren_count = 0

        for line in lines:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Start of pytest.raises block
            if 'pytest.raises' in line:
                in_raises_block = True
                current_assertion = line
                paren_count = line.count('(') - line.count(')')
                if paren_count == 0:
                    assertions.append(current_assertion)
                    current_assertion = None
                    in_raises_block = False
                continue

            # Start of regular assertion
            if line.startswith('assert'):
                current_assertion = line
                bracket_count = line.count('[') - line.count(']')
                paren_count = line.count('(') - line.count(')')
                if bracket_count == 0 and paren_count == 0:
                    assertions.append(current_assertion)
                    current_assertion = None
                continue

            # Continue previous assertion
            if current_assertion:
                current_assertion += ' ' + line
                if in_raises_block:
                    paren_count += line.count('(') - line.count(')')
                    if paren_count == 0:
                        assertions.append(current_assertion)
                        current_assertion = None
                        in_raises_block = False
                else:
                    bracket_count += line.count('[') - line.count(']')
                    paren_count += line.count('(') - line.count(')')
                    if bracket_count == 0 and paren_count == 0:
                        assertions.append(current_assertion)
                        current_assertion = None

        # Handle any remaining incomplete assertion
        if current_assertion:
            assertions.append(current_assertion + ' ...')

        return assertions

    def analyze_test_suite(self, test_code: str) -> Dict:
        """Analyze a test suite and return detailed coverage metrics."""
        assertions = self._extract_assertions(test_code)
        total_assertions = len(assertions)
        if total_assertions == 0:
            return {'error': 'No assertions found'}

        # Track which patterns match each assertion
        assertion_patterns = {i: set() for i in range(total_assertions)}
        pattern_counts = defaultdict(lambda: defaultdict(int))
        uncategorized_assertions = []

        # Analyze each assertion
        for i, assertion in enumerate(assertions):
            matches_found = False
            for category, patterns in self.patterns.items():
                for name, pattern in patterns.items():
                    if re.search(pattern, assertion):
                        assertion_patterns[i].add(f"{category}:{name}")
                        pattern_counts[category][name] += 1
                        matches_found = True

            if not matches_found:
                uncategorized_assertions.append(assertion)

        # Calculate metrics
        results = {}
        for category, patterns in pattern_counts.items():
            category_assertions = len([i for i in assertion_patterns.values()
                                    if any(p.startswith(f"{category}:") for p in i)])
            results[category] = {
                'total_matches': category_assertions,
                'coverage_ratio': category_assertions / total_assertions,
                'pattern_breakdown': dict(patterns)
            }

        # Add overall metrics
        results['overall'] = {
            'total_assertions': total_assertions,
            'patterns_per_assertion': sum(len(p) for p in assertion_patterns.values()) / total_assertions,
            'pattern_coverage': len([p for p in sum([list(p.values()) for p in pattern_counts.values()], []) if p > 0]) / \
                              len(sum([list(p.values()) for p in self.patterns.values()], [])),
            'uncategorized': len(uncategorized_assertions),
            'uncategorized_assertions': uncategorized_assertions
        }

        return results

In [70]:
pattern_analyzer = CoveragePatternAnalyzer()

In [46]:
deepseek_7b_extracted_test_suites_str = "\n\n".join(deepseek_7b_extracted_test_suites)

In [71]:
deepseek7b_pattern_analyzer_results = pattern_analyzer.analyze_test_suite(deepseek_7b_extracted_test_suites_str)

In [48]:
for item in deepseek7b_pattern_analyzer_results:
  print(item)
  print(deepseek7b_pattern_analyzer_results[item])

boundary_testing
{'total_matches': 43, 'coverage_ratio': 0.3333333333333333, 'pattern_breakdown': {'zero_values': 37, 'negative_values': 9, 'large_values': 3}}
functionality
{'total_matches': 47, 'coverage_ratio': 0.3643410852713178, 'pattern_breakdown': {'complex_input': 47}}
error_handling
{'total_matches': 22, 'coverage_ratio': 0.17054263565891473, 'pattern_breakdown': {'exception_testing': 22}}
edge_cases
{'total_matches': 21, 'coverage_ratio': 0.16279069767441862, 'pattern_breakdown': {'empty_input': 20, 'single_element': 3, 'null_input': 1}}
overall
{'total_assertions': 129, 'patterns_per_assertion': 1.1007751937984496, 'pattern_coverage': 0.8, 'uncategorized': 26, 'uncategorized_assertions': ['assert False, "Expected TypeError"', 'assert end_time - start_time < 1, "Performance test failed"', "assert string_xor('010', '110') == '100'", "assert string_xor('0101', '0000') == '0101'", "assert string_xor('0101', '0000') == '0101'", 'assert greatest_common_divisor(3, 5) == 1', 'assert

In [49]:
semcoder_extracted_test_suites_str = "\n\n".join(semcoder_extracted_test_suites)

In [72]:
semcoder_pattern_analyzer_results = pattern_analyzer.analyze_test_suite(semcoder_extracted_test_suites_str)

In [51]:
for item in semcoder_pattern_analyzer_results:
  print(item)
  print(semcoder_pattern_analyzer_results[item])

boundary_testing
{'total_matches': 21, 'coverage_ratio': 0.2079207920792079, 'pattern_breakdown': {'zero_values': 15, 'negative_values': 7, 'large_values': 2}}
functionality
{'total_matches': 25, 'coverage_ratio': 0.24752475247524752, 'pattern_breakdown': {'complex_input': 25}}
error_handling
{'total_matches': 34, 'coverage_ratio': 0.33663366336633666, 'pattern_breakdown': {'exception_testing': 34}}
edge_cases
{'total_matches': 24, 'coverage_ratio': 0.2376237623762376, 'pattern_breakdown': {'empty_input': 22, 'single_element': 3, 'null_input': 1}}
overall
{'total_assertions': 101, 'patterns_per_assertion': 1.0792079207920793, 'pattern_coverage': 0.8, 'uncategorized': 12, 'uncategorized_assertions': ["assert make_palindrome('jerry') == 'jerryrrej'", "assert string_xor('0101', '0000') == '0101'", 'assert greatest_common_divisor(3, 7) == 1', 'assert greatest_common_divisor(144, 60) == 12', "assert count_distinct_characters('Jerry jERRY JeRRRY') == 5", "assert how_many_times('john doe', 'j

### GPT-4 Results

In [75]:
generate_humaneval_plus_tests("gpt-4", num_total_tests=100)

Generated 10 enhanced tests
Total tests so far: 10/100

Test prompt:

Please provide executable test cases for this function:
from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """


Working test examples:
['assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True', 'assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False', 'assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True', 'assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) == False', 'assert candidate([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) == True', 'assert candidate([1.1, 2.2, 3.1, 4.1, 5.1], 1.0) == True', 'assert candidate([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False']

Include these types of tests:
1. Performance test:
def test

([{'problem_id': 0,
   'entry_point': 'has_close_elements',
   'tests': 'def test_has_close_elements_case_1():\n    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True\n\ndef test_has_close_elements_case_2():\n    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False\n\ndef test_has_close_elements_case_3():\n    assert has_close_elements([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True\n\ndef test_has_close_elements_case_4():\n    assert has_close_elements([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) == False\n\ndef test_has_close_elements_case_5():\n    assert has_close_elements([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) == True\n\ndef test_has_close_elements_case_6():\n    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 1.0) == True\n\ndef test_has_close_elements_case_7():\n    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False\n\ndef test_has_close_elements_perf():\n    assert has_close_elements([1.0]*1000000 + [2.0], 0.3) == True\n\ndef test_has_clos

In [76]:
gpt_4_extracted_test_suites = process_file_path("/content/gpt-4_test_case_generation_results.txt")

ORIGINAL TEST SUITE:
def test_has_close_elements_case_1():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True

def test_has_close_elements_case_2():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False

def test_has_close_elements_case_3():
    assert has_close_elements([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True

def test_has_close_elements_case_4():
    assert has_close_elements([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) == False

def test_has_close_elements_case_5():
    assert has_close_elements([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) == True

def test_has_close_elements_case_6():
    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 1.0) == True

def test_has_close_elements_case_7():
    assert has_close_elements([1.1, 2.2, 3.1, 4.1, 5.1], 0.5) == False

def test_has_close_elements_perf():
    assert has_close_elements([1.0]*1000000 + [2.0], 0.3) == True

def test_has_close_elements_edge():
    assert has_close_elements([1.0, 2.0], 0.1) == False



In [77]:
evaluate_test_suite("gpt-4", dataset, len(gpt_4_extracted_test_suites), gpt_4_extracted_test_suites)

PROBLEM 0:

CANONICAL SOLUTION:

from typing import List


def has_close_elements(numbers: List[float], threshold: float) -> bool:
    """ Check if in given list of numbers, are any two numbers closer to each other than
    given threshold.
    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)
    False
    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
    True
    """
    for idx, elem in enumerate(numbers):
        for idx2, elem2 in enumerate(numbers):
            if idx != idx2:
                distance = abs(elem - elem2)
                if distance < threshold:
                    return True

    return False


CLEANED TESTS:

def test_has_close_elements_case_1():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
test_has_close_elements_case_1()
def test_has_close_elements_case_2():
    assert has_close_elements([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False
test_has_close_elements_case_2()
def test_has_close_elements_case_3():
    assert has_cl

In [78]:
gpt_4_coverage_analyzer = TestCoverageAnalyzer()
gpt_4_coverage_results = []
for index, test_suite in enumerate(gpt_4_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  with tempfile.TemporaryDirectory() as temp_dir:
    solution_file, test_file = gpt_4_coverage_analyzer.create_test_files(solution, test_suite, temp_dir)
    result = gpt_4_coverage_analyzer.run_coverage_analysis(solution_file, test_file, temp_dir)
    if 'line_coverage' in result:
      gpt_4_coverage_results.append(result)
print(calculate_aggregate_metrics(gpt_4_coverage_results, "line_coverage"))

{'mean_line_coverage': 96.60493827160494, 'median_line_coverage': 100.0, 'min_line_coverage': 50.0, 'max_line_coverage': 100.0, 'std_dev': 11.92126324635307, 'total_entries_analyzed': 18}


In [79]:
gpt_4_novelty_results = []
for index, test_suite in enumerate(gpt_4_extracted_test_suites):
  solution = dataset['test']["prompt"][index] + dataset['test']["canonical_solution"][index]
  original_tests = dataset['test']["test"][index]
  result = analyze_novelty_with_claude(solution, test_suite, original_tests)
  print(result)
  gpt_4_novelty_results.append(result)
print(calculate_aggregate_metrics(gpt_4_novelty_results, "novelty_score"))

{'novelty_score': 0.7, 'novel_aspects': ['Tests performance with a large input list', 'Tests behavior when input list is None (error handling)', 'Tests edge case with a list of length 2'], 'unique_scenarios': ['Large input list to test performance', 'Input list is None (error handling)', 'Edge case with a list of length 2', 'Threshold values close to 0 and 1', 'Duplicate values in the input list'], 'coverage_assessment': 'The generated test suite covers a good range of scenarios, including edge cases, boundary conditions, and error handling. However, it lacks tests for empty lists and negative threshold values.', 'recommendations': ['Add tests for empty lists', 'Add tests with negative threshold values', 'Add tests with floating-point precision issues (e.g., 0.1 + 0.2 != 0.3)']}
{'novelty_score': 0.7, 'novel_aspects': ['Performance testing', 'Error handling (TypeError)', 'Larger input size'], 'unique_scenarios': ['Empty string input', 'Single group input', 'Large input size (10000 grou

In [80]:
write_results_to_file(gpt_4_novelty_results, 'gpt_4_novelty_results.json')
files.download('gpt_4_novelty_results.json')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>