diff --git a/scripts/version_scanner/.gitignore b/scripts/version_scanner/.gitignore new file mode 100644 index 000000000000..3d90478f0355 --- /dev/null +++ b/scripts/version_scanner/.gitignore @@ -0,0 +1,2 @@ +.conductor/ +scanner_report.csv diff --git a/scripts/version_scanner/.scannerignore b/scripts/version_scanner/.scannerignore new file mode 100644 index 000000000000..43ca102e0333 --- /dev/null +++ b/scripts/version_scanner/.scannerignore @@ -0,0 +1,16 @@ +# Directories and files to ignore by the version scanner +.git +__pycache__ +.tox +.nox +venv +.venv +.conductor +version_scanner +docs +samples +changelog.md +.librarian +goldens +# Ignore pandoc references in repositories.bzl +repositories.bzl diff --git a/scripts/version_scanner/README.md b/scripts/version_scanner/README.md new file mode 100644 index 000000000000..85d59aa6dc45 --- /dev/null +++ b/scripts/version_scanner/README.md @@ -0,0 +1,31 @@ +# Automated Dependency Version Scanner + +This tool scans the repository for hardcoded references to specific dependency versions (like Python 3.7) that need to be upgraded or removed. + +## Usage + +Run the script from the repository root: + +```bash +python3 scripts/version_scanner/version_scanner.py -d -v [options] +``` + +### Options + +* `-d`, `--dependency`: Name of the dependency (e.g., python, protobuf) +* `-v`, `--version`: Specific version to search for (e.g., 3.7, 4.25.8) +* `-p`, `--path`: Root directory to scan (defaults to current directory) +* `--package`: Specific subdirectory filter (useful for monorepos) +* `--package-file`: Path to a file containing a list of package directories to scan +* `--config`: Path to the regex configuration file (defaults to scripts/version_scanner/regex_config.yaml) +* `-o`, `--output`: Path to the output CSV file (defaults to --.csv) +* `--github-repo`: GitHub repository URL base (defaults to https://github.com/googleapis/google-cloud-python) +* `--branch`: GitHub branch for links (defaults to main) + +## Configuration + +The scanner uses a YAML configuration file (`regex_config.yaml`) to define rules and regex patterns. + +## Ignoring Directories + +You can create a `.scannerignore` file in the directory you are scanning (usually the repo root) to list directories to skip, one per line. diff --git a/scripts/version_scanner/benchmark.py b/scripts/version_scanner/benchmark.py new file mode 100644 index 000000000000..36179af1dc4b --- /dev/null +++ b/scripts/version_scanner/benchmark.py @@ -0,0 +1,166 @@ +import argparse +import os +import random +import subprocess +import sys +import tempfile +import time +from typing import List, Dict + +def get_package_subset(packages_dir: str, count: int) -> List[str]: + """ + Get a randomized subset of package names from the specified directory. + + Args: + packages_dir: Path to the directory containing packages. + count: Number of packages to return. + + Returns: + A list of package directory names. + """ + all_packages = [d for d in os.listdir(packages_dir) if os.path.isdir(os.path.join(packages_dir, d))] + + if count >= len(all_packages): + return all_packages + + return random.sample(all_packages, count) + +def run_benchmark( + scanner_path: str, + root_path: str, + package_file: str, + dependency: str, + version: str +) -> float: + """ + Run the scanner and return the duration in seconds. + """ + cmd = [ + "python3", scanner_path, + "-d", dependency, + "-v", version, + "-p", root_path, + "--package-file", package_file + ] + + start_time = time.perf_counter() + + try: + result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + except subprocess.CalledProcessError as e: + print(f"Error running benchmark: {e}") + return -1.0 + + duration = time.perf_counter() - start_time + return duration + +def run_benchmarks( + scanner_path: str, + root_path: str, + packages_dir: str, + counts: List[int], + dependency: str, + version: str +) -> Dict[int, float]: + """Runs benchmarks for specified counts and returns a dict of results.""" + results = {} + + for count in counts: + subset = get_package_subset(packages_dir, count) + print(f" Testing {len(subset)} packages (e.g., {subset[:3]}...)") + + # Create temp package file + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + for pkg in subset: + f.write(f"packages/{pkg}\n") + pkg_file = f.name + + try: + duration = run_benchmark(scanner_path, root_path, pkg_file, dependency, version) + results[count] = duration + finally: + # Clean up + if os.path.exists(pkg_file): + os.remove(pkg_file) + + return results + +def main(): + parser = argparse.ArgumentParser(description="Benchmark the version scanner.") + + parser.add_argument( + "-s", "--scanner-path", + default="version_scanner.py", + help="Path to version_scanner.py" + ) + + parser.add_argument( + "-r", "--root-path", + required=True, + help="Path to the monorepo root directory" + ) + + parser.add_argument( + "-p", "--packages-dir", + help="Path to packages directory (defaults to /packages)" + ) + + parser.add_argument( + "-d", "--dependency", + default="python", + help="Dependency to search for" + ) + + parser.add_argument( + "-v", "--version", + default="3.7", + help="Version to search for" + ) + + parser.add_argument( + "-c", "--counts", + default="1,10,50", + help="Comma-separated list of package counts to test" + ) + + args = parser.parse_args() + + packages_dir = args.packages_dir or os.path.join(args.root_path, "packages") + + if not os.path.exists(packages_dir): + print(f"Error: Packages directory not found: {packages_dir}", file=sys.stderr) + sys.exit(1) + + counts = [int(c) for c in args.counts.split(',')] + + all_packages = [d for d in os.listdir(packages_dir) if os.path.isdir(os.path.join(packages_dir, d))] + + total_packages = len(all_packages) + + print(f"Found {total_packages} packages in {packages_dir}") + + # Filter counts that are greater than total packages + counts = [c for c in counts if c <= total_packages] + # Add total if not already there + if total_packages not in counts: + counts.append(total_packages) + + print(f"Running benchmarks for counts: {counts}") + + results = run_benchmarks( + scanner_path=args.scanner_path, + root_path=args.root_path, + packages_dir=packages_dir, + counts=counts, + dependency=args.dependency, + version=args.version + ) + + print("\nBenchmark Results:") + print(f"{'Packages':<10} | {'Time (seconds)':<15}") + print("-" * 30) + for count, duration in results.items(): + print(f"{count:<10} | {duration:<15.4f}") + +if __name__ == "__main__": + main() diff --git a/scripts/version_scanner/regex_config.yaml b/scripts/version_scanner/regex_config.yaml new file mode 100644 index 000000000000..07196c63edeb --- /dev/null +++ b/scripts/version_scanner/regex_config.yaml @@ -0,0 +1,102 @@ +description: Search rules for identifying dependency versions +rules: + - name: explicit_version_string + description: Finds explicit version strings in code or configs. + examples: + - "'3.7'" + - '"3.7.1"' + - "'3.7.12'" + - "Python 3.7" + rules: + - | + \b{major}\.{minor}(\.\d+)?\b + + - name: python_requires + description: Finds various forms of python_requires declarations. + applies_to: [python] + examples: + - "python_requires = '==3.7'" + - "python_requires = '>=3.7'" + - "python_requires = '<=3.7'" + - "python_requires = '>3.6'" + - "python_requires = '<3.8'" + rules: + - | + python_requires\s*=\s*['"]==3\.{minor}['"] + - | + python_requires\s*=\s*['"]>=3\.{minor}['"] + - | + python_requires\s*=\s*['"]<=3\.{minor}['"] + - | + python_requires\s*=\s*['"]>3\.{minor_minus_one}['"] + - | + python_requires\s*=\s*['"]<3\.{minor_plus_one}['"] + + - name: sys_version_info + description: Finds sys.version_info checks in code. + applies_to: [python] + examples: + - "sys.version_info == (3, 7)" + - "sys.version_info >= (3, 7)" + - "sys.version_info <= (3, 7)" + - "sys.version_info > (3, 6)" + - "sys.version_info < (3, 8)" + - "sys.version_info.minor == 7" + - "sys.version_info.minor >= 7" + - "sys.version_info.minor <= 7" + - "sys.version_info.minor > 6" + - "sys.version_info.minor < 8" + rules: + - | + sys\.version_info\s*==\s*\(3,\s*{minor}\) + - | + sys\.version_info\s*>=\s*\(3,\s*{minor}\) + - | + sys\.version_info\s*<=\s*\(3,\s*{minor}\) + - | + sys\.version_info\s*>\s*\(3,\s*{minor_minus_one}\) + - | + sys\.version_info\s*<\s*\(3,\s*{minor_plus_one}\) + - | + sys\.version_info\.minor\s*==\s*{minor} + - | + sys\.version_info\.minor\s*>=\s*{minor} + - | + sys\.version_info\.minor\s*<=\s*{minor} + - | + sys\.version_info\.minor\s*>\s*{minor_minus_one} + - | + sys\.version_info\.minor\s*<\s*{minor_plus_one} + + - name: python_env_short + description: Finds short python environment names often used in tox or nox. + applies_to: [python] + examples: + - "py37" + - "py37-cover" + rules: + - | + \bpy3{minor}\b + + - name: explicit_python_command + description: Finds explicit python commands with version. + applies_to: [python] + examples: + - "python3.7" + - "python3.7 -m pip" + - "Python3.7" + rules: + - | + python3\.{minor} + + - name: combined_version_string + description: Finds combined version strings often used in class or variable names. + applies_to: [python] + examples: + - "Python37" + - "Python37DeprecationWarning" + rules: + - | + Python{major}{minor} + + diff --git a/scripts/version_scanner/small_package_list.txt b/scripts/version_scanner/small_package_list.txt new file mode 100644 index 000000000000..06109994b7fa --- /dev/null +++ b/scripts/version_scanner/small_package_list.txt @@ -0,0 +1,5 @@ +packages/google-cloud-access-context-manager +packages/google-cloud-bigtable +packages/google-cloud-biglake-hive +packages/google-cloud-documentai-toolbox +packages/google-cloud-core diff --git a/scripts/version_scanner/tests/data/.kokoro/build.sh b/scripts/version_scanner/tests/data/.kokoro/build.sh new file mode 100644 index 000000000000..a3079c597bd1 --- /dev/null +++ b/scripts/version_scanner/tests/data/.kokoro/build.sh @@ -0,0 +1 @@ +python3.7 diff --git a/scripts/version_scanner/tests/data/packages/pkg_a/setup.py b/scripts/version_scanner/tests/data/packages/pkg_a/setup.py new file mode 100644 index 000000000000..a5ff7d1dc955 --- /dev/null +++ b/scripts/version_scanner/tests/data/packages/pkg_a/setup.py @@ -0,0 +1 @@ +python_requires = '>=3.7' diff --git a/scripts/version_scanner/tests/data/packages/pkg_b/clean.py b/scripts/version_scanner/tests/data/packages/pkg_b/clean.py new file mode 100644 index 000000000000..2f9a147db12e --- /dev/null +++ b/scripts/version_scanner/tests/data/packages/pkg_b/clean.py @@ -0,0 +1 @@ +print("Hello") diff --git a/scripts/version_scanner/tests/integration/test_scanner_integration.py b/scripts/version_scanner/tests/integration/test_scanner_integration.py new file mode 100644 index 000000000000..daa3ef19c7b9 --- /dev/null +++ b/scripts/version_scanner/tests/integration/test_scanner_integration.py @@ -0,0 +1,34 @@ +import csv +import os +import subprocess +import pytest + +def test_integration_scan(tmp_path): + # Paths to real tools + scanner_path = os.path.abspath("version_scanner.py") + config_path = os.path.abspath("regex_config.yaml") + + # Static data directory + data_dir = os.path.abspath("tests/data") + + # Run the scanner in the tmp_path so the output file is created there + cmd = [ + "python3", scanner_path, + "-d", "python", + "-v", "3.7", + "-p", data_dir, + "--config", config_path, + "-o", "scanner_report.csv" + ] + + result = subprocess.run(cmd, cwd=tmp_path, capture_output=True, text=True, check=True) + + report_file = tmp_path / "scanner_report.csv" + assert report_file.exists(), f"Report file not found. Stderr: {result.stderr}" + + with open(report_file, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + rows = list(reader) + + # We expect at least some matches when we build the data directory + assert len(rows) > 0 diff --git a/scripts/version_scanner/tests/unit/test_benchmark.py b/scripts/version_scanner/tests/unit/test_benchmark.py new file mode 100644 index 000000000000..128493342ae1 --- /dev/null +++ b/scripts/version_scanner/tests/unit/test_benchmark.py @@ -0,0 +1,77 @@ +import os +import pytest +from unittest.mock import patch +from benchmark import get_package_subset, run_benchmark, run_benchmarks + +def test_get_package_subset(tmp_path): + # Create mock packages directory + packages_dir = tmp_path / "packages" + packages_dir.mkdir() + + for i in range(10): + (packages_dir / f"pkg_{i}").mkdir() + + # Test getting a subset of 5 + subset = get_package_subset(str(packages_dir), 5) + assert len(subset) == 5 + for pkg in subset: + assert pkg.startswith("pkg_") + +def test_get_package_subset_all(tmp_path): + packages_dir = tmp_path / "packages" + packages_dir.mkdir() + + for i in range(5): + (packages_dir / f"pkg_{i}").mkdir() + + # Test getting all + subset = get_package_subset(str(packages_dir), 10) # Request more than available + assert len(subset) == 5 # Should return all available + +def test_run_benchmark(tmp_path): + # Create a dummy package file + package_file = tmp_path / "packages.txt" + package_file.write_text("pkg1\n") + + # Create dummy package directory + packages_dir = tmp_path / "packages" + packages_dir.mkdir() + (packages_dir / "pkg1").mkdir() + (packages_dir / "pkg1" / "test.py").write_text("version = '3.7'\n") + + scanner_path = "version_scanner.py" + + duration = run_benchmark( + scanner_path=scanner_path, + root_path=str(tmp_path), + package_file=str(package_file), + dependency="python", + version="3.7" + ) + + assert isinstance(duration, float) + assert duration >= 0 + +# Test run_benchmarks +@patch('benchmark.run_benchmark') +def test_run_benchmarks(mock_run, tmp_path): + mock_run.return_value = 1.5 + + packages_dir = tmp_path / "packages" + packages_dir.mkdir() + for i in range(5): + (packages_dir / f"pkg_{i}").mkdir() + + results = run_benchmarks( + scanner_path="dummy.py", + root_path=str(tmp_path), + packages_dir=str(packages_dir), + counts=[1, 3], + dependency="python", + version="3.7" + ) + + assert len(results) == 2 + assert results[1] == 1.5 + assert results[3] == 1.5 + assert mock_run.call_count == 2 diff --git a/scripts/version_scanner/tests/unit/test_version_scanner.py b/scripts/version_scanner/tests/unit/test_version_scanner.py new file mode 100644 index 000000000000..32042d34b137 --- /dev/null +++ b/scripts/version_scanner/tests/unit/test_version_scanner.py @@ -0,0 +1,466 @@ +import csv +import os +import re +from unittest import mock +from unittest.mock import patch +import pytest +import yaml +from version_scanner import ConfigManager, scan_file, write_csv_report + +# Test ConfigManager +@pytest.mark.parametrize("dependency, version, expected", [ + ( + "python", + "3.7", + {"name": "python", "version": "3.7", "major": "3", "minor": "7", "minor_plus_one": "8", "minor_minus_one": "6"} + ), + ( + "protobuf", + "4.25.8", + {"name": "protobuf", "version": "4.25.8", "major": "4", "minor": "25", "patch": "8", "minor_plus_one": "26", "minor_minus_one": "24"} + ), + ( + "foo", + "3", + {"name": "foo", "version": "3", "major": "3"} + ), +]) +def test_compute_variables(dependency, version, expected): + cm = ConfigManager("dummy_path", dependency, version) + vars = cm._compute_variables() + assert vars == expected + +# Test scan_file +def test_scan_file_positive(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("python_requires = '>=3.7'\n") + + rules = [ + {"name": "python_requires_check", "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]")} + ] + + results = scan_file(str(test_file), rules) + assert len(results) == 1 + assert results[0]["rule_name"] == "python_requires_check" + assert results[0]["line_number"] == 1 + assert results[0]["matched_string"] == "python_requires = '>=3.7'" + +def test_scan_file_negative(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("python_requires = '>=3.8'\n") + + rules = [ + {"name": "python_requires_check", "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]")} + ] + + results = scan_file(str(test_file), rules) + assert len(results) == 0 + +def test_scan_file_ignores_pragma(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("python_requires = '>=3.7' # version-scanner: ignore\n") + + rules = [ + {"name": "python_requires_check", "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]")} + ] + + results = scan_file(str(test_file), rules) + assert len(results) == 0 + +def test_scan_file_ignores_next_line(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("# version-scanner: ignore-next-line\npython_requires = '>=3.7'\n") + + rules = [ + {"name": "python_requires_check", "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]")} + ] + + results = scan_file(str(test_file), rules) + assert len(results) == 0 + +def test_scan_repository_flags_filename(tmp_path): + test_file = tmp_path / "test-3.9.txt" + test_file.write_text("clean content\n") + + rules = [] + + from version_scanner import scan_repository + results = scan_repository(str(tmp_path), rules, version_string="3.9") + + assert len(results) == 1 + assert results[0]["rule_name"] == "filename_match" + assert results[0]["matched_string"] == "3.9" + +# Test directory scan simulation +def test_directory_scan(tmp_path): + # Create dummy files + p1 = tmp_path / "pkg1" + p1.mkdir() + f1 = p1 / "setup.py" + f1.write_text("python_requires = '>=3.7'\n") + + p2 = tmp_path / "pkg2" + p2.mkdir() + f2 = p2 / "clean.py" + f2.write_text("print('Hello')\n") + + rules = [ + {"name": "python_requires_check", "pattern": re.compile(r"python_requires\s*=\s*['\"]>=3\.7['\"]")} + ] + + results = [] + for root, dirs, files in os.walk(tmp_path): + for file in files: + file_path = os.path.join(root, file) + results.extend(scan_file(file_path, rules)) + + assert len(results) == 1 + assert results[0]["rule_name"] == "python_requires_check" + +# Test write_csv_report +def test_write_csv_report(tmp_path): + output_file = tmp_path / "report.csv" + matches = [ + { + "file_path": "./setup.py", + "rule_name": "python_requires_check", + "line_number": 1, + "matched_string": "python_requires = '>=3.7'", + "context_line": "python_requires = '>=3.7'" + } + ] + + write_csv_report(str(output_file), matches) + + assert output_file.exists() + + with open(output_file, 'r', encoding='utf-8', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 1 + assert rows[0]["file_path"] == "./setup.py" + assert rows[0]["rule_name"] == "python_requires_check" + assert rows[0]["line_number"] == "1" + assert rows[0]["matched_string"] == "python_requires = '>=3.7'" + assert rows[0]["context_line"] == "python_requires = '>=3.7'" + + +def test_load_config(tmp_path): + config_file = tmp_path / "config.yaml" + config_file.write_text(""" +rules: + - name: test_rule + rules: + - python{version} +""") + + cm = ConfigManager(str(config_file), "python", "3.7") + rules = cm.load_config() + + assert len(rules) == 1 + assert rules[0]["name"] == "test_rule" + assert rules[0]["pattern"] == "python3.7" + + +@pytest.mark.parametrize("template, expected_warning", [ + ("python{missing_var}", "Warning: Missing variable for interpolation"), + ("python{version", "Warning: Invalid format string"), +]) +def test_load_config_error_handling(tmp_path, capsys, template, expected_warning): + config_file = tmp_path / "config.yaml" + config_file.write_text(f""" +rules: + - name: test_rule + rules: + - {template} +""") + + cm = ConfigManager(str(config_file), "python", "3.7") + rules = cm.load_config() + + assert len(rules) == 0 + + captured = capsys.readouterr() + assert expected_warning in captured.err + + +def test_load_config_permission_error(tmp_path, capsys): + config_file = tmp_path / "config.yaml" + config_file.write_text("rules: []") + + cm = ConfigManager(str(config_file), "python", "3.7") + + with patch("builtins.open", side_effect=PermissionError("Permission denied")): + with pytest.raises(SystemExit) as excinfo: + cm.load_config() + + assert excinfo.value.code == 1 + captured = capsys.readouterr() + assert "Error: Permission denied reading config file" in captured.err +def test_main_package_file_permission_error(tmp_path, capsys): + package_file = tmp_path / "packages.txt" + package_file.write_text("packages/pkg_a") + + import sys + test_args = ["version_scanner.py", "-d", "python", "-v", "3.7", "--package-file", str(package_file)] + + real_open = open + def side_effect(file, *args, **kwargs): + if str(file) == str(package_file): + raise PermissionError("Permission denied") + return real_open(file, *args, **kwargs) + + with patch("sys.argv", test_args): + with patch("builtins.open", side_effect=side_effect): + with pytest.raises(SystemExit) as excinfo: + from version_scanner import main + main() + + assert excinfo.value.code == 1 + captured = capsys.readouterr() + assert "Error: Permission denied reading package file" in captured.err +def test_main_package_file_not_found(capsys): + import sys + test_args = ["version_scanner.py", "-d", "python", "-v", "3.7", "--package-file", "non_existent_file.txt"] + + with patch("sys.argv", test_args): + with pytest.raises(SystemExit) as excinfo: + from version_scanner import main + main() + + assert excinfo.value.code == 1 + captured = capsys.readouterr() + assert "Error: Package file not found" in captured.err +def test_format_match_for_csv(): + from version_scanner import format_match_for_csv + match = { + "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", + "repo_path": "packages/pkg_a/setup.py", + "line_number": 123, + "rule_name": "test_rule" + } + + # Test without github_repo + formatted = format_match_for_csv(match) + assert formatted["line_number"] == 123 + + # Test with github_repo + formatted = format_match_for_csv(match, github_repo="https://github.com/user/repo", branch="main") + expected_url = "https://github.com/user/repo/blob/main/packages/pkg_a/setup.py#L123" + assert formatted["line_number"] == f'=HYPERLINK("{expected_url}", "123")' + + +def test_format_match_for_csv_truncates_long_line(): + from version_scanner import format_match_for_csv + + long_line = "a" * 1000 + "PY37" + "b" * 1000 + match = { + "file_path": "test.py", + "line_number": 1, + "rule_name": "test_rule", + "matched_string": "PY37", + "context_line": long_line + } + + formatted = format_match_for_csv(match) + context = formatted["context_line"] + + assert len(context) <= 600 + assert "PY37" in context + assert "..." in context + + +def test_get_match_counts(): + from version_scanner import get_match_counts + + matches = [ + {"rule_name": "rule1", "package_name": "pkg1"}, + {"rule_name": "rule1", "package_name": "pkg2"}, + {"rule_name": "rule2", "package_name": "pkg1"}, + ] + + rule_counts, package_counts = get_match_counts(matches) + + assert rule_counts == {"rule1": 2, "rule2": 1} + assert package_counts == {"pkg1": 2, "pkg2": 1} + + +def test_scan_file_removes_newline_from_match(tmp_path): + test_file = tmp_path / "test.py" + test_file.write_text("Python 3.7\n") + + rules = [ + {"name": "explicit_version_string", "pattern": re.compile(r"(?:['\"]|\s|^)3\.7(\.\d+)?(?:['\"]|\s|$)")} + ] + + from version_scanner import scan_file + results = scan_file(str(test_file), rules) + + assert len(results) == 1 + assert "\n" not in results[0]["matched_string"] + + +def test_write_csv_report_with_links(tmp_path): + output_file = tmp_path / "report.csv" + matches = [ + { + "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", + "repo_path": "packages/pkg_a/setup.py", + "line_number": 1, + "rule_name": "python_requires_check", + "matched_string": "python_requires = '>=3.7'", + "context_line": "python_requires = '>=3.7'" + } + ] + + from version_scanner import write_csv_report + write_csv_report(str(output_file), matches, github_repo="https://github.com/user/repo", branch="main") + + assert output_file.exists() + + with open(output_file, 'r', encoding='utf-8', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 1 + assert "HYPERLINK" in rows[0]["line_number"] +def test_scan_repository_ignores_version_scanner(tmp_path): + vs_dir = tmp_path / "version_scanner" + vs_dir.mkdir() + f = vs_dir / "test.py" + f.write_text("python_requires = '>=3.7'\n") + + rules = [ + {"name": "python_requires_check", "pattern": "python_requires\\s*=\\s*['\"]>=3\\.7['\"]"} + ] + + from version_scanner import scan_repository + results = scan_repository(str(tmp_path), rules, ignore_dirs=['version_scanner']) + + assert len(results) == 0 + + +def test_load_ignore_file(tmp_path): + from version_scanner import load_ignore_file + + ignore_file = tmp_path / ".scannerignore" + ignore_file.write_text("dir1\n# comment\n \ndir2\n") + + ignore_dirs = load_ignore_file(str(ignore_file)) + + assert ignore_dirs == ["dir1", "dir2"] + +@mock.patch('version_scanner.load_ignore_file') +@mock.patch('version_scanner.scan_repository') +def test_main_loads_ignore_from_script_dir(mock_scan, mock_load_ignore): + mock_load_ignore.return_value = [] + mock_scan.return_value = [] + + import sys + test_args = ["version_scanner.py", "-d", "python", "-v", "3.7"] + + with mock.patch('sys.argv', test_args): + from version_scanner import main + main() + + mock_load_ignore.assert_called_once() + args, kwargs = mock_load_ignore.call_args + path = args[0] + assert ".scannerignore" in path + assert "scripts/version_scanner" in path + + +@mock.patch('version_scanner.build') +@mock.patch('google.auth.default') +def test_upload_to_drive(mock_auth, mock_build): + from unittest import mock + + mock_creds = mock.Mock() + mock_creds.universe_domain = "googleapis.com" + mock_creds.create_scoped.return_value = mock_creds + + mock_auth_http = mock.Mock() + mock_auth_http.credentials = mock_creds + mock_creds.authorize.return_value = mock_auth_http + + mock_auth.return_value = (mock_creds, "project-id") + + mock_sheets = mock.Mock() + mock_build.return_value = mock_sheets + + mock_spreadsheets = mock.Mock() + mock_sheets.spreadsheets.return_value = mock_spreadsheets + + mock_create = mock.Mock() + mock_spreadsheets.create.return_value = mock_create + mock_create.execute.return_value = {"spreadsheetUrl": "http://example.com"} + + mock_values = mock.Mock() + mock_spreadsheets.values.return_value = mock_values + mock_update = mock.Mock() + mock_values.update.return_value = mock_update + mock_update.execute.return_value = {} + + from version_scanner import upload_to_drive + + matches = [{"rule_name": "r1", "package_name": "p1", "file_path": "f1", "line_number": 1, "matched_string": "s1", "context_line": "c1"}] + + url = upload_to_drive("test.csv", matches, github_repo="https://github.com/user/repo") + + assert url == "http://example.com" + mock_spreadsheets.create.assert_called_once() + + # Verify that update was called with hyperlink formula + mock_values.update.assert_called_once() + args, kwargs = mock_values.update.call_args + body = kwargs.get('body', {}) + values = body.get('values', []) + assert len(values) > 1 + assert "HYPERLINK" in values[1][3] # line_number is at index 3 + + +def test_regex_examples_from_config(): + """Test that examples in config match at least one rule in the group.""" + config_path = "regex_config.yaml" + + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + except FileNotFoundError: + pytest.fail(f"Config file not found: {config_path}") + + rules_list = config.get("rules", []) + + # Variables for interpolation (simulate Python 3.7) + vars = { + "major": "3", + "minor": "7", + "version": "3.7", + "minor_plus_one": "8", + "minor_minus_one": "6" + } + + for rule_group in rules_list: + name = rule_group.get("name") + examples = rule_group.get("examples", []) + templates = rule_group.get("rules", []) + + if not examples or not templates: + continue + + compiled_patterns = [] + for template in templates: + try: + resolved = template.strip().format(**vars) + compiled_patterns.append(re.compile(resolved, re.IGNORECASE)) + except KeyError: + continue + + for example in examples: + matched = False + for pattern in compiled_patterns: + if pattern.search(example): + matched = True + break + assert matched, f"Example '{example}' in group '{name}' did not match any pattern." diff --git a/scripts/version_scanner/version_scanner.py b/scripts/version_scanner/version_scanner.py new file mode 100644 index 000000000000..1d3916973467 --- /dev/null +++ b/scripts/version_scanner/version_scanner.py @@ -0,0 +1,577 @@ +#!/usr/bin/env python3 +""" +Automated Dependency Version Scanner +Scans a repository for references to specific dependency versions. +""" + +import argparse +import csv +import datetime +import os +import re +import sys +from typing import Dict, List, Tuple +import yaml +import google.auth +from googleapiclient.discovery import build + +class ConfigManager: + """Handles loading and interpolation of regex configurations.""" + + def __init__(self, config_path: str, dependency: str, version: str): + self.config_path = config_path + self.dependency = dependency + self.version = version + self.variables = self._compute_variables() + + def _compute_variables(self) -> Dict[str, str]: + """Compute variables for interpolation from version string.""" + vars = { + "name": self.dependency, + "version": self.version, + } + + parts = self.version.split('.') + if len(parts) >= 1: + vars["major"] = parts[0] + if len(parts) >= 2: + vars["minor"] = parts[1] + try: + vars["minor_plus_one"] = str(int(parts[1]) + 1) + except ValueError: + vars["minor_plus_one"] = parts[1] + try: + vars["minor_minus_one"] = str(int(parts[1]) - 1) + except ValueError: + vars["minor_minus_one"] = parts[1] + if len(parts) >= 3: + vars["patch"] = parts[2] + + return vars + + def load_config(self) -> List[Dict[str, str]]: + """Load and resolve rules from config.""" + try: + with open(self.config_path, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + except FileNotFoundError: + print(f"Error: Config file not found: {self.config_path}", file=sys.stderr) + sys.exit(1) + except PermissionError: + print(f"Error: Permission denied reading config file: {self.config_path}", file=sys.stderr) + sys.exit(1) + except yaml.YAMLError as e: + print(f"Error parsing config file: {e}", file=sys.stderr) + sys.exit(1) + + rules_list = config.get("rules", []) + resolved_rules = [] + + for rule_group in rules_list: + name = rule_group.get("name") + applies_to = rule_group.get("applies_to", []) + + # Filter by dependency + if applies_to and self.dependency not in applies_to: + continue + + templates = rule_group.get("rules", []) + + for template in templates: + try: + resolved_pattern = template.strip().format(**self.variables) + resolved_rules.append({ + "name": name, + "pattern": resolved_pattern + }) + except KeyError as e: + print(f"Warning: Missing variable for interpolation in rule {name}: {e}", file=sys.stderr) + except ValueError as e: + print(f"Warning: Invalid format string in rule {name}: {e}", file=sys.stderr) + + return resolved_rules + +def scan_file(file_path: str, compiled_rules: List[Dict[str, re.Pattern]]) -> List[Dict[str, str]]: + """ + Scan a single file for matching patterns. + + Args: + file_path: Path to the file to scan. + compiled_rules: A list of dictionaries containing 'name' and 'pattern' (compiled regex). + + Returns: + A list of dictionaries containing match details. + """ + results = [] + + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + skip_next = False + for line_num, line in enumerate(f, 1): + if skip_next: + skip_next = False + continue + if "version-scanner: ignore-next-line" in line: + skip_next = True + continue + if "version-scanner: ignore" in line: + continue + for rule in compiled_rules: + match = rule["pattern"].search(line) + if match: + results.append({ + "rule_name": rule["name"], + "line_number": line_num, + "matched_string": match.group(0).strip(), + "context_line": line.strip() + }) + except IOError as e: + print(f"Warning: Could not read file {file_path}: {e}", file=sys.stderr) + + return results + + +def format_match_for_csv( + match: Dict[str, str], + github_repo: str = None, + branch: str = "main" +) -> Dict[str, str]: + """ + Format a match result for CSV output, adding GitHub links if requested. + """ + formatted = match.copy() + + if github_repo: + # Use repo_path if available, fallback to file_path + file_path = match.get("repo_path", match.get("file_path", "")) + line_number = match.get("line_number", "") + + # Construct URL + url = f"{github_repo}/blob/{branch}/{file_path}#L{line_number}" + + # Format as Google Sheets formula + formatted["line_number"] = f'=HYPERLINK("{url}", "{line_number}")' + + context = formatted.get("context_line", "") + matched = formatted.get("matched_string", "") + + if len(context) > 500: + match_start = context.find(matched) + if match_start != -1: + start = max(0, match_start - 200) + end = min(len(context), match_start + len(matched) + 200) + + prefix = "..." if start > 0 else "" + suffix = "..." if end < len(context) else "" + + formatted["context_line"] = prefix + context[start:end] + suffix + else: + formatted["context_line"] = context[:500] + "..." + + return formatted + + +def get_match_counts(matches: List[Dict[str, str]]) -> Tuple[Dict[str, int], Dict[str, int]]: + """ + Aggregate matches by rule and by package. + """ + rule_counts = {} + package_counts = {} + for m in matches: + r = m.get("rule_name") + p = m.get("package_name") + rule_counts[r] = rule_counts.get(r, 0) + 1 + package_counts[p] = package_counts.get(p, 0) + 1 + return rule_counts, package_counts + + +def print_summary_table(rule_counts: Dict[str, int], package_counts: Dict[str, int]) -> None: + """ + Print a summary table to the console. + """ + print("\n=== Scan Summary ===") + print(f"{'Rule Name':<30} {'Matches':<10}") + print("-" * 42) + for rule, count in sorted(rule_counts.items(), key=lambda x: x[1], reverse=True): + print(f"{rule:<30} {count:<10}") + + print(f"\n{'Package Name':<40} {'Matches':<10}") + print("-" * 52) + sorted_packages = sorted(package_counts.items(), key=lambda x: x[1], reverse=True) + for pkg, count in sorted_packages[:10]: + display_name = pkg if pkg else '[Root/None]' + print(f"{display_name:<40} {count:<10}") + + if len(sorted_packages) > 10: + print(f'... and {len(sorted_packages) - 10} more packages.') + + +def load_ignore_file(file_path: str) -> List[str]: + """ + Read ignore paths from a file. + """ + ignore_dirs = [] + try: + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + ignore_dirs.append(line) + except FileNotFoundError: + pass + return ignore_dirs + + +def write_csv_report( + output_path: str, + matches: List[Dict[str, str]], + github_repo: str = None, + branch: str = "main" +) -> None: + """ + Write the collected matches to a CSV file. + + Args: + output_path: Path to the output CSV file. + matches: A list of dictionaries containing match details. + github_repo: Optional GitHub repository URL base. + branch: GitHub branch for links (defaults to main). + """ + fieldnames = ["file_path", "package_name", "rule_name", "line_number", "matched_string", "context_line"] + + try: + with open(output_path, 'w', encoding='utf-8', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + + for match in matches: + formatted_match = format_match_for_csv(match, github_repo, branch) + # Ensure only specified fields are written + row = {field: formatted_match.get(field, "") for field in fieldnames} + writer.writerow(row) + + print(f"\nReport written to: {output_path}") + except IOError as e: + print(f"Error writing CSV report: {e}", file=sys.stderr) + + +def upload_to_drive(csv_path: str, matches: List[Dict[str, str]], github_repo: str = None, branch: str = "main") -> str: + """ + Upload matches to a Google Sheet in Drive. + """ + print("\nUploading to Google Drive...") + try: + credentials, project = google.auth.default( + scopes=['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/spreadsheets'] + ) + + service = build('sheets', 'v4', credentials=credentials) + + # Create a new spreadsheet + title = os.path.basename(csv_path).replace('.csv', '') + spreadsheet = { + 'properties': { + 'title': title + } + } + spreadsheet = service.spreadsheets().create(body=spreadsheet, fields='spreadsheetUrl,spreadsheetId').execute() + url = spreadsheet.get('spreadsheetUrl') + spreadsheet_id = spreadsheet.get('spreadsheetId') + + # Prepare data + values = [["file_path", "package_name", "rule_name", "line_number", "matched_string", "context_line"]] + for m in matches: + formatted_m = format_match_for_csv(m, github_repo=github_repo, branch=branch) + values.append([ + formatted_m.get("file_path", ""), + formatted_m.get("package_name", ""), + formatted_m.get("rule_name", ""), + str(formatted_m.get("line_number", "")), + formatted_m.get("matched_string", ""), + formatted_m.get("context_line", "") + ]) + + body = { + 'values': values + } + + # Update values + service.spreadsheets().values().update( + spreadsheetId=spreadsheet_id, + range='Sheet1!A1', + valueInputOption='USER_ENTERED', + body=body + ).execute() + + print(f"Successfully uploaded to Google Sheet: {url}") + return url + + except Exception as e: + import traceback + traceback.print_exc() + print(f"Error uploading to Google Drive: {e}", file=sys.stderr) + return "" + + +def read_package_file(file_path: str) -> List[str]: + """ + Read package paths from a file. + + Args: + file_path: Path to the package file. + + Returns: + A list of package paths. + """ + packages = [] + try: + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + packages.append(line) + except FileNotFoundError: + print(f"Error: Package file not found: {file_path}", file=sys.stderr) + sys.exit(1) + except PermissionError: + print(f"Error: Permission denied reading package file: {file_path}", file=sys.stderr) + sys.exit(1) + except IOError as e: + print(f"Error reading package file: {e}", file=sys.stderr) + sys.exit(1) + return packages + + +def scan_repository( + root_path: str, + rules: List[Dict[str, str]], + target_packages: List[str] = None, + ignore_dirs: List[str] = None, + version_string: str = None +) -> List[Dict[str, str]]: + """ + Scan repository for matching patterns. + + Args: + root_path: Path to the repository root. + rules: A list of dictionaries containing 'name' and 'pattern'. + target_packages: A list of package paths to include (e.g., ['packages/pkg_a']). + If None or empty, all packages are scanned. + + Returns: + A list of match details. + """ + ignore_lower = {i.lower() for i in ignore_dirs} if ignore_dirs else set() + results = [] + + # Compile patterns once here + compiled_rules = [] + for rule in rules: + try: + compiled_rules.append({ + "name": rule["name"], + "pattern": re.compile(rule["pattern"], re.IGNORECASE) + }) + except re.error as e: + print(f"Error compiling regex for rule {rule['name']}: {e}", file=sys.stderr) + continue + + print(f"\nScanning repository: {root_path}") + if target_packages: + print(f"Filtering for packages: {target_packages}") + + for root, dirs, files in os.walk(root_path): + # Prune ignore directories (case-insensitive) + dirs[:] = [d for d in dirs if d.lower() not in ignore_lower] + + # Filter ignore files (case-insensitive) + files = [f for f in files if f.lower() not in ignore_lower] + + rel_root = os.path.relpath(root, root_path) + parts = rel_root.split(os.sep) + + # Monorepo filtering + if target_packages and parts[0] == "packages": + if len(parts) >= 2: + current_package_path = os.path.join(parts[0], parts[1]) + if current_package_path not in target_packages: + # Skip this directory and all subdirectories + dirs[:] = [] + continue + else: + # We are in the "packages" directory itself. Continue to walk. + pass + + for file in files: + file_path = os.path.join(root, file) + matches = scan_file(file_path, compiled_rules) + + # Add filename match if applicable + if version_string and version_string in file: + matches.append({ + "rule_name": "filename_match", + "line_number": 0, + "matched_string": version_string, + "context_line": f"Filename contains {version_string}" + }) + + # Compute display path and package name + rel_file_path = os.path.relpath(file_path, root_path) + + package_name = "" + path_parts = rel_file_path.split(os.sep) + if len(path_parts) >= 2 and path_parts[0] == "packages": + package_name = path_parts[1] + + root_parts = os.path.abspath(root_path).split(os.sep) + if len(root_parts) >= 2: + prefix = os.path.join(root_parts[-2], root_parts[-1]) + display_path = os.path.join(prefix, rel_file_path) + else: + display_path = rel_file_path + + for m in matches: + m["file_path"] = display_path + m["repo_path"] = rel_file_path + m["package_name"] = package_name + results.append(m) + + return results + + +def main(): + script_dir = os.path.dirname(os.path.abspath(__file__)) + default_config = os.path.join(script_dir, "regex_config.yaml") + + parser = argparse.ArgumentParser( + description="Scan repository for references to specific dependency versions." + ) + + parser.add_argument( + "-d", "--dependency", + required=True, + help="Name of the dependency (e.g., python, protobuf)" + ) + + parser.add_argument( + "-v", "--version", + required=True, + help="Specific version to search for (e.g., 3.7, 4.25.8)" + ) + + parser.add_argument( + "-p", "--path", + default=".", + help="Root directory to scan (defaults to current directory)" + ) + + + + package_group = parser.add_mutually_exclusive_group() + + package_group.add_argument( + "--package", + help="Specific subdirectory filter (useful for monorepos)" + ) + + package_group.add_argument( + "--package-file", + help="Path to a file containing a list of package directories to scan" + ) + + parser.add_argument( + "--config", + default=default_config, + help="Path to the regex configuration file (defaults to scripts/version_scanner/regex_config.yaml)" + ) + + parser.add_argument( + "-o", "--output", + help="Path to the output CSV file (defaults to --.csv)" + ) + + parser.add_argument( + "--github-repo", + default="https://github.com/googleapis/google-cloud-python", + help="GitHub repository URL base (defaults to https://github.com/googleapis/google-cloud-python)" + ) + + parser.add_argument( + "--branch", + default="main", + help="GitHub branch for links (defaults to main)" + ) + + parser.add_argument( + "--upload", + action="store_true", + help="Upload results to a Google Sheet in Drive" + ) + + args = parser.parse_args() + + # Resolve target packages if filtering is requested + target_packages = [] + if args.package: + target_packages.append(os.path.join("packages", args.package)) + elif args.package_file: + target_packages = read_package_file(args.package_file) + + print(f"Starting scan for dependency: {args.dependency} version: {args.version}") + print(f"Root path: {args.path}") + print(f"Targets to scan:") + if target_packages: + for pkg in target_packages: + print(f" - {os.path.join(args.path, pkg)}") + else: + print(f" - {args.path} (all packages)") + print(f"Using config: {args.config}") + + # Load and resolve rules + config_manager = ConfigManager(args.config, args.dependency, args.version) + rules = config_manager.load_config() + + print(f"\nLoaded {len(rules)} rules:") + for rule in rules: + print(f" - {rule['name']}: {rule['pattern']}") + + + + # Load ignore file from script directory (Option A) + script_dir = os.path.dirname(os.path.abspath(__file__)) + ignore_file_path = os.path.join(script_dir, ".scannerignore") + ignore_dirs = load_ignore_file(ignore_file_path) + if ignore_dirs: + print(f"Loaded {len(ignore_dirs)} ignore patterns from {ignore_file_path}") + + # Scan repository + all_matches = scan_repository(args.path, rules, target_packages, ignore_dirs, version_string=args.version) + + print(f"\nFound {len(all_matches)} matches.") + for m in all_matches[:10]: # Show first 10 + print(f" {m['file_path']}:{m['line_number']} [{m['rule_name']}] {m['matched_string']}") + + if len(all_matches) > 10: + print(f" ... and {len(all_matches) - 10} more matches.") + + # Get and print summary counts + rule_counts, package_counts = get_match_counts(all_matches) + print_summary_table(rule_counts, package_counts) + + # Write report + if args.output: + output_path = args.output + else: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + script_dir = os.path.dirname(os.path.abspath(__file__)) + results_dir = os.path.join(script_dir, "results") + os.makedirs(results_dir, exist_ok=True) + output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") + + write_csv_report(output_path, all_matches, github_repo=args.github_repo, branch=args.branch) + + if args.upload: + upload_to_drive(output_path, all_matches, github_repo=args.github_repo, branch=args.branch) + +if __name__ == "__main__": + main()