Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 70 additions & 27 deletions src/pyspector/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,40 +125,75 @@ def should_skip_file(file_path: Path) -> bool:
return False


def get_python_file_asts(path: Path) -> List[Dict[str, Any]]:
def get_python_file_asts(path: Path, enable_syntax_warnings: bool = False) -> List[Dict[str, Any]]:
"""Recursively finds Python files and returns their content and AST."""
results = []
files_to_scan = list(path.glob('**/*.py')) if path.is_dir() else [path]
files_to_scan = list(path.glob("**/*.py")) if path.is_dir() else [path]

# Suppress Python's SyntaxWarning during AST parsing
# Suppress or treat Python's SyntaxWarning as errors during AST parsing
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=SyntaxWarning)

if not enable_syntax_warnings:
warnings.filterwarnings('ignore', category=SyntaxWarning)
else:
warnings.filterwarnings('error', category=SyntaxWarning)

for py_file in files_to_scan:
if py_file.is_file():
# Skip test fixtures
# Pre-compute the relative path to maintain consistent log messages
display_path = py_file.relative_to(path) if path.is_dir() else py_file.name

# Skip test fixtures and notify the user
if should_skip_file(py_file):
click.echo(
click.style(
f"Info: Skipped {display_path} (test file or fixture)",
fg="blue",
)
)
continue

try:
content = py_file.read_text(encoding='utf-8')
content = py_file.read_text(encoding="utf-8")
parsed_ast = ast.parse(content, filename=str(py_file))
ast_json = json.dumps(parsed_ast, cls=AstEncoder)
results.append({
"file_path": str(py_file.relative_to(path)) if path.is_dir() else py_file.name,
"content": content,
"ast_json": ast_json
})
results.append(
{
"file_path": str(display_path),
"content": content,
"ast_json": ast_json,
}
)
except SyntaxWarning as e:
# Log a warning when AST parsing fails due to Python syntax warning
click.echo(
click.style(
f"SyntaxWarning: there is a syntax warning in {display_path} - {e.msg} (line {e.lineno})",
fg="yellow",
)
)
except SyntaxError as e:
# Only warn about syntax errors in non-test files
if not should_skip_file(py_file):
click.echo(click.style(
f"Warning: Could not parse {py_file.relative_to(path) if path.is_dir() else py_file.name}: {e.msg} ({py_file.name}, line {e.lineno})",
fg="yellow"
))
# Log a error when AST parsing fails due to invalid Python syntax
click.echo(
click.style(
f"SyntaxError: Could not parse {display_path} - {e.msg} (line {e.lineno})",
fg="red",
)
)
except UnicodeDecodeError as e:
click.echo(click.style(f"Warning: Could not read {py_file}: {e}", fg="yellow"))

# Log a warning when a file cannot be read as utf-8
click.echo(
click.style(
f"Warning: Could not read {display_path} - Invalid UTF-8 encoding ({e.reason})",
fg="yellow",
)
)
except Exception as e:
click.echo(
click.style(
f"Warning: Could not read {display_path} - {e}", fg="yellow"
)
)

return results


Expand Down Expand Up @@ -308,6 +343,8 @@ def run_wizard():

supply_chain = click.confirm("Check dependencies for CVE vulnerabilities?", default=False)

syntax_warnings = click.confirm("Treat Python SyntaxWarnings as errors?", default=False)


output_file = None
if report_format != "console":
Expand All @@ -325,6 +362,7 @@ def run_wizard():
"report_format": report_format,
"output_file": output_file,
"supply_chain_scan": supply_chain,
"syntax_warnings": syntax_warnings,
}


Expand All @@ -342,6 +380,7 @@ def run_wizard():
@click.option('--plugin-config', 'plugin_config_file', type=click.Path(exists=True, path_type=Path), help="Path to plugin configuration JSON file")
@click.option('--list-plugins', 'list_plugins', is_flag=True, help="List available plugins and exit")
@click.option('--supply-chain', is_flag=True, default=False, help="Scan dependencies for known CVE vulnerabilities.")
@click.option('--syntax-warnings', is_flag=True, default=False, help="Treat SyntaxWarning as errors during parsing.")
@click.option('--wizard', is_flag=True, help="Interactive guided scan for first-time users")
def run_scan_command(
path: Optional[Path],
Expand All @@ -355,6 +394,7 @@ def run_scan_command(
plugin_config_file: Optional[Path],
list_plugins: bool,
supply_chain: bool,
syntax_warnings: bool,
wizard: bool
):
"""The main scan command with plugin support."""
Expand Down Expand Up @@ -391,7 +431,8 @@ def run_scan_command(
params["ai_scan"],
plugins=(),
plugin_config={},
supply_chain_scan=params["supply_chain_scan"]
supply_chain_scan=params["supply_chain_scan"],
syntax_warnings=params["syntax_warnings"]
)
else:
_execute_scan(
Expand All @@ -403,7 +444,8 @@ def run_scan_command(
params["ai_scan"],
plugins=(),
plugin_config={},
supply_chain_scan=params["supply_chain_scan"]
supply_chain_scan=params["supply_chain_scan"],
syntax_warnings=params["syntax_warnings"]
)
return

Expand Down Expand Up @@ -468,7 +510,7 @@ def run_scan_command(
)
scan_path = Path(temp_dir)
scan_path = Path(temp_dir)
_execute_scan(scan_path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain)
_execute_scan(scan_path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain, syntax_warnings)
except subprocess.CalledProcessError as e:
click.echo(click.style(f"Error: Failed to clone repository.\n{e.stderr}", fg="red"))
sys.exit(1)
Expand All @@ -479,7 +521,7 @@ def run_scan_command(
# Handle local path scan
scan_path = path
scan_path = path
_execute_scan(scan_path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain)
_execute_scan(scan_path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain, syntax_warnings)
return


Expand All @@ -492,7 +534,8 @@ def _execute_scan(
ai_scan: bool,
plugins: tuple,
plugin_config: dict,
supply_chain_scan: bool = False
supply_chain_scan: bool = False,
syntax_warnings: bool = False
):
"""Helper function to run the actual scan and reporting."""
start_time = time.time()
Expand All @@ -515,7 +558,7 @@ def _execute_scan(
click.echo(click.style(f"Warning: Could not parse baseline file '{baseline_path}'.", fg="yellow"))

# --- AST Generation for Python files ---
python_files_data = get_python_file_asts(scan_path)
python_files_data = get_python_file_asts(scan_path, enable_syntax_warnings=syntax_warnings)
click.echo(f"[*] Successfully parsed {len(python_files_data)} Python files")

# --- Supply Chain Scanning ---
Expand Down
74 changes: 74 additions & 0 deletions tests/unit/test_get_asts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import unittest
import tempfile
import json
from pathlib import Path
from unittest.mock import patch, call

from pyspector.cli import get_python_file_asts


class TestGetPythonFileAsts(unittest.TestCase):

def setUp(self):
# Create a temporary directory structure for tests
self.test_dir = tempfile.TemporaryDirectory()
self.base_path = Path(self.test_dir.name)

# Valid python file
self.valid_file = self.base_path / "valid.py"
self.valid_file.write_text("x = 10", encoding="utf-8")

# Syntax warning file
self.warning_syntax = self.base_path / "warning_err.py"
self.warning_syntax.write_bytes(b'path = "c:\windows"')

# Invalid syntax file
self.invalid_syntax = self.base_path / "syntax_err.py"
self.invalid_syntax.write_text("def broken_function(:", encoding="utf-8")

# Encoding error file
self.encoding_err = self.base_path / "encoding_err.py"
self.encoding_err.write_bytes(b"\xff\xfe\x00\x00")

# Fixture file (should be skipped)
self.fixture_dir = self.base_path / "tests" / "fixtures"
self.fixture_dir.mkdir(parents=True)
self.fixture_file = self.fixture_dir / "fixture_file.py"
self.fixture_file.write_text("y = 20", encoding="utf-8")

def tearDown(self):
self.test_dir.cleanup()

# @patch('pyspector.cli.click.echo')
# @patch('pyspector.cli.click.style', side_effect=lambda msg, fg=None, **kwargs: msg)
def test_get_python_file_asts_handling_default(self):
"""Test that by default SyntaxWarnings are ignored and files are included."""
# Run function with default (enable_syntax_warnings=False)
results = get_python_file_asts(self.base_path)

# We expect BOTH the valid python file AND the warning file to be in the result
# because the warning is ignored and parsing proceeds.
self.assertEqual(len(results), 2)
filenames = [r["file_path"] for r in results]
self.assertIn("valid.py", filenames)
self.assertIn("warning_err.py", filenames)

def test_get_python_file_asts_handling_enabled(self):
"""Test that when enabled, SyntaxWarnings are treated as errors and files are excluded."""
# Run function with enable_syntax_warnings=True
results = get_python_file_asts(self.base_path, enable_syntax_warnings=True)

# We expect ONLY the valid python file to be in the result
# because the warning_err.py triggers an exception and is caught.
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["file_path"], "valid.py")
self.assertEqual(results[0]["content"], "x = 10")
self.assertIn("ast_json", results[0])

# Verify JSON properties exist
ast_obj = json.loads(results[0]["ast_json"])
self.assertEqual(ast_obj["node_type"], "Module")


if __name__ == "__main__":
unittest.main()
Loading