Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/toolbox/modules/analyzer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
# Additional attribution and requirements are provided in the NOTICE file.

from .security_analyzer import SecurityAnalyzer
from .bandit_analyzer import BanditAnalyzer
from .mypy_analyzer import MypyAnalyzer

__all__ = ["SecurityAnalyzer"]
__all__ = ["SecurityAnalyzer", "BanditAnalyzer", "MypyAnalyzer"]
328 changes: 328 additions & 0 deletions backend/toolbox/modules/analyzer/bandit_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
"""
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
"""

# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.

import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List

try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding

logger = logging.getLogger(__name__)


class BanditAnalyzer(BaseModule):
"""
Analyzes Python code for security issues using Bandit.

This module:
- Runs Bandit security linter on Python files
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
- Reports findings with severity levels
"""

# Severity mapping from Bandit levels to our standard
SEVERITY_MAP = {
"LOW": "low",
"MEDIUM": "medium",
"HIGH": "high"
}

def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="bandit_analyzer",
version="1.0.0",
description="Analyzes Python code for security issues using Bandit",
author="FuzzForge Team",
category="analyzer",
tags=["python", "security", "bandit", "sast"],
input_schema={
"severity_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum severity level to report",
"default": "low"
},
"confidence_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum confidence level to report",
"default": "medium"
},
"exclude_tests": {
"type": "boolean",
"description": "Exclude test files from analysis",
"default": True
},
"skip_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of Bandit test IDs to skip",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of security issues found by Bandit"
}
},
requires_workspace=True
)

def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
severity = config.get("severity_level", "low")
if severity not in ["low", "medium", "high"]:
raise ValueError("severity_level must be one of: low, medium, high")

confidence = config.get("confidence_level", "medium")
if confidence not in ["low", "medium", "high"]:
raise ValueError("confidence_level must be one of: low, medium, high")

skip_ids = config.get("skip_ids", [])
if not isinstance(skip_ids, list):
raise ValueError("skip_ids must be a list")

return True

async def _run_bandit(
self,
workspace: Path,
severity_level: str,
confidence_level: str,
exclude_tests: bool,
skip_ids: List[str]
) -> Dict[str, Any]:
"""
Run Bandit on the workspace.

Args:
workspace: Path to workspace
severity_level: Minimum severity to report
confidence_level: Minimum confidence to report
exclude_tests: Whether to exclude test files
skip_ids: List of test IDs to skip

Returns:
Bandit JSON output as dict
"""
try:
# Build bandit command
cmd = [
"bandit",
"-r", str(workspace),
"-f", "json",
"-ll", # Report all findings (we'll filter later)
]

# Add exclude patterns for test files
if exclude_tests:
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])

# Add skip IDs if specified
if skip_ids:
cmd.extend(["-s", ",".join(skip_ids)])

logger.info(f"Running Bandit on: {workspace}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await process.communicate()

# Bandit returns non-zero if issues found, which is expected
if process.returncode not in [0, 1]:
logger.error(f"Bandit failed: {stderr.decode()}")
return {"results": []}

# Parse JSON output
result = json.loads(stdout.decode())
return result

except Exception as e:
logger.error(f"Error running Bandit: {e}")
return {"results": []}

def _should_include_finding(
self,
issue: Dict[str, Any],
min_severity: str,
min_confidence: str
) -> bool:
"""
Determine if a Bandit issue should be included based on severity/confidence.

Args:
issue: Bandit issue dict
min_severity: Minimum severity threshold
min_confidence: Minimum confidence threshold

Returns:
True if issue should be included
"""
severity_order = ["low", "medium", "high"]
issue_severity = issue.get("issue_severity", "LOW").lower()
issue_confidence = issue.get("issue_confidence", "LOW").lower()

severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)

return severity_meets_threshold and confidence_meets_threshold

def _convert_to_findings(
self,
bandit_result: Dict[str, Any],
workspace: Path,
min_severity: str,
min_confidence: str
) -> List[ModuleFinding]:
"""
Convert Bandit results to ModuleFindings.

Args:
bandit_result: Bandit JSON output
workspace: Workspace path for relative paths
min_severity: Minimum severity to include
min_confidence: Minimum confidence to include

Returns:
List of ModuleFindings
"""
findings = []

for issue in bandit_result.get("results", []):
# Filter by severity and confidence
if not self._should_include_finding(issue, min_severity, min_confidence):
continue

# Extract issue details
test_id = issue.get("test_id", "B000")
test_name = issue.get("test_name", "unknown")
issue_text = issue.get("issue_text", "No description")
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")

# File location
filename = issue.get("filename", "")
line_number = issue.get("line_number", 0)
code = issue.get("code", "")

# Try to get relative path
try:
file_path = Path(filename)
rel_path = file_path.relative_to(workspace)
except (ValueError, TypeError):
rel_path = Path(filename).name

# Create finding
finding = self.create_finding(
title=f"{test_name} ({test_id})",
description=issue_text,
severity=severity,
category="security-issue",
file_path=str(rel_path),
line_start=line_number,
line_end=line_number,
code_snippet=code.strip() if code else None,
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
metadata={
"test_id": test_id,
"test_name": test_name,
"confidence": issue.get("issue_confidence", "LOW").lower(),
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
"more_info": issue.get("more_info", "")
}
)
findings.append(finding)

return findings

async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the Bandit analyzer module.

Args:
config: Module configuration
workspace: Path to workspace

Returns:
ModuleResult with security findings
"""
start_time = time.time()
metadata = self.get_metadata()

# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)

# Get configuration
severity_level = config.get("severity_level", "low")
confidence_level = config.get("confidence_level", "medium")
exclude_tests = config.get("exclude_tests", True)
skip_ids = config.get("skip_ids", [])

# Run Bandit
logger.info("Starting Bandit analysis...")
bandit_result = await self._run_bandit(
workspace,
severity_level,
confidence_level,
exclude_tests,
skip_ids
)

# Convert to findings
findings = self._convert_to_findings(
bandit_result,
workspace,
severity_level,
confidence_level
)

# Calculate summary
severity_counts = {}
for finding in findings:
sev = finding.severity
severity_counts[sev] = severity_counts.get(sev, 0) + 1

execution_time = time.time() - start_time

return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=findings,
summary={
"total_issues": len(findings),
"by_severity": severity_counts,
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
},
metadata={
"bandit_version": bandit_result.get("generated_at", "unknown"),
"metrics": bandit_result.get("metrics", {})
}
)
Loading