<a href="https://colab.research.google.com/github/Marco3010/dir-sync/blob/main/dir_sync.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
DIR_SYNC = "dir-sync"
ANALYZER = "analyzer"
COLLECTOR = "collector"

# Logging

In [None]:
import gzip
import logging
import os
import sys
from logging.handlers import RotatingFileHandler
from typing import ClassVar

In [None]:
class ColoredFormatter(logging.Formatter):
    """Formatter to add colours to logs in the console.

    Notes:
        - Colors are applied only to the console handler you attach this formatter to.

    """

    COLORS: ClassVar = {
        "DEBUG": "\033[94m",  # Blue
        "INFO": "\033[92m",  # Green
        "WARNING": "\033[93m",  # Yellow
        "ERROR": "\033[91m",  # Red
        "CRITICAL": "\033[91m\033[1m",  # Bold Red
    }
    RESET = "\033[0m"

    def format(self, record: logging.LogRecord) -> str:
        """Build the colored log line.

        Args:
            record: The LogRecord produced by the logging system.

        Returns:
            The final colored string (or uncolored if no match).

        """
        if record.levelname in self.COLORS:
            record.levelname = f"{self.COLORS[record.levelname]}{record.levelname}{self.RESET}"
        return super().format(record)

In [None]:
def namer(name) -> str:
    """Add .gz extension.

    Args:
        name (str): The name of the file.

    Returns:
        str: The name of the file with the .gz extension.

    """
    return name + ".gz"


def rotator(source, dest) -> None:
    """Compress in gzip format.

    Args:
        source (str): The source file path.
        dest (str): The destination file path.

    """
    with open(source, "rb") as f_in, gzip.open(dest, "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)
    os.remove(source)

In [None]:
def setup_logging(log_dir: str = "logs", level: int = logging.INFO) -> None:
    """
    Set logging configuration.

    In case of error, it returns to a basic configuration.

    Args:
        log_dir (str): The directory where logs will be stored. Defaults to "logs".
        level (int): The logging level. Defaults to logging.DEBUG.

    Raises:
        Exception: If an error occurs during logging configuration.

    """
    os.makedirs(log_dir, exist_ok=True)
    try:
        # Loggers
        dir_sync_logger = logging.getLogger(DIR_SYNC)
        dir_sync_logger.setLevel(level)
        dir_sync_logger.propagate = False
        analyzer_logger = logging.getLogger(ANALYZER)
        analyzer_logger.setLevel(level)
        analyzer_logger.propagate = False
        collector_logger = logging.getLogger(COLLECTOR)
        collector_logger.setLevel(level)
        collector_logger.propagate = False

        # Formatters
        base_formatter = logging.Formatter(
            "%(asctime)s - %(levelname)s - [%(processName)s - %(threadName)s | %(name)s - %(message)s]",
            datefmt="%Y-%m-%d %H:%M:%S",
        )
        colored_format = ColoredFormatter(
            "%(asctime)s - %(levelname)s - [%(processName)s - %(threadName)s] | %(name)s - %(message)s]",
            datefmt="%Y-%m-%d %H:%M:%S",
        )

        # Handlers
        file_handler = RotatingFileHandler(
            log_dir + "/dir-sync.log", backupCount=5, maxBytes=1000000
        )
        file_handler.rotator = rotator
        file_handler.namer = namer
        file_handler.setFormatter(base_formatter)

        analyzer_file_handler = RotatingFileHandler(
            log_dir + "/dir-sync_analyzer.log", backupCount=5, maxBytes=1000000
        )
        analyzer_file_handler.rotator = rotator
        analyzer_file_handler.namer = namer
        analyzer_file_handler.setFormatter(base_formatter)

        collector_file_handler = RotatingFileHandler(
            log_dir + "/dir-sync_collector.log", backupCount=5, maxBytes=1000000
        )
        collector_file_handler.rotator = rotator
        collector_file_handler.namer = namer
        collector_file_handler.setFormatter(base_formatter)

        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(colored_format)

        if dir_sync_logger.hasHandlers():
            dir_sync_logger.handlers.clear()
        if analyzer_logger.hasHandlers():
            analyzer_logger.handlers.clear()
        if collector_logger.hasHandlers():
            collector_logger.handlers.clear()
        if level != logging.DEBUG:
            dir_sync_logger.addHandler(console_handler)
        dir_sync_logger.addHandler(file_handler)
        analyzer_logger.addHandler(analyzer_file_handler)
        collector_logger.addHandler(collector_file_handler)

    except Exception as e:
        print(f"Error setting up logging: {e}")
        logging.basicConfig(level=level)
        logging.error("Impossible to set up logging configuration. Using default configuration.")

# Decorators

In [None]:
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar

In [None]:
P = ParamSpec("P")
T = TypeVar("T")


def start_end_log(logger_name=DIR_SYNC):
    """Create and return a decorator.

    Args:
        logger_name (str): The name of the logger to use. Defaults to DIR_SYNC.

    Returns:
        function: the real decorator

    """
    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        """Wrap the target function to apply the start and end log."""
        logger = logging.getLogger(logger_name)

        @wraps(func)  # To hold the original function information
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            """Intercept the call and log the start and end of the execution of a function."""
            in_params: str = ", ".join(
                [repr(a) for a in args] + [f"{k}={v!r}" for k, v in kwargs.items()]
            )
            logger.debug(f"Starting {func.__name__}({in_params})")
            result = func(*args, **kwargs)
            if result is not None:
                logger.debug(f"Finished {func.__name__} with result: {result!r}")
            else:
                logger.debug(f"Finished {func.__name__}")
            return result
        return wrapper
    return decorator

# Models

In [None]:
from enum import Enum

In [None]:
"""Defines the `ActionType` enumeration.

These types are used to represent the different types of actions possible
within the application.
"""


class ActionType(str, Enum):
    """To represent the types of action available."""

    COPY = "copy"
    DELETE = "delete"
    NEW_DIR = "new_dir"
    UPDATE = "update"

In [None]:
"""Defines the `ItemType` enumeration.

These types are used to represent the different types of items possible within the application.
"""


class ItemType(str, Enum):
    """To represent the types of item available."""

    FILE = "file"
    DIRECTORY = "directory"

In [None]:
from pathlib import Path

from pydantic import BaseModel, Field

In [None]:
"""Data model for file information."""


class ItemData(BaseModel):
    """Holds the collected information about the items in the folders to be synchronized."""

    item_type: ItemType = Field(..., description="Item type: file or directory")
    path: Path = Field(..., description="Path of the item")
    size: int = Field(..., ge=0, description="The size cannot be negative")
    last_modification: float = Field(..., ge=0, description="Timestamp of last modification in float format >= 0")

# Collectors

Code for collecting information about the items in a directory.  
This is an I/O-bound type of operation, and we can use multithreading to perform it.

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

In [None]:
@start_end_log(logger_name=COLLECTOR)
def _get_item_data(item: Path, root_path: Path) -> tuple[str, ItemData]:
    stats = item.stat()
    relative_path = item.relative_to(root_path)
    if item.is_dir():
        info = ItemData(
            item_type=ItemType.DIRECTORY,
            path=relative_path,
            size=0,
            last_modification=stats.st_mtime,
        )
    else:
        info = ItemData(
            item_type=ItemType.FILE,
            path=relative_path,
            size=stats.st_size,
            last_modification=stats.st_mtime,
        )
    return relative_path.as_posix(), info

In [None]:
@start_end_log(logger_name=COLLECTOR)
def collect_data(target: Path) -> dict[str, ItemData]:
    """Recursively scans a file_path and collects item data in a dictionary of FileData objects.

    Args:
        target (Path): The path to the folder to be analysed for item information.

    Returns:
        Dict[str, ItemData]: A dictionary containing all information about the items in the analysed
         directory.

    Raises:
        Exception: If an error occurs during the collection of item information.

    """
    items_inventory: dict[str, ItemData] = {}

    items = [item for item in target.rglob("*") if item.is_file() or item.is_dir()]
    logger = logging.getLogger(COLLECTOR)
    workers = os.cpu_count()
    with ThreadPoolExecutor(max_workers=workers) as executor:
        logger.info(f"Collecting data for {len(items)} items with '{workers}' threads.")
        future_to_item = {executor.submit(_get_item_data, item, target): item for item in items}

        for future in as_completed(future_to_item):
            try:
                file_path, file_data = future.result()
                items_inventory[file_path] = file_data
            except Exception as ex:
                logger.error(
                    f"Error processing the item {future_to_item[future]}: {ex}"
                )

    return items_inventory

# Operations

Code for performing operations on items in a directory synchronization context.

In [None]:
import shutil
from pathlib import Path
from typing import Annotated

from pydantic import AfterValidator, FilePath, validate_call

In [None]:
def _normalize_path(path: Path) -> Path:
    return path.expanduser().resolve()

In [None]:
def _path_exists(exist: bool) -> AfterValidator:
    def _check(path: Path) -> Path:
        if exist and not path.exists():
            raise ValueError(f"Path does not exist: {path}")
        if not exist and path.exists():
            raise ValueError(f"Path already exist: {path}")
        return path

    return AfterValidator(_check)

In [None]:
@start_end_log()
@validate_call
def do_copy_update(src: FilePath, dest: Path) -> None:
    """Create or update operation.

    Args:
        src(Path): source path of the element to be copied or updated
        dest(Path): destination path where the item is copied or updated

    """
    dest.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy2(src, dest)

In [None]:
@start_end_log()
@validate_call
def do_delete(
    item: Annotated[Path, AfterValidator(_normalize_path), _path_exists(exist=True)],
) -> None:
    """Delete file or folder.

    Args:
        item(Path): file or folder to be deleted

    """
    if item.is_file():
        item.unlink()
    elif item.is_dir():
        shutil.rmtree(item, ignore_errors=True)

In [None]:
@start_end_log()
@validate_call
def do_create_dir(
    item: Annotated[Path, AfterValidator(_normalize_path), _path_exists(exist=False)],
) -> None:
    """Create folder.

    Args:
        item(Path): folder to be deleted

    """
    item.mkdir(parents=True, exist_ok=True)

# Analyzer

In [None]:
import multiprocessing
import time

from pydantic import SkipValidation

In [None]:
@validate_call
def analyze_items(
    path: str,
    src: dict[str, ItemData],
    dest: dict[str, ItemData],
    queue: Annotated[multiprocessing.Queue, SkipValidation],
    log_level: int = logging.ERROR,
) -> tuple[ActionType, str] | None:
    """Analyzes two items to understand the type of action to take.

    Possible actions are copy delete or update.

    Args:
        path (str): The relative path of the item to analyze.
        src (dict[str, ItemData]): The source items data.
        dest (dict[str, ItemData]): The destination items data.
        queue (multiprocessing.Queue): The queue to use for logging from multiple processes.
        log_level (int, optional): The logging level. Defaults to logging.ERROR.

    Returns:
        tuple[ActionType, str] | None: The action to take and the path of the item.

    Raises:
        Exception: If an error occurs during the analysis of the item.

    """
    h = logging.handlers.QueueHandler(queue)
    logger = logging.getLogger(ANALYZER)
    logger.setLevel(log_level)
    if logger.hasHandlers():
            logger.handlers.clear()
    logger.addHandler(h)

    item_plan: tuple[ActionType, str] | None = None
    pid = os.getpid()
    logger.debug(f"Worker {pid}: start analysis for the item '{path}'.")
    start_time = time.perf_counter()
    try:
        in_src = path in src
        in_dest = path in dest
        if in_src and not in_dest:
            if src[path].item_type == ItemType.DIRECTORY:
                item_plan = ActionType.NEW_DIR, path
            else:
                item_plan = ActionType.COPY, path
        elif in_dest and not in_src:
            item_plan = ActionType.DELETE, path
        elif in_src and in_dest:
            src_item = src[path]
            dest_item = dest[path]
            if src_item.item_type == ItemType.FILE and (
                src_item.last_modification > dest_item.last_modification
                or src_item.last_modification != dest_item.last_modification
            ):
                item_plan = ActionType.UPDATE, path
    except Exception as e:
        logger.error(f"Worker {pid}: error analyzing item '{path}': {e}")
    end_time = time.perf_counter()
    duration = end_time - start_time

    logger.debug(f"Worker {pid}: end analysis for the item '{path}'. Time: {duration:.2f} seconds.")
    return item_plan

# Synchronizer

For the operations required to synchronise the items in the source and destination folders, as this is a CPU-bound operation, we use multiprocessing.

For log management, in order to use a single file from multiple processes, can be used a queue and a QueueHandler.

In [None]:
import threading
from concurrent.futures import ProcessPoolExecutor
from itertools import repeat
from typing import Any

from pydantic import DirectoryPath

In [None]:
class Synchronizer:
    """Class that performs synchronization between two folders in a parallel way."""

    @validate_call
    def __init__(self, source: DirectoryPath, destination: DirectoryPath) -> None:
        """Initialize the synchronizer.

        Args:
            source(DirectoryPath): The source directory from which to synchronize.
            destination(DirectoryPath): The source directory to be synchronized.

        """
        self.logger = logging.getLogger(DIR_SYNC)
        self._source: DirectoryPath = source
        self._destination: DirectoryPath = destination

    @start_end_log()
    def create_action_plan(self) -> list[tuple[ActionType, str]]:
        """Compare the two folders and prepare an execution plan for parallel synchronisation.

        Returns:
            list[tuple[ActionType, str]]: The action plan.

        """
        items_src: dict[str, ItemData] = collect_data(self._source)
        items_dst: dict[str, ItemData] = collect_data(self._destination)
        self.logger.info(f"src size: {len(items_src)} items - dst size: {len(items_dst)} items")
        all_items: set[str] = items_src.keys() | items_dst.keys()
        self.logger.info(f"Total items to analyze: {len(all_items)}")

        start_time = time.perf_counter()
        self.logger.info(f"Manager e Listener started at {start_time:.2f} seconds. Preparing tasks ...")
        action_plan: list[tuple[ActionType, str]] = []

        with multiprocessing.Manager() as manager:
            queue = manager.Queue(-1)
            lp = threading.Thread(target=self._logger_thread, args=(queue,))
            lp.start()
            with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
                size = round(len(all_items) / executor._max_workers)
                self.logger.debug(f"Worker number: {executor._max_workers} and chunck size: {size} ...")
                for result in executor.map(
                    analyze_items,
                    all_items,
                    repeat(items_src),
                    repeat(items_dst),
                    repeat(queue),
                    repeat(logging.DEBUG),
                    chunksize=size,
                ):
                    if result is not None:
                        action_plan.append(result)
                        self.logger.debug(f"Action plan updated with: {result}")
            queue.put_nowait(None)
            lp.join()

        end_time = time.perf_counter()
        duration = end_time - start_time
        self.logger.info(f"Action plan created in {duration:.2f} seconds.")
        return action_plan

    @start_end_log()
    def execute(self, plan: list[tuple[ActionType, str]], *, dry_run: bool = False) -> None:
        """Perform the actions in the plan in parallel.

        Args:
            plan(list[tuple[ActionType, str]]): The action plan to be executed.
                The possible action to be executed are COPY, UPDATE and DELETE
            dry_run (bool): If True, perform a trial run without making any changes;
                actions are logged only. Default False

        """
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
            future_to_item = [
                executor.submit(self._execute, action, path, dry_run=dry_run)
                for action, path in plan
            ]
            for future in as_completed(future_to_item):
                future.result()

    def _execute(self, action: ActionType, path: str, *, dry_run: bool = False) -> None:
        start = time.perf_counter()
        match action:
            case ActionType.COPY | ActionType.UPDATE:
                if dry_run:
                    self.logger.info(
                        f"[DRY RUN] copy from {self._source / path} to "
                        f"{self._destination / path}"
                    )
                else:
                    do_copy_update(self._source / path, self._destination / path)

            case ActionType.NEW_DIR:
                if dry_run:
                    self.logger.info(f"[DRY RUN] create new folder {self._destination / path}")
                else:
                    do_create_dir(self._destination / path)

            case ActionType.DELETE:
                if dry_run:
                    self.logger.info(f"[DRY RUN] delete item {self._destination / path}")
                else:
                    do_delete(self._destination / path)

            case _:
                pass
        end = time.perf_counter()
        elapsed = end - start
        self.logger.debug(f"Action {action} for {path} end in: {elapsed:.3f} seconds")

    def _logger_thread(self, queue: "multiprocessing.Queue[Any]") -> None:
        while True:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)

# Tests

In [None]:
import tempfile
import unittest

from pydantic import ValidationError

## Collectors

In [None]:
class TestFileInfoCollector(unittest.TestCase):
    """Test the item_info module."""

    def setUp(self) -> None:
        """Prepare a temporary folder."""
        self.test_dir = Path(tempfile.mkdtemp())

    def tearDown(self) -> None:
        """Clear a temporary folder."""
        shutil.rmtree(self.test_dir)

    def test_collects_file_information(self) -> None:
        """Check that the function correctly collects file info."""
        file1 = self.test_dir / "file1.txt"
        file1.write_text("Test 1")

        subdir = self.test_dir / "subdir"
        subdir.mkdir()
        file2 = subdir / "file2.txt"
        file2.write_text("Test 2")

        result = collect_data(self.test_dir)

        expected = {
            "file1.txt": ItemData(
                item_type=ItemType.FILE,
                path=file1.relative_to(self.test_dir),
                size=file1.stat().st_size,
                last_modification=file1.stat().st_mtime,
            ),
            "subdir": ItemData(
                item_type=ItemType.DIRECTORY,
                path=subdir.relative_to(self.test_dir),
                size=0,
                last_modification=subdir.stat().st_mtime,
            ),
            "subdir/file2.txt": ItemData(
                item_type=ItemType.FILE,
                path=file2.relative_to(self.test_dir),
                size=file2.stat().st_size,
                last_modification=file2.stat().st_mtime,
            ),
        }

        self.assertEqual(result, expected)

    def test_returns_empty_dict_for_empty_folder(self) -> None:
        """Check that the function returns an empty dictionary for empty folders."""
        result = collect_data(self.test_dir)
        self.assertEqual(result, {})

## Models

In [None]:
class TestFileData(unittest.TestCase):
    """Class for model tests that contains information on the characteristics of a file."""

    def test_size(self) -> None:
        """Test verifying the correctness of the dimension property."""
        with self.assertRaises(ValidationError):
            ItemData(
                item_type=ItemType.FILE,
                path=Path("test.txt"),
                size="1a3",
                last_modification=1625309472.357246,
            )

    def test_data(self) -> None:
        """Test that the data are consistent."""
        try:
            info = ItemData(
                item_type=ItemType.FILE,
                path=Path("test.txt"),
                size=1024,
                last_modification=1625309472.357246,
            )
            self.assertEqual(info.size, 1024)
        except ValidationError as e:
            self.fail(f"The creation of FileInfo with valid data has failed: {e}")

## Synchronizer

In [None]:
import random
from math import ceil

In [None]:
class TestSynchronizer(unittest.TestCase):
    """Test the Synchronizer class."""

    def setUp(self) -> None:
        """Prepare a temporary folder."""
        self._test_src: Path = Path(tempfile.mkdtemp())
        self._test_dst: Path = Path(tempfile.mkdtemp())

    def tearDown(self) -> None:
        """Clear a temporary folder."""
        shutil.rmtree(self._test_src)
        shutil.rmtree(self._test_dst)

    def test_action_plan_small(self) -> None:
        """Test the action_plan method with a small number of items."""
        self._create_items(self._test_src, 20)
        self._copy_items(self._test_src, self._test_dst)
        plan = self._modify_random_items(self._test_dst)
        sync: Synchronizer = Synchronizer(source=self._test_src, destination=self._test_dst)
        res = sync.create_action_plan()
        self.assertCountEqual(res, plan)

    def test_execute_small(self) -> None:
        """Test the synchronize execution with a small number of items."""
        self._create_items(self._test_src, 20)
        self._copy_items(self._test_src, self._test_dst)
        plan = self._modify_random_items(self._test_dst)
        sync: Synchronizer = Synchronizer(source=self._test_src, destination=self._test_dst)
        sync.execute(plan)
        src_collect_data = collect_data(self._test_src)
        dest_collect_data = collect_data(self._test_dst)
        self.assertEqual(set(src_collect_data), set(dest_collect_data))

    def test_action_plan_large(self) -> None:
        """Test the action_plan method with a large number of items."""
        self._create_items(self._test_src, 200)
        self._copy_items(self._test_src, self._test_dst)
        plan = self._modify_random_items(self._test_dst)
        sync: Synchronizer = Synchronizer(source=self._test_src, destination=self._test_dst)
        res = sync.create_action_plan()
        self.assertCountEqual(res, plan)

    def test_execute_large(self) -> None:
        """Test the synchronize execution with a large number of items."""
        self._create_items(self._test_src, 200)
        self._copy_items(self._test_src, self._test_dst)
        plan = self._modify_random_items(self._test_dst)
        sync: Synchronizer = Synchronizer(source=self._test_src, destination=self._test_dst)
        sync.execute(plan)
        src_collect_data = collect_data(self._test_src)
        dest_collect_data = collect_data(self._test_dst)
        self.assertEqual(set(src_collect_data), set(dest_collect_data))

    def test_action_plan_with_dir(self) -> None:
        """Test the action_plan method with dir."""
        self._create_items(self._test_src, 300, 6, 3)
        self._copy_items(self._test_src, self._test_dst)
        plan = self._modify_random_items(self._test_dst)
        sync: Synchronizer = Synchronizer(source=self._test_src, destination=self._test_dst)
        res = sync.create_action_plan()
        self.assertCountEqual(res, plan)

    def test_execute_with_dir(self) -> None:
        """Test the synchronize execution with dir."""
        self._create_items(self._test_src, 300, 6, 3)
        self._copy_items(self._test_src, self._test_dst)
        plan = self._modify_random_items(self._test_dst)
        sync: Synchronizer = Synchronizer(source=self._test_src, destination=self._test_dst)
        sync.execute(plan)
        src_collect_data = collect_data(self._test_src)
        dest_collect_data = collect_data(self._test_dst)
        self.assertEqual(set(src_collect_data), set(dest_collect_data))

    def _copy_items(self, src: Path, dest: Path):
        shutil.copytree(src, dest, dirs_exist_ok=True)

    def _create_items(self, folder: Path, n_files: int = 30, n_dirs: int = 0, n_sub_dirs: int = 0):
        os.makedirs(folder, exist_ok=True)
        tot_dirs: int = n_dirs + n_sub_dirs + 1
        files_dirs: int = n_files // tot_dirs

        for i in range(tot_dirs):
            dir_path: str = folder.resolve() if tot_dirs ==1 else os.path.join(folder, f"dir_{i}")
            os.makedirs(dir_path, exist_ok=True)
            if n_sub_dirs > 0:
                for j in range(n_sub_dirs):
                    sub_dir_path: str = os.path.join(dir_path, f"sub_dir_{j}")
                    os.makedirs(sub_dir_path, exist_ok=True)
                    self._create_files(sub_dir_path, files_dirs)
            else:
                self._create_files(dir_path, files_dirs)

    def _create_files(self, folder: Path, n_files: int = 30):
        for i in range(n_files):
            file_name: str = f"file_{i}.txt"
            file_path: str = os.path.join(folder, file_name)
            with open(file_path, 'w') as f:
                f.write(f"Text {i}: {file_name}")

    def _modify_random_items(self, folder: Path):
        files: list[str] = []
        plan: list[ActionType, str]  = []
        for root, dirs_list, files_list in os.walk(folder):
            for f in files_list:
                files.append(os.path.join(root, f))

        if files:
            files_to_remove = random.sample(files, min(ceil(len(files) / 4), len(files)))
            for f in files_to_remove:
                file_path = Path(f)
                os.remove(file_path)
                files.remove(f)
                plan.append((ActionType.COPY, str(file_path.relative_to(folder))))

        if files:
            files_to_modify = random.sample(files, min(ceil(len(files) / 5), len(files)))
            for f in files_to_modify:
                file_path = Path(f)
                with open(file_path, "w") as open_file:
                    open_file.write(f"Modified {file_path}")
                    plan.append((ActionType.UPDATE, str(file_path.relative_to(folder))))

        return plan





## Start Test

In [None]:
setup_logging(level=logging.DEBUG)

test_file_data = unittest.TestLoader().loadTestsFromTestCase(TestFileData)
test_file_info_collector = unittest.TestLoader().loadTestsFromTestCase(TestFileInfoCollector)
test_synchronizer = unittest.TestLoader().loadTestsFromTestCase(TestSynchronizer)

all_tests = unittest.TestSuite([test_file_data, test_file_info_collector, test_synchronizer])

unittest.TextTestRunner(verbosity=2, stream=sys.stdout).run(all_tests)

test_data (__main__.TestFileData.test_data)
Test that the data are consistent. ... ok
test_size (__main__.TestFileData.test_size)
Test verifying the correctness of the dimension property. ... ok
test_collects_file_information (__main__.TestFileInfoCollector.test_collects_file_information)
Check that the function correctly collects file info. ... ok
test_returns_empty_dict_for_empty_folder (__main__.TestFileInfoCollector.test_returns_empty_dict_for_empty_folder)
Check that the function returns an empty dictionary for empty folders. ... ok
test_action_plan_large (__main__.TestSynchronizer.test_action_plan_large)
Test the action_plan method with a large number of items. ... ok
test_action_plan_small (__main__.TestSynchronizer.test_action_plan_small)
Test the action_plan method with a small number of items. ... ok
test_action_plan_with_dir (__main__.TestSynchronizer.test_action_plan_with_dir)
Test the action_plan method with dir. ... ok
test_execute_large (__main__.TestSynchronizer.test_ex

<unittest.runner.TextTestResult run=10 errors=0 failures=0>