diff --git a/CHANGELOG.md b/CHANGELOG.md index 26f0f9b66..7d39311d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +* Circular symlinks no longer cause infinite loops when syncing a folder + ## [1.21.0] - 2023-04-17 ### Added diff --git a/b2sdk/scan/folder.py b/b2sdk/scan/folder.py index c57f3698e..dfba23594 100644 --- a/b2sdk/scan/folder.py +++ b/b2sdk/scan/folder.py @@ -10,12 +10,13 @@ import logging import os +from pathlib import Path import platform import re import sys from abc import ABCMeta, abstractmethod -from typing import Iterator, Optional +from typing import Iterator, Optional, Set from ..utils import fix_windows_path_limit, get_file_mtime, is_file_readable from .exception import EmptyDirectory, EnvironmentEncodingError, NotADirectory, UnableToCreateDirectory, UnsupportedFilename @@ -132,7 +133,8 @@ def all_files(self, reporter: Optional[ProgressReport], :param reporter: a place to report errors :param policies_manager: a policy manager object, default is DEFAULT_SCAN_MANAGER """ - yield from self._walk_relative_paths(self.root, '', reporter, policies_manager) + root_path = Path(self.root) + yield from self._walk_relative_paths(root_path, Path(''), reporter, policies_manager) def make_full_path(self, file_name): """ @@ -178,17 +180,23 @@ def ensure_non_empty(self): raise EmptyDirectory(self.root) def _walk_relative_paths( - self, local_dir: str, relative_dir_path: str, reporter, - policies_manager: ScanPoliciesManager + self, + local_dir: Path, + relative_dir_path: Path, + reporter: ProgressReport, + policies_manager: ScanPoliciesManager, + visited_symlinks: Optional[Set[int]] = None, ): """ Yield a File object for each of the files anywhere under this folder, in the order they would appear in B2, unless the path is excluded by policies manager. - :param relative_dir_path: the path of this dir relative to the scan point, or '' if at scan point + :param local_dir: the path to the local directory that we are currently inspecting + :param relative_dir_path: the path of this dir relative to the scan point, or Path('') if at scan point + :param reporter: a reporter object to report errors and warnings + :param policies_manager: a policies manager object + :param visited_symlinks: a set of paths to symlinks that have already been visited. Using inode numbers to reduce memory usage """ - if not isinstance(local_dir, str): - raise ValueError('folder path should be unicode: %s' % repr(local_dir)) # Collect the names. We do this before returning any results, because # directories need to sort as if their names end in '/'. @@ -204,12 +212,28 @@ def _walk_relative_paths( # # This is because in Unicode '.' comes before '/', which comes before '0'. names = [] # list of (name, local_path, relative_file_path) - for name in os.listdir(local_dir): - # We expect listdir() to return unicode if dir_path is unicode. - # If the file name is not valid, based on the file system - # encoding, then listdir() will return un-decoded str/bytes. - if not isinstance(name, str): - name = self._handle_non_unicode_file_name(name) + + visited_symlinks = visited_symlinks or set() + + if local_dir.is_symlink(): + real_path = local_dir.resolve() + inode_number = real_path.stat().st_ino + + visited_symlinks_count = len(visited_symlinks) + + # Add symlink to visited_symlinks to prevent infinite symlink loops + visited_symlinks.add(inode_number) + + # Check if set size has changed, if not, symlink has already been visited + if len(visited_symlinks) == visited_symlinks_count: + # Infinite symlink loop detected, report warning and skip symlink + if reporter is not None: + reporter.circular_symlink_skipped(str(local_dir)) + return + + visited_symlinks.add(inode_number) + + for name in (x.name for x in local_dir.iterdir()): if '/' in name: raise UnsupportedFilename( @@ -217,26 +241,30 @@ def _walk_relative_paths( "%s in dir %s" % (name, local_dir) ) - local_path = os.path.join(local_dir, name) + local_path = local_dir / name relative_file_path = join_b2_path( - relative_dir_path, name + str(relative_dir_path), name ) # file path relative to the scan point # Skip broken symlinks or other inaccessible files - if not is_file_readable(local_path, reporter): + if not is_file_readable(str(local_path), reporter): continue - if policies_manager.exclude_all_symlinks and os.path.islink(local_path): + if policies_manager.exclude_all_symlinks and local_path.is_symlink(): if reporter is not None: - reporter.symlink_skipped(local_path) + reporter.symlink_skipped(str(local_path)) continue - if os.path.isdir(local_path): + if local_path.is_dir(): name += '/' - if policies_manager.should_exclude_local_directory(relative_file_path): + if policies_manager.should_exclude_local_directory(str(relative_file_path)): continue - names.append((name, local_path, relative_file_path)) + # remove the leading './' from the relative path to ensure backward compatibility + relative_file_path_str = str(relative_file_path) + if relative_file_path_str.startswith("./"): + relative_file_path_str = relative_file_path_str[2:] + names.append((name, local_path, relative_file_path_str)) # Yield all of the answers. # @@ -245,19 +273,23 @@ def _walk_relative_paths( for (name, local_path, relative_file_path) in sorted(names): if name.endswith('/'): for subdir_file in self._walk_relative_paths( - local_path, relative_file_path, reporter, policies_manager + local_path, + relative_file_path, + reporter, + policies_manager, + visited_symlinks, ): yield subdir_file else: # Check that the file still exists and is accessible, since it can take a long time # to iterate through large folders - if is_file_readable(local_path, reporter): - file_mod_time = get_file_mtime(local_path) - file_size = os.path.getsize(local_path) + if is_file_readable(str(local_path), reporter): + file_mod_time = get_file_mtime(str(local_path)) + file_size = local_path.stat().st_size local_scan_path = LocalPath( - absolute_path=self.make_full_path(relative_file_path), - relative_path=relative_file_path, + absolute_path=self.make_full_path(str(relative_file_path)), + relative_path=str(relative_file_path), mod_time=file_mod_time, size=file_size, ) diff --git a/b2sdk/scan/folder_parser.py b/b2sdk/scan/folder_parser.py index 1ee3b47af..e67c3f2a5 100644 --- a/b2sdk/scan/folder_parser.py +++ b/b2sdk/scan/folder_parser.py @@ -51,4 +51,4 @@ def _parse_bucket_and_folder(bucket_and_path, api, b2_folder_class): (bucket_name, folder_name) = bucket_and_path.split('/', 1) if folder_name.endswith('/'): folder_name = folder_name[:-1] - return b2_folder_class(bucket_name, folder_name, api) \ No newline at end of file + return b2_folder_class(bucket_name, folder_name, api) diff --git a/b2sdk/scan/path.py b/b2sdk/scan/path.py index ac9da313e..50ca2c2e2 100644 --- a/b2sdk/scan/path.py +++ b/b2sdk/scan/path.py @@ -89,4 +89,4 @@ def __eq__(self, other): self.relative_path == other.relative_path and self.selected_version == other.selected_version and self.all_versions == other.all_versions - ) \ No newline at end of file + ) diff --git a/b2sdk/scan/report.py b/b2sdk/scan/report.py index 57b53813b..8d4dbb47a 100644 --- a/b2sdk/scan/report.py +++ b/b2sdk/scan/report.py @@ -66,29 +66,27 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def error(self, message): + def error(self, message: str) -> None: """ Print an error, gracefully interleaving it with a progress bar. :param message: an error message - :type message: str """ self.print_completion(message) - def print_completion(self, message): + def print_completion(self, message: str) -> None: """ Remove the progress bar, prints a message, and puts the progress bar back. :param message: an error message - :type message: str """ with self.lock: self._print_line(message, True) self._last_update_time = 0 self._update_progress() - def update_count(self, delta: int): + def update_count(self, delta: int) -> None: """ Report that items have been processed. """ @@ -117,14 +115,12 @@ def _update_progress(self): self._print_line(message, False) - def _print_line(self, line, newline): + def _print_line(self, line: str, newline: bool) -> None: """ Print a line to stdout. :param line: a string without a \r or \n in it. - :type line: str :param newline: True if the output should move to a new line after this one. - :type newline: bool """ if len(line) < len(self.current_line): line += ' ' * (len(self.current_line) - len(line)) @@ -150,18 +146,17 @@ def _print_line(self, line, newline): self.current_line = line self.stdout.flush() - def update_total(self, delta): + def update_total(self, delta: int) -> None: """ Report that more files have been found for comparison. :param delta: number of files found since the last check - :type delta: int """ with self.lock: self.total_count += delta self._update_progress() - def end_total(self): + def end_total(self) -> None: """ Total files count is done. Can proceed to step 2. """ @@ -169,29 +164,37 @@ def end_total(self): self.total_done = True self._update_progress() - def local_access_error(self, path): + def local_access_error(self, path: str) -> None: """ Add a file access error message to the list of warnings. :param path: file path - :type path: str """ self.warnings.append('WARNING: %s could not be accessed (broken symlink?)' % (path,)) - def local_permission_error(self, path): + def local_permission_error(self, path: str) -> None: """ Add a permission error message to the list of warnings. :param path: file path - :type path: str """ self.warnings.append( 'WARNING: %s could not be accessed (no permissions to read?)' % (path,) ) - def symlink_skipped(self, path): + def symlink_skipped(self, path: str) -> None: pass + def circular_symlink_skipped(self, path: str) -> None: + """ + Add a circular symlink error message to the list of warnings. + + :param path: file path + """ + self.warnings.append( + 'WARNING: %s is a circular symlink, which was already visited. Skipping.' % (path,) + ) + def sample_report_run(): """ diff --git a/noxfile.py b/noxfile.py index 7fd5d27e5..e35909e1f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -39,6 +39,7 @@ 'pytest-lazy-fixture==0.6.3', 'pyfakefs==4.5.6', 'pytest-xdist==2.5.0', + 'pytest-timeout==2.1.0', ] REQUIREMENTS_BUILD = ['setuptools>=20.2'] diff --git a/test/unit/scan/test_folder_traversal.py b/test/unit/scan/test_folder_traversal.py new file mode 100644 index 000000000..6365b291b --- /dev/null +++ b/test/unit/scan/test_folder_traversal.py @@ -0,0 +1,509 @@ +###################################################################### +# +# File: test/unit/scan/test_folder_traversal.py +# +# Copyright 2023 Backblaze Inc. All Rights Reserved. +# +# License https://www.backblaze.com/using_b2_code.html +# +###################################################################### +from pathlib import Path +import platform +from unittest.mock import MagicMock +from b2sdk.scan.folder import LocalFolder +from b2sdk.utils import fix_windows_path_limit +import pytest +import os +import re +import sys + + +class TestFolderTraversal: + def test_flat_folder(self, tmp_path): + + # Create a directory structure below with initial scannig point at tmp_path/dir: + # tmp_path + # └── dir + # ├── file1.txt + # ├── file2.txt + # └── file3.txt + + (tmp_path / "dir").mkdir(parents=True) + + (tmp_path / "dir" / "file1.txt").write_text("content1") + (tmp_path / "dir" / "file2.txt").write_text("content2") + (tmp_path / "dir" / "file3.txt").write_text("content3") + + folder = LocalFolder(str(tmp_path / "dir")) + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "dir" / "file1.txt")), + fix_windows_path_limit(str(tmp_path / "dir" / "file2.txt")), + fix_windows_path_limit(str(tmp_path / "dir" / "file3.txt")), + ] + + def test_folder_with_subfolders(self, tmp_path): + + # Create a directory structure below with initial scannig point at tmp_path: + # tmp_path + # ├── dir1 + # │ ├── file1.txt + # │ └── file2.txt + # └── dir2 + # ├── file3.txt + # └── file4.txt + + d1 = tmp_path / "dir1" + d1.mkdir() + (d1 / "file1.txt").write_text("content1") + (d1 / "file2.txt").write_text("content2") + + d2 = tmp_path / "dir2" + d2.mkdir() + (d2 / "file3.txt").write_text("content3") + (d2 / "file4.txt").write_text("content4") + + folder = LocalFolder(str(tmp_path)) + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(d1 / "file1.txt")), + fix_windows_path_limit(str(d1 / "file2.txt")), + fix_windows_path_limit(str(d2 / "file3.txt")), + fix_windows_path_limit(str(d2 / "file4.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + def test_folder_with_symlink_to_file(self, tmp_path): + + # Create a directory structure below with initial scannig point at tmp_path: + # tmp_path + # ├── dir + # │ └── file.txt + # └── symlink_file.txt -> dir/file.txt + + (tmp_path / "dir").mkdir() + + file = tmp_path / "dir" / "file.txt" + file.write_text("content") + + symlink_file = tmp_path / "symlink_file.txt" + symlink_file.symlink_to(file) + + folder = LocalFolder(str(tmp_path)) + local_paths = folder.all_files(reporter=MagicMock()) + + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(file)), + fix_windows_path_limit(str(symlink_file)) + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + @pytest.mark.timeout(5) + def test_folder_with_circular_symlink(self, tmp_path): + + # Create a directory structure below with initial scannig point at tmp_path: + # tmp_path + # ├── dir + # │ └── file.txt + # └── symlink_dir -> dir + + (tmp_path / "dir").mkdir() + (tmp_path / "dir" / "file1.txt").write_text("content1") + symlink_dir = tmp_path / "dir" / "symlink_dir" + symlink_dir.symlink_to(tmp_path / "dir", target_is_directory=True) + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "dir" / "file1.txt")), + fix_windows_path_limit(str(tmp_path / "dir" / "symlink_dir" / "file1.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + @pytest.mark.timeout(5) + def test_folder_with_symlink_to_parent(self, tmp_path): + + # Create a directory structure below with the scannig point at tmp_path/parent/child/: + # tmp_path + # ├── parent + # │ ├── child + # │ │ ├── file4.txt + # │ │ └── grandchild + # │ │ ├── file5.txt + # │ │ └── symlink_dir -> ../../.. (symlink to tmp_path/parent) + # │ └── file3.txt + # ├── file1.txt + # └── file2.txt + + (tmp_path / "parent" / "child" / "grandchild").mkdir(parents=True) + (tmp_path / "file1.txt").write_text("content1") + (tmp_path / "file2.txt").write_text("content2") + (tmp_path / "parent" / "file3.txt").write_text("content3") + (tmp_path / "parent" / "child" / "file4.txt").write_text("content4") + (tmp_path / "parent" / "child" / "grandchild" / "file5.txt").write_text("content5") + symlink_dir = tmp_path / "parent" / "child" / "grandchild" / "symlink_dir" + symlink_dir.symlink_to(tmp_path / "parent", target_is_directory=True) + + folder = LocalFolder(str(tmp_path / "parent" / "child")) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "parent" / "child" / "file4.txt")), + fix_windows_path_limit(str(tmp_path / "parent" / "child" / "grandchild" / "file5.txt")), + fix_windows_path_limit(str(tmp_path / "parent" / "child" / "grandchild" / "symlink_dir" / "child" / "file4.txt")), + fix_windows_path_limit(str(tmp_path / "parent" / "child" / "grandchild" / "symlink_dir" / "child" / "grandchild" / "file5.txt")), + fix_windows_path_limit(str(tmp_path / "parent" / "child" / "grandchild" / "symlink_dir" / "file3.txt")), + ] # yapf: disable + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + @pytest.mark.timeout(5) + def test_root_short_loop(self, tmp_path): + + # Create a symlink to the tmp_path directory itself + # tmp_path + # └── tmp_path_symlink -> tmp_path + + tmp_path_symlink = tmp_path / "tmp_path_symlink" + tmp_path_symlink.symlink_to(tmp_path, target_is_directory=True) + + folder = LocalFolder(str(tmp_path_symlink)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + @pytest.mark.timeout(5) + def test_root_parent_loop(self, tmp_path): + + # Create a symlink that points to the parent of the initial scanning point + # tmp_path + # └── start + # ├── file.txt + # └── symlink -> tmp_path + + (tmp_path / "start").mkdir() + (tmp_path / "start" / "file.txt").write_text("content") + (tmp_path / "start" / "symlink").symlink_to(tmp_path, target_is_directory=True) + + folder = LocalFolder(str(tmp_path / "start")) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "start" / "file.txt")), + fix_windows_path_limit(str(tmp_path / "start" / "symlink" / "start" / "file.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + def test_symlink_that_points_deeper(self, tmp_path): + + # Create a directory structure with a symlink that points to a deeper directory + # tmp_path + # ├── a + # │ └── a.txt + # └── b + # ├── c + # │ └── c.txt + # └── d + # ├── d.txt + # └── e + # └── e.txt + # ├── f + # │ └── f.txt + # └── symlink -> b/d/e + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b" / "c").mkdir(parents=True) + (tmp_path / "b" / "c" / "c.txt").write_text("c") + (tmp_path / "b" / "d" / "e").mkdir(parents=True) + (tmp_path / "b" / "d" / "d.txt").write_text("d") + (tmp_path / "b" / "d" / "e" / "e.txt").write_text("e") + (tmp_path / "f").mkdir() + (tmp_path / "f" / "f.txt").write_text("f") + (tmp_path / "symlink").symlink_to(tmp_path / "b" / "d" / "e", target_is_directory=True) + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "c" / "c.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "d" / "d.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "d" / "e" / "e.txt")), + fix_windows_path_limit(str(tmp_path / "f" / "f.txt")), + fix_windows_path_limit(str(tmp_path / "symlink" / "e.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows" + ) + def test_symlink_that_points_up(self, tmp_path): + + # Create a directory structure with a symlink that points to a upper directory + # tmp_path + # ├── a + # │ └── a.txt + # └── b + # ├── c + # │ └── c.txt + # └── d + # ├── d.txt + # └── e + # ├── symlink -> ../../a + # └── e.txt + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b" / "c").mkdir(parents=True) + (tmp_path / "b" / "c" / "c.txt").write_text("c") + (tmp_path / "b" / "d" / "e").mkdir(parents=True) + (tmp_path / "b" / "d" / "d.txt").write_text("d") + (tmp_path / "b" / "d" / "e" / "e.txt").write_text("e") + (tmp_path / "b" / "d" / "e" / "symlink").symlink_to(tmp_path / "a", target_is_directory=True) # yapf: disable + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "c" / "c.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "d" / "d.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "d" / "e" / "e.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "d" / "e" / "symlink" / "a.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows" + ) + @pytest.mark.timeout(5) + def test_elaborate_infinite_loop(self, tmp_path): + + # Create a directory structure with an elaborate infinite loop of symlinks + # tmp_path + # ├── a + # │ └── a.txt + # ├── b -> c + # ├── c -> d + # ├── d -> e + # ├── e -> b + # └── f + # └── f.txt + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b").symlink_to("c") + (tmp_path / "c").symlink_to("d") + (tmp_path / "d").symlink_to("e") + (tmp_path / "e").symlink_to("b") + (tmp_path / "f").mkdir() + (tmp_path / "f" / "f.txt").write_text("f") + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "f" / "f.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + def test_valid_symlink_pattern_where_the_link_goes_down_and_up(self, tmp_path): + + # tmp_path + # ├── a + # │ └── a.txt + # ├── b -> c/d + # ├── c + # │ └── d + # │ └── b.txt + # ├── d -> e + # ├── e + # │ └── e.txt + # └── f + # └── f.txt + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b").symlink_to(tmp_path / "c" / "d", target_is_directory=True) # yapf: disable + (tmp_path / "c").mkdir() + (tmp_path / "c" / "d").mkdir() + (tmp_path / "c" / "d" / "b.txt").write_text("b") + (tmp_path / "d").symlink_to(tmp_path / "e", target_is_directory=True) + (tmp_path / "e").mkdir() + (tmp_path / "e" / "e.txt").write_text("e") + (tmp_path / "f").mkdir() + (tmp_path / "f" / "f.txt").write_text("f") + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "b.txt")), + fix_windows_path_limit(str(tmp_path / "c" / "d" / "b.txt")), + fix_windows_path_limit(str(tmp_path / "d" / "e.txt")), + fix_windows_path_limit(str(tmp_path / "e" / "e.txt")), + fix_windows_path_limit(str(tmp_path / "f" / "f.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + def test_valid_symlink_pattern_where_the_link_goes_up_and_down(self, tmp_path): + + # Create a directory structure with a valid symlink pattern where the link goes up and down + # tmp_path + # ├── a + # │ └── a.txt + # ├── b + # │ └── c -> ../d + # ├── d + # │ └── e + # │ └── f + # │ └── f.txt + # └── t.txt + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b").mkdir() + (tmp_path / "b" / "c").symlink_to(tmp_path / "d", target_is_directory=True) + (tmp_path / "d").mkdir() + (tmp_path / "d" / "e").mkdir() + (tmp_path / "d" / "e" / "f").mkdir() + (tmp_path / "d" / "e" / "f" / "f.txt").write_text("f") + (tmp_path / "t.txt").write_text("t") + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "b" / "c" / "e" / "f" / "f.txt")), + fix_windows_path_limit(str(tmp_path / "d" / "e" / "f" / "f.txt")), + fix_windows_path_limit(str(tmp_path / "t.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows", + ) + @pytest.mark.timeout(5) + def test_loop_that_goes_down_and_up(self, tmp_path): + + # Create a directory structure with a loop that goes down and up + # tmp_path + # ├── a + # │ └── a.txt + # ├── b -> c/d + # ├── c + # │ └── d -> ../e + # ├── e -> b + # └── f + # └── f.txt + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b").symlink_to(tmp_path / "c" / "d", target_is_directory=True) + (tmp_path / "c").mkdir() + (tmp_path / "c" / "d").symlink_to(tmp_path / "e", target_is_directory=True) + (tmp_path / "e").symlink_to("b") + (tmp_path / "f").mkdir() + (tmp_path / "f" / "f.txt").write_text("f") + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "f" / "f.txt")), + ] + + @pytest.mark.skipif( + platform.system() == 'Windows' and platform.python_implementation() == 'PyPy', + reason="Symlinks not supported on PyPy/Windows" + ) + @pytest.mark.timeout(5) + def test_loop_that_goes_up_and_down(self, tmp_path): + + # Create a directory structure with a loop that goes up and down + # tmp_path + # ├── a + # │ └── a.txt + # ├── b + # │ └── c -> ../d + # ├── d + # │ └── e + # │ └── f -> ../../b/c + # └── g + # └── g.txt + + (tmp_path / "a").mkdir() + (tmp_path / "a" / "a.txt").write_text("a") + (tmp_path / "b").mkdir() + (tmp_path / "b" / "c").symlink_to(tmp_path / "d", target_is_directory=True) + (tmp_path / "d").mkdir() + (tmp_path / "d" / "e").mkdir() + (tmp_path / "d" / "e" / "f").symlink_to(tmp_path / "b" / "c", target_is_directory=True) + (tmp_path / "g").mkdir() + (tmp_path / "g" / "g.txt").write_text("g") + + folder = LocalFolder(str(tmp_path)) + + local_paths = folder.all_files(reporter=MagicMock()) + absolute_paths = [path.absolute_path for path in list(local_paths)] + + assert absolute_paths == [ + fix_windows_path_limit(str(tmp_path / "a" / "a.txt")), + fix_windows_path_limit(str(tmp_path / "g" / "g.txt")), + ] diff --git a/test/unit/scan/test_scan_policies.py b/test/unit/scan/test_scan_policies.py index b4ab02efd..3adb80f00 100644 --- a/test/unit/scan/test_scan_policies.py +++ b/test/unit/scan/test_scan_policies.py @@ -8,7 +8,6 @@ # ###################################################################### import re - import pytest from apiver_deps import ScanPoliciesManager