Helpful commentary on defects4j: https://greg4cr.github.io/pdf/20d4j.pdf
May need this to set path: export PATH=$PATH:/Users/clairecallon/defects4j/framework/bin

In [6]:
import subprocess
import os

DEFECTS4J_HOME = "/Users/clairecallon/defects4j"  # Adjust to your path
os.environ["DEFECTS4J_HOME"] = DEFECTS4J_HOME
os.environ["PATH"] += os.pathsep + f"{DEFECTS4J_HOME}/framework/bin"

def checkout_version(project, bug_id, version):
    """Checkout a specific version of a project from Defects4J."""
    work_dir = f"/tmp/{project}_{bug_id}_{version}"
    os.makedirs(work_dir, exist_ok=True)
    
    cmd = f"defects4j checkout -p {project} -v {bug_id}{version} -w {work_dir}"
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

    if result.returncode != 0:
        print("Error:", result.stderr)
    
    return work_dir

# Checkout buggy and fixed versions
buggy_dir = checkout_version("Math", "35", "b")  # Buggy version
fixed_dir = checkout_version("Math", "35", "f")  # Fixed version
src_dir = os.path.join(buggy_dir, "src")


path = "/tmp/Math_35_b/src/test/java/org/apache/commons/math3/geometry/euclidean/twod/LineTest.java"


"""""
file_path = os.path.join(buggy_dir, "src", "test", "java", "org", "apache", "commons","math3", "geometry", "euclidean","twod", "LineTest.java")

# Open and read the file
with open(file_path, 'r') as file:
    content = file.read()
    print(content)
"""


'""\nfile_path = os.path.join(buggy_dir, "src", "test", "java", "org", "apache", "commons","math3", "geometry", "euclidean","twod", "LineTest.java")\n\n# Open and read the file\nwith open(file_path, \'r\') as file:\n    content = file.read()\n    print(content)\n'

In [7]:
#now begin training neural networks
#first tokenize
import javalang
import os
from typing import List

def tokenize_java_file(file_path: str) -> List[str]:
    """Tokenize a single Java file using javalang."""
    with open(file_path, "r", encoding="utf-8") as f:
        java_code = f.read()
    try:
        tokens = list(javalang.tokenizer.tokenize(java_code))
        return [token.value for token in tokens]
    except javalang.tokenizer.LexerError:
        print(f"Lexer error in file: {file_path} (possibly invalid Java syntax)")
        return []

def tokenize_defects4j_project(project_dir: str) -> List[List[str]]:
    """Tokenize all Java files in a Defects4J project."""
    all_tokens = []
    for root, _, files in os.walk(project_dir):
        for file in files:
            if file.endswith(".java"):
                file_path = os.path.join(root, file)
                tokens = tokenize_java_file(file_path)
                if tokens:  # Only add if tokenization succeeded
                    all_tokens.append(tokens)
    return all_tokens

# Example: Tokenize a Defects4J buggy version
project_dir = "/tmp/Lang_1_b"  # Path after checking out Lang-1
buggy_tokens = tokenize_defects4j_project(project_dir)
project_dir = "/tmp/Lang_1_f"
fixed_tokens = tokenize_defects4j_project(project_dir)


In [28]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from typing import List, Dict, Tuple
import difflib

# 1. Find differences between buggy and fixed files to create token-level labels
def identify_bug_locations(buggy_tokens: List[str], fixed_tokens: List[str]) -> List[int]:
    """
    Identify which tokens in the buggy file are likely to contain bugs by comparing with fixed tokens.
    Returns a list of 0s and 1s where 1 indicates a potentially buggy token.
    """
    # Use difflib to find differences
    matcher = difflib.SequenceMatcher(None, buggy_tokens, fixed_tokens)
    bug_locations = [0] * len(buggy_tokens)
    
    # Mark tokens that are different or absent in the fixed version
    for tag, i1, i2, j1, j2 in matcher.get_opcodes():
        if tag in ('replace', 'delete'):
            for i in range(i1, i2):
                if i < len(bug_locations):  # Safety check
                    bug_locations[i] = 1
                    
    return bug_locations

# 2. Prepare paired data with token-level annotations
def prepare_paired_data(buggy_tokens_list: List[List[str]], fixed_tokens_list: List[List[str]]):
    """
    Prepare paired data with token-level bug annotations.
    Returns file-level labels and token-level labels.
    """
    paired_data = []
    
    for i, (buggy_tokens, fixed_tokens) in enumerate(zip(buggy_tokens_list, fixed_tokens_list)):
        # Skip empty files
        if not buggy_tokens or not fixed_tokens:
            continue
            
        # Find bug locations
        token_level_labels = identify_bug_locations(buggy_tokens, fixed_tokens)
        
        # Determine if file is buggy (if any token is marked as buggy)
        file_is_buggy = 1 if sum(token_level_labels) > 0 else 0
        
        paired_data.append({
            'id': i,
            'buggy_tokens': buggy_tokens,
            'token_level_labels': token_level_labels,
            'file_is_buggy': file_is_buggy
        })
        
    return paired_data

# 3. Build vocabulary
def build_vocabulary(token_sequences: List[List[str]]) -> Dict[str, int]:
    """Build a vocabulary from all tokens."""
    vocab = {"<PAD>": 0, "<UNK>": 1}
    for tokens in token_sequences:
        for token in tokens:
            if token not in vocab:
                vocab[token] = len(vocab)
    return vocab

# 4. Convert and pad sequences
def prepare_sequences(data_items, vocab: Dict[str, int], max_len: int):
    """Convert token sequences and labels to padded numerical arrays."""
    token_data = []
    file_labels = []
    token_labels = []
    
    for item in data_items:
        tokens = item['buggy_tokens']
        token_level_label = item['token_level_labels']
        
        # Process tokens (truncate or pad)
        if len(tokens) > max_len:
            token_indices = [vocab.get(token, vocab["<UNK>"]) for token in tokens[:max_len]]
            item_token_labels = token_level_label[:max_len]
        else:
            token_indices = [vocab.get(token, vocab["<UNK>"]) for token in tokens]
            item_token_labels = token_level_label.copy()
            # Pad tokens and labels
            padding_length = max_len - len(tokens)
            token_indices += [vocab["<PAD>"]] * padding_length
            item_token_labels += [0] * padding_length  # Pad labels with 0 (non-buggy)
            
        token_data.append(token_indices)
        file_labels.append(item['file_is_buggy'])
        token_labels.append(item_token_labels)
    
    return np.array(token_data), np.array(file_labels), np.array(token_labels)

# 5. Custom Dataset for Bug Localization
class BugLocalizationDataset(Dataset):
    def __init__(self, token_features, file_labels, token_labels):
        self.token_features = torch.tensor(token_features, dtype=torch.long)
        self.file_labels = torch.tensor(file_labels, dtype=torch.float)
        self.token_labels = torch.tensor(token_labels, dtype=torch.float)
    
    def __len__(self):
        return len(self.token_features)
    
    def __getitem__(self, idx):
        return self.token_features[idx], self.file_labels[idx], self.token_labels[idx]

# 6. Bug Localization Model (with both file-level and token-level predictions)
class BugLocalizationModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(BugLocalizationModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, batch_first=True, bidirectional=True)
        
        # Token-level classification
        self.token_classifier = nn.Linear(hidden_dim, 1)
        
        # File-level classification
        self.file_classifier = nn.Linear(hidden_dim, 1)
        
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        # x shape: [batch_size, seq_len]
        embedded = self.embedding(x)  # [batch_size, seq_len, embedding_dim]
        lstm_out, (hidden, _) = self.lstm(embedded)  # lstm_out: [batch_size, seq_len, hidden_dim]
        
        # Token-level predictions
        token_logits = self.token_classifier(lstm_out)  # [batch_size, seq_len, 1]
        token_probs = self.sigmoid(token_logits.squeeze(-1))  # [batch_size, seq_len]
        
        # File-level prediction (using the final hidden state)
        hidden = hidden.transpose(0, 1).contiguous().view(x.size(0), -1)  # [batch_size, hidden_dim]
        file_logit = self.file_classifier(hidden)  # [batch_size, 1]
        file_prob = self.sigmoid(file_logit).squeeze(-1)  # [batch_size]
        
        return file_prob, token_probs

# 7. Split data and train model
def split_and_train_bug_localization(buggy_tokens_list, fixed_tokens_list, max_len=500, test_size=0.2):
    """Split data and train a bug localization model with both file and token-level labels."""
    # Prepare paired data
    paired_data = prepare_paired_data(buggy_tokens_list, fixed_tokens_list)
    
    # Split into training and testing sets
    train_data, test_data = train_test_split(
        paired_data, test_size=test_size, random_state=42,
        # Stratify based on file_is_buggy to maintain class balance
        stratify=[item['file_is_buggy'] for item in paired_data]
    )
    
    print(f"Training set: {len(train_data)} files")
    print(f"Testing set: {len(test_data)} files")
    
    # Extract token sequences for vocabulary building
    train_token_sequences = [item['buggy_tokens'] for item in train_data]
    
    # Build vocabulary from training data only
    vocab = build_vocabulary(train_token_sequences)
    print(f"Vocabulary size: {len(vocab)}")
    
    # Prepare sequences for training and testing
    train_features, train_file_labels, train_token_labels = prepare_sequences(train_data, vocab, max_len)
    test_features, test_file_labels, test_token_labels = prepare_sequences(test_data, vocab, max_len)
    
    # Create datasets
    train_dataset = BugLocalizationDataset(train_features, train_file_labels, train_token_labels)
    test_dataset = BugLocalizationDataset(test_features, test_file_labels, test_token_labels)
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=16)
    
    # Initialize model
    vocab_size = len(vocab)
    embedding_dim = 64
    hidden_dim = 128
    
    model = BugLocalizationModel(vocab_size, embedding_dim, hidden_dim)
    
    # Define loss function and optimizer
    file_criterion = nn.BCELoss()
    token_criterion = nn.BCELoss(reduction='none')
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        
        for tokens, file_labels, token_labels in train_loader:
            optimizer.zero_grad()
            file_preds, token_preds = model(tokens)
            
            # Calculate file-level loss
            file_loss = file_criterion(file_preds, file_labels)
            
            # Calculate token-level loss with masking for padding
            mask = (tokens != 0).float()
            token_loss = token_criterion(token_preds, token_labels)
            masked_token_loss = (token_loss * mask).sum() / mask.sum().clamp(min=1e-5)
            
            # Combined loss (with weighting)
            combined_loss = file_loss + masked_token_loss
            combined_loss.backward()
            optimizer.step()
            
            total_loss += combined_loss.item()
        
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")
        
        # Evaluation
        if (epoch + 1) % 2 == 0 or epoch == num_epochs - 1:
            model.eval()
            file_correct = 0
            file_total = 0
            token_metrics = {
                'true_pos': 0, 'false_pos': 0, 'true_neg': 0, 'false_neg': 0
            }
            
            with torch.no_grad():
                for tokens, file_labels, token_labels in test_loader:
                    file_preds, token_preds = model(tokens)
                    
                    # File-level metrics
                    file_pred_labels = (file_preds > 0.5).float()
                    file_correct += (file_pred_labels == file_labels).sum().item()
                    file_total += len(file_labels)
                    
                    # Token-level metrics with mask for padding
                    mask = (tokens != 0).float()
                    token_pred_labels = (token_preds > 0.5).float()
                    
                    # Calculate token metrics
                    token_metrics['true_pos'] += ((token_pred_labels == 1) & (token_labels == 1) & (mask == 1)).sum().item()
                    token_metrics['false_pos'] += ((token_pred_labels == 1) & (token_labels == 0) & (mask == 1)).sum().item()
                    token_metrics['true_neg'] += ((token_pred_labels == 0) & (token_labels == 0) & (mask == 1)).sum().item()
                    token_metrics['false_neg'] += ((token_pred_labels == 0) & (token_labels == 1) & (mask == 1)).sum().item()
            
            # Calculate file-level accuracy
            file_accuracy = file_correct / file_total
            
            # Calculate token-level metrics
            token_precision = token_metrics['true_pos'] / (token_metrics['true_pos'] + token_metrics['false_pos']) if (token_metrics['true_pos'] + token_metrics['false_pos']) > 0 else 0
            token_recall = token_metrics['true_pos'] / (token_metrics['true_pos'] + token_metrics['false_neg']) if (token_metrics['true_pos'] + token_metrics['false_neg']) > 0 else 0
            token_f1 = 2 * (token_precision * token_recall) / (token_precision + token_recall) if (token_precision + token_recall) > 0 else 0
            
            print(f"File-level Accuracy: {file_accuracy:.4f}")
            print(f"Token-level Precision: {token_precision:.4f}, Recall: {token_recall:.4f}, F1: {token_f1:.4f}")
    
    return model, vocab, test_data

# 8. Function to predict bugs in a new file
def predict_bugs_in_file(model, vocab, file_tokens, max_len=500, threshold=0.5):
    """Predict if a file has bugs and identify their locations."""
    model.eval()
    
    # Tokenize and pad
    if len(file_tokens) > max_len:
        indices = [vocab.get(token, vocab["<UNK>"]) for token in file_tokens[:max_len]]
        effective_len = max_len
    else:
        indices = [vocab.get(token, vocab["<UNK>"]) for token in file_tokens]
        effective_len = len(indices)
        indices += [vocab["<PAD>"]] * (max_len - len(indices))
    
    # Convert to tensor
    tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(0)  # Add batch dimension
    
    # Predict
    with torch.no_grad():
        file_prob, token_probs = model(tensor)
        file_is_buggy = file_prob.item() > threshold
        
    # Get token-level predictions for original (non-padded) tokens
    token_predictions = []
    for i, (token, prob) in enumerate(zip(file_tokens[:effective_len], token_probs[0][:effective_len])):
        if prob.item() > threshold:
            token_predictions.append((i, token, prob.item()))
    
    return {
        'file_is_buggy': file_is_buggy,
        'file_bug_probability': file_prob.item(),
        'bug_locations': token_predictions
    }

# Example usage:
#model, vocab, test_data = split_and_train_bug_localization(buggy_tokens, fixed_tokens)
# 
# # Save the model
""""
torch.save({
        'model_state_dict': model.state_dict(),
    'vocab': vocab
}, 'bug_localization_model.pt')
"""
""""
buggy_tokens = tokenize_defects4j_project("/tmp/Lang_1_b/") # Bug: uses - instead of +
fixed_tokens = ["def", "add", "(", "a", ",", "b", ")", ":", "return", "a", "+", "b"]

# Call the function to identify and print bugs
bug_labels = identify_bug_locations(buggy_tokens, fixed_tokens)
print(bug_labels)
# # Example: Test on the first test item
if test_data:
    for i in range(0,48):
        test_file = test_data[i]
        result = predict_bugs_in_file(model, vocab, test_file['buggy_tokens'])
    #     
        print(f"File is buggy: {result['file_is_buggy']} (Probability: {result['file_bug_probability']:.4f})")
        print(f"Found {len(result['bug_locations'])} potential bug locations")
    #     
        if result['bug_locations']:
            print("\nPotential bugs:")
            for idx, token, prob in result['bug_locations']:
                print(f"Token {idx}: '{token}' (probability: {prob:.4f})")
"""
#bug = tokenize_java_file("/tmp/Math_35_b/src/test/java/org/apache/commons/math3/geometry/euclidean/twod/LineTest.java")
#fix = tokenize_java_file("/tmp/Math_35_f/src/test/java/org/apache/commons/math3/geometry/euclidean/twod/LineTest.java")
#list = identify_bug_locations(bug,fix)
#print(list)
#if 1 in list:
    #print("bug")
buggy_path = "/tmp/Math_35_b/src/test/java/org/apache/commons/math3/geometry/euclidean/twod/LineTest.java"
fixed_path = "/tmp/Math_35_f/src/test/java/org/apache/commons/math3/geometry/euclidean/twod/LineTest.java"

# Read files
with open(buggy_path, 'r') as f1, open(fixed_path, 'r') as f2:
    buggy_lines = f1.readlines()
    fixed_lines = f2.readlines()

# Compute differences
diff = difflib.unified_diff(buggy_lines, fixed_lines, lineterm='')

# Print results
print("\n".join(diff))




In [8]:
#example for comparing individual directories
import os
import difflib
import javalang
from typing import List

def get_java_files(directory):
    """Recursively get all Java file paths in a directory."""
    java_files = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".java"):
                java_files.append(os.path.join(root, file))
    return java_files

def get_relative_paths(files, base_dir):
    """Convert absolute file paths to relative paths for comparison."""
    return {os.path.relpath(f, base_dir): f for f in files}

def tokenize_java_file(file_path: str) -> List[str]:
    """Tokenize a single Java file using javalang."""
    with open(file_path, "r", encoding="utf-8") as f:
        java_code = f.read()
    try:
        tokens = list(javalang.tokenizer.tokenize(java_code))
        return [token.value for token in tokens]
    except javalang.tokenizer.LexerError:
        print(f"Lexer error in file: {file_path} (possibly invalid Java syntax)")
        return []

def find_bug_locations(buggy_tokens, fixed_tokens):
    """Find locations where tokens differ."""
    diff = list(difflib.ndiff(buggy_tokens, fixed_tokens))
    changes = [i for i, line in enumerate(diff) if line.startswith('- ') or line.startswith('+ ')]
    return changes  # Returns indices of differences

# Debugging: Check dictionary structure
buggy_dir = checkout_version("Math","11","b")  # Make sure this function is defined correctly
fixed_dir = checkout_version("Math", "11","f")  # Same for this one

buggy_files = get_java_files(buggy_dir)
fixed_files = get_java_files(fixed_dir)

print(f"Buggy version has {len(buggy_files)} Java files.")
print(f"Fixed version has {len(fixed_files)} Java files.")

buggy_rel = get_relative_paths(buggy_files, buggy_dir)
fixed_rel = get_relative_paths(fixed_files, fixed_dir)

common_files = set(buggy_rel.keys()) & set(fixed_rel.keys())

print(f"Comparing {len(common_files)} files found in both versions.")

# Tokenize the common files
buggy_tokens = {file: tokenize_java_file(buggy_rel[file]) for file in common_files}
fixed_tokens = {file: tokenize_java_file(fixed_rel[file]) for file in common_files}

# Compare the tokens and find bug locations
bug_locations = {file: find_bug_locations(buggy_tokens[file], fixed_tokens[file]) for file in common_files}

# Print bug locations for each file
for file, changes in bug_locations.items():
    if changes:
        print(f"Potential bugs found in {file} at token indices: {changes}")



Buggy version has 1196 Java files.
Fixed version has 1196 Java files.
Comparing 1196 files found in both versions.
Potential bugs found in src/main/java/org/apache/commons/math3/distribution/MultivariateNormalDistribution.java at token indices: [639, 640, 642, 643]


In [9]:
import os
import hashlib
import difflib
import javalang
from typing import List, Dict, Tuple, Set, Optional, Any
from dataclasses import dataclass
import concurrent.futures
from pathlib import Path

@dataclass
class CodeChange:
    """Represents a code change between buggy and fixed versions."""
    file_path: str
    line_number: Optional[int]  # May be None for structural changes
    change_type: str  # 'added', 'deleted', 'modified'
    buggy_code: str
    fixed_code: str
    context_before: str
    context_after: str
    method_name: Optional[str]  # Method containing the change
    class_name: Optional[str]   # Class containing the change

def get_java_files(directory: str) -> List[str]:
    """Recursively get all Java file paths in a directory."""
    return [str(p) for p in Path(directory).glob('**/*.java')]

def get_relative_paths(files: List[str], base_dir: str) -> Dict[str, str]:
    """Convert absolute file paths to relative paths for comparison."""
    return {os.path.relpath(f, base_dir): f for f in files}

def compute_file_hash(file_path: str) -> str:
    """Compute a hash of the file contents."""
    with open(file_path, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()

def find_changed_files(buggy_dir: str, fixed_dir: str) -> Dict[str, Tuple[str, str]]:
    """Find files that differ between buggy and fixed versions based on hash."""
    buggy_files = get_java_files(buggy_dir)
    fixed_files = get_java_files(fixed_dir)
    
    buggy_rel = get_relative_paths(buggy_files, buggy_dir)
    fixed_rel = get_relative_paths(fixed_files, fixed_dir)
    
    common_files = set(buggy_rel.keys()) & set(fixed_rel.keys())
    changed_files = {}
    
    print(f"Checking {len(common_files)} files found in both versions...")
    
    # First filter: use file hash comparison to identify changed files
    for file in common_files:
        buggy_hash = compute_file_hash(buggy_rel[file])
        fixed_hash = compute_file_hash(fixed_rel[file])
        
        if buggy_hash != fixed_hash:
            changed_files[file] = (buggy_rel[file], fixed_rel[file])
    
    print(f"Found {len(changed_files)} files with different hashes.")
    return changed_files

def get_file_content(file_path: str) -> str:
    """Safely read file content handling different encodings."""
    encodings = ['utf-8', 'latin-1', 'cp1252']
    
    for encoding in encodings:
        try:
            with open(file_path, 'r', encoding=encoding) as f:
                return f.read()
        except UnicodeDecodeError:
            continue
    
    print(f"Warning: Could not decode {file_path} with any of the attempted encodings.")
    return ""

def get_line_mapping(tokens: List) -> Dict[int, int]:
    """Create mapping from token index to line number."""
    return {i: token.position[0] for i, token in enumerate(tokens) if hasattr(token, 'position') and token.position}

def extract_ast_info(file_path: str) -> Dict[int, Dict[str, str]]:
    """Extract class and method information from Java file."""
    content = get_file_content(file_path)
    line_info = {}
    
    try:
        tree = javalang.parse.parse(content)
        
        # Process classes
        for path, node in tree.filter(javalang.tree.ClassDeclaration):
            start_line = node.position[0] if node.position else 0
            end_line = 0  # We'll approximate the end line
            
            for member in node.body:
                if hasattr(member, 'position') and member.position:
                    if member.position[0] > end_line:
                        end_line = member.position[0]
            
            # If we couldn't find an end line, use start + 100 as an approximation
            if end_line <= start_line:
                end_line = start_line + 100
            
            for line in range(start_line, end_line + 1):
                line_info[line] = {
                    'class_name': node.name,
                    'method_name': None
                }
        
        # Process methods
        for path, node in tree.filter(javalang.tree.MethodDeclaration):
            if node.position:
                start_line = node.position[0]
                # Approximate method end based on body
                end_line = start_line + 30  # Reasonable default if we can't determine
                
                # Try to determine class name from path
                class_name = None
                for p in path:
                    if isinstance(p, javalang.tree.ClassDeclaration):
                        class_name = p.name
                        break
                
                for line in range(start_line, end_line + 1):
                    line_info[line] = {
                        'class_name': class_name,
                        'method_name': node.name
                    }
        
        return line_info
    except Exception as e:
        print(f"Error extracting AST info from {file_path}: {e}")
        return {}

def tokenize_java_file(file_path: str) -> Tuple[List[str], List]:
    """Tokenize a Java file and return token values and original tokens."""
    # Rest of the function remains the same
    content = get_file_content(file_path)
    
    try:
        tokens = list(javalang.tokenizer.tokenize(content))
        return [token.value for token in tokens], tokens
    except javalang.tokenizer.LexerError:
        print(f"Lexer error in file: {file_path} (possibly invalid Java syntax)")
        return [], []

def analyze_file_differences(file_rel_path: str, buggy_path: str, fixed_path: str) -> List[CodeChange]:
    """Analyze differences between buggy and fixed versions of a file."""
    buggy_tokens_values, buggy_tokens = tokenize_java_file(buggy_path)
    fixed_tokens_values, fixed_tokens = tokenize_java_file(fixed_path)
    
    if not buggy_tokens_values or not fixed_tokens_values:
        return []
    
    # Get line number mapping
    buggy_line_map = get_line_mapping(buggy_tokens)
    
    # Get AST information
    buggy_ast_info = extract_ast_info(buggy_path)
    
    # Use difflib to find differences
    matcher = difflib.SequenceMatcher(None, buggy_tokens_values, fixed_tokens_values)
    
    changes = []
    for tag, i1, i2, j1, j2 in matcher.get_opcodes():
        if tag != 'equal':
            # Determine line number (from buggy version)
            line_number = buggy_line_map.get(i1) if i1 < len(buggy_line_map) else None
            
            # Get context (5 tokens before and after)
            context_before_start = max(0, i1 - 5)
            context_before = " ".join(buggy_tokens_values[context_before_start:i1])
            
            context_after_end = min(len(buggy_tokens_values), i2 + 5)
            context_after = " ".join(buggy_tokens_values[i2:context_after_end])
            
            # Get class and method information
            class_name = None
            method_name = None
            if line_number and line_number in buggy_ast_info:
                info = buggy_ast_info[line_number]
                class_name = info.get('class_name')
                method_name = info.get('method_name')
            
            # Determine change type
            if tag == 'replace':
                change_type = 'modified'
            elif tag == 'delete':
                change_type = 'deleted'
            elif tag == 'insert':
                change_type = 'added'
            
            # Get the actual code snippets
            buggy_code = " ".join(buggy_tokens_values[i1:i2]) if i1 < i2 else ""
            fixed_code = " ".join(fixed_tokens_values[j1:j2]) if j1 < j2 else ""
            
            changes.append(CodeChange(
                file_path=file_rel_path,
                line_number=line_number,
                change_type=change_type,
                buggy_code=buggy_code,
                fixed_code=fixed_code,
                context_before=context_before,
                context_after=context_after,
                method_name=method_name,
                class_name=class_name
            ))
    
    return changes

def analyze_project_differences(buggy_dir: str, fixed_dir: str) -> Dict[str, List[CodeChange]]:
    """Analyze differences between buggy and fixed versions of a project."""
    changed_files = find_changed_files(buggy_dir, fixed_dir)
    
    results = {}
    
    # Sequential processing instead of parallel
    for file_rel, (buggy_path, fixed_path) in changed_files.items():
        try:
            changes = analyze_file_differences(file_rel, buggy_path, fixed_path)
            if changes:
                results[file_rel] = changes
        except Exception as e:
            print(f"Error analyzing {file_rel}: {e}")
    
    return results

def generate_html_report(project_name: str, bug_id: str, changes: Dict[str, List[CodeChange]]) -> str:
    """Generate an HTML report of the changes."""
    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>Bug Analysis: {project_name}-{bug_id}</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .file {{ margin-bottom: 30px; border: 1px solid #ddd; border-radius: 5px; overflow: hidden; }}
            .file-header {{ background-color: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; }}
            .change {{ margin: 10px; padding: 10px; border: 1px solid #eee; border-radius: 5px; }}
            .change-header {{ font-weight: bold; margin-bottom: 10px; }}
            .deleted {{ background-color: #ffecec; }}
            .added {{ background-color: #eaffea; }}
            .modified {{ background-color: #ececff; }}
            .code {{ font-family: monospace; white-space: pre-wrap; padding: 10px; background-color: #f9f9f9; }}
            .context {{ color: #888; }}
        </style>
    </head>
    <body>
        <h1>Bug Analysis: {project_name}-{bug_id}</h1>
        <p>Total files changed: {len(changes)}</p>
    """
    
    for file_path, file_changes in changes.items():
        html += f"""
        <div class="file">
            <div class="file-header">
                <h2>{file_path}</h2>
                <p>Total changes: {len(file_changes)}</p>
            </div>
        """
        
        for change in file_changes:
            html += f"""
            <div class="change {change.change_type}">
                <div class="change-header">
                    {change.change_type.capitalize()} at line {change.line_number or 'unknown'}
                    {f' in {change.class_name}' if change.class_name else ''}
                    {f'.{change.method_name}()' if change.method_name else ''}
                </div>
                <div class="context">Context before: {change.context_before}</div>
                <div class="code buggy">Buggy code: {change.buggy_code}</div>
                <div class="code fixed">Fixed code: {change.fixed_code}</div>
                <div class="context">Context after: {change.context_after}</div>
            </div>
            """
        
        html += "</div>"
    
    html += """
    </body>
    </html>
    """
    
    return html

def main(project_name: str, bug_id: str, buggy_dir: str, fixed_dir: str):
    """Main function to analyze differences between buggy and fixed versions."""
    print(f"Analyzing {project_name}-{bug_id}...")
    
    # Analyze differences
    changes = analyze_project_differences(buggy_dir, fixed_dir)
    
    # Print summary
    total_changes = sum(len(file_changes) for file_changes in changes.values())
    print(f"Found {len(changes)} files with changes, totaling {total_changes} changes.")
    
    for file_path, file_changes in changes.items():
        print(f"\nFile: {file_path}")
        for i, change in enumerate(file_changes):
            print(f"  Change {i+1}: {change.change_type} at line {change.line_number}")
            if change.class_name:
                print(f"    In class: {change.class_name}")
            if change.method_name:
                print(f"    In method: {change.method_name}()")
            print(f"    Buggy: {change.buggy_code[:50]}{'...' if len(change.buggy_code) > 50 else ''}")
            print(f"    Fixed: {change.fixed_code[:50]}{'...' if len(change.fixed_code) > 50 else ''}")
    
    # Generate HTML report
    html_report = generate_html_report(project_name, bug_id, changes)
    report_file = f"{project_name}-{bug_id}_bug_report.html"
    
    with open(report_file, "w", encoding="utf-8") as f:
        f.write(html_report)
    
    print(f"\nHTML report generated: {report_file}")

if __name__ == "__main__":
    project_name = "Math"
    bug_id = "13"
    buggy_dir = checkout_version(project_name, bug_id, "b")  
    fixed_dir = checkout_version(project_name, bug_id, "f") 
    
    main(project_name, bug_id, buggy_dir, fixed_dir)

Analyzing Math-13...
Checking 1195 files found in both versions...
Found 1 files with different hashes.
Found 1 files with changes, totaling 2 changes.

File: src/main/java/org/apache/commons/math3/optimization/general/AbstractLeastSquaresOptimizer.java
  Change 1: added at line 562
    In class: AbstractLeastSquaresOptimizer
    In method: squareRoot()
    Buggy: 
    Fixed: if ( m instanceof DiagonalMatrix ) { final int dim...
  Change 2: added at line None
    Buggy: 
    Fixed: }

HTML report generated: Math-13_bug_report.html


In [31]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import os
from transformers import RobertaTokenizer, RobertaModel
import re
from tqdm import tqdm

class BugDataset(Dataset):
    def __init__(self, data_df, tokenizer, max_length=512):
        self.data = data_df
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # Get code snippets and context
        code = row['code_snippet']
        context_before = row['context_before'] if not pd.isna(row['context_before']) else ""
        context_after = row['context_after'] if not pd.isna(row['context_after']) else ""
        
        # Combine for full context
        full_snippet = f"{context_before} {code} {context_after}"
        
        # Tokenize
        encoding = self.tokenizer(
            full_snippet,
            return_tensors='pt',
            max_length=self.max_length,
            padding='max_length',
            truncation=True
        )
        
        # Get label (1 for buggy, 0 for fixed)
        label = torch.tensor(1 if row['is_buggy'] else 0, dtype=torch.float)
        
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'label': label
        }

class BugLocalizationModel(nn.Module):
    def __init__(self, pretrained_model_name='microsoft/codebert-base'):
        super(BugLocalizationModel, self).__init__()
        self.codebert = RobertaModel.from_pretrained(pretrained_model_name)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.codebert.config.hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, input_ids, attention_mask):
        outputs = self.codebert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        # Use the [CLS] token representation
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return self.sigmoid(logits)

def prepare_data_from_changes(change_dict):
    """Convert code changes to training data format"""
    data = []
    
    for file_path, changes in change_dict.items():
        for change in changes:
            # Add buggy version as positive example
            if change.buggy_code.strip():
                data.append({
                    'file_path': file_path,
                    'line_number': change.line_number,
                    'code_snippet': change.buggy_code,
                    'context_before': change.context_before,
                    'context_after': change.context_after,
                    'class_name': change.class_name,
                    'method_name': change.method_name,
                    'is_buggy': True
                })
            
            # Add fixed version as negative example
            if change.fixed_code.strip():
                data.append({
                    'file_path': file_path,
                    'line_number': change.line_number,
                    'code_snippet': change.fixed_code,
                    'context_before': change.context_before,
                    'context_after': change.context_after,
                    'class_name': change.class_name,
                    'method_name': change.method_name,
                    'is_buggy': False
                })
    
    return pd.DataFrame(data)

def collect_training_data(projects_dir, output_file='bug_dataset.csv'):
    """Collect training data from multiple projects and bugs"""
    all_data = []
    
    for project_dir in os.listdir(projects_dir):
        project_path = os.path.join(projects_dir, project_dir)
        if not os.path.isdir(project_path):
            continue
        
        project_name = project_dir.split('-')[0]
        bug_id = project_dir.split('-')[1]
        
        buggy_dir = os.path.join(project_path, 'buggy')
        fixed_dir = os.path.join(project_path, 'fixed')
        
        if os.path.exists(buggy_dir) and os.path.exists(fixed_dir):
            try:
                changes = analyze_project_differences(buggy_dir, fixed_dir)
                df = prepare_data_from_changes(changes)
                df['project'] = project_name
                df['bug_id'] = bug_id
                all_data.append(df)
                print(f"Processed {project_name}-{bug_id}")
            except Exception as e:
                print(f"Error processing {project_name}-{bug_id}: {e}")
    
    if all_data:
        final_df = pd.concat(all_data, ignore_index=True)
        final_df.to_csv(output_file, index=False)
        print(f"Saved dataset with {len(final_df)} records to {output_file}")
        return final_df
    else:
        print("No data collected.")
        return None

def train_model(train_loader, val_loader, model, device, num_epochs=3, learning_rate=2e-5):
    """Train the bug localization model"""
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
    criterion = nn.BCELoss()
    
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0
        
        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            
            optimizer.zero_grad()
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.view(-1), labels.view(-1))
            
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        avg_train_loss = train_loss / len(train_loader)
        
        # Validation phase
        model.eval()
        val_loss = 0
        val_preds = []
        val_true = []
        
        with torch.no_grad():
            for batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]"):
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                
                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                loss = criterion(outputs.view(-1), labels.view(-1))
                
                val_loss += loss.item()
                
                val_preds.extend(outputs.view(-1).cpu().numpy())
                val_true.extend(labels.view(-1).cpu().numpy())
        
        avg_val_loss = val_loss / len(val_loader)
        
        # Calculate metrics
        val_preds_binary = [1 if p >= 0.5 else 0 for p in val_preds]
        accuracy = sum(p == t for p, t in zip(val_preds_binary, val_true)) / len(val_true)
        
        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}")
        print(f"Val Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.4f}")
        
        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), "best_bug_localization_model.pt")
            print("Saved best model.")
    
    return model

def predict_bugs(model, tokenizer, code_files, device, threshold=0.5):
    """
    Predict potential bugs in code files
    
    Args:
        model: The trained model
        tokenizer: Tokenizer for the model
        code_files: Dictionary of file_path -> code_content
        device: Device to run inference on
        threshold: Probability threshold for bug prediction
    
    Returns:
        Dictionary of file_path -> list of potential bug lines
    """
    model.eval()
    results = {}
    
    for file_path, content in code_files.items():
        # Split code into chunks (e.g., methods or smaller segments)
        lines = content.split('\n')
        chunks = []
        
        # A simple chunking strategy - this could be improved to be more method-aware
        current_chunk = []
        for i, line in enumerate(lines):
            current_chunk.append(line)
            if len(current_chunk) >= 15 or i == len(lines) - 1:  # Chunk size of ~15 lines
                chunks.append(('\n'.join(current_chunk), i - len(current_chunk) + 1))
                current_chunk = []
        
        file_bugs = []
        
        # Process each chunk
        for chunk_text, start_line in chunks:
            encoding = tokenizer(
                chunk_text,
                return_tensors='pt',
                max_length=512,
                padding='max_length',
                truncation=True
            )
            
            with torch.no_grad():
                input_ids = encoding['input_ids'].to(device)
                attention_mask = encoding['attention_mask'].to(device)
                
                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                prob = outputs.squeeze().cpu().item()
                
                if prob >= threshold:
                    # If chunk is predicted as buggy, add it to results
                    file_bugs.append({
                        'start_line': start_line,
                        'end_line': start_line + len(chunk_text.split('\n')) - 1,
                        'bug_probability': prob,
                        'chunk': chunk_text
                    })
        
        if file_bugs:
            results[file_path] = file_bugs
    
    return results



In [10]:
import os
import subprocess
import tempfile
import shutil
from pathlib import Path


def checkout_version(project_name, bug_id, version_type):
    """
    Check out a specific version of a project using Defects4J or similar.
    
    Args:
        project_name: Name of the project (e.g., 'Math', 'Lang')
        bug_id: Bug identifier (e.g., '1', '10')
        version_type: 'b' for buggy version, 'f' for fixed version
    
    Returns:
        Path to the checked out code
    """
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp(prefix=f"{project_name}-{bug_id}-{version_type}")
    
    # Map version type to Defects4J version flag
    version_flag = "b" if version_type == "b" else "f"
    
    try:
        # Run Defects4J checkout command
        cmd = [
            "defects4j", "checkout",
            "-p", project_name,
            "-v", f"{bug_id}{version_flag}",
            "-w", temp_dir
        ]
        
        result = subprocess.run(cmd, check=True, capture_output=True, text=True)
        print(f"Checked out {project_name}-{bug_id} {version_type} version to {temp_dir}")
        
        return temp_dir
    except subprocess.CalledProcessError as e:
        print(f"Error checking out {project_name}-{bug_id} {version_type} version:")
        print(f"Command: {' '.join(cmd)}")
        print(f"Error: {e.stderr}")
        shutil.rmtree(temp_dir, ignore_errors=True)
        raise

def collect_training_data_from_defects4j(projects, bug_ids, output_file='bug_dataset.csv'):
    """
    Collect training data from multiple Defects4J projects and bugs
    
    Args:
        projects: List of project names (e.g., ['Math', 'Lang'])
        bug_ids: Dictionary mapping project names to lists of bug IDs
        output_file: Path to save the collected data
    
    Returns:
        DataFrame with collected data
    """
    all_data = []
    
    for project_name in projects:
        for bug_id in bug_ids.get(project_name, []):
            try:
                print(f"Processing {project_name}-{bug_id}...")
                
                # Check out buggy and fixed versions
                buggy_dir = checkout_version(project_name, bug_id, "b")
                fixed_dir = checkout_version(project_name, bug_id, "f")
                
                # Analyze differences
                changes = analyze_project_differences(buggy_dir, fixed_dir)
                
                # Prepare data
                df = prepare_data_from_changes(changes)
                df['project'] = project_name
                df['bug_id'] = bug_id
                all_data.append(df)
                
                print(f"Successfully processed {project_name}-{bug_id}")
                
                # Clean up checkout directories
                shutil.rmtree(buggy_dir, ignore_errors=True)
                shutil.rmtree(fixed_dir, ignore_errors=True)
                
            except Exception as e:
                print(f"Error processing {project_name}-{bug_id}: {e}")
    
    if all_data:
        final_df = pd.concat(all_data, ignore_index=True)
        final_df.to_csv(output_file, index=False)
        print(f"Saved dataset with {len(final_df)} records to {output_file}")
        return final_df
    else:
        print("No data collected.")
        return None

def main():
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Define projects and bug IDs to use
    projects = ['Math', 'Lang', 'Time', 'Chart']
    bug_ids = {
        'Math': ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'],
        'Lang': ['1', '2', '3', '4', '5'],
        'Time': ['1', '2', '3', '4', '5'],
        'Chart': ['1', '2', '3', '4', '5']
    }
    
    # Parameters
    batch_size = 8
    max_length = 512
    num_epochs = 3
    learning_rate = 2e-5
    
    # 1. Collect and prepare data
    print("Collecting training data...")
    if os.path.exists("bug_dataset.csv"):
        print("Loading existing dataset...")
        data_df = pd.read_csv("bug_dataset.csv")
    else:
        data_df = collect_training_data_from_defects4j(projects, bug_ids)
        if data_df is None:
            print("Failed to collect data. Exiting.")
            return
    
    # Rest of the training code remains the same
    # ...
def generate_prediction_report(project_name, bug_id, predictions, actual_changes, output_path):
    """Generate HTML report comparing predictions to actual bugs"""
    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>Bug Prediction Report: {project_name}-{bug_id}</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .file {{ margin-bottom: 30px; border: 1px solid #ddd; border-radius: 5px; overflow: hidden; }}
            .file-header {{ background-color: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; }}
            .prediction {{ margin: 10px; padding: 10px; border: 1px solid #eee; border-radius: 5px; background-color: #fff8e6; }}
            .actual {{ margin: 10px; padding: 10px; border: 1px solid #eee; border-radius: 5px; background-color: #e6fff0; }}
            .match {{ background-color: #d1ffdd; border: 2px solid #28a745; }}
            .code {{ font-family: monospace; white-space: pre-wrap; padding: 10px; background-color: #f9f9f9; }}
            .metrics {{ margin-top: 20px; padding: 10px; background-color: #f0f0f0; border-radius: 5px; }}
        </style>
    </head>
    <body>
        <h1>Bug Prediction Report: {project_name}-{bug_id}</h1>
    """
    
    # Collect all files
    all_files = set(predictions.keys())
    for file_path in actual_changes.keys():
        all_files.add(file_path)
    
    # Track metrics
    true_positives = 0
    false_positives = 0
    false_negatives = 0
    
    # Process each file
    for file_path in sorted(all_files):
        html += f"""
        <div class="file">
            <div class="file-header">
                <h2>{file_path}</h2>
            </div>
        """
        
        # Get predicted bugs for this file
        file_predictions = predictions.get(file_path, [])
        predicted_lines = set()
        for pred in file_predictions:
            for line in range(pred['start_line'], pred['end_line'] + 1):
                predicted_lines.add(line)
        
        # Get actual bugs for this file
        file_changes = actual_changes.get(file_path, [])
        actual_lines = set()
        for change in file_changes:
            if change.line_number:
                actual_lines.add(change.line_number)
        
        # Show predictions
        if file_predictions:
            html += "<h3>Predicted Bugs:</h3>"
            for pred in file_predictions:
                # Check if prediction overlaps with actual bug
                matches_actual = any(line in actual_lines for line in range(pred['start_line'], pred['end_line'] + 1))
                match_class = " match" if matches_actual else ""
                
                if matches_actual:
                    true_positives += 1
                else:
                    false_positives += 1
                
                html += f"""
                <div class="prediction{match_class}">
                    <p>Lines {pred['start_line']}-{pred['end_line']} (Confidence: {pred['bug_probability']:.2f})</p>
                    <div class="code">{pred['chunk']}</div>
                </div>
                """
        
        # Show actual bugs
        if file_changes:
            html += "<h3>Actual Bugs:</h3>"
            for change in file_changes:
                if not change.line_number:
                    continue
                    
                # Check if actual bug was predicted
                was_predicted = change.line_number in predicted_lines
                match_class = " match" if was_predicted else ""
                
                if not was_predicted:
                    false_negatives += 1
                
                html += f"""
                <div class="actual{match_class}">
                    <p>Line {change.line_number} ({change.change_type})</p>
                    <p>In {change.class_name}.{change.method_name if change.method_name else ""}</p>
                    <div class="code">Buggy: {change.buggy_code}</div>
                    <div class="code">Fixed: {change.fixed_code}</div>
                </div>
                """
        
        html += "</div>"  # Close file div
    
    # Calculate metrics
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    html += f"""
    <div class="metrics">
        <h2>Performance Metrics</h2>
        <p>True Positives: {true_positives}</p>
        <p>False Positives: {false_positives}</p>
        <p>False Negatives: {false_negatives}</p>
        <p>Precision: {precision:.2f}</p>
        <p>Recall: {recall:.2f}</p>
        <p>F1 Score: {f1:.2f}</p>
    </div>
    </body>
    </html>
    """
    
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(html)
# Then to use the model on a new project:
def analyze_new_project(project_name, bug_id, model, tokenizer, device):
    """
    Analyze a specific project bug to predict bug locations
    """
    # Check out the buggy version
    buggy_dir = checkout_version(project_name, bug_id, "b")
    
    # Get all Java files
    java_files = get_java_files(buggy_dir)
    
    # Read file contents
    code_files = {}
    for file_path in java_files:
        rel_path = os.path.relpath(file_path, buggy_dir)
        code_files[rel_path] = get_file_content(file_path)
    
    # Predict bugs
    predictions = predict_bugs(model, tokenizer, code_files, device)
    
    # Generate report
    report_path = f"{project_name}-{bug_id}_bug_predictions.html"
    generate_prediction_report(project_name, bug_id, predictions, report_path)
    
    # Clean up
    shutil.rmtree(buggy_dir, ignore_errors=True)
    
    return predictions, report_path

In [33]:
import torch
import os
from transformers import RobertaTokenizer

# Assuming all the functions from previous examples are defined

def run_bug_localization_example():
    """Complete example workflow for bug localization model"""
    print("Starting bug localization example...")
    
    # 1. Define project data
    projects = ['Math']  # Start with just one project for the example
    bug_ids = {'Math': ['13']}  # Using Math-13 as an example
    
    # 2. Set up paths and device
    output_dir = "bug_model_output"
    os.makedirs(output_dir, exist_ok=True)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # 3. Check for existing dataset or create a new one
    dataset_path = os.path.join(output_dir, "bug_dataset.csv")
    if os.path.exists(dataset_path):
        print(f"Loading existing dataset from {dataset_path}")
        data_df = pd.read_csv(dataset_path)
    else:
        print("Creating new dataset by analyzing projects...")
        data_df = collect_training_data_from_defects4j(projects, bug_ids, dataset_path)
        if data_df is None or len(data_df) == 0:
            print("Failed to collect enough data. Exiting.")
            return
    
    print(f"Dataset contains {len(data_df)} samples")
    print(f"Sample distribution: Buggy={data_df['is_buggy'].sum()}, Non-buggy={len(data_df) - data_df['is_buggy'].sum()}")
    
    # 4. Prepare the tokenizer and model
    print("Initializing tokenizer and model...")
    tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base")
    model = BugLocalizationModel().to(device)
    
    train_df, val_df = train_test_split(data_df, test_size=0.2, random_state=42, 
                                       stratify=data_df['is_buggy'])
    
    print(f"Training set: {len(train_df)} samples")
    print(f"Validation set: {len(val_df)} samples")
    
    train_dataset = BugDataset(train_df, tokenizer)
    val_dataset = BugDataset(val_df, tokenizer)
    
    batch_size = 8
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    
    # 6. Train model or load existing model
    model_path = os.path.join(output_dir, "best_bug_model.pt")
    if os.path.exists(model_path):
        print(f"Loading existing model from {model_path}")
        model.load_state_dict(torch.load(model_path, map_location=device))
    else:
        print("Training new model...")
        model = train_model(
            train_loader,
            val_loader,
            model,
            device,
            num_epochs=3,
            learning_rate=2e-5
        )
    
    # 7. Test on a new bug
    test_project = "Lang"
    test_bug_id = "33"
    
    print(f"\nTesting model on {test_project}-{test_bug_id}...")
    
    # Check out the buggy version
    try:
        buggy_dir = checkout_version(test_project, test_bug_id, "b")
        
        # Get all Java files
        java_files = get_java_files(buggy_dir)
        print(f"Found {len(java_files)} Java files")
        
        # Read file contents
        code_files = {}
        for file_path in java_files:
            rel_path = os.path.relpath(file_path, buggy_dir)
            code_files[rel_path] = get_file_content(file_path)
        
        # Predict bugs
        print("Running predictions...")
        predictions = predict_bugs(model, tokenizer, code_files, device, threshold=0.7)
        
        # Print results
        print("\nPrediction Results:")
        print(f"Found potential bugs in {len(predictions)} files")
        
        for file_path, bugs in predictions.items():
            print(f"\nFile: {file_path}")
            print(f"Number of potential bugs: {len(bugs)}")
            
            # Just show first bug for brevity in example
            if bugs:
                bug = bugs[0]
                print(f"  Lines {bug['start_line']}-{bug['end_line']} (Probability: {bug['bug_probability']:.2f})")
                print(f"  Code excerpt:")
                print(f"  {bug['chunk'][:200]}...")  # Show part of the code
        
        # Check if we found the actual bug 
        # (Requires accessing Defects4J bug information)
        fixed_dir = checkout_version(test_project, test_bug_id, "f")
        changes = analyze_project_differences(buggy_dir, fixed_dir)
        
        print("\nActual bug locations:")
        for file_path, file_changes in changes.items():
            print(f"File: {file_path}")
            for change in file_changes:
                print(f"  Line {change.line_number}: {change.change_type}")
                print(f"  Buggy: {change.buggy_code[:100]}...")
                print(f"  Fixed: {change.fixed_code[:100]}...")
        
        # Generate HTML report
        report_path = os.path.join(output_dir, f"{test_project}-{test_bug_id}_predictions.html")
        generate_prediction_report(test_project, test_bug_id, predictions, changes, report_path)
        print(f"\nPrediction report saved to: {report_path}")
        
        # Clean up
        shutil.rmtree(buggy_dir, ignore_errors=True)
        shutil.rmtree(fixed_dir, ignore_errors=True)
        
    except Exception as e:
        print(f"Error analyzing {test_project}-{test_bug_id}: {e}")



if __name__ == "__main__":
    run_bug_localization_example()

Starting bug localization example...
Using device: cpu
Loading existing dataset from bug_model_output/bug_dataset.csv
Dataset contains 2 samples
Sample distribution: Buggy=0, Non-buggy=2
Initializing tokenizer and model...




Training set: 1 samples
Validation set: 1 samples
Training new model...


Epoch 1/3 [Train]: 100%|██████████| 1/1 [00:01<00:00,  1.08s/it]
Epoch 1/3 [Val]: 100%|██████████| 1/1 [00:00<00:00,  7.53it/s]


Epoch 1/3
Train Loss: 0.5241
Val Loss: 0.4527, Accuracy: 1.0000
Saved best model.


Epoch 2/3 [Train]: 100%|██████████| 1/1 [00:00<00:00,  1.76it/s]
Epoch 2/3 [Val]: 100%|██████████| 1/1 [00:00<00:00,  9.13it/s]


Epoch 2/3
Train Loss: 0.4499
Val Loss: 0.4093, Accuracy: 1.0000
Saved best model.


Epoch 3/3 [Train]: 100%|██████████| 1/1 [00:00<00:00,  1.91it/s]
Epoch 3/3 [Val]: 100%|██████████| 1/1 [00:00<00:00,  7.52it/s]


Epoch 3/3
Train Loss: 0.4080
Val Loss: 0.3678, Accuracy: 1.0000
Saved best model.

Testing model on Lang-33...
Checked out Lang-33 b version to /var/folders/f2/0s8yk3ss055_ygbx2_lw3m9r0000gn/T/Lang-33-bc_qz_7l9
Found 182 Java files
Running predictions...


KeyboardInterrupt: 

In [19]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import subprocess
import re
import javalang
from collections import Counter

class Defects4JBugDetector:
    def __init__(self, defects4j_path, output_dir):
        """
        Initialize the bug detector with paths to Defects4J and output directory
        
        Args:
            defects4j_path: Path to Defects4J installation
            output_dir: Directory to save extracted data and models
        """
        self.defects4j_path = defects4j_path
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
        
    def extract_features_from_java(self, java_code):
        """
        Extract code features from Java source code
        
        Args:
            java_code: String containing Java source code
            
        Returns:
            Dictionary of code features
        """
        features = {}
        
        # Basic metrics
        features['loc'] = len(java_code.splitlines())
        features['chars'] = len(java_code)
        
        # Count tokens and keywords
        try:
            tokens = list(javalang.tokenizer.tokenize(java_code))
            token_types = Counter(token.__class__.__name__ for token in tokens)
            
            for token_type, count in token_types.items():
                features[f'token_{token_type}'] = count
                
            # Try to parse and extract AST-based features
            try:
                tree = javalang.parse.parse(java_code)
                
                # Count different node types in AST
                node_types = []
                for path, node in tree:
                    node_types.append(node.__class__.__name__)
                
                node_counts = Counter(node_types)
                for node_type, count in node_counts.items():
                    features[f'ast_{node_type}'] = count
                    
                # Extract cyclomatic complexity approximation
                features['if_statements'] = node_counts.get('IfStatement', 0)
                features['for_loops'] = node_counts.get('ForStatement', 0)
                features['while_loops'] = node_counts.get('WhileStatement', 0)
                features['try_blocks'] = node_counts.get('TryStatement', 0)
                features['catch_blocks'] = node_counts.get('CatchClause', 0)
                features['complexity'] = (1 + features['if_statements'] + 
                                         features['for_loops'] + 
                                         features['while_loops'])
                
            except Exception as e:
                # If AST parsing fails, set these features to 0
                features['if_statements'] = 0
                features['for_loops'] = 0
                features['while_loops'] = 0
                features['try_blocks'] = 0
                features['catch_blocks'] = 0
                features['complexity'] = 0
                
        except Exception as e:
            # If tokenization fails, set token features to 0
            features['token_Identifier'] = 0
            features['token_Keyword'] = 0
            features['token_Operator'] = 0
        
        # Check for common bug indicators
        features['null_checks'] = java_code.count('null')
        features['todo_comments'] = len(re.findall(r'TODO|FIXME', java_code))
        features['exception_handling'] = java_code.count('catch')
        features['magic_numbers'] = len(re.findall(r'[^a-zA-Z0-9_"\']\d+[^a-zA-Z0-9_"\'.]', java_code))
        
        return features
    

    def extract_defects4j_data(self, projects=None):
        """
        Extract data from Defects4J projects

        Args:
            projects: List of project names to process (None for all projects)

        Returns:
            DataFrame with code features and bug labels
        """
        if projects is None:
            # Get list of all projects
            cmd = [os.path.join(self.defects4j_path, "framework/bin/defects4j"), "pids"]
            result = subprocess.run(cmd, capture_output=True, text=True)
            projects = result.stdout.strip().split()

        all_data = []

        for project in projects:
            print(f"Processing project: {project}")

            # Get number of bugs in project
            cmd = [os.path.join(self.defects4j_path, "framework/bin/defects4j"), "bids", "-p", project]
            result = subprocess.run(cmd, capture_output=True, text=True)
            bug_ids = result.stdout.strip().split()

            for bug_id in bug_ids:
                if not bug_id.strip():
                    continue

                print(f"  Processing bug: {project}-{bug_id}")
                work_dir = os.path.join(self.output_dir, f"{project}_{bug_id}")
                os.makedirs(work_dir, exist_ok=True)

                # Checkout buggy version
                checkout_cmd = [
                    os.path.join(self.defects4j_path, "framework/bin/defects4j"),
                    "checkout", 
                    "-p", project, 
                    "-v", f"{bug_id}b", 
                    "-w", work_dir
                ]
                result = subprocess.run(checkout_cmd, capture_output=True, text=True)
                if result.returncode != 0:
                    print(f"    Checkout failed for {project}-{bug_id}: {result.stderr.strip()}")
                    continue

                # Get list of modified classes
                export_cmd = [
                    os.path.join(self.defects4j_path, "framework/bin/defects4j"), 
                    "export", 
                    "-p", "classes.modified", 
                    "-w", work_dir
                ]
                result = subprocess.run(export_cmd, capture_output=True, text=True)
                buggy_classes = set(result.stdout.strip().split())

                # Gather all Java files in the source directory
                src_dir = os.path.join(work_dir, "src")
                for root, _, files in os.walk(src_dir):
                    for file in files:
                        if not file.endswith(".java"):
                            continue

                        java_file = os.path.join(root, file)
                        try:
                            with open(java_file, "r", encoding="utf-8") as f:
                                code = f.read()

                            # Compute class name (Defects4J-style)
                            rel_path = os.path.relpath(java_file, src_dir)
                            class_name = rel_path.replace(os.sep, '.').replace(".java", "")

                            # Determine if this file is buggy
                            is_buggy = any(class_name.endswith(buggy_class) for buggy_class in buggy_classes)

                            # Extract features
                            features = self.extract_features_from_java(code)
                            features.update({
                                'project': project,
                                'bug_id': bug_id,
                                'file': class_name,
                                'is_buggy': int(is_buggy)
                            })

                            all_data.append(features)

                        except Exception as e:
                            print(f"    Error processing {java_file}: {e}")

                # Clean up
                subprocess.run(["rm", "-rf", work_dir])

        if all_data:
            df = pd.DataFrame(all_data)
            df.to_csv(os.path.join(self.output_dir, "defects4j_features.csv"), index=False)
            return df
        else:
            print("No data was collected.")
            return pd.DataFrame()

    
    def prepare_data(self, df=None, test_size=0.2):
        """
        Prepare data for training
        
        Args:
            df: DataFrame with features (if None, loads from disk)
            test_size: Proportion of data to use for testing
            
        Returns:
            X_train, X_test, y_train, y_test, scaler
        """
        if df is None:
            df = pd.read_csv(f"{self.output_dir}/defects4j_features.csv")
        
        # Select features (drop non-feature columns)
        non_features = ['project', 'bug_id', 'file', 'is_buggy']
        X = df.drop(columns=[col for col in non_features if col in df.columns])
        y = df['is_buggy']
        
        # Fill missing values
        X = X.fillna(0)
        
        # Store the feature column names for later use in prediction
        self.feature_columns = list(X.columns)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
        
        # Scale features
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        
        # Save the feature columns to use in prediction
        if not hasattr(scaler, 'feature_names_in_'):
            # For older sklearn versions that don't store feature names
            setattr(scaler, 'feature_names_in_', self.feature_columns)
        
        return X_train, X_test, y_train, y_test, scaler

    
    def build_model(self, input_dim):
        """
        Build neural network model
        
        Args:
            input_dim: Number of input features
            
        Returns:
            Compiled Keras model
        """
        model = Sequential([
            Dense(128, activation='relu', input_dim=input_dim),
            Dropout(0.3),
            Dense(64, activation='relu'),
            Dropout(0.2),
            Dense(32, activation='relu'),
            Dense(1, activation='sigmoid')
        ])
        
        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
        )
        
        return model
    
    def train_model(self, X_train, y_train, X_test, y_test, epochs=50, batch_size=32):
        """
        Train the neural network model
        
        Args:
            X_train, y_train: Training data
            X_test, y_test: Test data
            epochs: Number of epochs
            batch_size: Batch size
            
        Returns:
            Trained model and history
        """
        # Build model
        model = self.build_model(X_train.shape[1])
        
        # Early stopping
        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )
        
        # Train model
        history = model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_test, y_test),
            callbacks=[early_stopping]
        )
        
        # Save model
        model.save(f"{self.output_dir}/bug_detector_model.h5")
        
        return model, history
    
    def evaluate_model(self, model, X_test, y_test):
        """
        Evaluate model performance
        
        Args:
            model: Trained model
            X_test, y_test: Test data
            
        Returns:
            Evaluation metrics
        """
        # Predict
        y_pred_prob = model.predict(X_test)
        y_pred = (y_pred_prob > 0.5).astype(int).flatten()
        
        # Calculate metrics
        conf_matrix = confusion_matrix(y_test, y_pred)
        report = classification_report(y_test, y_pred)
        
        print("Confusion Matrix:")
        print(conf_matrix)
        print("\nClassification Report:")
        print(report)
        
        return conf_matrix, report

def predict_bugs(self, model, scaler, java_file_path):
    """
    Predict if a Java file contains bugs
    
    Args:
        model: Trained model
        scaler: Feature scaler
        java_file_path: Path to Java file
        
    Returns:
        Bug probability
    """
    # Read file
    with open(java_file_path, 'r', encoding='utf-8') as f:
        code = f.read()
    
    # Extract features
    features = self.extract_features_from_java(code)
    
    # Convert to DataFrame
    df = pd.DataFrame([features])
    
    # Get the original feature names used during training
    # Try to load them from scaler or from a saved instance variable
    if hasattr(scaler, 'feature_names_in_'):
        original_features = list(scaler.feature_names_in_)
    else:
        # If not available, this might be a problem
        print("Warning: Scaler doesn't have feature_names_in_ attribute.")
        if hasattr(self, 'feature_columns'):
            original_features = self.feature_columns
        else:
            raise ValueError("Cannot determine original feature names. Store them during training.")
    
    # Create a new DataFrame with all expected columns from training
    aligned_df = pd.DataFrame(0.0, index=[0], columns=original_features)
    
    # Fill in values for features that exist in our current data
    for col in df.columns:
        if col in aligned_df.columns:
            aligned_df[col] = df[col]
    
    # Make sure all values are numeric and replace NAs
    aligned_df = aligned_df.fillna(0).astype(float)
    
    # Make sure columns are in the exact same order as during training
    aligned_df = aligned_df[original_features]
    
    # Now scale the features
    X = scaler.transform(aligned_df)
    
    # Predict
    bug_prob = model.predict(X)[0][0]
    
    return bug_prob 


# Example usage
if __name__ == "__main__":
    # Initialize detector
    detector = Defects4JBugDetector(
        defects4j_path="/Users/clairecallon/defects4j",
        output_dir="./defects4j_data"
    )
    
    # Extract data (comment out if already done)
    df = detector.extract_defects4j_data(['Lang', 'Math'])
    
    # Load existing data
    df = pd.read_csv("./defects4j_data/defects4j_features.csv")
    
    # Prepare data
    X_train, X_test, y_train, y_test, scaler = detector.prepare_data()
    
    # Train model
    model, history = detector.train_model(X_train, y_train, X_test, y_test)
    
    # Evaluate model
    detector.evaluate_model(model, X_test, y_test)
    


Processing project: Lang
  Processing bug: Lang-1
  Processing bug: Lang-3
  Processing bug: Lang-4
  Processing bug: Lang-5
  Processing bug: Lang-6
  Processing bug: Lang-7
  Processing bug: Lang-8
  Processing bug: Lang-9
  Processing bug: Lang-10
  Processing bug: Lang-11
  Processing bug: Lang-12
  Processing bug: Lang-13
  Processing bug: Lang-14
  Processing bug: Lang-15
  Processing bug: Lang-16
  Processing bug: Lang-17
  Processing bug: Lang-19
  Processing bug: Lang-20
  Processing bug: Lang-21
  Processing bug: Lang-22
  Processing bug: Lang-23
  Processing bug: Lang-24
  Processing bug: Lang-26
  Processing bug: Lang-27
  Processing bug: Lang-28
  Processing bug: Lang-29
  Processing bug: Lang-30
  Processing bug: Lang-31
  Processing bug: Lang-32
  Processing bug: Lang-33
  Processing bug: Lang-34
  Processing bug: Lang-35
  Processing bug: Lang-36
  Processing bug: Lang-37
  Processing bug: Lang-38
  Processing bug: Lang-39
  Processing bug: Lang-40
    Error processing 

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m2646/2646[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 530us/step - accuracy: 0.9954 - loss: 0.0353 - precision_1: 0.0000e+00 - recall_1: 0.0000e+00 - val_accuracy: 0.9983 - val_loss: 0.0116 - val_precision_1: 0.0000e+00 - val_recall_1: 0.0000e+00
Epoch 2/50
[1m2646/2646[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 479us/step - accuracy: 0.9983 - loss: 0.0112 - precision_1: 0.0000e+00 - recall_1: 0.0000e+00 - val_accuracy: 0.9983 - val_loss: 0.0114 - val_precision_1: 0.0000e+00 - val_recall_1: 0.0000e+00
Epoch 3/50
[1m2646/2646[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 483us/step - accuracy: 0.9985 - loss: 0.0107 - precision_1: 0.0000e+00 - recall_1: 0.0000e+00 - val_accuracy: 0.9983 - val_loss: 0.0114 - val_precision_1: 0.0000e+00 - val_recall_1: 0.0000e+00
Epoch 4/50
[1m2646/2646[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 476us/step - accuracy: 0.9982 - loss: 0.0112 - precision_1: 0.0000e+00 - recall_1: 0.0000e+00 - val_accuracy: 0.9983



[1m662/662[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 211us/step
Confusion Matrix:
[[21129     0]
 [   35     0]]

Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00     21129
           1       0.00      0.00      0.00        35

    accuracy                           1.00     21164
   macro avg       0.50      0.50      0.50     21164
weighted avg       1.00      1.00      1.00     21164



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [None]:

# Predict on a new file
prob = detector.predict_bugs(model, scaler, "/tmp/Math_35_b/src/test/java/org/apache/commons/math3/geometry/euclidean/twod/LineTest.java")
print(f"Bug probability: {prob:.2f}")

ValueError: The feature names should match those that were passed during fit.
Feature names unseen at fit time:
- missing_feature_0
- missing_feature_1
- missing_feature_10
- missing_feature_11
- missing_feature_12
- ...
Feature names seen at fit time, yet now missing:
- ast_AnnotationDeclaration
- ast_AnnotationMethod
- ast_ArrayCreator
- ast_ArrayInitializer
- ast_ArraySelector
- ...
