# HPC Fuzzing Pipeline with Dynamic Image Building

Complete fuzzing analysis pipeline using dynamic Singularity container building for HPC clusters.

## Pipeline Overview:

### STATIC ANALYSIS (Host)
1. Patch Loading
2. Repository Setup  
3. Static Analysis (Pylint, Flake8, Radon, Mypy, Bandit)

### DYNAMIC ANALYSIS (Container)
4. Build Instance-Specific Singularity Container (Dynamic)
5. Install Dependencies
6. Run Existing Tests
7. Patch Analysis
8. Generate Hypothesis Tests
9. Execute Tests
10. Coverage Analysis
11. Final Verdict

## Key Features:
- **Dynamic Image Building**: Automatically builds containers based on SWE-bench instance
- **Docker Resolution**: Finds appropriate Docker images from multiple sources
- **Caching**: Reuses built images for efficiency
- **HPC Optimized**: Designed for cluster environments without Docker

## Imports

In [None]:
import sys, subprocess
from pathlib import Path
import json, time, ast
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

PROJECT_ROOT = Path.cwd()
sys.path.append(str(PROJECT_ROOT))

# Dataset and patch handling
from swebench_integration import DatasetLoader, PatchLoader

# NEW: Dynamic container building system
from swebench_singularity import Config, SingularityBuilder, DockerImageResolver

# Existing analysis modules
from verifier.dynamic_analyzers.patch_analyzer import PatchAnalyzer
from verifier.dynamic_analyzers.test_generator import HypothesisTestGenerator
from verifier.dynamic_analyzers.singularity_executor import SingularityTestExecutor
from verifier.dynamic_analyzers.coverage_analyzer import CoverageAnalyzer
from verifier.dynamic_analyzers.test_patch_singularity import (
    install_package_in_singularity, 
    run_tests_in_singularity
)

# Static analysis
import streamlit.modules.static_eval.static_modules.code_quality as code_quality
import streamlit.modules.static_eval.static_modules.syntax_structure as syntax_structure
from verifier.utils.diff_utils import parse_unified_diff, filter_paths_to_py

sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
print("‚úì Imports OK")

## Configuration Setup

Configure the dynamic container building system for HPC environment.

In [None]:
# Initialize configuration
config = Config()

# Override defaults for HPC cluster
config.set("singularity.cache_dir", "/fs/nexus-scratch/ihbas/.cache/swebench_singularity")
config.set("singularity.tmp_dir", "/fs/nexus-scratch/ihbas/.tmp/singularity_build")
config.set("singularity.cache_internal_dir", "/fs/nexus-scratch/ihbas/.singularity/cache")
config.set("singularity.build_timeout", 1800)  # 30 minutes
config.set("docker.max_retries", 3)

# Initialize builder
builder = SingularityBuilder(config)
resolver = DockerImageResolver(config)

print("‚úì Configuration initialized")
print(f"  Cache: {config.singularity_cache_dir}")
print(f"  Tmp: {config.singularity_tmp_dir}")
print(f"  Singularity available: {builder.check_singularity_available()}")
print(f"  Docker available: {builder.check_docker_available()}")

---
# STATIC ANALYSIS
---

## Stage 1: Load Patch

In [None]:
REPO_FILTER = "scikit-learn/scikit-learn"  # Example: pytest-dev/pytest, pylint-dev/pylint

loader = DatasetLoader("princeton-nlp/SWE-bench_Verified", hf_mode=True, split="test")
sample = next(loader.iter_samples(limit=1, filter_repo=REPO_FILTER), None)

if sample:
    instance_id = sample.get('metadata', {}).get('instance_id', 'unknown')
    print(f"‚úì {instance_id}")
    print(f"  Repo: {sample['repo']}")
    print(f"\nPatch preview:\n{sample['patch'][:400]}...")
else:
    raise Exception(f"No sample found for {REPO_FILTER}")

## Stage 2: Setup Repository

In [None]:
patcher = PatchLoader(sample=sample, repos_root="./repos_temp")
repo_path = patcher.clone_repository()
patch_result = patcher.apply_patch()

print(f"‚úì Repo: {repo_path}")
print(f"‚úì Patch: {'Applied' if patch_result['applied'] else 'FAILED'}")

## Stage 2b: Apply Test Patch (if exists)

In [None]:
# In SWE-bench, test_patch contains additional tests needed to validate the fix
test_patch = sample.get('metadata', {}).get('test_patch', '')

if test_patch and test_patch.strip():
    print("üìù Applying test_patch...")
    try:
        test_patch_result = patcher.apply_additional_patch(test_patch)
        print(f"‚úì Test patch applied: {test_patch_result.get('log', 'success')}")
    except Exception as e:
        print(f"‚ö†Ô∏è Test patch application failed: {e}")
else:
    print("‚ÑπÔ∏è  No test_patch in metadata (tests already exist in repo)")

## Stage 3: Static Analysis

In [None]:
static_config = {
    'checks': {'pylint': True, 'flake8': True, 'radon': True, 'mypy': True, 'bandit': True},
    'weights': {'pylint': 0.5, 'flake8': 0.15, 'radon': 0.25, 'mypy': 0.05, 'bandit': 0.05}
}

print("üîç Static analysis...")
cq_results = code_quality.analyze(str(repo_path), sample['patch'], static_config)
ss_results = syntax_structure.run_syntax_structure_analysis(str(repo_path), sample['patch'])

sqi_data = cq_results.get('sqi', {})
print(f"‚úì SQI: {sqi_data.get('SQI', 0)}/100 ({sqi_data.get('classification', 'Unknown')})")

---
# DYNAMIC ANALYSIS (Container)
---

## Stage 4: Build Instance-Specific Container (Dynamic)

This is the key improvement: dynamically build a Singularity container based on the SWE-bench instance.
The system will:
1. Resolve the appropriate Docker image for this instance
2. Check the cache for an existing build
3. Build the Singularity container if needed
4. Cache the result for future use

In [None]:
print(f"üê≥ Building container for {instance_id}...\n")

# Check what Docker image will be used
docker_image = resolver.find_available_image(instance_id, check_existence=False)
if docker_image:
    print(f"  Docker image: {docker_image.full_name}")
    print(f"  Registry: {docker_image.registry}")
    print(f"  Tag: {docker_image.tag}\n")

# Build the container (will use cache if available)
build_result = builder.build_instance(
    instance_id=instance_id,
    force_rebuild=False,  # Set to True to force rebuild
    check_docker_exists=False  # Set to True to verify Docker image exists
)

if build_result.success:
    CONTAINER_IMAGE_PATH = build_result.sif_path
    cache_status = "(from cache)" if build_result.from_cache else "(newly built)"
    print(f"‚úì Container ready {cache_status}")
    print(f"  Path: {CONTAINER_IMAGE_PATH}")
    print(f"  Build time: {build_result.build_time_seconds:.1f}s")
    
    # Verify container works
    result = subprocess.run(
        ["singularity", "exec", str(CONTAINER_IMAGE_PATH), "python", "--version"],
        capture_output=True, text=True
    )
    print(f"  Python: {result.stdout.strip()}")
else:
    print(f"‚ùå Container build failed: {build_result.error_message}")
    raise Exception("Cannot proceed without container")

## Stage 5: Install Dependencies

In [None]:
print("üì¶ Installing dependencies...\n")

install_result = install_package_in_singularity(
    repo_path=Path(repo_path),
    image_path=str(CONTAINER_IMAGE_PATH)
)

if install_result.get("installed"):
    print("‚úì Dependencies installed")
elif install_result.get("returncode") != 0:
    print(f"‚ö†Ô∏è Install issues (code {install_result.get('returncode')})")
    print(install_result.get('stderr', '')[-500:])
else:
    print("‚ö†Ô∏è No setup.py/pyproject.toml (will use PYTHONPATH mode)")

## Stage 6: Run Existing Tests

In [None]:
print("üß™ Running existing tests...\n")

# Get tests from metadata
fail_to_pass = sample.get('metadata', {}).get('FAIL_TO_PASS', '[]')
pass_to_pass = sample.get('metadata', {}).get('PASS_TO_PASS', '[]')

print(f"  FAIL_TO_PASS: {fail_to_pass}")
print(f"  PASS_TO_PASS: {pass_to_pass}\n")

# Parse test lists
try:
    f2p = ast.literal_eval(fail_to_pass) if isinstance(fail_to_pass, str) else fail_to_pass
    p2p = ast.literal_eval(pass_to_pass) if isinstance(pass_to_pass, str) else pass_to_pass
except:
    f2p, p2p = [], []

all_tests = f2p + p2p

# Run tests using the dynamically built container
test_result = run_tests_in_singularity(
    repo_path=Path(repo_path),
    tests=all_tests,
    image_path=str(CONTAINER_IMAGE_PATH)
)

print(f"Exit code: {test_result['returncode']}")
print((test_result['stdout'] + test_result['stderr'])[-1500:])
print(f"\n{'‚úì' if test_result['returncode'] == 0 else '‚ö†Ô∏è'} Tests {'passed' if test_result['returncode'] == 0 else 'had issues'}")

## Stage 7: Analyze Patch

In [None]:
print("üîç Analyzing patch...")

patch_analyzer = PatchAnalyzer()
modified_files = filter_paths_to_py(list(parse_unified_diff(sample['patch']).keys()))

if modified_files:
    first_file_path = modified_files[0]
    first_file = Path(repo_path) / first_file_path
    patched_code = first_file.read_text(encoding='utf-8')
    
    patch_analysis = patch_analyzer.parse_patch(sample['patch'], patched_code, file_path=first_file_path)
    
    print(f"‚úì Files: {len(modified_files)}")
    print(f"  Module: {patch_analysis.module_path}")
    print(f"  Functions: {patch_analysis.changed_functions}")
    if patch_analysis.class_context:
        print(f"  Classes: {list(patch_analysis.class_context.values())}")
    print(f"  Lines: {len(patch_analysis.all_changed_lines)}")
else:
    patch_analysis = None
    patched_code = None
    print("‚ö†Ô∏è No Python files modified")

## Stage 8: Generate Hypothesis Tests

In [None]:
if patch_analysis and patch_analysis.changed_functions:
    print("üß¨ Generating change-aware fuzzing tests...")
    
    test_generator = HypothesisTestGenerator()
    test_code = test_generator.generate_tests(patch_analysis, patched_code)
    test_count = test_code.count('def test_')
    
    print(f"‚úì Generated {test_count} tests")
    print(f"\nPreview:\n{test_code[:600]}...")
else:
    test_code = None
    test_count = 0
    print("‚ÑπÔ∏è  No testable changes found")

## Stage 9: Execute Fuzzing Tests in Container

In [None]:
if test_code:
    print("üê≥ Executing change-aware fuzzing tests in container...\n")
    
    executor = SingularityTestExecutor(str(CONTAINER_IMAGE_PATH), timeout=120)
    start = time.time()
    
    try:
        module_name = patch_analysis.module_path if patch_analysis else None
        
        success, output, coverage_data = executor.run_tests_with_existing_infrastructure(
            Path(repo_path), 
            test_code,
            module_name=module_name
        )
        
        elapsed = time.time() - start
        print(f"{'‚úì PASSED' if success else '‚ùå FAILED'} ({elapsed:.1f}s)\n")
        
        # Show output
        print("=== TEST OUTPUT ===")
        print(output[-2000:])
        print("=== END OUTPUT ===\n")
        
    except Exception as e:
        print(f"‚ùå Error: {e}")
        import traceback
        traceback.print_exc()
        success = False
        coverage_data = {}
else:
    success = True
    coverage_data = {}
    print("‚ÑπÔ∏è  No tests to execute")

## Stage 10: Coverage Analysis

In [None]:
if patch_analysis and coverage_data:
    if coverage_data.get('_coverage_skipped'):
        print(f"‚ÑπÔ∏è  Coverage skipped: {coverage_data.get('_skip_reason', 'N/A')}")
        coverage_result = {'overall_coverage': None, 'total_changed_lines': 0, 'total_covered_lines': 0, 'skipped': True}
    else:
        coverage_analyzer = CoverageAnalyzer()
        coverage_result = coverage_analyzer.calculate_changed_line_coverage(
            coverage_data, patch_analysis.changed_lines, patch_analysis.all_changed_lines
        )
        print(f"üìä Coverage: {coverage_result['overall_coverage']:.1%}")
        print(f"   {coverage_result['total_covered_lines']}/{coverage_result['total_changed_lines']} lines")
else:
    coverage_result = {'overall_coverage': 0.0, 'total_changed_lines': 0, 'total_covered_lines': 0}
    print("‚ÑπÔ∏è  No coverage data available")

## Stage 11: Final Verdict

In [None]:
sqi_score = sqi_data.get('SQI', 0) / 100.0
coverage_score = coverage_result.get('overall_coverage', 0.0)
coverage_skipped = coverage_result.get('skipped', False)

# Determine verdict
if sqi_score < 0.5:
    verdict = 'REJECT'
    reason = f'Poor SQI ({sqi_score:.2f})'
elif not success:
    verdict = 'REJECT'
    reason = 'Fuzzing tests failed'
elif coverage_skipped:
    verdict = 'ACCEPT' if test_count > 0 else 'WARNING'
    reason = 'Coverage N/A (internal module)' if test_count > 0 else 'No tests generated'
elif coverage_score is not None and coverage_score < 0.5:
    verdict = 'WARNING'
    reason = f'Low coverage ({coverage_score:.1%})'
else:
    verdict = 'ACCEPT'
    reason = 'All checks passed'

# Display verdict
print("\n" + "="*80)
print("FINAL VERDICT")
print("="*80)
print(f"{verdict}: {reason}")
print(f"\nInstance: {instance_id}")
print(f"Container: {CONTAINER_IMAGE_PATH.name}")
if coverage_score is not None:
    print(f"\nSQI: {sqi_score:.2%} | Tests: {test_count} | Coverage: {coverage_score:.1%}")
else:
    print(f"\nSQI: {sqi_score:.2%} | Tests: {test_count} | Coverage: N/A")
print("="*80)

## Pipeline Summary

This notebook demonstrates the complete fuzzing pipeline with dynamic container building:

### Key Improvements:
1. **Dynamic Image Building**: Automatically builds instance-specific containers
2. **Docker Resolution**: Finds images from multiple registries (aorwall, swebench, ghcr.io)
3. **Intelligent Caching**: Reuses containers when possible
4. **HPC Optimized**: Works without Docker daemon using Singularity native auth

### Next Steps:
- Scale to multiple instances using batch processing
- Integrate with SLURM for parallel execution
- Export results for analysis
- Monitor cache usage and performance