diff --git a/Build.py b/Build.py index 0d00452..89efde2 100644 --- a/Build.py +++ b/Build.py @@ -54,7 +54,7 @@ def list_commands(self, *args, **kwargs): # pylint: disable=unused-argument this_dir, package_dir.name, app, - default_min_coverage=75.0, + default_min_coverage=87.0, # TODO: Increase this to 90% ) UpdateVersion = RepoBuildTools.UpdateVersionFuncFactory( diff --git a/src/FileBackup/CommandLine/EntryPoint.py b/src/FileBackup/CommandLine/EntryPoint.py index 21e5048..c5077b1 100644 --- a/src/FileBackup/CommandLine/EntryPoint.py +++ b/src/FileBackup/CommandLine/EntryPoint.py @@ -4,7 +4,7 @@ # | Distributed under the MIT License. # | # ---------------------------------------------------------------------- -"""This file serves as an example of how to create scripts that can be invoked from the command line once the package is installed.""" +"""Tools to backup and restore files and directories.""" import sys @@ -14,6 +14,7 @@ from FileBackup import __version__ from FileBackup.CommandLine import MirrorEntryPoint +from FileBackup.CommandLine import OffsiteEntryPoint # ---------------------------------------------------------------------- @@ -21,7 +22,7 @@ class NaturalOrderGrouper(TyperGroup): # pylint: disable=missing-class-docstring # ---------------------------------------------------------------------- def list_commands(self, *args, **kwargs): # pylint: disable=unused-argument - return self.commands.keys() + return self.commands.keys() # pragma: no cover # ---------------------------------------------------------------------- @@ -35,10 +36,13 @@ def list_commands(self, *args, **kwargs): # pylint: disable=unused-argument app.add_typer(MirrorEntryPoint.app, name="mirror", help=MirrorEntryPoint.__doc__) +app.add_typer(OffsiteEntryPoint.app, name="offsite", help=OffsiteEntryPoint.__doc__) @app.command("version", no_args_is_help=False) def Version(): + """Displays the current version and exits.""" + sys.stdout.write(f"FileBackup v{__version__}\n") diff --git a/src/FileBackup/CommandLine/MirrorEntryPoint.py b/src/FileBackup/CommandLine/MirrorEntryPoint.py index 524d665..bc3f38d 100644 --- a/src/FileBackup/CommandLine/MirrorEntryPoint.py +++ b/src/FileBackup/CommandLine/MirrorEntryPoint.py @@ -38,7 +38,7 @@ class NaturalOrderGrouper(TyperGroup): # pylint: disable=missing-class-docstring # ---------------------------------------------------------------------- def list_commands(self, *args, **kwargs): # pylint: disable=unused-argument - return self.commands.keys() + return self.commands.keys() # pragma: no cover # ---------------------------------------------------------------------- @@ -153,7 +153,7 @@ def Validate( bool, CommandLineArguments.debug_option ] = CommandLineArguments.debug_option_default, ) -> None: - """Validates perviously mirrored content in the backup data store.""" + """Validates previously mirrored content in the backup data store.""" with DoneManager.CreateCommandLine( flags=DoneManagerFlags.Create(verbose=verbose, debug=debug), diff --git a/src/FileBackup/CommandLine/OffsiteEntryPoint.py b/src/FileBackup/CommandLine/OffsiteEntryPoint.py new file mode 100644 index 0000000..d10cb37 --- /dev/null +++ b/src/FileBackup/CommandLine/OffsiteEntryPoint.py @@ -0,0 +1,335 @@ +# ---------------------------------------------------------------------- +# | +# | OffsiteEntryPoint.py +# | +# | David Brownell +# | 2024-07-04 12:51:52 +# | +# ---------------------------------------------------------------------- +# | +# | Copyright David Brownell 2024 +# | Distributed under the MIT License. +# | +# ---------------------------------------------------------------------- +"""\ +Copies content to an offsite location: a snapshot is saved after the initial backup and +deltas are applied to that snapshot for subsequent backups. +""" + +import datetime +import shutil + +from contextlib import contextmanager +from pathlib import Path +from typing import Annotated, cast, Iterator, Optional, Pattern + +import typer + +from dbrownell_Common import PathEx # type: ignore[import-untyped] +from dbrownell_Common.Streams.DoneManager import DoneManager, Flags as DoneManagerFlags # type: ignore[import-untyped] +from dbrownell_Common import TyperEx +from typer.core import TyperGroup + +from FileBackup.CommandLine import CommandLineArguments +from FileBackup.Impl import Common +from FileBackup import Offsite + + +# ---------------------------------------------------------------------- +class NaturalOrderGrouper(TyperGroup): + # pylint: disable=missing-class-docstring + # ---------------------------------------------------------------------- + def list_commands(self, *args, **kwargs): # pylint: disable=unused-argument + return self.commands.keys() # pragma: no cover + + +# ---------------------------------------------------------------------- +app = typer.Typer( + cls=NaturalOrderGrouper, + help=__doc__, + no_args_is_help=True, + pretty_exceptions_show_locals=False, + pretty_exceptions_enable=False, +) + + +# ---------------------------------------------------------------------- +_backup_name_argument = typer.Argument( + ..., + help="Unique name of the backup; this value allows for multiple distinct backups on the same machine.", +) +_destination_argument = typer.Argument( + ..., + help="Destination data store used to backup content; This value can be 'None' if the backup content should be created locally but manually distributed to the data store (this can be helpful when initially creating backups that are hundreds of GB in size). See the comments below for information on the different data store destination formats.", +) + + +# ---------------------------------------------------------------------- +@app.command( + "execute", + epilog=Common.GetDestinationHelp(), + no_args_is_help=True, +) +def Execute( + backup_name: Annotated[str, _backup_name_argument], + destination: Annotated[str, _destination_argument], + input_filename_or_dirs: Annotated[ + list[Path], + CommandLineArguments.input_filename_or_dirs_argument, + ], + encryption_password: Annotated[ + Optional[str], + typer.Option( + "--encryption-password", + help="Encrypt the contents for backup prior to transferring them to the destination data store.", + ), + ] = None, + compress: Annotated[ + bool, + typer.Option( + "--compress", + help="Compress the contents to backup prior to transferring them to the destination data store.", + ), + ] = False, + ssd: Annotated[bool, CommandLineArguments.ssd_option] = CommandLineArguments.ssd_option_default, + force: Annotated[ + bool, CommandLineArguments.force_option + ] = CommandLineArguments.force_option_default, + verbose: Annotated[ + bool, CommandLineArguments.verbose_option + ] = CommandLineArguments.verbose_option_default, + quiet: Annotated[ + bool, CommandLineArguments.quiet_option + ] = CommandLineArguments.quiet_option_default, + debug: Annotated[ + bool, CommandLineArguments.debug_option + ] = CommandLineArguments.debug_option_default, + working_dir: Annotated[ + Optional[Path], + typer.Option( + "--working-dir", + file_okay=False, + resolve_path=True, + help="Local directory used to stage files prior to transferring them to the destination data store.", + ), + ] = None, + archive_volume_size: Annotated[ + int, + typer.Option( + "--archive-volume-size", + min=1024, + help="Compressed/encrypted data will be converted to volumes of this size for easier transmission to the data store; value expressed in terms of bytes.", + ), + ] = Offsite.DEFAULT_ARCHIVE_VOLUME_SIZE, + ignore_pending_snapshot: Annotated[ + bool, + typer.Option( + "--ignore-pending-snapshot", help="Disable the pending warning snapshot and continue." + ), + ] = False, + file_include_params: Annotated[ + list[str], + CommandLineArguments.file_include_option, + ] = CommandLineArguments.file_include_option_default, + file_exclude_params: Annotated[ + list[str], + CommandLineArguments.file_exclude_option, + ] = CommandLineArguments.file_exclude_option_default, +) -> None: + """Prepares local changes for offsite backup.""" + + file_includes = cast(list[Pattern], file_include_params) + file_excludes = cast(list[Pattern], file_exclude_params) + + del file_include_params + del file_exclude_params + + with DoneManager.CreateCommandLine( + flags=DoneManagerFlags.Create(verbose=verbose, debug=debug), + ) as dm: + dm.WriteVerbose(str(datetime.datetime.now()) + "\n\n") + + destination_value = None if destination.lower() == "none" else destination + + with _ResolveWorkingDir( + dm, + working_dir, + always_preserve=destination_value is None, + ) as resolved_working_dir: + Offsite.Backup( + dm, + backup_name, + destination_value, + input_filename_or_dirs, + encryption_password, + resolved_working_dir, + compress=compress, + ssd=ssd, + force=force, + quiet=quiet, + file_includes=file_includes, + file_excludes=file_excludes, + archive_volume_size=archive_volume_size, + ignore_pending_snapshot=ignore_pending_snapshot, + ) + + +# ---------------------------------------------------------------------- +@app.command("commit", no_args_is_help=True) +def Commit( + backup_name: Annotated[str, _backup_name_argument], + verbose: Annotated[ + bool, CommandLineArguments.verbose_option + ] = CommandLineArguments.verbose_option_default, + debug: Annotated[ + bool, CommandLineArguments.debug_option + ] = CommandLineArguments.debug_option_default, +) -> None: + """Commits a pending snapshot after the changes have been transferred to an offsite data store.""" + + with DoneManager.CreateCommandLine( + flags=DoneManagerFlags.Create(verbose=verbose, debug=debug), + ) as dm: + dm.WriteVerbose(str(datetime.datetime.now()) + "\n\n") + + Offsite.Commit(dm, backup_name) + + +# ---------------------------------------------------------------------- +@app.command( + "restore", + epilog=Common.GetDestinationHelp(), + no_args_is_help=True, +) +def Restore( # pylint: disable=dangerous-default-value + backup_name: Annotated[str, _backup_name_argument], + backup_source: Annotated[ + str, typer.Argument(help="Data store location containing content that has been backed up.") + ], + encryption_password: Annotated[ + Optional[str], + typer.Option( + "--encryption-password", + help="Password used when creating the backups.", + ), + ] = None, + dir_substitution_key_value_args: Annotated[ + list[str], + TyperEx.TyperDictOption( + {}, + "--dir-substitution", + allow_any__=True, + help='A key-value-pair consisting of a string to replace and its replacement value within a posix string; this can be used when restoring to a location that is different from the location used to create the backup. Example: \'--dir-substitution "C\\:/=C\\:/Restore/" will cause files backed-up as "C:/Foo/Bar.txt" to be restored as "C:/Restore/Foo/Bar.txt". This value can be provided multiple times on the command line when supporting multiple substitutions.', + ), + ] = [], + dry_run: Annotated[ + bool, + typer.Option( + "--dry-run", + help="Show the changes that would be made during the restoration process, but do not modify the local file system.", + ), + ] = False, + overwrite: Annotated[ + bool, + typer.Option( + "--overwrite", + help="By default, the restoration process will not overwrite existing files on the local file system; this flag indicates that files should be overwritten as they are restored.", + ), + ] = False, + ssd: Annotated[bool, CommandLineArguments.ssd_option] = CommandLineArguments.ssd_option_default, + verbose: Annotated[ + bool, CommandLineArguments.verbose_option + ] = CommandLineArguments.verbose_option_default, + quiet: Annotated[ + bool, CommandLineArguments.quiet_option + ] = CommandLineArguments.quiet_option_default, + debug: Annotated[ + bool, CommandLineArguments.debug_option + ] = CommandLineArguments.debug_option_default, + working_dir: Annotated[ + Optional[Path], + typer.Option( + "--working-dir", + file_okay=False, + resolve_path=True, + help="Working directory to use when decompressing archives; provide this value during a dry run and subsequent execution to only download and extract the backup content once.", + ), + ] = None, +) -> None: + """Restores content from an offsite data store.""" + + with DoneManager.CreateCommandLine( + flags=DoneManagerFlags.Create(verbose=verbose, debug=debug), + ) as dm: + dm.WriteVerbose(str(datetime.datetime.now()) + "\n\n") + + dir_substitutions = TyperEx.PostprocessDictArgument(dir_substitution_key_value_args) + + with _ResolveWorkingDir(dm, working_dir) as resolved_working_dir: + Offsite.Restore( + dm, + backup_name, + backup_source, + encryption_password, + resolved_working_dir, + dir_substitutions, + ssd=ssd, + quiet=quiet, + dry_run=dry_run, + overwrite=overwrite, + ) + + +# ---------------------------------------------------------------------- +# ---------------------------------------------------------------------- +# ---------------------------------------------------------------------- +@contextmanager +def _ResolveWorkingDir( + dm: DoneManager, + working_dir: Path | None, + *, + always_preserve: bool = False, +) -> Iterator[Path]: + if working_dir is None: + delete_dir = not always_preserve + working_dir = PathEx.CreateTempDirectory() + else: + delete_dir = False + + was_successful = True + + try: + assert working_dir is not None + yield working_dir + + except: + was_successful = False + raise + + finally: + assert working_dir is not None + + if delete_dir: + was_successful = was_successful and dm.result == 0 + + if was_successful: + shutil.rmtree(working_dir) + else: + if dm.result <= 0: + # dm.result can be 0 if an exception was raised + type_desc = "errors" + elif dm.result > 0: + type_desc = "warnings" + else: + assert False, dm.result # pragma: no cover + + dm.WriteInfo( + f"The temporary directory '{working_dir}' was preserved due to {type_desc}.\n" + ) + + +# ---------------------------------------------------------------------- +# ---------------------------------------------------------------------- +# ---------------------------------------------------------------------- +if __name__ == "__main__": + app() diff --git a/src/FileBackup/Impl/Common.py b/src/FileBackup/Impl/Common.py index ce65b9f..a9ec371 100644 --- a/src/FileBackup/Impl/Common.py +++ b/src/FileBackup/Impl/Common.py @@ -358,8 +358,7 @@ def YieldDataStore( ), ) - is_local_filesystem_override_value_for_testing = True - destination = destination[len("[nonlocal]") :] + is_local_filesystem_override_value_for_testing = False yield FileSystemDataStore( Path(destination), diff --git a/src/FileBackup/Offsite.py b/src/FileBackup/Offsite.py new file mode 100644 index 0000000..8e99f77 --- /dev/null +++ b/src/FileBackup/Offsite.py @@ -0,0 +1,1552 @@ +# ---------------------------------------------------------------------- +# | +# | Offsite.py +# | +# | David Brownell +# | 2024-07-04 11:06:08 +# | +# ---------------------------------------------------------------------- +# | +# | Copyright David Brownell 2024 +# | Distributed under the MIT License. +# | +# ---------------------------------------------------------------------- +"""\ +Copies content to an offsite location: a snapshot is saved after the initial backup and +deltas are applied to that snapshot for subsequent backups. +""" + +import datetime +import itertools +import json +import os +import re +import shutil +import sys +import textwrap +import threading +import uuid + +from contextlib import contextmanager +from dataclasses import dataclass +from enum import auto, Enum +from pathlib import Path +from typing import Any, Callable, cast, Iterator, Pattern + +from dbrownell_Common.ContextlibEx import ExitStack # type: ignore[import-untyped] +from dbrownell_Common import ExecuteTasks # type: ignore[import-untyped] +from dbrownell_Common.InflectEx import inflect # type: ignore[import-untyped] +from dbrownell_Common import PathEx # type: ignore[import-untyped] +from dbrownell_Common.Streams.DoneManager import DoneManager # type: ignore[import-untyped] +from dbrownell_Common import SubprocessEx # type: ignore[import-untyped] +from dbrownell_Common import TextwrapEx # type: ignore[import-untyped] + +from FileBackup.DataStore.FileSystemDataStore import FileSystemDataStore +from FileBackup.DataStore.Interfaces.BulkStorageDataStore import BulkStorageDataStore +from FileBackup.DataStore.Interfaces.FileBasedDataStore import FileBasedDataStore +from FileBackup.Impl import Common +from FileBackup.Snapshot import Snapshot + + +# ---------------------------------------------------------------------- +# | +# | Public Types +# | +# ---------------------------------------------------------------------- +DEFAULT_ARCHIVE_VOLUME_SIZE = 250 * 1024 * 1024 # 250MB + +INDEX_FILENAME = "index.json" +INDEX_HASH_FILENAME = f"{INDEX_FILENAME}.hash" + +ARCHIVE_FILENAME = "data.7z" +DELTA_SUFFIX = ".delta" + + +# ---------------------------------------------------------------------- +@dataclass(frozen=True) +class SnapshotFilenames: + """Filenames used to store snapshot information.""" + + backup_name: str + standard: Path + pending: Path + + # ---------------------------------------------------------------------- + @classmethod + def Create( + cls, + backup_name: str, + ) -> "SnapshotFilenames": + snapshot_filename = PathEx.GetUserDirectory() / f"OffsiteFileBackup.{backup_name}.json" + + return cls( + backup_name, + snapshot_filename, + snapshot_filename.parent + / f"{snapshot_filename.stem}.__pending__{snapshot_filename.suffix}", + ) + + +# ---------------------------------------------------------------------- +# | +# | Public Functions +# | +# ---------------------------------------------------------------------- +def Backup( + dm: DoneManager, + backup_name: str, + destination: str | None, + input_filenames_or_dirs: list[Path], + encryption_password: str | None, + working_dir: Path, + *, + ssd: bool, + force: bool, + quiet: bool, + file_includes: list[Pattern] | None, + file_excludes: list[Pattern] | None, + compress: bool, + archive_volume_size: int = DEFAULT_ARCHIVE_VOLUME_SIZE, + ignore_pending_snapshot: bool = False, + commit_pending_snapshot: bool = True, +) -> None: + # Process the inputs + for input_filename_or_dir in input_filenames_or_dirs: + if not input_filename_or_dir.exists(): + raise Exception(f"'{input_filename_or_dir}' is not a valid filename or directory.") + + if compress or encryption_password: + zip_binary = _GetZipBinary() + else: + zip_binary = None + + snapshot_filenames = SnapshotFilenames.Create(backup_name) + + if snapshot_filenames.pending.is_file(): + if not ignore_pending_snapshot: + dm.WriteError( + textwrap.dedent( + f"""\ + + A pending snapshot exists for the backup '{backup_name}'; this snapshot should be committed before creating updates + to the backup. + + To commit the pending snapshot, run this script with the 'commit' command. + + To ignore this error and delete the pending snapshot, run this script with the '--ignore-pending-snapshot' + argument. + + + """, + ), + ) + + return + + snapshot_filenames.pending.unlink() + + elif ignore_pending_snapshot: + dm.WriteError( + f"A pending snapshot for '{snapshot_filenames.backup_name}' was not found.\n", + ) + return + + # Create the local snapshot + with dm.Nested("Creating the local snapshot...") as local_dm: + local_snapshot = Snapshot.Calculate( + local_dm, + input_filenames_or_dirs, + FileSystemDataStore(), + run_in_parallel=ssd, + quiet=quiet, + filter_filename_func=Common.CreateFilterFunc(file_includes, file_excludes), + ) + + if local_dm.result != 0: + return + + if force or not snapshot_filenames.standard.is_file(): + force = True + + offsite_snapshot = Snapshot( + Snapshot.Node( + None, + None, + Common.DirHashPlaceholder(explicitly_added=False), + None, + ), + ) + else: + with dm.Nested("\nReading the most recent offsite snapshot...") as destination_dm: + offsite_snapshot = Snapshot.LoadPersisted( + destination_dm, + FileSystemDataStore(), + snapshot_filename=snapshot_filenames.standard, + ) + + if destination_dm.result != 0: + return + + # Calculate the differences + diffs: dict[Common.DiffOperation, list[Common.DiffResult]] = Common.CalculateDiffs( + dm, + local_snapshot, + offsite_snapshot, + ) + + if not any(diff_items for diff_items in diffs.values()): + return + + # Capture all of the changes in a temp directory + now = datetime.datetime.now() + + file_content_root = ( + working_dir + / f"{now.year:04}.{now.month:02}.{now.day:02}.{now.hour:02}.{now.minute:02}.{now.second:02}-{now.microsecond:06}{DELTA_SUFFIX if not force else ''}" + ) + + file_content_root.mkdir(parents=True) + file_content_data_store = FileSystemDataStore(file_content_root) + + # ---------------------------------------------------------------------- + def OnExit(): + if destination is None: + template = textwrap.dedent( + f"""\ + + + Content has been written to '{{}}', + however the changes have not been committed yet. + + After the generated content is transferred to an offsite location, run this script + again with the 'commit' command using the backup name '{backup_name}' to ensure that + these changes are not processed when this offsite backup is run again. + + + """, + ) + + elif dm.result == 0: + shutil.rmtree(file_content_root) + return + + else: + if dm.result < 0: + type_desc = "errors" + elif dm.result > 0: + type_desc = "warnings" + else: + assert False, dm.result # pragma: no cover + + template = f"The temporary directory '{{}}' was preserved due to {type_desc}." + + dm.WriteInfo( + "\n" + + template.format( + ( + file_content_root + if dm.capabilities.is_headless + else TextwrapEx.CreateAnsiHyperLink( + f"file:///{working_dir.as_posix()}", + str(working_dir), + ) + ), + ), + ) + + # ---------------------------------------------------------------------- + + with ExitStack(OnExit): + with dm.Nested( + "Preparing file content...", + suffix="\n", + ) as prepare_dm: + if diffs[Common.DiffOperation.add] or diffs[Common.DiffOperation.modify]: + # Create a lookup for the hash values of all existing files at the offsite. + # We will use this information to only copy those files that do not already + # exist at the offsite. + offsite_file_lookup: set[str] = set() + + for node in offsite_snapshot.node.Enum(): + if not node.is_file: + continue + + assert isinstance(node.hash_value, str), node.hash_value + offsite_file_lookup.add(node.hash_value) + + # Gather all the diffs associated with the files that need to be transferred + diffs_to_process: list[Common.DiffResult] = [] + + for diff in itertools.chain( + diffs[Common.DiffOperation.add], + diffs[Common.DiffOperation.modify], + ): + if not diff.path.is_file(): + continue + + assert isinstance(diff.this_hash, str), diff.this_hash + if diff.this_hash in offsite_file_lookup: + continue + + diffs_to_process.append(diff) + offsite_file_lookup.add(diff.this_hash) + + if diffs_to_process: + # Calculate the size requirements + Common.ValidateSizeRequirements( + prepare_dm, + file_content_data_store, + file_content_data_store, + diffs_to_process, + ) + + if prepare_dm.result != 0: + return + + # Preserve the files + with prepare_dm.Nested("\nPreserving files...") as preserve_dm: + # ---------------------------------------------------------------------- + def PrepareTask( + context: Any, + on_simple_status_func: Callable[ # pylint: disable=unused-argument + [str], None + ], + ) -> tuple[int, ExecuteTasks.TransformTasksExTypes.TransformFuncType]: + diff = cast(Common.DiffResult, context) + del context + + # ---------------------------------------------------------------------- + def TransformTask( + status: ExecuteTasks.Status, + ) -> Path: + if not diff.path.is_file(): + raise Exception(f"The file '{diff.path}' was not found.") + + assert isinstance(diff.this_hash, str), diff.this_hash + dest_filename = ( + Path(diff.this_hash[:2]) / diff.this_hash[2:4] / diff.this_hash + ) + + Common.WriteFile( + file_content_data_store, + diff.path, + dest_filename, + lambda bytes_written: cast( + None, status.OnProgress(bytes_written, None) + ), + ) + + return dest_filename + + # ---------------------------------------------------------------------- + + content_size = 0 + if diff.path.is_file(): + content_size = diff.path.stat().st_size + + return content_size, TransformTask + + # ---------------------------------------------------------------------- + + ExecuteTasks.TransformTasksEx( + preserve_dm, + "Processing", + [ + ExecuteTasks.TaskData(str(diff.path), diff) + for diff in diffs_to_process + ], + PrepareTask, + quiet=quiet, + max_num_threads=( + None if file_content_data_store.ExecuteInParallel() else 1 + ), + refresh_per_second=Common.EXECUTE_TASKS_REFRESH_PER_SECOND, + ) + + if preserve_dm.result != 0: + return + + with prepare_dm.Nested( + "\nPreserving index...", + suffix="\n", + ): + index_filename_path = Path(INDEX_FILENAME) + + with file_content_data_store.Open(index_filename_path, "w") as f: + json_diffs: list[dict[str, Any]] = [] + + for these_diffs in diffs.values(): + these_diffs.sort(key=lambda value: str(value.path)) + + for diff in these_diffs: + json_diffs.append(diff.ToJson()) + + json.dump(json_diffs, f) + + with file_content_data_store.Open(Path(INDEX_HASH_FILENAME), "w") as f: + f.write( + Common.CalculateHash( + file_content_data_store, + index_filename_path, + lambda _: None, + ), + ) + + if encryption_password and compress: + heading = "Compressing and encrypting..." + encryption_arg = f' "-p{encryption_password}"' + compression_level = 9 + elif encryption_password: + heading = "Encrypting..." + encryption_arg = f' "-p{encryption_password}"' + compression_level = 0 + elif compress: + heading = "Compressing..." + encryption_arg = "" + compression_level = 9 + else: + heading = None + encryption_arg = None + compression_level = None + + if heading: + with prepare_dm.Nested( + heading, + suffix="\n", + ) as zip_dm: + assert zip_binary is not None + + command_line = f'{zip_binary} a -t7z -mx{compression_level} -ms=on -mhe=on -sccUTF-8 -scsUTF-8 -ssw -v{archive_volume_size} "{ARCHIVE_FILENAME}" {encryption_arg}' + + zip_dm.WriteVerbose(f"Command Line: {_ScrubZipCommandLine(command_line)}\n\n") + + with zip_dm.YieldStream() as stream: + zip_dm.result = SubprocessEx.Stream( + command_line, + stream, + cwd=file_content_root, + ) + + if zip_dm.result != 0: + return + + with prepare_dm.Nested( + "Validating archive...", + suffix="\n", + ) as validate_dm: + assert zip_binary is not None + + command_line = f'{zip_binary} t "{file_content_root / ARCHIVE_FILENAME}.001"{encryption_arg}' + + validate_dm.WriteVerbose( + f"Command Line: {_ScrubZipCommandLine(command_line)}\n\n" + ) + + with validate_dm.YieldStream() as stream: + validate_dm.result = SubprocessEx.Stream(command_line, stream) + + if validate_dm.result != 0: + return + + with prepare_dm.Nested("Cleaning content...") as clean_dm: + for item in file_content_root.iterdir(): + if item.name.startswith(ARCHIVE_FILENAME): + continue + + with clean_dm.VerboseNested(f"Removing '{item}'..."): + if item.is_file(): + item.unlink() + elif item.is_dir(): + shutil.rmtree(item) + else: + assert False, item # pragma: no cover + + if not destination: + with dm.Nested("Preserving the pending snapshot...") as pending_dm: + local_snapshot.Persist( + pending_dm, + FileSystemDataStore(snapshot_filenames.pending), + snapshot_filename=snapshot_filenames.pending, + ) + + if pending_dm.result != 0: + return + + return + + with Common.YieldDataStore( + dm, + destination, + ssd=ssd, + ) as destination_data_store: + if isinstance(destination_data_store, BulkStorageDataStore): + _CommitBulkStorageDataStore( + dm, + file_content_data_store, + destination_data_store, + ) + elif isinstance(destination_data_store, FileBasedDataStore): + _CommitFileBasedDataStore( + dm, + snapshot_filenames, + file_content_data_store, + destination_data_store, + quiet=quiet, + ssd=ssd, + ) + else: + assert False, destination_data_store # pragma: no cover + + if dm.result != 0: + return + + if commit_pending_snapshot: + with dm.Nested("Committing snapshot locally...") as commit_dm: + local_snapshot.Persist( + commit_dm, + FileSystemDataStore(snapshot_filenames.standard.parent), + snapshot_filename=snapshot_filenames.standard, + ) + + +# ---------------------------------------------------------------------- +def Commit( + dm: DoneManager, + backup_name: str, +) -> None: + snapshot_filenames = SnapshotFilenames.Create(backup_name) + + if not snapshot_filenames.pending.is_file(): + dm.WriteError(f"A pending snapshot for the backup '{backup_name}' was not found.\n") + return + + with dm.Nested(f"Committing the pending snapshot for the backup '{backup_name}'..."): + snapshot_filenames.standard.unlink(missing_ok=True) + shutil.move(snapshot_filenames.pending, snapshot_filenames.standard) + + +# ---------------------------------------------------------------------- +def Restore( + dm: DoneManager, + backup_name: str, + data_store_connection_string: str, + encryption_password: str | None, + working_dir: Path, + dir_substitutions: dict[str, str], + *, + ssd: bool, + quiet: bool, + dry_run: bool, + overwrite: bool, +) -> None: + with Common.YieldDataStore( + dm, + data_store_connection_string, + ssd=ssd, + ) as data_store: + if not isinstance(data_store, FileBasedDataStore): + dm.WriteError( + textwrap.dedent( + f"""\ + '{data_store_connection_string}' does not resolve to a file-based data store, which is required when restoring content. + + Most often, this error is encountered when attempting to restore an offsite backup that was + originally transferred to a cloud-based data store. + + To restore these types of offsite backups, copy the content from the original data store + to your local file system and run this script again while pointing to that + location on your file system. This local directory should contain the primary directory + created during the initial backup and all directories created as a part of subsequent backups. + + """, + ), + ) + return + + with _YieldTempDirectory("staging content") as staging_directory: + # ---------------------------------------------------------------------- + @dataclass(frozen=True) + class Instruction: + # ---------------------------------------------------------------------- + operation: Common.DiffOperation + file_content_path: Path | None + original_filename: str + local_filename: Path + + # ---------------------------------------------------------------------- + def __post_init__(self): + assert self.file_content_path is None or self.operation in [ + Common.DiffOperation.add, + Common.DiffOperation.modify, + ] + + # ---------------------------------------------------------------------- + + instructions: dict[str, list[Instruction]] = {} + + # ---------------------------------------------------------------------- + def CountInstructions() -> int: + total = 0 + + for these_instructions in instructions.values(): + total += len(these_instructions) + + return total + + # ---------------------------------------------------------------------- + + with dm.Nested( + "Processing file content...", + lambda: "{} found".format(inflect.no("instruction", CountInstructions())), + ) as preprocess_dm: + backup_name_path = Path(backup_name) + + if data_store.GetItemType(backup_name_path) == Common.ItemType.Dir: + data_store.SetWorkingDir(backup_name_path) + + # We should have a bunch of dirs organized by datetime + offsite_directories: dict[str, list[tuple[str, bool]]] = {} + + for _, directories, filenames in data_store.Walk(): + if filenames: + preprocess_dm.WriteError( + textwrap.dedent( + """\ + Files were not expected: + + {} + + """, + ).format("\n".join(f" - {filename}" for filename in filenames)), + ) + return + + dir_regex = re.compile( + textwrap.dedent( + r"""(?# + Year )(?P\d{{4}})(?# + Month )\.(?P\d{{2}})(?# + Day )\.(?P\d{{2}})(?# + Hour )\.(?P\d{{2}})(?# + Minute )\.(?P\d{{2}})(?# + Second )\.(?P\d{{2}})(?# + Index )-(?P\d+)(?# + Suffix )(?P{})?(?# + )""", + ).format(re.escape(DELTA_SUFFIX)), + ) + + for directory in directories: + match = dir_regex.match(directory) + if not match: + preprocess_dm.WriteError( + f"'{directory}' is not a recognized directory name.\n" + ) + return + + offsite_directories.setdefault(directory, []).append( + ( + directory, + not match.group("suffix"), + ), + ) + + # Only process top-level items + break + + if not offsite_directories: + preprocess_dm.WriteError("No directories were found.\n") + return + + # Sort the directories + keys = list(offsite_directories.keys()) + keys.sort() + + all_directories: list[tuple[str, bool]] = [] + + for key in keys: + all_directories += offsite_directories[key] + + # Ensure that we start processing at the latest primary directory + primary_indexes: list[int] = [] + + for index, (directory, is_primary) in enumerate(all_directories): + if is_primary: + primary_indexes.append(index) + + if not primary_indexes: + preprocess_dm.WriteError("No primary directories were found.\n") + return + + if len(primary_indexes) > 1: + preprocess_dm.WriteError( + textwrap.dedent( + """\ + Multiple primary directories were found. + + {} + + """, + ).format( + "\n".join( + f" - {all_directories[primary_index][0]}" + for primary_index in primary_indexes + ), + ), + ) + return + + directories = [data[0] for data in all_directories[primary_indexes[-1] :]] + + # Process each directory + + # ---------------------------------------------------------------------- + class ProcessDirectoryState(Enum): + Transferring = 0 + Extracting = auto() + Verifying = auto() + Moving = auto() + + # ---------------------------------------------------------------------- + def PrepareTask( + context: Any, + on_simple_status_func: Callable[[str], None], # pylint: disable=unused-argument + ) -> tuple[int, ExecuteTasks.TransformTasksExTypes.TransformFuncType]: + directory = cast(str, context) + del context + + # ---------------------------------------------------------------------- + def ExecuteTask( + status: ExecuteTasks.Status, + ) -> Path: + destination_dir = working_dir / directory + + if destination_dir.is_dir(): + # The destination already exists, no need to process it further + return destination_dir + + with _YieldRestoredArchive( + data_store, # type: ignore + directory, + lambda bytes_transferred: cast( + None, + status.OnProgress( + ProcessDirectoryState.Transferring.value + 1, + bytes_transferred, + ), + ), + ) as (archive_directory, archive_directory_is_temporary): + with _YieldRestoredFiles( + directory, + archive_directory, + encryption_password, + lambda message: cast( + None, + status.OnProgress( + ProcessDirectoryState.Extracting.value + 1, + message, + ), + ), + ) as (contents_dir, contents_dir_is_temporary): + # Validate the contents + _VerifyRestoredFiles( + directory, + contents_dir, + lambda message: cast( + None, + status.OnProgress( + ProcessDirectoryState.Verifying.value + 1, + message, + ), + ), + ) + + # Move/Copy the content. Note that the code assumes a flat + # directory structure and doesn't do anything to account for + # nested dirs. This assumption matches the current archive + # format. + if archive_directory_is_temporary or contents_dir_is_temporary: + func = cast(Callable[[Path, Path], None], shutil.move) + else: + # ---------------------------------------------------------------------- + def CreateSymLink( + source: Path, + dest: Path, + ) -> None: + dest /= source.name + os.symlink( + source, dest, target_is_directory=source.is_dir() + ) + + # ---------------------------------------------------------------------- + + func = CreateSymLink + + temp_dest_dir = destination_dir.parent / ( + destination_dir.name + "__temp__" + ) + + shutil.rmtree(temp_dest_dir, ignore_errors=True) + temp_dest_dir.mkdir(parents=True) + + items = [ + item + for item in contents_dir.iterdir() + if item.name != INDEX_HASH_FILENAME + ] + + for item_index, item in enumerate(items): + status.OnProgress( + ProcessDirectoryState.Moving.value + 1, + f"Moving {item_index + 1} of {len(items)}...", + ) + + func(item, temp_dest_dir) + + shutil.move(temp_dest_dir, destination_dir) + + return destination_dir + + # ---------------------------------------------------------------------- + + return len(ProcessDirectoryState), ExecuteTask + + # ---------------------------------------------------------------------- + + directory_working_dirs: list[Path | None | Exception] = ( + ExecuteTasks.TransformTasksEx( + preprocess_dm, + "Processing", + [ + ExecuteTasks.TaskData(str(directory), directory) + for directory in directories + ], + PrepareTask, + quiet=quiet, + max_num_threads=None if ssd and data_store.ExecuteInParallel() else 1, + refresh_per_second=Common.EXECUTE_TASKS_REFRESH_PER_SECOND, + ) + ) + + if preprocess_dm.result != 0: + return + + assert all( + isinstance(working_dir, Path) for working_dir in directory_working_dirs + ), directory_working_dirs + + with preprocess_dm.Nested("Staging working content...") as stage_dm: + # ---------------------------------------------------------------------- + def HashToFilename( + hash_value: str, + ) -> Path: + return staging_directory / hash_value[:2] / hash_value[2:4] / hash_value + + # ---------------------------------------------------------------------- + def PathToFilename( + path: str, + ) -> Path: + for source_text, dest_text in dir_substitutions.items(): + path = path.replace(source_text, dest_text) + + return Path(path) + + # ---------------------------------------------------------------------- + + file_hashes: set[str] = set() + + for index, (directory, directory_working_dir) in enumerate( + zip(directories, directory_working_dirs) + ): + assert isinstance(directory_working_dir, Path), directory_working_dir + + these_instructions: list[Instruction] = [] + + with stage_dm.Nested( + f"Processing '{directory}' ({index + 1} of {len(directories)})...", + lambda: "{} added".format( + inflect.no("instruction", len(these_instructions)) + ), + ): + # link the content + for root_str, _, filenames in os.walk( + directory_working_dir, + followlinks=True, + ): + root = Path(root_str) + + if root == directory_working_dir: + continue + + for filename in filenames: + fullpath = root / filename + + dest_filename = staging_directory / fullpath.relative_to( + directory_working_dir + ) + + dest_filename.parent.mkdir(parents=True, exist_ok=True) + + os.symlink(fullpath, dest_filename) + + # Read the instructions + with (directory_working_dir / INDEX_FILENAME).open() as f: + json_content = json.load(f) + + # TODO: Validate json against a schema + + for item_index, item in enumerate(json_content): + try: + assert "operation" in item, item + + if item["operation"] == "add": + hash_value = item.get("this_hash", None) + + if hash_value is None: + # We need to create a directory + hash_filename = None + else: + hash_filename = HashToFilename(hash_value) + file_hashes.add(hash_value) + + these_instructions.append( + Instruction( + Common.DiffOperation.add, + hash_filename, + item["path"], + PathToFilename(item["path"]), + ), + ) + + elif item["operation"] == "modify": + if item["other_hash"] not in file_hashes: + raise Exception( + "The original file does not exist in the staged content." + ) + + new_hash_filename = HashToFilename(item["this_hash"]) + file_hashes.add(item["this_hash"]) + + these_instructions.append( + Instruction( + Common.DiffOperation.modify, + new_hash_filename, + item["path"], + PathToFilename(item["path"]), + ), + ) + + elif item["operation"] == "remove": + hash_value = item.get("other_hash", None) + + if hash_value is not None: + if item["other_hash"] not in file_hashes: + raise Exception( + "The referenced file does not exist in the staged content." + ) + + these_instructions.append( + Instruction( + Common.DiffOperation.remove, + None, + item["path"], + PathToFilename(item["path"]), + ), + ) + + else: + assert False, item["operation"] # pragma: no cover + + except Exception as ex: + raise Exception( + textwrap.dedent( + """\ + An error was encountered while processing '{}' [Index: {}]. + + Original Filename: {} + Error: {} + + """, + ).format( + directory, + item_index, + item["path"], + str(ex), + ), + ) from ex + + assert these_instructions + instructions[directory] = these_instructions + + with dm.Nested("\nProcessing instructions...") as all_instructions_dm: + all_instructions_dm.WriteLine("") + + temp_directory = PathEx.CreateTempDirectory() + + with ExitStack(lambda: shutil.rmtree(temp_directory)): + commit_actions: list[Callable[[], None]] = [] + + # ---------------------------------------------------------------------- + def WriteImpl( + local_filename: Path, + content_filename: Path | None, + ) -> None: + if content_filename is None: + # ---------------------------------------------------------------------- + def CommitDir() -> None: + if local_filename.is_dir(): + shutil.rmtree(local_filename) + else: + local_filename.unlink(missing_ok=True) + + local_filename.mkdir(parents=True) + + # ---------------------------------------------------------------------- + + commit_actions.append(CommitDir) + return + + temp_filename = temp_directory / str(uuid.uuid4()) + + with content_filename.resolve().open("rb") as source: + with temp_filename.open("wb") as dest: + dest.write(source.read()) + + # ---------------------------------------------------------------------- + def CommitFile() -> None: + if local_filename.is_dir(): + shutil.rmtree(local_filename) + elif local_filename.is_file(): + local_filename.unlink() + + local_filename.parent.mkdir(parents=True, exist_ok=True) + shutil.move(temp_filename, local_filename) + + # ---------------------------------------------------------------------- + + commit_actions.append(CommitFile) + + # ---------------------------------------------------------------------- + def OnAddInstruction( + dm: DoneManager, + instruction: Instruction, + ) -> None: + if instruction.local_filename.exists() and not overwrite: + dm.WriteError( + f"The local item '{instruction.local_filename}' exists and will not be overwritten.\n", + ) + return + + WriteImpl(instruction.local_filename, instruction.file_content_path) + + # ---------------------------------------------------------------------- + def OnModifyInstruction( + dm: DoneManager, # pylint: disable=unused-argument + instruction: Instruction, + ) -> None: + assert instruction.file_content_path is not None + WriteImpl(instruction.local_filename, instruction.file_content_path) + + # ---------------------------------------------------------------------- + def OnRemoveInstruction( + dm: DoneManager, # pylint: disable=unused-argument + instruction: Instruction, + ) -> None: + # ---------------------------------------------------------------------- + def RemoveItem(): + if instruction.local_filename.is_file(): + instruction.local_filename.unlink() + elif instruction.local_filename.is_dir(): + shutil.rmtree(instruction.local_filename) + + # ---------------------------------------------------------------------- + + commit_actions.append(RemoveItem) + + # ---------------------------------------------------------------------- + + operation_map: dict[ + Common.DiffOperation, + tuple[ + str, # Heading prefix + Callable[[DoneManager, Instruction], None], + ], + ] = { + Common.DiffOperation.add: ("Restoring", OnAddInstruction), + Common.DiffOperation.modify: ("Updating", OnModifyInstruction), + Common.DiffOperation.remove: ("Removing", OnRemoveInstruction), + } + + for directory_index, (directory, these_instructions) in enumerate( + instructions.items() + ): + with all_instructions_dm.Nested( + f"Processing '{directory}' ({directory_index + 1} of {len(instructions)})...", + suffix="\n", + ) as instructions_dm: + with instructions_dm.YieldStream() as stream: + stream.write( + textwrap.dedent( + """\ + + {} + """, + ).format( + TextwrapEx.CreateTable( + [ + "Operation", + "Local Location", + "Original Location", + ], + [ + [ + f"[{instruction.operation.name.upper()}]", + str(instruction.local_filename), + instruction.original_filename, + ] + for instruction in these_instructions + ], + [ + TextwrapEx.Justify.Center, + TextwrapEx.Justify.Left, + TextwrapEx.Justify.Left, + ], + ), + ), + ) + + if not dry_run: + for instruction_index, instruction in enumerate(these_instructions): + prefix, on_instruction_func = operation_map[ + instruction.operation + ] + + with instructions_dm.Nested( + f"{prefix} the {'file' if instruction.file_content_path is not None else 'directory'} '{instruction.local_filename}' ({instruction_index + 1} of {len(these_instructions)})...", + ) as execute_dm: + on_instruction_func(execute_dm, instruction) + + if execute_dm.result != 0: + break + + instructions_dm.WriteLine("") + + if instructions_dm.result != 0: + break + + # Commit + with all_instructions_dm.Nested("Committing content..."): + for commit_action in commit_actions: + commit_action() + + +# ---------------------------------------------------------------------- +# | +# | Private Functions +# | +# ---------------------------------------------------------------------- +# Not using functools.cache here, as we want the function to generate exceptions each time it is +# invoked, but only calculate the results once. +_get_zip_binary_result: str | Exception | None = None +_get_zip_binary_result_lock = threading.Lock() + + +def _GetZipBinary() -> str: + global _get_zip_binary_result # pylint: disable=global-statement + + with _get_zip_binary_result_lock: + if _get_zip_binary_result is None: + for binary_name in ["7z", "7zz"]: + result = SubprocessEx.Run(binary_name) + if result.returncode != 0: + continue + + _get_zip_binary_result = binary_name + break + + if _get_zip_binary_result is None: + _get_zip_binary_result = Exception( + "7zip is not available for compression and/or encryption; please add it to the path before invoking this script." + ) + + if isinstance(_get_zip_binary_result, Exception): + raise _get_zip_binary_result + + return _get_zip_binary_result + + +# ---------------------------------------------------------------------- +def _ScrubZipCommandLine( + command_line: str, +) -> str: + """Produces a string suitable for display within a log file""" + + return re.sub( + r'"-p(?P\\\"|[^\"])+\"', + '"-p*****"', + command_line, + ) + + +# ---------------------------------------------------------------------- +def _CommitBulkStorageDataStore( + dm: DoneManager, + file_content_data_store: FileSystemDataStore, + destination_data_store: BulkStorageDataStore, +) -> None: + # We want to include the data-based directory in the upload, so upload the file + # content root parent rather than the file content root itself. + destination_data_store.Upload(dm, file_content_data_store.GetWorkingDir().parent) + + +# ---------------------------------------------------------------------- +def _CommitFileBasedDataStore( + dm: DoneManager, + snapshot_filenames: SnapshotFilenames, + file_content_data_store: FileSystemDataStore, + destination_data_store: FileBasedDataStore, + *, + quiet: bool, + ssd: bool, +) -> None: + destination_data_store.SetWorkingDir(Path(snapshot_filenames.backup_name)) + + # Get the files + transfer_diffs: list[Common.DiffResult] = [] + + for root, _, filenames in file_content_data_store.Walk(): + transfer_diffs += [ + Common.DiffResult( + Common.DiffOperation.add, + filename, + "ignore", + filename.stat().st_size, + None, + None, + ) + for filename in [root / filename for filename in filenames] + ] + + Common.ValidateSizeRequirements( + dm, + file_content_data_store, + destination_data_store, + transfer_diffs, + header="Validating destination size requirements...", + ) + + if dm.result != 0: + return + + dm.WriteLine("") + + with dm.Nested( + "Transferring content to the destination...", + suffix="\n", + ) as transfer_dm: + file_content_root = file_content_data_store.GetWorkingDir() + + # ---------------------------------------------------------------------- + def StripPath( + path: Path, + extension: str, + ) -> Path: + return ( + Path(file_content_root.name) + / path.parent.relative_to(file_content_root) + / (path.name + extension) + ) + + # ---------------------------------------------------------------------- + + pending_items = Common.CopyLocalContent( + transfer_dm, + destination_data_store, + transfer_diffs, + StripPath, + quiet=quiet, + ssd=ssd, + ) + + if transfer_dm.result != 0: + return + + if not any(pending_item for pending_item in pending_items): + transfer_dm.WriteError("No content was transferred.\n") + return + + with dm.Nested( + "Committing content on the destination...", + suffix="\n", + ) as commit_dm: + # ---------------------------------------------------------------------- + def CommitContext( + context: Any, + status: ExecuteTasks.Status, # pylint: disable=unused-argument + ) -> None: + fullpath = cast(Path, context) + del context + + destination_data_store.Rename(fullpath, fullpath.with_suffix("")) + + # ---------------------------------------------------------------------- + + ExecuteTasks.TransformTasks( + commit_dm, + "Processing", + [ + ExecuteTasks.TaskData(str(pending_item), pending_item) + for pending_item in pending_items + if pending_item + ], + CommitContext, + quiet=quiet, + max_num_threads=None if destination_data_store.ExecuteInParallel() else 1, + refresh_per_second=Common.EXECUTE_TASKS_REFRESH_PER_SECOND, + ) + + if commit_dm.result != 0: + return + + +# ---------------------------------------------------------------------- +@contextmanager +def _YieldTempDirectory( + desc: str, +) -> Iterator[Path]: + temp_directory = PathEx.CreateTempDirectory() + should_delete = True + + try: + yield temp_directory + except: + should_delete = False + raise + finally: + if should_delete: + shutil.rmtree(temp_directory) + else: + sys.stderr.write( + f"**** The temporary directory '{temp_directory}' was preserved due to errors while {desc}.\n", + ) + + +# ---------------------------------------------------------------------- +@contextmanager +def _YieldRestoredArchive( + data_store: FileBasedDataStore, + directory: str, + status_func: Callable[[str], None], +) -> Iterator[ + tuple[ + Path, + bool, # is temporary directory + ], +]: + if data_store.is_local_filesystem: + working_dir = data_store.GetWorkingDir() / directory + assert working_dir.is_dir(), working_dir + + yield working_dir, False + return + + status_func("Calculating files to transfer...") + + with _YieldTempDirectory("transferring archive files") as temp_directory: + # Map the remote filenames to local filenames + filename_map: dict[Path, Path] = {} + + # Don't change the data store's working dir, as multiple threads might be accessing it at + # the same time. That does make this code a bit more complicated. + data_store_dir = data_store.GetWorkingDir() / directory + + for root, _, filenames in data_store.Walk(Path(directory)): + relative_root = root.relative_to(data_store_dir) + + for filename in filenames: + filename_map[root / filename] = temp_directory / relative_root / filename + + if not filename_map: + raise Exception(f"The directory '{directory}' does not contain any files.") + + # Transfer the files + for filename_index, (source_filename, dest_filename) in enumerate(filename_map.items()): + file_size = data_store.GetFileSize(source_filename) or 1 + + status_template = f"Transferring '{source_filename}' ({filename_index + 1} of {len(filename_map)}) [{PathEx.GetSizeDisplay(file_size)}] {{:.02f}}%..." + + Common.WriteFile( + data_store, + source_filename, + dest_filename, + lambda bytes_transferred: status_func( + status_template.format((bytes_transferred / file_size) * 100) + ), + ) + + yield temp_directory, True + + +# ---------------------------------------------------------------------- +@contextmanager +def _YieldRestoredFiles( + directory_name: str, + archive_dir: Path, + encryption_password: str | None, + status_func: Callable[[str], None], +) -> Iterator[ + tuple[ + Path, + bool, # is temporary directory + ], +]: + if (archive_dir / INDEX_FILENAME).is_file(): + yield archive_dir, False + return + + # By default, 7zip will prompt for a password with archives that were created + # with a password but no password was provided. This is not what we want, as + # it will block indefinitely. Instead, employ this workaround suggested at + # https://sourceforge.net/p/sevenzip/discussion/45798/thread/2b98fd92/. + # + # 1) Attempt to extract with a bogus password; this will work for archives + # created without a password. + # + # 2) If extraction fails, issue an error. + # + password = encryption_password or str(uuid.uuid4()) + + # Validate + status_func("Validating archive...") + + archive_filename = archive_dir / (ARCHIVE_FILENAME + ".001") + + if not archive_filename.is_file(): + raise Exception(f"The archive file '{archive_filename.name}' was not found.") + + result = SubprocessEx.Run(f'{_GetZipBinary()} t "{archive_filename}" "-p{password}"') + if result.returncode != 0: + raise Exception( + textwrap.dedent( + """\ + Archive validation failed for the directory '{}' ({}). + + + {} + + """, + ).format( + directory_name, + result.returncode, + TextwrapEx.Indent( + result.output.strip(), + 4, + skip_first_line=True, + ), + ), + ) + + # Extract + status_func("Extracting archive...") + + with _YieldTempDirectory("extracting the archive") as temp_directory: + result = SubprocessEx.Run( + f'{_GetZipBinary()} x "{archive_filename}" "-p{password}"', + cwd=temp_directory, + ) + + if result.returncode != 0: + raise Exception( + textwrap.dedent( + """\ + Archive extraction failed for the directory '{}' ({}). + + {} + + """, + ).format( + directory_name, + result.returncode, + TextwrapEx.Indent( + result.output.strip(), + 4, + skip_first_line=True, + ), + ), + ) + + yield temp_directory, True + + +# ---------------------------------------------------------------------- +def _VerifyRestoredFiles( + directory_name: str, + contents_dir: Path, + status_func: Callable[[str], None], +) -> None: + # Ensure that the index is present + for index_filename in [INDEX_FILENAME, INDEX_HASH_FILENAME]: + if not (contents_dir / index_filename).is_file(): + raise Exception(f"The index file '{index_filename}' does not exist.") + + # Ensure that the content is valid + all_filenames: list[Path] = [] + + for root_str, _, filenames in os.walk(contents_dir): + root = Path(root_str) + + all_filenames += [ + root / filename for filename in filenames if filename != INDEX_HASH_FILENAME + ] + + data_store = FileSystemDataStore() + + errors: list[str] = [] + + for filename_index, filename in enumerate(all_filenames): + if filename.name == INDEX_FILENAME: + expected_hash_value = (contents_dir / INDEX_HASH_FILENAME).read_text().strip() + else: + expected_hash_value = filename.name + + file_size = filename.stat().st_size or 1 + + status_template = f"Validating file {filename_index + 1} of {len(all_filenames)} [{PathEx.GetSizeDisplay(file_size)}] {{:.02f}}%..." + + actual_hash_value = Common.CalculateHash( + data_store, + filename, + lambda bytes_transferred: status_func( + status_template.format((bytes_transferred / file_size) * 100) + ), + ) + + if actual_hash_value != expected_hash_value: + errors.append( + textwrap.dedent( + f"""\ + Filename: {filename.relative_to(contents_dir)} + Expected: {expected_hash_value} + Actual: {actual_hash_value} + """, + ), + ) + + if errors: + raise Exception( + textwrap.dedent( + """\ + Corrupt files were encountered in the directory '{}'. + + {} + + """, + ).format( + directory_name, + TextwrapEx.Indent( + "\n".join(errors), + 4, + skip_first_line=True, + ), + ), + ) diff --git a/tests/CommandLine/EntryPoint_UnitTest.py b/tests/CommandLine/EntryPoint_UnitTest.py new file mode 100644 index 0000000..a52bd37 --- /dev/null +++ b/tests/CommandLine/EntryPoint_UnitTest.py @@ -0,0 +1,522 @@ +# ---------------------------------------------------------------------- +# | +# | EntryPoint_UnitTest.py +# | +# | David Brownell +# | 2024-07-12 10:35:31 +# | +# ---------------------------------------------------------------------- +# | +# | Copyright David Brownell 2024 +# | Distributed under the MIT License. +# | +# ---------------------------------------------------------------------- +"""Unit tests for EntryPoint.py""" + +import re +import uuid + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from dbrownell_Common.Streams.DoneManager import DoneManager +from dbrownell_Common.TestHelpers.StreamTestHelpers import InitializeStreamCapabilities +from typer.testing import CliRunner + +from FileBackup import __version__ +from FileBackup.CommandLine.EntryPoint import app +from FileBackup.Mirror import ValidateType +from FileBackup.Offsite import DEFAULT_ARCHIVE_VOLUME_SIZE + + +# ---------------------------------------------------------------------- +_this_file = Path(__file__) + + +@pytest.fixture(InitializeStreamCapabilities(), scope="session", autouse=True) + + +# ---------------------------------------------------------------------- +def test_Version(): + result = CliRunner().invoke(app, ["version"]) + + assert result.exit_code == 0 + assert result.output == f"FileBackup v{__version__}\n" + + +# ---------------------------------------------------------------------- +def test_Help(): + result = CliRunner().invoke(app, "--help") + + assert result.exit_code == 0 + assert "Tools to backup and restore files and directories." in result.output + assert "version" in result.output + assert "mirror" in result.output + assert "offsite" in result.output + + +# ---------------------------------------------------------------------- +class TestMirror: + class TestExecute: + # ---------------------------------------------------------------------- + def test_Standard(self, tmp_path): + with patch("FileBackup.Mirror.Backup") as backup: + result = CliRunner().invoke( + app, + [ + "mirror", + "execute", + str(tmp_path), + str(_this_file.parent), + ], + ) + + assert result.exit_code == 0 + + args = backup.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == str(tmp_path), args[1] + assert args[2] == [_this_file.parent], args[2] + + kwargs = backup.call_args_list[0].kwargs + + assert kwargs == { + "ssd": False, + "force": False, + "quiet": False, + "file_includes": [], + "file_excludes": [], + } + + # ---------------------------------------------------------------------- + def test_WithFlags(self, tmp_path): + with patch("FileBackup.Mirror.Backup") as backup: + result = CliRunner().invoke( + app, + [ + "mirror", + "execute", + str(tmp_path), + str(_this_file.parent), + "--ssd", + "--force", + "--quiet", + "--file-include", + "one", + "--file-include", + "two", + "--file-exclude", + "three", + "--file-exclude", + "four", + "--file-exclude", + "five", + ], + ) + + assert result.exit_code == 0 + + args = backup.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == str(tmp_path), args[1] + assert args[2] == [_this_file.parent], args[2] + + kwargs = backup.call_args_list[0].kwargs + + assert kwargs == { + "ssd": True, + "force": True, + "quiet": True, + "file_includes": [re.compile("^one$"), re.compile("^two$")], + "file_excludes": [ + re.compile("^three$"), + re.compile("^four$"), + re.compile("^five$"), + ], + } + + # ---------------------------------------------------------------------- + def test_ErrorBadRegex(self, tmp_path): + expression = "(?:not_valid" + + result = CliRunner().invoke( + app, + [ + "mirror", + "execute", + str(tmp_path), + str(_this_file.parent), + "--file-include", + expression, + ], + ) + + assert result.exit_code != 0 + assert f"The regular expression '{expression}' is not valid" in result.output + + # ---------------------------------------------------------------------- + def test_Help(self): + result = CliRunner().invoke(app, ["mirror", "execute", "--help"]) + + assert result.exit_code == 0 + + assert "Mirrors content to a backup data store." in result.output + assert "Data Store Destinations" in result.output + + # ---------------------------------------------------------------------- + class TestValidate: + # ---------------------------------------------------------------------- + def test_Standard(self, tmp_path): + with patch("FileBackup.Mirror.Validate") as validate: + result = CliRunner().invoke(app, ["mirror", "validate", str(tmp_path)]) + + assert result.exit_code == 0 + + args = validate.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == str(tmp_path), args[1] + assert args[2] == ValidateType.standard, args[2] + + kwargs = validate.call_args_list[0].kwargs + + assert kwargs == { + "ssd": False, + "quiet": False, + } + + # ---------------------------------------------------------------------- + def test_WithFlags(self, tmp_path): + with patch("FileBackup.Mirror.Validate") as validate: + result = CliRunner().invoke( + app, + [ + "mirror", + "validate", + str(tmp_path), + ValidateType.complete.name, + "--ssd", + "--quiet", + ], + ) + + assert result.exit_code == 0 + + args = validate.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == str(tmp_path), args[1] + assert args[2] == ValidateType.complete, args[2] + + kwargs = validate.call_args_list[0].kwargs + + assert kwargs == { + "ssd": True, + "quiet": True, + } + + # ---------------------------------------------------------------------- + def test_Help(self): + result = CliRunner().invoke(app, ["mirror", "validate", "--help"]) + + assert result.exit_code == 0 + + assert ( + "Validates previously mirrored content in the backup data store." in result.output + ) + assert "Data Store Destinations" in result.output + + # ---------------------------------------------------------------------- + class TestCleanup: + # ---------------------------------------------------------------------- + def test_Standard(self, tmp_path): + with patch("FileBackup.Mirror.Cleanup") as cleanup: + result = CliRunner().invoke(app, ["mirror", "cleanup", str(tmp_path)]) + + assert result.exit_code == 0 + + args = cleanup.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == str(tmp_path), args[1] + + kwargs = cleanup.call_args_list[0].kwargs + + assert kwargs == {} + + # ---------------------------------------------------------------------- + def test_Help(self): + result = CliRunner().invoke(app, ["mirror", "cleanup", "--help"]) + + assert result.exit_code == 0 + + assert ( + "Cleans a backup data store after a mirror execution that was interrupted or failed." + in result.output + ) + assert "Data Store Destinations" in result.output + + +# ---------------------------------------------------------------------- +class TestOffsite: + # ---------------------------------------------------------------------- + class TestExecute: + # ---------------------------------------------------------------------- + def test_Standard(self, tmp_path): + with patch("FileBackup.Offsite.Backup") as backup: + result = CliRunner().invoke( + app, + [ + "offsite", + "execute", + "BackupName", + str(tmp_path), + str(_this_file.parent), + ], + ) + + assert result.exit_code == 0 + + args = backup.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == "BackupName", args[1] + assert args[2] == str(tmp_path), args[2] + assert args[3] == [_this_file.parent], args[3] + assert args[4] is None, args[4] # encryption password + assert isinstance(args[5], Path), args[5] # working dir + + kwargs = backup.call_args_list[0].kwargs + + assert kwargs == { + "compress": False, + "ssd": False, + "force": False, + "quiet": False, + "file_includes": [], + "file_excludes": [], + "archive_volume_size": DEFAULT_ARCHIVE_VOLUME_SIZE, + "ignore_pending_snapshot": False, + } + + # ---------------------------------------------------------------------- + def test_WithFlags(self, tmp_path): + with patch("FileBackup.Offsite.Backup") as backup: + backup_name = str(uuid.uuid4()) + encryption_password = str(uuid.uuid4()) + working_dir = tmp_path / "working_dir" + archive_volume_size = DEFAULT_ARCHIVE_VOLUME_SIZE // 2 + + result = CliRunner().invoke( + app, + [ + "offsite", + "execute", + backup_name, + str(tmp_path), + str(_this_file.parent), + "--encryption-password", + encryption_password, + "--compress", + "--ssd", + "--force", + "--quiet", + "--working-dir", + working_dir, + "--archive-volume-size", + str(archive_volume_size), + "--ignore-pending-snapshot", + "--file-include", + "one", + "--file-include", + "two", + "--file-exclude", + "three", + "--file-exclude", + "four", + "--file-exclude", + "five", + ], + ) + + assert result.exit_code == 0 + + args = backup.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == backup_name, args[1] + assert args[2] == str(tmp_path), args[2] + assert args[3] == [_this_file.parent], args[3] + assert args[4] == encryption_password, args[4] + assert args[5] == working_dir, args[5] + + kwargs = backup.call_args_list[0].kwargs + + assert kwargs == { + "compress": True, + "ssd": True, + "force": True, + "quiet": True, + "file_includes": [re.compile("^one$"), re.compile("^two$")], + "file_excludes": [ + re.compile("^three$"), + re.compile("^four$"), + re.compile("^five$"), + ], + "archive_volume_size": archive_volume_size, + "ignore_pending_snapshot": True, + } + + # ---------------------------------------------------------------------- + def test_ErrorBadRegex(self, tmp_path): + expression = "(?:not_valid" + + result = CliRunner().invoke( + app, + [ + "offsite", + "execute", + "BackupName", + str(tmp_path), + str(_this_file.parent), + "--file-include", + expression, + ], + ) + + assert result.exit_code != 0 + assert f"The regular expression '{expression}' is not valid" in result.output + + # ---------------------------------------------------------------------- + def test_Help(self): + result = CliRunner().invoke(app, ["offsite", "execute", "--help"]) + + assert result.exit_code == 0 + + assert "Prepares local changes for offsite backup." in result.output + assert "Data Store Destinations" in result.output + + # ---------------------------------------------------------------------- + class TestCommit: + # ---------------------------------------------------------------------- + def test_Standard(self, tmp_path): + with patch("FileBackup.Offsite.Commit") as commit: + result = CliRunner().invoke(app, ["offsite", "commit", "BackupName"]) + + assert result.exit_code == 0 + + args = commit.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == "BackupName", args[1] + + kwargs = commit.call_args_list[0].kwargs + + assert kwargs == {} + + # ---------------------------------------------------------------------- + def test_Help(self): + result = CliRunner().invoke(app, ["offsite", "commit", "--help"]) + + assert result.exit_code == 0 + + assert ( + "Commits a pending snapshot after the changes have been transferred to an offsite data store." + in result.output + ) + + # ---------------------------------------------------------------------- + class TestRestore: + # ---------------------------------------------------------------------- + def test_Standard(self, tmp_path): + with patch("FileBackup.Offsite.Restore") as restore: + result = CliRunner().invoke( + app, ["offsite", "restore", "BackupName", str(tmp_path)] + ) + + assert result.exit_code == 0 + + args = restore.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == "BackupName", args[1] + assert args[2] == str(tmp_path), args[2] + assert args[3] is None, args[3] # encryption password + assert isinstance(args[4], Path), args[4] # working dir + assert args[5] == {} + + kwargs = restore.call_args_list[0].kwargs + + assert kwargs == { + "ssd": False, + "quiet": False, + "dry_run": False, + "overwrite": False, + } + + # ---------------------------------------------------------------------- + def test_WithFlags(self, tmp_path): + with patch("FileBackup.Offsite.Restore") as restore: + backup_name = str(uuid.uuid4()) + encryption_password = str(uuid.uuid4()) + working_dir = tmp_path / "working_dir" + dir_subs = { + "one": "two", + "three": "four", + } + + args = [ + "offsite", + "restore", + backup_name, + str(tmp_path), + "--working-dir", + working_dir, + "--encryption-password", + encryption_password, + ] + + for k, v in dir_subs.items(): + args += ["--dir-substitution", f"{k}:{v}"] + + args += [ + "--dry-run", + "--overwrite", + "--ssd", + "--quiet", + ] + + result = CliRunner().invoke(app, args) + + assert result.exit_code == 0 + + args = restore.call_args_list[0].args + + assert isinstance(args[0], DoneManager), args[0] + assert args[1] == backup_name, args[1] + assert args[2] == str(tmp_path), args[2] + assert args[3] == encryption_password, args[3] + assert args[4] == working_dir, args[4] + assert args[5] == dir_subs + + kwargs = restore.call_args_list[0].kwargs + + assert kwargs == { + "ssd": True, + "quiet": True, + "dry_run": True, + "overwrite": True, + } + + # ---------------------------------------------------------------------- + def test_Help(self): + result = CliRunner().invoke(app, ["offsite", "restore", "--help"]) + + assert result.exit_code == 0 + + assert "Restores content from an offsite data store." in result.output + assert "Data Store Destinations" in result.output diff --git a/tests/Mirror_UnitTest.py b/tests/Mirror_UnitTest.py index 20c8472..bb747b5 100644 --- a/tests/Mirror_UnitTest.py +++ b/tests/Mirror_UnitTest.py @@ -23,11 +23,10 @@ import pytest -from dbrownell_Common.Streams.DoneManager import DoneManager, Flags as DoneManagerFlags +from dbrownell_Common.Streams.DoneManager import DoneManager from dbrownell_Common.TestHelpers.StreamTestHelpers import ( GenerateDoneManagerAndContent, InitializeStreamCapabilities, - ScrubDuration, ) from FileBackup.Mirror import * diff --git a/tests/Offsite_UnitTest.py b/tests/Offsite_UnitTest.py new file mode 100644 index 0000000..915e374 --- /dev/null +++ b/tests/Offsite_UnitTest.py @@ -0,0 +1,1684 @@ +# ---------------------------------------------------------------------- +# | +# | Offsite_UnitTest.py +# | +# | David Brownell +# | 2024-07-07 13:25:03 +# | +# ---------------------------------------------------------------------- +# | +# | Copyright David Brownell 2024 +# | Distributed under the MIT License. +# | +# ---------------------------------------------------------------------- +"""Unit tests for Offsite.py""" + +import os +import re + +from contextlib import contextmanager +from dataclasses import dataclass +from pathlib import Path +from typing import cast, Iterator +from unittest import mock +from unittest.mock import Mock + +import pytest + +from dbrownell_Common.TestHelpers.StreamTestHelpers import GenerateDoneManagerAndContent + +from FileBackup.Offsite import * + +import TestHelpers + + +# ---------------------------------------------------------------------- +def test_SnapshotFilenames(): + sfs = SnapshotFilenames.Create("the_name") + + assert sfs.backup_name == "the_name" + assert sfs.standard == PathEx.GetUserDirectory() / "OffsiteFileBackup.the_name.json" + assert sfs.pending == PathEx.GetUserDirectory() / "OffsiteFileBackup.the_name.__pending__.json" + + +# ---------------------------------------------------------------------- +class TestBackup: + # ---------------------------------------------------------------------- + def test_InvalidInput(self): + with pytest.raises( + Exception, + match="'foo' is not a valid filename or directory.", + ): + Backup( + Mock(), + "Backup", + None, + [Path("foo")], + None, + Path.cwd(), + ssd=False, + force=False, + quiet=False, + file_includes=None, + file_excludes=None, + compress=False, + ) + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_Standard(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_NoChanges(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + # No changes + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_AddSingleFile(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + with (_working_dir / "New File").open("w") as f: + f.write("New File") + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + assert len(backup_item_info.filenames) == 3 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_AddMultipleFiles(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + with (_working_dir / "New File 1").open("w") as f: + f.write("New File 1") + + with (_working_dir / "New File 2").open("w") as f: + f.write("New File 2") + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + assert len(backup_item_info.filenames) == 4 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_AddMultipleFilesSameContent( + self, _working_dir, tmp_path_factory, compress, encryption_password + ): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + with (_working_dir / "New File 1").open("w") as f: + f.write("New File") + + with (_working_dir / "New File 2").open("w") as f: + f.write("New File") + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + assert len(backup_item_info.filenames) == 3 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_AddDir(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + (_working_dir / "New Directory 1").mkdir() + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + # index and index hash + assert len(backup_item_info.filenames) == 2 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_AddMultipleDirs(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + (_working_dir / "New Directory 1").mkdir() + (_working_dir / "New Directory 2").mkdir() + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + # index and index hash + assert len(backup_item_info.filenames) == 2 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_RemoveFile(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + (_working_dir / "one" / "A").unlink() + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + # index and index hash + assert len(backup_item_info.filenames) == 2 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_RemoveMultipleFile( + self, _working_dir, tmp_path_factory, compress, encryption_password + ): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + (_working_dir / "one" / "A").unlink() + (_working_dir / "two" / "Dir1" / "File3").unlink() + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + # index and index hash + assert len(backup_item_info.filenames) == 2 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_RemoveDir(self, _working_dir, tmp_path_factory, compress, encryption_password): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + shutil.rmtree(_working_dir / "one") + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + # index and index hash + assert len(backup_item_info.filenames) == 2 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_RemoveMultipleDirs( + self, _working_dir, tmp_path_factory, compress, encryption_password + ): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as helper: + shutil.rmtree(_working_dir / "one") + shutil.rmtree(_working_dir / "two" / "Dir2") + + helper.ExecuteBackup(_working_dir, compress, encryption_password) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + if not compress and encryption_password is None: + # index and index hash + assert len(backup_item_info.filenames) == 2 + else: + assert len(backup_item_info.filenames) == 1 + + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + def test_FileToDir(self, _working_dir, tmp_path_factory): + with _YieldInitializedBackupHelper(tmp_path_factory, _working_dir, False, None) as helper: + (_working_dir / "one" / "A").unlink() + (_working_dir / "one" / "A").mkdir() + + helper.ExecuteBackup(_working_dir, False, None) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + # index and index hash + assert len(backup_item_info.filenames) == 2 + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + def test_DirToFile(self, _working_dir, tmp_path_factory): + with _YieldInitializedBackupHelper(tmp_path_factory, _working_dir, False, None) as helper: + with (_working_dir / "one" / "Dir1").open("w") as f: + f.write("This is a change") + + helper.ExecuteBackup(_working_dir, False, None) + + result = helper.GetBackupInfo() + + assert len(result.primary_dirs) == 1 + assert len(result.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(result.delta_dirs[0]) + + # index and index hash and file content + assert len(backup_item_info.filenames) == 3 + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.skipif( + os.name != "nt", + reason="This test is running into what I believe to be timing issues associated with the quick turnaround time on Linux and MacOS", + ) + def test_MultipleChanges(self, _working_dir, tmp_path_factory): + with _YieldInitializedBackupHelper(tmp_path_factory, _working_dir, False, None) as helper: + num_deltas = 3 + num_new_files = 0 + + for backup_ctr in range(num_deltas): + for file_ctr in range(backup_ctr + 1): + with ( + _working_dir + / "NewFile-MultipleChanges-{}-{}.txt".format(backup_ctr, file_ctr) + ).open("w") as f: + f.write("{}-{}\n{}\n".format(backup_ctr, file_ctr, uuid.uuid4())) + + num_new_files += backup_ctr + 1 + + helper.ExecuteBackup(_working_dir, False, None) + + backup_info = helper.GetBackupInfo() + + assert len(backup_info.primary_dirs) == 1 + assert len(backup_info.delta_dirs) == backup_ctr + 1 + + backup_item_info = _PathInfo.Create(backup_info.delta_dirs[-1]) + + # Index and index hash + number of files written + assert len(backup_item_info.filenames) == 2 + backup_ctr + 1 + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # Force a backup + helper.ExecuteBackup(_working_dir, False, None, force=True) + + backup_info = helper.GetBackupInfo() + + assert len(backup_info.primary_dirs) == 2 + assert len(backup_info.delta_dirs) == num_deltas + + # Original backup + backup_item_info = _PathInfo.Create(backup_info.primary_dirs[0]) + + assert len(backup_item_info.filenames) == 11 + assert len(backup_item_info.empty_dirs) == 0 + + # Latests backup + backup_item_info = _PathInfo.Create(backup_info.primary_dirs[1]) + + assert len(backup_item_info.filenames) == 11 + num_new_files + assert len(backup_item_info.empty_dirs) == 0 + + assert len(_PathInfo.Create(helper.snapshot_dir).filenames) == 1 + + # ---------------------------------------------------------------------- + @pytest.mark.skipif( + os.name != "nt", + reason="This test is running into what I believe to be timing issues associated with the quick turnaround time on Linux and MacOS", + ) + def test_NoDestination(self, _working_dir, tmp_path_factory): + with _YieldBackupHelper(tmp_path_factory) as helper: + output = helper.ExecuteBackup(_working_dir, False, None, provide_destination=False) + output = _ScrubDynamicContent(output) + + assert ( + output + == textwrap.dedent( + """\ + Heading... + Creating the local snapshot... + Discovering files... + Processing (1 item)...DONE! (0, , 1 item succeeded, no items with errors, no items with warnings) + DONE! (0, , 9 files found, 1 empty directory found) + + Calculating hashes... + Processing (9 items)...DONE! (0, , 9 items succeeded, no items with errors, no items with warnings) + DONE! (0, ) + + Organizing results...DONE! (0, ) + DONE! (0, ) + + Calculating differences...DONE! (0, , 10 differences found) + + Preparing file content... + Validating size requirements...DONE! (0, , , ) + + Preserving files... + Processing (9 items)...DONE! (0, , 9 items succeeded, no items with errors, no items with warnings) + DONE! (0, ) + + Preserving index...DONE! (0, ) + + DONE! (0, ) + + Preserving the pending snapshot... + Writing '{snapshot_dir}{sep}OffsiteFileBackup.BackupTest.__pending__.json'...DONE! (0, ) + DONE! (0, ) + + + + INFO: Content has been written to '{backup_working_dir}{sep}', + however the changes have not been committed yet. + + After the generated content is transferred to an offsite location, run this script + again with the 'commit' command using the backup name 'BackupTest' to ensure that + these changes are not processed when this offsite backup is run again. + + + DONE! (0, ) + """, + ).format( + snapshot_dir=helper.snapshot_dir, + backup_working_dir=helper.backup_working_dir, + sep=os.path.sep, + ) + ) + + snapshot_filenames: list[Path] = [ + item for item in helper.snapshot_dir.iterdir() if item.is_file() + ] + + assert len(snapshot_filenames) == 1 + assert snapshot_filenames[0].stem.endswith("__pending__") + + # Backup w/pending + output = helper.ExecuteBackup(_working_dir, False, None) + + assert ( + output + == textwrap.dedent( + """\ + Heading... + + ERROR: A pending snapshot exists for the backup '{}'; this snapshot should be committed before creating updates + to the backup. + + To commit the pending snapshot, run this script with the 'commit' command. + + To ignore this error and delete the pending snapshot, run this script with the '--ignore-pending-snapshot' + argument. + + + DONE! (-1, ) + """, + ).format(helper.backup_name) + ) + + # With ignore pending snapshot + helper.ExecuteBackup(_working_dir, False, None, ignore_pending_snapshot=True) + + backup_info = helper.GetBackupInfo() + + assert len(backup_info.primary_dirs) == 1 + assert len(backup_info.delta_dirs) == 0 + + backup_item_info = _PathInfo.Create(backup_info.primary_dirs[0]) + + assert len(backup_item_info.filenames) == 11 + assert len(backup_item_info.empty_dirs) == 0 + + snapshot_filenames: list[Path] = [ + item for item in helper.snapshot_dir.iterdir() if item.is_file() + ] + + assert len(snapshot_filenames) == 1 + assert not snapshot_filenames[0].stem.endswith("__pending__") + + # Delta + (_working_dir / "New Dir").mkdir() + + output = helper.ExecuteBackup(_working_dir, False, None, provide_destination=False) + output = _ScrubDynamicContent(output) + + assert ( + output + == textwrap.dedent( + """\ + Heading... + Creating the local snapshot... + Discovering files... + Processing (1 item)...DONE! (0, , 1 item succeeded, no items with errors, no items with warnings) + DONE! (0, , 9 files found, 2 empty directories found) + + Calculating hashes... + Processing (9 items)...DONE! (0, , 9 items succeeded, no items with errors, no items with warnings) + DONE! (0, ) + + Organizing results...DONE! (0, ) + DONE! (0, ) + + Reading the most recent offsite snapshot... + Reading '{snapshot_dir}{sep}OffsiteFileBackup.BackupTest.json'... + + + DONE! (0, ) + DONE! (0, ) + + Calculating differences...DONE! (0, , 1 difference found) + + Preparing file content... + + Preserving index...DONE! (0, ) + + DONE! (0, ) + + Preserving the pending snapshot... + Writing '{snapshot_dir}{sep}OffsiteFileBackup.BackupTest.__pending__.json'...DONE! (0, ) + DONE! (0, ) + + + + INFO: Content has been written to '{backup_working_dir}{sep}', + however the changes have not been committed yet. + + After the generated content is transferred to an offsite location, run this script + again with the 'commit' command using the backup name 'BackupTest' to ensure that + these changes are not processed when this offsite backup is run again. + + + DONE! (0, ) + """, + ).format( + backup_working_dir=helper.backup_working_dir, + snapshot_dir=helper.snapshot_dir, + sep=os.path.sep, + ) + ) + + snapshot_filenames: list[Path] = [ + item for item in helper.snapshot_dir.iterdir() if item.is_file() + ] + + assert len(snapshot_filenames) == 2 + assert not snapshot_filenames[0].stem.endswith("__pending__") + assert snapshot_filenames[1].stem.endswith("__pending__") + + # Backup w/pending + output = helper.ExecuteBackup(_working_dir, False, None) + + assert output == textwrap.dedent( + """\ + Heading... + + ERROR: A pending snapshot exists for the backup 'BackupTest'; this snapshot should be committed before creating updates + to the backup. + + To commit the pending snapshot, run this script with the 'commit' command. + + To ignore this error and delete the pending snapshot, run this script with the '--ignore-pending-snapshot' + argument. + + + DONE! (-1, ) + """, + ) + + # With ignore pending snapshot + helper.ExecuteBackup(_working_dir, False, None, ignore_pending_snapshot=True) + + backup_info = helper.GetBackupInfo() + + assert len(backup_info.primary_dirs) == 1 + assert len(backup_info.delta_dirs) == 1 + + backup_item_info = _PathInfo.Create(backup_info.primary_dirs[0]) + + assert len(backup_item_info.filenames) == 11 + assert len(backup_item_info.empty_dirs) == 0 + + snapshot_filenames: list[Path] = [ + item for item in helper.snapshot_dir.iterdir() if item.is_file() + ] + + assert len(snapshot_filenames) == 1 + assert not snapshot_filenames[0].stem.endswith("__pending__") + + # ---------------------------------------------------------------------- + def test_InvalidIgnorePending(self, _working_dir, tmp_path_factory): + with _YieldBackupHelper(tmp_path_factory) as helper: + output = helper.ExecuteBackup(_working_dir, False, None, ignore_pending_snapshot=True) + + assert output == textwrap.dedent( + """\ + Heading... + ERROR: A pending snapshot for 'BackupTest' was not found. + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("result", [-1, 1]) + def test_UncleanExit(self, _working_dir, tmp_path_factory, result): + with _YieldBackupHelper(tmp_path_factory) as helper: + dm_and_content = iter(GenerateDoneManagerAndContent()) + + dm = cast(DoneManager, next(dm_and_content)) + + # ---------------------------------------------------------------------- + def NewValidateSizeRequirements( + dm: DoneManager, + *args, + **kwargs, + ): + dm.result = result + + # ---------------------------------------------------------------------- + + with mock.patch( + "FileBackup.Impl.Common.ValidateSizeRequirements", + side_effect=NewValidateSizeRequirements, + ): + Backup( + dm, + helper.backup_name, + str(helper.output_dir), + [_working_dir], + encryption_password=None, + working_dir=helper.backup_working_dir, + compress=False, + ssd=False, + force=False, + quiet=False, + file_includes=None, + file_excludes=None, + ) + + sink = cast(str, next(dm_and_content)) + sink = _ScrubDynamicContent(sink) + + if result == -1: + desc = "errors" + elif result == 1: + desc = "warnings" + else: + assert False, result # pragma: no cover + + assert ( + sink + == textwrap.dedent( + """\ + Heading... + Creating the local snapshot... + Discovering files... + Processing (1 item)...DONE! (0, , 1 item succeeded, no items with errors, no items with warnings) + DONE! (0, , 9 files found, 1 empty directory found) + + Calculating hashes... + Processing (9 items)...DONE! (0, , 9 items succeeded, no items with errors, no items with warnings) + DONE! (0, ) + + Organizing results...DONE! (0, ) + DONE! (0, ) + + Calculating differences...DONE! (0, , 10 differences found) + + Preparing file content...DONE! ({result}, ) + + + INFO: The temporary directory '{backup_working_dir}{sep}' was preserved due to {desc}. + DONE! ({result}, ) + """, + ).format( + backup_working_dir=helper.backup_working_dir, + result=result, + desc=desc, + sep=os.path.sep, + ) + ) + + +# ---------------------------------------------------------------------- +class TestCommit(object): + # ---------------------------------------------------------------------- + def test_CommitNothingAvailable(self, tmp_path_factory): + with _YieldBackupHelper(tmp_path_factory) as helper: + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Commit( + cast(DoneManager, next(dm_and_content)), + helper.backup_name, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + ERROR: A pending snapshot for the backup 'BackupTest' was not found. + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + def test_Standard(self, _working_dir, tmp_path_factory): + with _YieldBackupHelper(tmp_path_factory) as helper: + helper.ExecuteBackup(_working_dir, False, None, provide_destination=False) + + snapshot_filenames: list[Path] = [ + item for item in helper.snapshot_dir.iterdir() if item.is_file() + ] + + assert len(snapshot_filenames) == 1 + assert snapshot_filenames[0].stem.endswith("__pending__") + + # Commit + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Commit(cast(DoneManager, next(dm_and_content)), helper.backup_name) + + snapshot_filenames: list[Path] = [ + item for item in helper.snapshot_dir.iterdir() if item.is_file() + ] + + assert len(snapshot_filenames) == 1 + assert not snapshot_filenames[0].stem.endswith("__pending__") + + +# ---------------------------------------------------------------------- +class TestRestore(object): + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("is_local_filesystem", [True, False]) + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_RestoreSingleBackup( + self, _working_dir, tmp_path_factory, compress, encryption_password, is_local_filesystem + ): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as backup_helper: + restore_helper = _RestoreHelper.Create( + _working_dir, + tmp_path_factory, + encryption_password, + is_local_filesystem, + backup_helper.backup_name, + backup_helper.output_dir, + ) + + restore_helper.ExecuteRestore( + 10, + overwrite=False, + ) + + # ---------------------------------------------------------------------- + @pytest.mark.parametrize("is_local_filesystem", [True, False]) + @pytest.mark.parametrize("encryption_password", [None, str(uuid.uuid4())]) + @pytest.mark.parametrize("compress", [False, True]) + def test_RestoreMultipleBackups( + self, _working_dir, tmp_path_factory, compress, encryption_password, is_local_filesystem + ): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, compress, encryption_password + ) as backup_helper: + restore_helper = _RestoreHelper.Create( + _working_dir, + tmp_path_factory, + encryption_password, + is_local_filesystem, + backup_helper.backup_name, + backup_helper.output_dir, + ) + + # Add file and dir + new_file1 = _working_dir / "New File 1.txt" + new_dir1 = _working_dir / "New Dir" + + with new_file1.open("w") as f: + f.write("This is a new file") + + new_dir1.mkdir() + + backup_helper.ExecuteBackup(_working_dir, compress, encryption_password) + restore_helper.ExecuteRestore(12) + + # Modify file (1 of N) + with new_file1.open("w") as f: + f.write("This is change 1") + + backup_helper.ExecuteBackup(_working_dir, compress, encryption_password) + restore_helper.ExecuteRestore( + 12, + overwrite=True, + ) + + # Add new files + new_file2 = _working_dir / "New file 2.txt" + + with new_file2.open("w") as f: + f.write("This is a new file 2") + + backup_helper.ExecuteBackup(_working_dir, compress, encryption_password) + restore_helper.ExecuteRestore( + 13, + overwrite=True, + ) + + # Modify, Remove file and dir (2 of N) + with new_file1.open("w") as f: + f.write("This is change 2") + + (_working_dir / "one" / "A").unlink() + shutil.rmtree(_working_dir / "two" / "Dir1") + + backup_helper.ExecuteBackup(_working_dir, compress, encryption_password) + restore_helper.ExecuteRestore( + 10, + overwrite=True, + ) + + # Change dir to file and file to dir (3 of 3) + empty_dir = _working_dir / "EmptyDirTest" / "EmptyDir" + + shutil.rmtree(empty_dir) + + with empty_dir.open("w") as f: + f.write("This was a directory") + + file_to_dir = _working_dir / "one" / "BC" + + file_to_dir.unlink() + file_to_dir.mkdir() + + file_to_dir_with_files = _working_dir / "two" / "Dir2" / "Dir3" / "File5" + + (file_to_dir_with_files).unlink() + file_to_dir_with_files.mkdir() + + with (file_to_dir_with_files / "Another New File 1").open("w") as f: + f.write("Content1") + + with (file_to_dir_with_files / "Another New File 2").open("w") as f: + f.write("Content2") + + backup_helper.ExecuteBackup(_working_dir, compress, encryption_password) + restore_helper.ExecuteRestore( + 11, + overwrite=True, + ) + + # ---------------------------------------------------------------------- + def test_OverwriteError(self, _working_dir, tmp_path_factory): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, False, None + ) as backup_helper: + restore_helper = _RestoreHelper.Create( + _working_dir, + tmp_path_factory, + None, + None, + backup_helper.backup_name, + backup_helper.output_dir, + ) + + output = restore_helper.ExecuteRestore( + None, + expected_result=-1, + decorate_restored_files=False, + ) + + output = _ScrubDynamicContent(output) + + assert ( + output + == textwrap.dedent( + """\ + Heading... + Processing file content... + Processing (1 item)...DONE! (0, , 1 item succeeded, no items with errors, no items with warnings) + Staging working content... + Processing '' (1 of 1)...DONE! (0, , 10 instructions added) + DONE! (0, ) + DONE! (0, , 10 instructions found) + + Processing instructions... + + Processing '' (1 of 1)... + + Operation Local Location{working_dir_whitespace_delta} Original Location + --------- {working_dir_sep_delta}------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ {restore_dir_sep_delta}------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + [ADD] {working_dir}{sep}EmptyDirTest{sep}EmptyDir {restore_dir}/EmptyDirTest/EmptyDir + [ADD] {working_dir}{sep}VeryLongPaths{sep}11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111 {restore_dir}/VeryLongPaths/11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111 + [ADD] {working_dir}{sep}VeryLongPaths{sep}222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222 {restore_dir}/VeryLongPaths/222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222 + [ADD] {working_dir}{sep}one{sep}A {restore_dir}/one/A + [ADD] {working_dir}{sep}one{sep}BC {restore_dir}/one/BC + [ADD] {working_dir}{sep}two{sep}Dir1{sep}File3 {restore_dir}/two/Dir1/File3 + [ADD] {working_dir}{sep}two{sep}Dir1{sep}File4 {restore_dir}/two/Dir1/File4 + [ADD] {working_dir}{sep}two{sep}Dir2{sep}Dir3{sep}File5 {restore_dir}/two/Dir2/Dir3/File5 + [ADD] {working_dir}{sep}two{sep}File1 {restore_dir}/two/File1 + [ADD] {working_dir}{sep}two{sep}File2 {restore_dir}/two/File2 + + Restoring the directory '{working_dir}{sep}EmptyDirTest{sep}EmptyDir' (1 of 10)... + ERROR: The local item '{working_dir}{sep}EmptyDirTest{sep}EmptyDir' exists and will not be overwritten. + DONE! (-1, ) + + DONE! (-1, ) + + Committing content...DONE! (0, ) + DONE! (-1, ) + DONE! (-1, ) + """, + ).format( + working_dir=_working_dir, + working_dir_sep_delta="-" * len(str(_working_dir)), + working_dir_whitespace_delta=" " * len(str(_working_dir)), + restore_dir=_working_dir.as_posix(), + restore_dir_sep_delta="-" * len(_working_dir.as_posix()), + sep=os.path.sep, + ) + ) + + # ---------------------------------------------------------------------- + def test_Overwrite(self, _working_dir, tmp_path_factory): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, False, None + ) as backup_helper: + # Remove a file to show that things have been restored as expected + path_info = _PathInfo.Create(_working_dir) + + assert len(path_info.filenames) == 9 + assert len(path_info.empty_dirs) == 1 + + (_working_dir / "one" / "A").unlink() + shutil.rmtree(_working_dir / "EmptyDirTest") + + path_info = _PathInfo.Create(_working_dir) + + assert len(path_info.filenames) == 8 + assert len(path_info.empty_dirs) == 0 + + # Restore w/overwrite + restore_helper = _RestoreHelper.Create( + _working_dir, + tmp_path_factory, + None, + None, + backup_helper.backup_name, + backup_helper.output_dir, + ) + + restore_helper.ExecuteRestore( + None, + decorate_restored_files=False, + overwrite=True, + ) + + path_info = _PathInfo.Create(_working_dir) + + assert len(path_info.filenames) == 9 + assert len(path_info.empty_dirs) == 1 + + # ---------------------------------------------------------------------- + def test_DryRun(self, _working_dir, tmp_path_factory): + with _YieldInitializedBackupHelper( + tmp_path_factory, _working_dir, False, None + ) as backup_helper: + # Remove a file to show that things are not restored + path_info = _PathInfo.Create(_working_dir) + + assert len(path_info.filenames) == 9 + assert len(path_info.empty_dirs) == 1 + + (_working_dir / "one" / "A").unlink() + shutil.rmtree(_working_dir / "EmptyDirTest") + + path_info = _PathInfo.Create(_working_dir) + + assert len(path_info.filenames) == 8 + assert len(path_info.empty_dirs) == 0 + + # Restore as dry run + restore_helper = _RestoreHelper.Create( + _working_dir, + tmp_path_factory, + None, + None, + backup_helper.backup_name, + backup_helper.output_dir, + ) + + output = restore_helper.ExecuteRestore( + None, + dry_run=True, + overwrite=True, + decorate_restored_files=False, + ) + + output = _ScrubDynamicContent(output) + + # Nothing changed + path_info = _PathInfo.Create(_working_dir) + + assert len(path_info.filenames) == 8 + assert len(path_info.empty_dirs) == 0 + + assert ( + output + == textwrap.dedent( + """\ + Heading... + Processing file content... + Processing (1 item)...DONE! (0, , 1 item succeeded, no items with errors, no items with warnings) + Staging working content... + Processing '' (1 of 1)...DONE! (0, , 10 instructions added) + DONE! (0, ) + DONE! (0, , 10 instructions found) + + Processing instructions... + + Processing '' (1 of 1)... + + Operation Local Location{working_dir_whitespace_delta} Original Location + --------- {working_dir_sep_delta}------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ {restore_dir_sep_delta}------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + [ADD] {working_dir}{sep}EmptyDirTest{sep}EmptyDir {restore_dir}/EmptyDirTest/EmptyDir + [ADD] {working_dir}{sep}VeryLongPaths{sep}11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111 {restore_dir}/VeryLongPaths/11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111 + [ADD] {working_dir}{sep}VeryLongPaths{sep}222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222 {restore_dir}/VeryLongPaths/222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222 + [ADD] {working_dir}{sep}one{sep}A {restore_dir}/one/A + [ADD] {working_dir}{sep}one{sep}BC {restore_dir}/one/BC + [ADD] {working_dir}{sep}two{sep}Dir1{sep}File3 {restore_dir}/two/Dir1/File3 + [ADD] {working_dir}{sep}two{sep}Dir1{sep}File4 {restore_dir}/two/Dir1/File4 + [ADD] {working_dir}{sep}two{sep}Dir2{sep}Dir3{sep}File5 {restore_dir}/two/Dir2/Dir3/File5 + [ADD] {working_dir}{sep}two{sep}File1 {restore_dir}/two/File1 + [ADD] {working_dir}{sep}two{sep}File2 {restore_dir}/two/File2 + + DONE! (0, ) + + Committing content...DONE! (0, ) + DONE! (0, ) + DONE! (0, ) + """, + ).format( + working_dir=_working_dir, + working_dir_sep_delta="-" * len(str(_working_dir)), + working_dir_whitespace_delta=" " * len(str(_working_dir)), + restore_dir=_working_dir.as_posix(), + restore_dir_sep_delta="-" * len(_working_dir.as_posix()), + sep=os.path.sep, + ) + ) + + +# ---------------------------------------------------------------------- +class TestRestoreErrors: + # ---------------------------------------------------------------------- + def test_InvalidStore(self): + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Restore( + cast(DoneManager, next(dm_and_content)), + "Test", + "fast_glacier://account@region", + None, + Path(), + {}, + ssd=False, + quiet=False, + dry_run=False, + overwrite=False, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + ERROR: 'fast_glacier://account@region' does not resolve to a file-based data store, which is required when restoring content. + + Most often, this error is encountered when attempting to restore an offsite backup that was + originally transferred to a cloud-based data store. + + To restore these types of offsite backups, copy the content from the original data store + to your local file system and run this script again while pointing to that + location on your file system. This local directory should contain the primary directory + created during the initial backup and all directories created as a part of subsequent backups. + + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + def test_FilesInBackupContent(self, tmp_path_factory): + temp_dir = tmp_path_factory.mktemp("backup_with_invalid_files") / "Backup" + + temp_dir.mkdir() + + with (temp_dir / "Invalid File").open("w") as f: + f.write("This will cause an error") + + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Restore( + cast(DoneManager, next(dm_and_content)), + temp_dir.name, + str(temp_dir.parent), + None, + Path(), + {}, + ssd=False, + quiet=False, + dry_run=False, + overwrite=False, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + Processing file content... + ERROR: Files were not expected: + + - Invalid File + + DONE! (-1, , no instructions found) + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + def test_InvalidDirectory(self, tmp_path_factory): + temp_dir = tmp_path_factory.mktemp("backup_with_invalid_dir") / "Backup" + + temp_dir.mkdir() + + (temp_dir / "Invalid Directory").mkdir() + + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Restore( + cast(DoneManager, next(dm_and_content)), + temp_dir.name, + str(temp_dir.parent), + None, + Path(), + {}, + ssd=False, + quiet=False, + dry_run=False, + overwrite=False, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + Processing file content... + ERROR: 'Invalid Directory' is not a recognized directory name. + DONE! (-1, , no instructions found) + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + def test_NoDirectories(self, tmp_path_factory): + temp_dir = tmp_path_factory.mktemp("backup_with_invalid_dir") / "Backup" + + temp_dir.mkdir() + + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Restore( + cast(DoneManager, next(dm_and_content)), + temp_dir.name, + str(temp_dir.parent), + None, + Path(), + {}, + ssd=False, + quiet=False, + dry_run=False, + overwrite=False, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + Processing file content... + ERROR: No directories were found. + DONE! (-1, , no instructions found) + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + def test_NoPrimaryDirectories(self, tmp_path_factory): + temp_dir = tmp_path_factory.mktemp("backup_with_invalid_dir") / "Backup" + + temp_dir.mkdir() + + (temp_dir / "2022.12.07.17.10.00-000000.delta").mkdir() + (temp_dir / "2022.12.07.17.10.00-000001.delta").mkdir() + + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Restore( + cast(DoneManager, next(dm_and_content)), + temp_dir.name, + str(temp_dir.parent), + None, + Path(), + {}, + ssd=False, + quiet=False, + dry_run=False, + overwrite=False, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + Processing file content... + ERROR: No primary directories were found. + DONE! (-1, , no instructions found) + DONE! (-1, ) + """, + ) + + # ---------------------------------------------------------------------- + def test_MultiplePrimaryDirectories(self, tmp_path_factory): + temp_dir = tmp_path_factory.mktemp("backup_with_invalid_dir") / "Backup" + + temp_dir.mkdir() + + (temp_dir / "2022.12.07.17.10.00-000000").mkdir() + (temp_dir / "2022.12.07.17.10.00-000001").mkdir() + + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Restore( + cast(DoneManager, next(dm_and_content)), + temp_dir.name, + str(temp_dir.parent), + None, + Path(), + {}, + ssd=False, + quiet=False, + dry_run=False, + overwrite=False, + ) + + output = cast(str, next(dm_and_content)) + + assert output == textwrap.dedent( + """\ + Heading... + Processing file content... + ERROR: Multiple primary directories were found. + + - 2022.12.07.17.10.00-000000 + - 2022.12.07.17.10.00-000001 + + DONE! (-1, , no instructions found) + DONE! (-1, ) + """, + ) + + +# ---------------------------------------------------------------------- +# | +# | Private Types +# | +# ---------------------------------------------------------------------- +@dataclass(frozen=True) +class _BackupHelper(object): + # ---------------------------------------------------------------------- + # | Public Types + @dataclass(frozen=True) + class BackupInfo(object): + # ---------------------------------------------------------------------- + primary_dirs: list[Path] + delta_dirs: list[Path] + + # ---------------------------------------------------------------------- + # | Public Data + backup_name: str + output_dir: Path + snapshot_dir: Path + backup_working_dir: Path + + # ---------------------------------------------------------------------- + # | Public Methods + def ExecuteBackup( + self, + _working_dir, + compress: bool, + encryption_password: str | None, + *, + provide_destination: bool = True, + force: bool = False, + ignore_pending_snapshot: bool = False, + ) -> str: + dm_and_content = iter(GenerateDoneManagerAndContent()) + + Backup( + cast(DoneManager, next(dm_and_content)), + self.backup_name, + str(self.output_dir) if provide_destination else None, + [_working_dir], + encryption_password=encryption_password, + working_dir=self.backup_working_dir, + compress=compress, + ssd=False, + force=force, + quiet=False, + file_includes=None, + file_excludes=None, + ignore_pending_snapshot=ignore_pending_snapshot, + ) + + return cast(str, next(dm_and_content)) + + # ---------------------------------------------------------------------- + def GetBackupInfo(self) -> "_BackupHelper.BackupInfo": + backup_dir = self.output_dir / self.backup_name + assert backup_dir.is_dir(), backup_dir + + primary_dirs: list[Path] = [] + delta_dirs: list[Path] = [] + + for item in backup_dir.iterdir(): + assert item.is_dir(), item + + if item.name.endswith(".delta"): + delta_dirs.append(item) + else: + primary_dirs.append(item) + + return _BackupHelper.BackupInfo(primary_dirs, delta_dirs) + + +# ---------------------------------------------------------------------- +@dataclass(frozen=True) +class _RestoreHelper(object): + # ---------------------------------------------------------------------- + # | Public Data + original_dir: Path + encryption_password: str | None + is_local_filesystem: bool | None + + backup_name: str + backup_dir: Path + + output_dir: Path + restore_working_dir: Path + + # ---------------------------------------------------------------------- + # | Public Methods + @classmethod + def Create( + cls, + original_dir: Path, + tmp_path_factory, + encryption_password: str | None, + is_local_filesystem: bool | None, + backup_name: str, + backup_dir: Path, + ) -> "_RestoreHelper": + return cls( + original_dir, + encryption_password, + is_local_filesystem, + backup_name, + backup_dir, + tmp_path_factory.mktemp("restore_destination"), + tmp_path_factory.mktemp("restore_working"), + ) + + # ---------------------------------------------------------------------- + def ExecuteRestore( + self, + expected_num_files: int | None, + *, + expected_result: int = 0, + clear_working_dir: bool = False, + dry_run: bool = False, + overwrite: bool = False, + decorate_restored_files: bool = True, + ) -> str: + dm_and_content = iter(GenerateDoneManagerAndContent()) + + if clear_working_dir: + shutil.rmtree(self.restore_working_dir) + self.restore_working_dir.mkdir() + + dm = cast(DoneManager, next(dm_and_content)) + + Restore( + dm, + self.backup_name, + "{}{}".format( + "[nonlocal]" if self.is_local_filesystem is False else "", + self.backup_dir.as_posix(), + ), + self.encryption_password, + self.restore_working_dir, + ( + {} + if not decorate_restored_files + else { + self.original_dir.as_posix(): self.output_dir.as_posix(), + } + ), + ssd=False, + quiet=False, + dry_run=dry_run, + overwrite=overwrite, + ) + + output = cast(str, next(dm_and_content)) + + assert dm.result == expected_result + + if expected_num_files is not None: + TestHelpers.CompareFileSystemSourceAndDestination( + self.original_dir, + self.output_dir, + expected_num_files, + is_mirror=False, + compare_file_contents=True, + ) + + return output + + +# ---------------------------------------------------------------------- +@dataclass(frozen=True) +class _PathInfo(object): + # ---------------------------------------------------------------------- + filenames: list[Path] + empty_dirs: list[Path] + + # ---------------------------------------------------------------------- + @classmethod + def Create( + cls, + path: Path, + ) -> "_PathInfo": + all_files: list[Path] = [] + empty_dirs: list[Path] = [] + + for root, directories, filenames in os.walk(path): + root = Path(root) + + if not directories and not filenames: + empty_dirs.append(root) + + all_files += [root / filename for filename in filenames] + + return _PathInfo(all_files, empty_dirs) + + +# ---------------------------------------------------------------------- +# | +# | Private Functions +# | +# ---------------------------------------------------------------------- +def _MakeFile( + root: Path, + path: Path, +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + + with path.open("w") as f: + f.write(PathEx.CreateRelativePath(root, path).as_posix()) + + +# ---------------------------------------------------------------------- +@pytest.fixture() +def _working_dir(tmp_path_factory) -> Path: + root = tmp_path_factory.mktemp("source") + + _MakeFile(root, root / "one" / "A") + _MakeFile(root, root / "one" / "BC") + + _MakeFile(root, root / "two" / "File1") + _MakeFile(root, root / "two" / "File2") + _MakeFile(root, root / "two" / "Dir1" / "File3") + _MakeFile(root, root / "two" / "Dir1" / "File4") + _MakeFile(root, root / "two" / "Dir2" / "Dir3" / "File5") + + _MakeFile(root, root / "VeryLongPaths" / ("1" * 200)) + _MakeFile(root, root / "VeryLongPaths" / ("2" * 201)) + + (root / "EmptyDirTest" / "EmptyDir").mkdir(parents=True) + + return root + + +# ---------------------------------------------------------------------- +@contextmanager +def _YieldBackupHelper(tmp_path_factory) -> Iterator[_BackupHelper]: + destination_dir = tmp_path_factory.mktemp("backup_destination") + snapshot_dir = tmp_path_factory.mktemp("snapshot") + backup_working_dir = tmp_path_factory.mktemp("backup_working") + + backup_name = "BackupTest" + + with mock.patch( + "dbrownell_Common.PathEx.GetUserDirectory", + return_value=snapshot_dir, + ): + yield _BackupHelper(backup_name, destination_dir, snapshot_dir, backup_working_dir) + + +# ---------------------------------------------------------------------- +@contextmanager +def _YieldInitializedBackupHelper( + tmp_path_factory, + _working_dir, + compress, + encryption_password, +) -> Iterator[_BackupHelper]: + with _YieldBackupHelper(tmp_path_factory) as helper: + result = helper.ExecuteBackup(_working_dir, compress, encryption_password) + + backup_info = helper.GetBackupInfo() + + assert len(backup_info.primary_dirs) == 1 + assert len(backup_info.delta_dirs) == 0 + + backup_item_info = _PathInfo.Create(backup_info.primary_dirs[0]) + + if not compress and encryption_password is None: + # The number of original files in 9, but we have added the index file + # and index file hash file + assert len(backup_item_info.filenames) == 11 + + # The empty dirs are captured in the index file but not explicitly + # stored + assert len(backup_item_info.empty_dirs) == 0 + else: + # All content has been compressed + assert len(backup_item_info.filenames) == 1 + assert len(backup_item_info.empty_dirs) == 0 + + yield helper + + +# ---------------------------------------------------------------------- +def _ScrubDynamicContent( + content: str, +) -> str: + # Dynamic directory names + content = re.sub( + r"""(?# + Capture [begin] )(?P(?# + Year )\d{4}\.(?# + Month )\d{2}\.(?# + Day )\d{2}\.(?# + Hour )\d{2}\.(?# + Minute )\d{2}\.(?# + Second )\d{2}(?# + Index )-\d+(?# + Suffix )(?:\.delta)?(?# + Capture [end] ))(?# + )""", + "", + content, + ) + + # Dynamic file sizes + content = re.sub( + r"""(?# + Capture [begin] )(?P(?# + Value )\d+(?:\.\d+)?\s+(?# + Units )\S?B\s+(?# + available or required )(?Prequired|available)(?# + Capture [end] ))(?# + )""", + lambda match: f"", + content, + ) + + return content