Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 91 additions & 62 deletions cli/src/fuzzforge_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,24 @@
#
# Additional attribution and requirements are provided in the NOTICE file.

import sys
from typing import List, Optional

import typer
from rich.console import Console
from rich.traceback import install
from typing import Optional, List
import sys

from .commands import (
init,
workflows,
workflow_exec,
ai,
findings,
ingest,
init,
monitor,
workflow_exec,
workflows,
)
from .commands import (
config as config_cmd,
ai,
ingest,
)
from .constants import DEFAULT_VOLUME_MODE
from .fuzzy import enhanced_command_not_found_handler
Expand Down Expand Up @@ -78,25 +80,30 @@

# === Top-level commands ===


@app.command()
def init(
name: Optional[str] = typer.Option(
None, "--name", "-n",
help="Project name (defaults to current directory name)"
None, "--name", "-n", help="Project name (defaults to current directory name)"
),
api_url: Optional[str] = typer.Option(
None, "--api-url", "-u",
help="FuzzForge API URL (defaults to http://localhost:8000)"
None,
"--api-url",
"-u",
help="FuzzForge API URL (defaults to http://localhost:8000)",
),
force: bool = typer.Option(
False, "--force", "-f",
help="Force initialization even if project already exists"
)
False,
"--force",
"-f",
help="Force initialization even if project already exists",
),
):
"""
📁 Initialize a new FuzzForge project
"""
from .commands.init import project

project(name=name, api_url=api_url, force=force)


Expand All @@ -106,18 +113,18 @@ def status():
📊 Show project and latest execution status
"""
from .commands.status import show_status

show_status()


@app.command()
def config(
key: Optional[str] = typer.Argument(None, help="Configuration key"),
value: Optional[str] = typer.Argument(None, help="Configuration value to set")
value: Optional[str] = typer.Argument(None, help="Configuration value to set"),
):
"""
⚙️ Manage configuration (show all, get, or set values)
"""
from .commands import config as config_cmd

if key is None:
# No arguments: show all config
Expand All @@ -133,13 +140,11 @@ def config(
@app.command()
def clean(
days: int = typer.Option(
90, "--days", "-d",
help="Remove data older than this many days"
90, "--days", "-d", help="Remove data older than this many days"
),
dry_run: bool = typer.Option(
False, "--dry-run",
help="Show what would be deleted without actually deleting"
)
False, "--dry-run", help="Show what would be deleted without actually deleting"
),
):
"""
🧹 Clean old execution data and findings
Expand All @@ -155,7 +160,9 @@ def clean(
raise typer.Exit(1)

if dry_run:
console.print(f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days")
console.print(
f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days"
)

deleted = db.cleanup_old_runs(keep_days=days)

Expand All @@ -177,35 +184,41 @@ def clean(
workflow_app.command("info")(workflows.workflow_info)
workflow_app.command("params")(workflows.workflow_parameters)


@workflow_app.command("run")
def run_workflow(
workflow: str = typer.Argument(help="Workflow name"),
target: str = typer.Argument(help="Target path"),
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"),
params: List[str] = typer.Argument(
default=None, help="Parameters as key=value pairs"
),
param_file: Optional[str] = typer.Option(
None, "--param-file", "-f",
help="JSON file containing workflow parameters"
None, "--param-file", "-f", help="JSON file containing workflow parameters"
),
volume_mode: str = typer.Option(
DEFAULT_VOLUME_MODE, "--volume-mode", "-v",
help="Volume mount mode: ro (read-only) or rw (read-write)"
DEFAULT_VOLUME_MODE,
"--volume-mode",
"-v",
help="Volume mount mode: ro (read-only) or rw (read-write)",
),
timeout: Optional[int] = typer.Option(
None, "--timeout", "-t",
help="Execution timeout in seconds"
None, "--timeout", "-t", help="Execution timeout in seconds"
),
interactive: bool = typer.Option(
True, "--interactive/--no-interactive", "-i/-n",
help="Interactive parameter input for missing required parameters"
True,
"--interactive/--no-interactive",
"-i/-n",
help="Interactive parameter input for missing required parameters",
),
wait: bool = typer.Option(
False, "--wait", "-w",
help="Wait for execution to complete"
False, "--wait", "-w", help="Wait for execution to complete"
),
live: bool = typer.Option(
False, "--live", "-l",
help="Start live monitoring after execution (useful for fuzzing workflows)"
)
False,
"--live",
"-l",
help="Start live monitoring after execution (useful for fuzzing workflows)",
),
):
"""
🚀 Execute a security testing workflow
Expand All @@ -221,9 +234,10 @@ def run_workflow(
timeout=timeout,
interactive=interactive,
wait=wait,
live=live
live=live,
)


@workflow_app.callback()
def workflow_main():
"""
Expand All @@ -239,17 +253,18 @@ def workflow_main():

# === Finding commands (singular) ===


@finding_app.command("export")
def export_finding(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID (defaults to latest)"),
execution_id: Optional[str] = typer.Argument(
None, help="Execution ID (defaults to latest)"
),
format: str = typer.Option(
"sarif", "--format", "-f",
help="Export format: sarif, json, csv"
"sarif", "--format", "-f", help="Export format: sarif, json, csv"
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file (defaults to stdout)"
)
None, "--output", "-o", help="Output file (defaults to stdout)"
),
):
"""
📤 Export findings to file
Expand All @@ -270,7 +285,9 @@ def export_finding(
execution_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {execution_id}")
else:
console.print("⚠️ No findings found in project database", style="yellow")
console.print(
"⚠️ No findings found in project database", style="yellow"
)
return
else:
console.print("❌ No project database found", style="red")
Expand All @@ -283,14 +300,16 @@ def export_finding(

@finding_app.command("analyze")
def analyze_finding(
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze")
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze"),
):
"""
🤖 AI analysis of a finding
"""
from .commands.ai import analyze_finding as ai_analyze

ai_analyze(finding_id)


@finding_app.callback(invoke_without_command=True)
def finding_main(
ctx: typer.Context,
Expand All @@ -309,7 +328,7 @@ def finding_main(
return

# Get remaining arguments for direct viewing
args = ctx.args if hasattr(ctx, 'args') else []
args = ctx.args if hasattr(ctx, "args") else []
finding_id = args[0] if args else None

# Direct viewing: fuzzforge finding [id]
Expand All @@ -329,7 +348,9 @@ def finding_main(
finding_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {finding_id}")
else:
console.print("⚠️ No findings found in project database", style="yellow")
console.print(
"⚠️ No findings found in project database", style="yellow"
)
return
else:
console.print("❌ No project database found", style="red")
Expand All @@ -355,6 +376,7 @@ def finding_main(
app.add_typer(ai.app, name="ai", help="🤖 AI integration features")
app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI")


# Help and utility commands
@app.command()
def examples():
Expand All @@ -372,7 +394,6 @@ def examples():
[bold]Execute Workflows:[/bold]
ff workflow afl-fuzzing ./target # Run fuzzing on target
ff workflow afl-fuzzing . --live # Run with live monitoring
ff workflow scan-c ./src timeout=300 threads=4 # With parameters

[bold]Monitor Execution:[/bold]
ff status # Check latest execution
Expand All @@ -399,16 +420,16 @@ def version():
📦 Show version information
"""
from . import __version__

console.print(f"FuzzForge CLI v{__version__}")
console.print(f"Short command: ff")
console.print("Short command: ff")


@app.callback()
def main_callback(
ctx: typer.Context,
version: Optional[bool] = typer.Option(
None, "--version", "-v",
help="Show version information"
None, "--version", "-v", help="Show version information"
),
):
"""
Expand All @@ -422,6 +443,7 @@ def main_callback(
"""
if version:
from . import __version__

console.print(f"FuzzForge CLI v{__version__}")
raise typer.Exit()

Expand All @@ -432,12 +454,11 @@ def main():
if len(sys.argv) > 1:
args = sys.argv[1:]


# Handle finding command with pattern recognition
if len(args) >= 2 and args[0] == 'finding':
finding_subcommands = ['export', 'analyze']
if len(args) >= 2 and args[0] == "finding":
finding_subcommands = ["export", "analyze"]
# Skip custom dispatching if help flags are present
if not any(arg in ['--help', '-h', '--version', '-v'] for arg in args):
if not any(arg in ["--help", "-h", "--version", "-v"] for arg in args):
if args[1] not in finding_subcommands:
# Direct finding display: ff finding <id>
from .commands.findings import get_findings
Expand All @@ -457,18 +478,26 @@ def main():
app()
except SystemExit as e:
# Enhanced error handling for command not found
if hasattr(e, 'code') and e.code != 0 and len(sys.argv) > 1:
if hasattr(e, "code") and e.code != 0 and len(sys.argv) > 1:
command_parts = sys.argv[1:]
clean_parts = [part for part in command_parts if not part.startswith('-')]
clean_parts = [part for part in command_parts if not part.startswith("-")]

if clean_parts:
main_cmd = clean_parts[0]
valid_commands = [
'init', 'status', 'config', 'clean',
'workflows', 'workflow',
'findings', 'finding',
'monitor', 'ai', 'ingest',
'examples', 'version'
"init",
"status",
"config",
"clean",
"workflows",
"workflow",
"findings",
"finding",
"monitor",
"ai",
"ingest",
"examples",
"version",
]

if main_cmd not in valid_commands:
Expand Down