From 37e502cf423a0138d8e0297add7224ca83ce47f3 Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Tue, 21 Apr 2026 20:10:42 +0800 Subject: [PATCH 01/38] Add file watcher triggers with FA_watch_* actions and UI tab New automation_file.trigger package wraps watchdog observers behind a TriggerManager singleton. FA_watch_start / FA_watch_stop / FA_watch_stop_all / FA_watch_list are auto-registered in build_default_registry so triggered action lists dispatch through the shared executor. Triggers tab in the main window drives the same lifecycle from the GUI. --- automation_file/__init__.py | 19 ++ automation_file/core/action_registry.py | 7 + automation_file/trigger/__init__.py | 31 ++++ automation_file/trigger/manager.py | 236 ++++++++++++++++++++++++ automation_file/ui/main_window.py | 4 + automation_file/ui/tabs/__init__.py | 2 + automation_file/ui/tabs/trigger_tab.py | 171 +++++++++++++++++ dev.toml | 3 +- requirements.txt | 3 +- stable.toml | 3 +- tests/test_trigger.py | 116 ++++++++++++ tests/test_ui_smoke.py | 1 + 12 files changed, 593 insertions(+), 3 deletions(-) create mode 100644 automation_file/trigger/__init__.py create mode 100644 automation_file/trigger/manager.py create mode 100644 automation_file/ui/tabs/trigger_tab.py create mode 100644 tests/test_trigger.py diff --git a/automation_file/__init__.py b/automation_file/__init__.py index 7419549..6c877d4 100644 --- a/automation_file/__init__.py +++ b/automation_file/__init__.py @@ -87,6 +87,16 @@ TCPActionServer, start_autocontrol_socket_server, ) +from automation_file.trigger import ( + FileWatcher, + TriggerManager, + register_trigger_ops, + trigger_manager, + watch_list, + watch_start, + watch_stop, + watch_stop_all, +) from automation_file.utils.file_discovery import get_dir_files_as_list if TYPE_CHECKING: @@ -186,6 +196,15 @@ def __getattr__(name: str) -> Any: "ProjectBuilder", "create_project_dir", "get_dir_files_as_list", + # Triggers + "FileWatcher", + "TriggerManager", + "register_trigger_ops", + "trigger_manager", + "watch_start", + "watch_stop", + "watch_stop_all", + "watch_list", # UI (lazy-loaded) "launch_ui", ] diff --git a/automation_file/core/action_registry.py b/automation_file/core/action_registry.py index 26d882a..29c2228 100644 --- a/automation_file/core/action_registry.py +++ b/automation_file/core/action_registry.py @@ -141,6 +141,12 @@ def _register_cloud_backends(registry: ActionRegistry) -> None: register_sftp_ops(registry) +def _register_trigger_ops(registry: ActionRegistry) -> None: + from automation_file.trigger import register_trigger_ops + + register_trigger_ops(registry) + + def build_default_registry() -> ActionRegistry: """Return a registry pre-populated with every built-in ``FA_*`` action.""" registry = ActionRegistry() @@ -148,6 +154,7 @@ def build_default_registry() -> ActionRegistry: registry.register_many(_http_commands()) registry.register_many(_drive_commands()) _register_cloud_backends(registry) + _register_trigger_ops(registry) file_automation_logger.info( "action_registry: built default registry with %d commands", len(registry) ) diff --git a/automation_file/trigger/__init__.py b/automation_file/trigger/__init__.py new file mode 100644 index 0000000..ac86d36 --- /dev/null +++ b/automation_file/trigger/__init__.py @@ -0,0 +1,31 @@ +"""Event-driven triggers — watch local paths and run action lists on change. + +Each :class:`FileWatcher` wraps one ``watchdog.Observer`` plus the JSON action +list to dispatch when a matching event fires. The module-level +:data:`trigger_manager` owns a registry of named watchers so callers can +start / stop them from JSON actions or the GUI. +""" + +from __future__ import annotations + +from automation_file.trigger.manager import ( + FileWatcher, + TriggerManager, + register_trigger_ops, + trigger_manager, + watch_list, + watch_start, + watch_stop, + watch_stop_all, +) + +__all__ = [ + "FileWatcher", + "TriggerManager", + "register_trigger_ops", + "trigger_manager", + "watch_list", + "watch_start", + "watch_stop", + "watch_stop_all", +] diff --git a/automation_file/trigger/manager.py b/automation_file/trigger/manager.py new file mode 100644 index 0000000..b0af75a --- /dev/null +++ b/automation_file/trigger/manager.py @@ -0,0 +1,236 @@ +"""Watchdog-backed trigger manager. + +A :class:`FileWatcher` owns one ``watchdog.observers.Observer`` plus the +action list to run when a matching filesystem event fires. The module-level +:data:`trigger_manager` keeps a name -> watcher mapping so the JSON facade +(``FA_watch_start`` / ``FA_watch_stop`` / ``FA_watch_list``) and the GUI share +the same lifecycle. + +Events are dispatched through the shared :class:`ActionExecutor`, so the +same JSON action-list shape is used everywhere. Dispatch always happens on +watchdog's dispatcher thread — the executor's per-action ``try/except`` +prevents a bad action from killing the observer. +""" + +from __future__ import annotations + +import threading +from collections.abc import Iterable +from pathlib import Path +from typing import Any + +from watchdog.events import FileSystemEvent, FileSystemEventHandler +from watchdog.observers import Observer +from watchdog.observers.api import BaseObserver + +from automation_file.core.action_registry import ActionRegistry +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger + +_SUPPORTED_EVENTS = frozenset({"created", "modified", "deleted", "moved"}) + + +class TriggerException(FileAutomationException): + """Raised by the trigger manager on duplicate / missing / invalid watchers.""" + + +def _parse_events(events: Iterable[str] | str | None) -> frozenset[str]: + if events is None: + return frozenset({"created", "modified"}) + if isinstance(events, str): + events = [events] + chosen = {item.strip().lower() for item in events if item and item.strip()} + unknown = chosen - _SUPPORTED_EVENTS + if unknown: + raise TriggerException(f"unsupported event types: {sorted(unknown)}") + return frozenset(chosen or {"created", "modified"}) + + +class _DispatchingHandler(FileSystemEventHandler): + """Route watchdog events into an action list on the shared executor.""" + + def __init__( + self, + name: str, + events: frozenset[str], + action_list: list[list[Any]], + ) -> None: + super().__init__() + self._name = name + self._events = events + self._action_list = action_list + + def on_any_event(self, event: FileSystemEvent) -> None: + kind = event.event_type + if kind not in self._events: + return + file_automation_logger.info("trigger[%s]: %s %s", self._name, kind, event.src_path) + from automation_file.core.action_executor import executor + + try: + executor.execute_action(self._action_list) + except FileAutomationException as error: + file_automation_logger.warning( + "trigger[%s]: action dispatch failed: %r", self._name, error + ) + + +class FileWatcher: + """One named watchdog observer tied to an action list.""" + + def __init__( + self, + name: str, + path: str, + action_list: list[list[Any]], + *, + events: Iterable[str] | str | None = None, + recursive: bool = True, + ) -> None: + resolved = Path(path).expanduser().resolve() + if not resolved.exists(): + raise TriggerException(f"watch path does not exist: {resolved}") + self.name = name + self.path = resolved + self.recursive = bool(recursive) + self.events = _parse_events(events) + self.action_list: list[list[Any]] = list(action_list) + self._observer: BaseObserver | None = None + + @property + def is_running(self) -> bool: + observer = self._observer + return observer is not None and observer.is_alive() + + def start(self) -> None: + if self.is_running: + return + handler = _DispatchingHandler(self.name, self.events, self.action_list) + observer = Observer() + observer.schedule(handler, str(self.path), recursive=self.recursive) + observer.daemon = True + observer.start() + self._observer = observer + file_automation_logger.info( + "trigger[%s]: watching %s (events=%s, recursive=%s)", + self.name, + self.path, + sorted(self.events), + self.recursive, + ) + + def stop(self, timeout: float = 5.0) -> None: + observer = self._observer + if observer is None: + return + self._observer = None + observer.stop() + observer.join(timeout=timeout) + file_automation_logger.info("trigger[%s]: stopped", self.name) + + def as_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "path": str(self.path), + "events": sorted(self.events), + "recursive": self.recursive, + "running": self.is_running, + "actions": len(self.action_list), + } + + +class TriggerManager: + """Process-wide registry of :class:`FileWatcher` instances.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._watchers: dict[str, FileWatcher] = {} + + def start( + self, + name: str, + path: str, + action_list: list[list[Any]], + *, + events: Iterable[str] | str | None = None, + recursive: bool = True, + ) -> dict[str, Any]: + with self._lock: + if name in self._watchers: + raise TriggerException(f"watcher already registered: {name}") + watcher = FileWatcher( + name=name, + path=path, + action_list=action_list, + events=events, + recursive=recursive, + ) + watcher.start() + self._watchers[name] = watcher + return watcher.as_dict() + + def stop(self, name: str) -> dict[str, Any]: + with self._lock: + watcher = self._watchers.pop(name, None) + if watcher is None: + raise TriggerException(f"no such watcher: {name}") + watcher.stop() + return watcher.as_dict() + + def stop_all(self) -> list[dict[str, Any]]: + with self._lock: + watchers = list(self._watchers.values()) + self._watchers.clear() + snapshots: list[dict[str, Any]] = [] + for watcher in watchers: + watcher.stop() + snapshots.append(watcher.as_dict()) + return snapshots + + def list(self) -> list[dict[str, Any]]: + with self._lock: + return [watcher.as_dict() for watcher in self._watchers.values()] + + def __contains__(self, name: object) -> bool: + return isinstance(name, str) and name in self._watchers + + +trigger_manager: TriggerManager = TriggerManager() + + +def watch_start( + name: str, + path: str, + action_list: list[list[Any]], + events: Iterable[str] | str | None = None, + recursive: bool = True, +) -> dict[str, Any]: + """Start a named watcher on ``path`` that dispatches ``action_list``.""" + return trigger_manager.start(name, path, action_list, events=events, recursive=recursive) + + +def watch_stop(name: str) -> dict[str, Any]: + """Stop and remove the named watcher.""" + return trigger_manager.stop(name) + + +def watch_stop_all() -> list[dict[str, Any]]: + """Stop and remove every active watcher.""" + return trigger_manager.stop_all() + + +def watch_list() -> list[dict[str, Any]]: + """Return a snapshot of every registered watcher.""" + return trigger_manager.list() + + +def register_trigger_ops(registry: ActionRegistry) -> None: + """Wire ``FA_watch_*`` actions into a registry.""" + registry.register_many( + { + "FA_watch_start": watch_start, + "FA_watch_stop": watch_stop, + "FA_watch_stop_all": watch_stop_all, + "FA_watch_list": watch_list, + } + ) diff --git a/automation_file/ui/main_window.py b/automation_file/ui/main_window.py index e54bb4c..1ca5507 100644 --- a/automation_file/ui/main_window.py +++ b/automation_file/ui/main_window.py @@ -14,6 +14,7 @@ LocalOpsTab, ServerTab, TransferTab, + TriggerTab, ) _WINDOW_TITLE = "automation_file" @@ -40,6 +41,8 @@ def __init__(self) -> None: self._tabs.addTab(LocalOpsTab(self._log, self._pool), "Local") self._tabs.addTab(TransferTab(self._log, self._pool), "Transfer") self._tabs.addTab(JSONEditorTab(self._log, self._pool), "JSON actions") + self._trigger_tab = TriggerTab(self._log, self._pool) + self._tabs.addTab(self._trigger_tab, "Triggers") self._server_tab = ServerTab(self._log, self._pool) self._tabs.addTab(self._server_tab, "Servers") @@ -76,4 +79,5 @@ def _on_log_message(self, message: str) -> None: def closeEvent(self, event) -> None: # noqa: N802 # pylint: disable=invalid-name — Qt override self._server_tab.closeEvent(event) + self._trigger_tab.closeEvent(event) super().closeEvent(event) diff --git a/automation_file/ui/tabs/__init__.py b/automation_file/ui/tabs/__init__.py index 57a85a2..2387d86 100644 --- a/automation_file/ui/tabs/__init__.py +++ b/automation_file/ui/tabs/__init__.py @@ -13,6 +13,7 @@ from automation_file.ui.tabs.server_tab import ServerTab from automation_file.ui.tabs.sftp_tab import SFTPTab from automation_file.ui.tabs.transfer_tab import TransferTab +from automation_file.ui.tabs.trigger_tab import TriggerTab __all__ = [ "AzureBlobTab", @@ -26,4 +27,5 @@ "SFTPTab", "ServerTab", "TransferTab", + "TriggerTab", ] diff --git a/automation_file/ui/tabs/trigger_tab.py b/automation_file/ui/tabs/trigger_tab.py new file mode 100644 index 0000000..8c10eab --- /dev/null +++ b/automation_file/ui/tabs/trigger_tab.py @@ -0,0 +1,171 @@ +"""File-watcher trigger management tab. + +Lists active watchers, starts a new one from an inline JSON action list, +and stops watchers individually or as a batch. All dispatch still goes +through the shared :class:`ActionExecutor`; this tab is a thin GUI over +:mod:`automation_file.trigger`. +""" + +from __future__ import annotations + +import json +from typing import Any + +from PySide6.QtWidgets import ( + QCheckBox, + QFormLayout, + QGroupBox, + QHBoxLayout, + QHeaderView, + QLineEdit, + QPlainTextEdit, + QTableWidget, + QTableWidgetItem, + QVBoxLayout, +) + +from automation_file.exceptions import FileAutomationException +from automation_file.trigger import ( + trigger_manager, + watch_start, + watch_stop, + watch_stop_all, +) +from automation_file.ui.tabs.base import BaseTab + +_COLUMNS = ("Name", "Path", "Events", "Recursive", "Actions", "Running") + + +class TriggerTab(BaseTab): + """Start / stop / list file-system watchers.""" + + def __init__(self, log, pool) -> None: + super().__init__(log, pool) + root = QVBoxLayout(self) + root.setContentsMargins(12, 12, 12, 12) + root.setSpacing(12) + root.addWidget(self._start_group()) + root.addWidget(self._list_group(), 1) + self._refresh() + + def _start_group(self) -> QGroupBox: + box = QGroupBox("Start a watcher") + form = QFormLayout(box) + form.setVerticalSpacing(10) + form.setHorizontalSpacing(12) + + self._name = QLineEdit() + self._name.setPlaceholderText("unique watcher name") + self._path = QLineEdit() + self._path.setPlaceholderText("absolute path to watch") + self._events = QLineEdit("created,modified") + self._events.setPlaceholderText("comma-separated: created,modified,deleted,moved") + self._recursive = QCheckBox("Recursive") + self._recursive.setChecked(True) + self._actions = QPlainTextEdit() + self._actions.setPlaceholderText( + '[["FA_create_file", {"file_path": "triggered.txt", "content": "hi"}]]' + ) + self._actions.setMinimumHeight(120) + + form.addRow("Name", self._name) + form.addRow("Path", self._path) + form.addRow("Events", self._events) + form.addRow(self._recursive) + form.addRow("Actions (JSON)", self._actions) + form.addRow(self.make_button("Start watcher", self._on_start)) + return box + + def _list_group(self) -> QGroupBox: + box = QGroupBox("Active watchers") + layout = QVBoxLayout(box) + layout.setSpacing(8) + + self._table = QTableWidget(0, len(_COLUMNS)) + self._table.setHorizontalHeaderLabels(_COLUMNS) + header = self._table.horizontalHeader() + header.setSectionResizeMode(QHeaderView.ResizeMode.Stretch) + self._table.verticalHeader().setVisible(False) + self._table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows) + self._table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers) + layout.addWidget(self._table) + + row = QHBoxLayout() + row.setSpacing(8) + row.addWidget(self.make_button("Refresh", self._refresh)) + row.addWidget(self.make_button("Stop selected", self._on_stop_selected)) + row.addWidget(self.make_button("Stop all", self._on_stop_all)) + row.addStretch() + layout.addLayout(row) + return box + + def _on_start(self) -> None: + name = self._name.text().strip() + path = self._path.text().strip() + raw_events = [event.strip() for event in self._events.text().split(",") if event.strip()] + raw_actions = self._actions.toPlainText().strip() + if not name or not path or not raw_actions: + self._log.append_line("trigger: name, path, and actions are all required") + return + try: + action_list = json.loads(raw_actions) + except json.JSONDecodeError as error: + self._log.append_line(f"trigger: invalid JSON: {error}") + return + if not isinstance(action_list, list): + self._log.append_line("trigger: action JSON must be an array") + return + try: + snapshot = watch_start( + name=name, + path=path, + action_list=action_list, + events=raw_events or None, + recursive=self._recursive.isChecked(), + ) + except FileAutomationException as error: + self._log.append_line(f"trigger: start failed: {error!r}") + return + self._log.append_line(f"trigger: started {snapshot}") + self._refresh() + + def _on_stop_selected(self) -> None: + row = self._table.currentRow() + if row < 0: + self._log.append_line("trigger: no row selected") + return + name_item = self._table.item(row, 0) + if name_item is None: + return + name = name_item.text() + try: + snapshot = watch_stop(name) + except FileAutomationException as error: + self._log.append_line(f"trigger: stop failed: {error!r}") + return + self._log.append_line(f"trigger: stopped {snapshot}") + self._refresh() + + def _on_stop_all(self) -> None: + snapshots = watch_stop_all() + self._log.append_line(f"trigger: stopped {len(snapshots)} watcher(s)") + self._refresh() + + def _refresh(self) -> None: + watchers = trigger_manager.list() + self._table.setRowCount(len(watchers)) + for row, data in enumerate(watchers): + self._set_cell(row, 0, data["name"]) + self._set_cell(row, 1, data["path"]) + self._set_cell(row, 2, ", ".join(data["events"])) + self._set_cell(row, 3, "yes" if data["recursive"] else "no") + self._set_cell(row, 4, str(data["actions"])) + self._set_cell(row, 5, "yes" if data["running"] else "no") + + def _set_cell(self, row: int, col: int, text: str) -> None: + item = QTableWidgetItem(text) + self._table.setItem(row, col, item) + + def closeEvent(self, event: Any) -> None: # noqa: N802 # pylint: disable=invalid-name — Qt override + watch_stop_all() + super().closeEvent(event) diff --git a/dev.toml b/dev.toml index 23230ac..3c15e73 100644 --- a/dev.toml +++ b/dev.toml @@ -23,7 +23,8 @@ dependencies = [ "azure-storage-blob>=12.19.0", "dropbox>=11.36.2", "paramiko>=3.4.0", - "PySide6>=6.6.0" + "PySide6>=6.6.0", + "watchdog>=4.0.0" ] classifiers = [ "Programming Language :: Python :: 3.10", diff --git a/requirements.txt b/requirements.txt index 2cb4f68..d6b9100 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ google-auth-oauthlib PySide6 requests protobuf -tqdm \ No newline at end of file +tqdm +watchdog \ No newline at end of file diff --git a/stable.toml b/stable.toml index 91b1b50..992dcaf 100644 --- a/stable.toml +++ b/stable.toml @@ -23,7 +23,8 @@ dependencies = [ "azure-storage-blob>=12.19.0", "dropbox>=11.36.2", "paramiko>=3.4.0", - "PySide6>=6.6.0" + "PySide6>=6.6.0", + "watchdog>=4.0.0" ] classifiers = [ "Programming Language :: Python :: 3.10", diff --git a/tests/test_trigger.py b/tests/test_trigger.py new file mode 100644 index 0000000..e52dbea --- /dev/null +++ b/tests/test_trigger.py @@ -0,0 +1,116 @@ +"""Tests for the filesystem watcher trigger manager. + +These tests avoid asserting on actual filesystem events — watchdog's +dispatcher thread is inherently racey across platforms. Instead they cover +validation, registry wiring, lifecycle, and dict representation. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from automation_file.exceptions import FileAutomationException +from automation_file.trigger.manager import ( + FileWatcher, + TriggerException, + TriggerManager, + _parse_events, + register_trigger_ops, +) + + +def test_parse_events_defaults_when_none() -> None: + assert _parse_events(None) == frozenset({"created", "modified"}) + + +def test_parse_events_accepts_string() -> None: + assert _parse_events("deleted") == frozenset({"deleted"}) + + +def test_parse_events_normalises_case_and_whitespace() -> None: + assert _parse_events([" Created ", "MOVED"]) == frozenset({"created", "moved"}) + + +def test_parse_events_rejects_unknown() -> None: + with pytest.raises(TriggerException): + _parse_events(["created", "exploded"]) + + +def test_file_watcher_rejects_missing_path(tmp_path: Path) -> None: + with pytest.raises(TriggerException): + FileWatcher("missing", str(tmp_path / "nope"), [["FA_create_file", {}]]) + + +def test_trigger_manager_start_stop_lifecycle(tmp_path: Path) -> None: + manager = TriggerManager() + snapshot = manager.start( + name="t1", + path=str(tmp_path), + action_list=[["FA_watch_list"]], + events=["created"], + recursive=False, + ) + try: + assert snapshot["name"] == "t1" + assert snapshot["running"] is True + assert snapshot["events"] == ["created"] + assert "t1" in manager + listing = manager.list() + assert len(listing) == 1 and listing[0]["name"] == "t1" + finally: + manager.stop("t1") + assert "t1" not in manager + assert manager.list() == [] + + +def test_trigger_manager_rejects_duplicate(tmp_path: Path) -> None: + manager = TriggerManager() + manager.start("dup", str(tmp_path), [["FA_watch_list"]]) + try: + with pytest.raises(TriggerException): + manager.start("dup", str(tmp_path), [["FA_watch_list"]]) + finally: + manager.stop_all() + + +def test_trigger_manager_stop_unknown_raises() -> None: + manager = TriggerManager() + with pytest.raises(TriggerException): + manager.stop("never-registered") + + +def test_trigger_manager_stop_all_clears_everything(tmp_path: Path) -> None: + manager = TriggerManager() + manager.start("a", str(tmp_path), [["FA_watch_list"]]) + manager.start("b", str(tmp_path), [["FA_watch_list"]]) + snapshots = manager.stop_all() + assert len(snapshots) == 2 + assert manager.list() == [] + + +def test_trigger_exception_inherits_from_file_automation() -> None: + assert issubclass(TriggerException, FileAutomationException) + + +def test_register_trigger_ops_populates_registry() -> None: + from automation_file.core.action_registry import ActionRegistry + + registry = ActionRegistry() + register_trigger_ops(registry) + for name in ( + "FA_watch_start", + "FA_watch_stop", + "FA_watch_stop_all", + "FA_watch_list", + ): + assert name in registry + + +def test_default_registry_contains_trigger_ops() -> None: + from automation_file.core.action_registry import build_default_registry + + registry = build_default_registry() + assert "FA_watch_start" in registry + assert "FA_watch_list" in registry diff --git a/tests/test_ui_smoke.py b/tests/test_ui_smoke.py index a94ec39..776096e 100644 --- a/tests/test_ui_smoke.py +++ b/tests/test_ui_smoke.py @@ -54,6 +54,7 @@ def test_main_window_constructs(qt_app) -> None: "JSONEditorTab", "ServerTab", "TransferTab", + "TriggerTab", "HomeTab", ], ) From af955deee29738e16c85be01261937d3be9cc2f2 Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Tue, 21 Apr 2026 20:14:35 +0800 Subject: [PATCH 02/38] Add cron scheduler with FA_schedule_* actions and UI tab New automation_file.scheduler package parses 5-field cron expressions (stdlib-only, supports *, ranges, lists, and step syntax with month / DoW aliases) and drives a background thread that dispatches each job's action list on minute-boundary ticks. FA_schedule_add / FA_schedule_remove / FA_schedule_remove_all / FA_schedule_list are auto-registered alongside the other FA_* actions. Scheduler tab in the main window drives the same lifecycle from the GUI. --- automation_file/__init__.py | 21 +++ automation_file/core/action_registry.py | 7 + automation_file/scheduler/__init__.py | 35 +++++ automation_file/scheduler/cron.py | 140 ++++++++++++++++++ automation_file/scheduler/manager.py | 180 +++++++++++++++++++++++ automation_file/ui/main_window.py | 4 + automation_file/ui/tabs/__init__.py | 2 + automation_file/ui/tabs/scheduler_tab.py | 155 +++++++++++++++++++ tests/test_scheduler.py | 166 +++++++++++++++++++++ tests/test_ui_smoke.py | 1 + 10 files changed, 711 insertions(+) create mode 100644 automation_file/scheduler/__init__.py create mode 100644 automation_file/scheduler/cron.py create mode 100644 automation_file/scheduler/manager.py create mode 100644 automation_file/ui/tabs/scheduler_tab.py create mode 100644 tests/test_scheduler.py diff --git a/automation_file/__init__.py b/automation_file/__init__.py index 6c877d4..5099283 100644 --- a/automation_file/__init__.py +++ b/automation_file/__init__.py @@ -82,6 +82,17 @@ from automation_file.remote.s3 import S3Client, register_s3_ops, s3_instance from automation_file.remote.sftp import SFTPClient, register_sftp_ops, sftp_instance from automation_file.remote.url_validator import validate_http_url +from automation_file.scheduler import ( + CronExpression, + ScheduledJob, + Scheduler, + register_scheduler_ops, + schedule_add, + schedule_list, + schedule_remove, + schedule_remove_all, + scheduler, +) from automation_file.server.http_server import HTTPActionServer, start_http_action_server from automation_file.server.tcp_server import ( TCPActionServer, @@ -205,6 +216,16 @@ def __getattr__(name: str) -> Any: "watch_stop", "watch_stop_all", "watch_list", + # Scheduler + "CronExpression", + "ScheduledJob", + "Scheduler", + "register_scheduler_ops", + "schedule_add", + "schedule_list", + "schedule_remove", + "schedule_remove_all", + "scheduler", # UI (lazy-loaded) "launch_ui", ] diff --git a/automation_file/core/action_registry.py b/automation_file/core/action_registry.py index 29c2228..cbf9f7b 100644 --- a/automation_file/core/action_registry.py +++ b/automation_file/core/action_registry.py @@ -147,6 +147,12 @@ def _register_trigger_ops(registry: ActionRegistry) -> None: register_trigger_ops(registry) +def _register_scheduler_ops(registry: ActionRegistry) -> None: + from automation_file.scheduler import register_scheduler_ops + + register_scheduler_ops(registry) + + def build_default_registry() -> ActionRegistry: """Return a registry pre-populated with every built-in ``FA_*`` action.""" registry = ActionRegistry() @@ -155,6 +161,7 @@ def build_default_registry() -> ActionRegistry: registry.register_many(_drive_commands()) _register_cloud_backends(registry) _register_trigger_ops(registry) + _register_scheduler_ops(registry) file_automation_logger.info( "action_registry: built default registry with %d commands", len(registry) ) diff --git a/automation_file/scheduler/__init__.py b/automation_file/scheduler/__init__.py new file mode 100644 index 0000000..ec2d481 --- /dev/null +++ b/automation_file/scheduler/__init__.py @@ -0,0 +1,35 @@ +"""Cron-style scheduler — run action lists on a recurring schedule. + +A :class:`ScheduledJob` pairs a 5-field cron expression (minute hour dom month +dow) with a JSON action list. The module-level :data:`scheduler` owns a +background thread that wakes every second, checks which jobs are due, and +dispatches their action lists through the shared +:class:`~automation_file.core.action_executor.ActionExecutor`. +""" + +from __future__ import annotations + +from automation_file.scheduler.cron import CronException, CronExpression +from automation_file.scheduler.manager import ( + ScheduledJob, + Scheduler, + register_scheduler_ops, + schedule_add, + schedule_list, + schedule_remove, + schedule_remove_all, + scheduler, +) + +__all__ = [ + "CronException", + "CronExpression", + "ScheduledJob", + "Scheduler", + "register_scheduler_ops", + "schedule_add", + "schedule_list", + "schedule_remove", + "schedule_remove_all", + "scheduler", +] diff --git a/automation_file/scheduler/cron.py b/automation_file/scheduler/cron.py new file mode 100644 index 0000000..0dc6d87 --- /dev/null +++ b/automation_file/scheduler/cron.py @@ -0,0 +1,140 @@ +"""Minimal 5-field cron expression parser (stdlib-only). + +Supports ``*``, exact values, ``a-b`` ranges, ``a,b,c`` lists, and ``*/n`` or +``a-b/n`` step syntax. Fields are, in order: minute (0-59), hour (0-23), +day-of-month (1-31), month (1-12), day-of-week (0-6, Sunday = 0 or 7). Names +(``jan``..``dec`` / ``sun``..``sat``) are accepted case-insensitively. + +Explicitly *not* supported: ``@yearly`` / ``@reboot`` aliases, ``L``/``W`` +modifiers, seconds. Callers needing that should use a dedicated cron library. +""" + +from __future__ import annotations + +import datetime as dt +from dataclasses import dataclass + +from automation_file.exceptions import FileAutomationException + + +class CronException(FileAutomationException): + """Raised when a cron expression cannot be parsed.""" + + +_FIELD_BOUNDS = ( + (0, 59), + (0, 23), + (1, 31), + (1, 12), + (0, 6), +) + +_MONTH_ALIASES = { + "jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6, + "jul": 7, "aug": 8, "sep": 9, "oct": 10, "nov": 11, "dec": 12, +} # fmt: skip + +_DOW_ALIASES = { + "sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6, +} # fmt: skip + + +def _resolve_alias(token: str, field_index: int) -> str: + lowered = token.lower() + if field_index == 3 and lowered in _MONTH_ALIASES: + return str(_MONTH_ALIASES[lowered]) + if field_index == 4 and lowered in _DOW_ALIASES: + return str(_DOW_ALIASES[lowered]) + return token + + +def _parse_value(token: str, field_index: int) -> int: + resolved = _resolve_alias(token, field_index) + try: + return int(resolved) + except ValueError as error: + raise CronException(f"cron: non-numeric value {token!r}") from error + + +def _expand_range(start: int, end: int, step: int, low: int, high: int) -> set[int]: + if start < low or end > high or start > end: + raise CronException(f"cron: range {start}-{end} outside [{low},{high}]") + if step <= 0: + raise CronException(f"cron: step must be positive, got {step}") + return set(range(start, end + 1, step)) + + +def _parse_field(raw: str, field_index: int) -> frozenset[int]: + low, high = _FIELD_BOUNDS[field_index] + # DoW accepts 7 as an alias for Sunday (0) before range validation. + effective_high = 7 if field_index == 4 else high + result: set[int] = set() + for part in raw.split(","): + chunk = part.strip() + if not chunk: + raise CronException(f"cron: empty chunk in field {field_index}") + step = 1 + if "/" in chunk: + base, step_text = chunk.split("/", 1) + try: + step = int(step_text) + except ValueError as error: + raise CronException(f"cron: bad step {step_text!r}") from error + chunk = base + if chunk == "*": + result |= _expand_range(low, high, step, low, high) + continue + if "-" in chunk: + start_text, end_text = chunk.split("-", 1) + start = _parse_value(start_text, field_index) + end = _parse_value(end_text, field_index) + result |= _expand_range(start, end, step, low, effective_high) + continue + value = _parse_value(chunk, field_index) + if value < low or value > effective_high: + raise CronException(f"cron: value {value} outside [{low},{high}]") + if step == 1: + result.add(value) + else: + result |= _expand_range(value, effective_high, step, low, effective_high) + if field_index == 4 and 7 in result: + result.discard(7) + result.add(0) + return frozenset(result) + + +@dataclass(frozen=True) +class CronExpression: + """Parsed 5-field cron expression.""" + + minutes: frozenset[int] + hours: frozenset[int] + days: frozenset[int] + months: frozenset[int] + weekdays: frozenset[int] + source: str + + @classmethod + def parse(cls, expression: str) -> CronExpression: + if not expression or not expression.strip(): + raise CronException("cron: expression is empty") + fields = expression.split() + if len(fields) != 5: + raise CronException(f"cron: expected 5 fields, got {len(fields)}: {expression!r}") + minutes = _parse_field(fields[0], 0) + hours = _parse_field(fields[1], 1) + days = _parse_field(fields[2], 2) + months = _parse_field(fields[3], 3) + weekdays = _parse_field(fields[4], 4) + return cls(minutes, hours, days, months, weekdays, expression.strip()) + + def matches(self, moment: dt.datetime) -> bool: + """Return ``True`` when ``moment`` satisfies every field.""" + weekday = moment.isoweekday() % 7 # Monday=1..Sunday=7 -> Monday=1..Sunday=0 + return ( + moment.minute in self.minutes + and moment.hour in self.hours + and moment.day in self.days + and moment.month in self.months + and weekday in self.weekdays + ) diff --git a/automation_file/scheduler/manager.py b/automation_file/scheduler/manager.py new file mode 100644 index 0000000..398ffc6 --- /dev/null +++ b/automation_file/scheduler/manager.py @@ -0,0 +1,180 @@ +"""Background scheduler for cron-scheduled action lists. + +The scheduler thread wakes once a minute (aligned to wall-clock minute +boundaries), iterates registered jobs, and dispatches each matching job's +action list through the shared :class:`ActionExecutor`. Dispatch happens on a +short-lived worker thread so a long-running action cannot block subsequent +jobs — but callers are still responsible for keeping their action lists +reasonable in duration. +""" + +from __future__ import annotations + +import datetime as dt +import threading +from dataclasses import dataclass, field +from typing import Any + +from automation_file.core.action_registry import ActionRegistry +from automation_file.exceptions import FileAutomationException +from automation_file.logging_config import file_automation_logger +from automation_file.scheduler.cron import CronExpression + + +class SchedulerException(FileAutomationException): + """Raised for duplicate / missing / invalid scheduled jobs.""" + + +@dataclass +class ScheduledJob: + """One named cron expression paired with an action list.""" + + name: str + cron: CronExpression + action_list: list[list[Any]] + last_run: dt.datetime | None = field(default=None) + runs: int = field(default=0) + + def as_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "cron": self.cron.source, + "actions": len(self.action_list), + "last_run": self.last_run.isoformat() if self.last_run else None, + "runs": self.runs, + } + + +class Scheduler: + """Process-wide scheduler — one background thread drives every job.""" + + _TICK_SECONDS = 1.0 + + def __init__(self) -> None: + self._lock = threading.Lock() + self._jobs: dict[str, ScheduledJob] = {} + self._thread: threading.Thread | None = None + self._stop = threading.Event() + + def _ensure_running(self) -> None: + if self._thread is not None and self._thread.is_alive(): + return + self._stop.clear() + thread = threading.Thread(target=self._run, name="fa-scheduler", daemon=True) + thread.start() + self._thread = thread + + def _run(self) -> None: + last_minute: tuple[int, int, int, int, int] | None = None + while not self._stop.is_set(): + now = dt.datetime.now().replace(second=0, microsecond=0) + key = (now.year, now.month, now.day, now.hour, now.minute) + if key != last_minute: + last_minute = key + self._fire_due(now) + self._stop.wait(self._TICK_SECONDS) + + def _fire_due(self, moment: dt.datetime) -> None: + with self._lock: + due = [job for job in self._jobs.values() if job.cron.matches(moment)] + for job in due: + self._dispatch(job, moment) + + @staticmethod + def _dispatch(job: ScheduledJob, moment: dt.datetime) -> None: + job.last_run = moment + job.runs += 1 + file_automation_logger.info( + "scheduler[%s]: firing at %s (run #%d)", job.name, moment.isoformat(), job.runs + ) + worker = threading.Thread( + target=_safe_execute, + args=(job.name, job.action_list), + name=f"fa-scheduler-{job.name}", + daemon=True, + ) + worker.start() + + def add(self, name: str, cron_expression: str, action_list: list[list[Any]]) -> dict[str, Any]: + cron = CronExpression.parse(cron_expression) + with self._lock: + if name in self._jobs: + raise SchedulerException(f"job already registered: {name}") + job = ScheduledJob(name=name, cron=cron, action_list=list(action_list)) + self._jobs[name] = job + snapshot = job.as_dict() + self._ensure_running() + file_automation_logger.info("scheduler: added job %r (cron=%r)", name, cron.source) + return snapshot + + def remove(self, name: str) -> dict[str, Any]: + with self._lock: + job = self._jobs.pop(name, None) + if job is None: + raise SchedulerException(f"no such job: {name}") + file_automation_logger.info("scheduler: removed job %r", name) + return job.as_dict() + + def remove_all(self) -> list[dict[str, Any]]: + with self._lock: + snapshots = [job.as_dict() for job in self._jobs.values()] + self._jobs.clear() + return snapshots + + def list(self) -> list[dict[str, Any]]: + with self._lock: + return [job.as_dict() for job in self._jobs.values()] + + def shutdown(self, timeout: float = 5.0) -> None: + self._stop.set() + thread = self._thread + self._thread = None + if thread is not None and thread.is_alive(): + thread.join(timeout=timeout) + + def __contains__(self, name: object) -> bool: + return isinstance(name, str) and name in self._jobs + + +def _safe_execute(job_name: str, action_list: list[list[Any]]) -> None: + from automation_file.core.action_executor import executor + + try: + executor.execute_action(action_list) + except FileAutomationException as error: + file_automation_logger.warning("scheduler[%s]: dispatch failed: %r", job_name, error) + + +scheduler: Scheduler = Scheduler() + + +def schedule_add(name: str, cron_expression: str, action_list: list[list[Any]]) -> dict[str, Any]: + """Register a named job that fires ``action_list`` on ``cron_expression``.""" + return scheduler.add(name, cron_expression, action_list) + + +def schedule_remove(name: str) -> dict[str, Any]: + """Remove the named job.""" + return scheduler.remove(name) + + +def schedule_remove_all() -> list[dict[str, Any]]: + """Remove every registered job and return their final snapshots.""" + return scheduler.remove_all() + + +def schedule_list() -> list[dict[str, Any]]: + """Return a snapshot of every registered job.""" + return scheduler.list() + + +def register_scheduler_ops(registry: ActionRegistry) -> None: + """Wire ``FA_schedule_*`` actions into a registry.""" + registry.register_many( + { + "FA_schedule_add": schedule_add, + "FA_schedule_remove": schedule_remove, + "FA_schedule_remove_all": schedule_remove_all, + "FA_schedule_list": schedule_list, + } + ) diff --git a/automation_file/ui/main_window.py b/automation_file/ui/main_window.py index 1ca5507..f422b4a 100644 --- a/automation_file/ui/main_window.py +++ b/automation_file/ui/main_window.py @@ -12,6 +12,7 @@ HomeTab, JSONEditorTab, LocalOpsTab, + SchedulerTab, ServerTab, TransferTab, TriggerTab, @@ -43,6 +44,8 @@ def __init__(self) -> None: self._tabs.addTab(JSONEditorTab(self._log, self._pool), "JSON actions") self._trigger_tab = TriggerTab(self._log, self._pool) self._tabs.addTab(self._trigger_tab, "Triggers") + self._scheduler_tab = SchedulerTab(self._log, self._pool) + self._tabs.addTab(self._scheduler_tab, "Scheduler") self._server_tab = ServerTab(self._log, self._pool) self._tabs.addTab(self._server_tab, "Servers") @@ -80,4 +83,5 @@ def _on_log_message(self, message: str) -> None: def closeEvent(self, event) -> None: # noqa: N802 # pylint: disable=invalid-name — Qt override self._server_tab.closeEvent(event) self._trigger_tab.closeEvent(event) + self._scheduler_tab.closeEvent(event) super().closeEvent(event) diff --git a/automation_file/ui/tabs/__init__.py b/automation_file/ui/tabs/__init__.py index 2387d86..26fa7e7 100644 --- a/automation_file/ui/tabs/__init__.py +++ b/automation_file/ui/tabs/__init__.py @@ -10,6 +10,7 @@ from automation_file.ui.tabs.json_editor_tab import JSONEditorTab from automation_file.ui.tabs.local_tab import LocalOpsTab from automation_file.ui.tabs.s3_tab import S3Tab +from automation_file.ui.tabs.scheduler_tab import SchedulerTab from automation_file.ui.tabs.server_tab import ServerTab from automation_file.ui.tabs.sftp_tab import SFTPTab from automation_file.ui.tabs.transfer_tab import TransferTab @@ -25,6 +26,7 @@ "LocalOpsTab", "S3Tab", "SFTPTab", + "SchedulerTab", "ServerTab", "TransferTab", "TriggerTab", diff --git a/automation_file/ui/tabs/scheduler_tab.py b/automation_file/ui/tabs/scheduler_tab.py new file mode 100644 index 0000000..26e5f78 --- /dev/null +++ b/automation_file/ui/tabs/scheduler_tab.py @@ -0,0 +1,155 @@ +"""Cron scheduler management tab. + +Lists active scheduled jobs, adds a new job from a cron expression plus an +inline JSON action list, and removes jobs individually or in bulk. All +dispatch goes through the shared :class:`ActionExecutor`. +""" + +from __future__ import annotations + +import json +from typing import Any + +from PySide6.QtWidgets import ( + QFormLayout, + QGroupBox, + QHBoxLayout, + QHeaderView, + QLineEdit, + QPlainTextEdit, + QTableWidget, + QTableWidgetItem, + QVBoxLayout, +) + +from automation_file.exceptions import FileAutomationException +from automation_file.scheduler import ( + schedule_add, + schedule_list, + schedule_remove, + schedule_remove_all, +) +from automation_file.ui.tabs.base import BaseTab + +_COLUMNS = ("Name", "Cron", "Actions", "Runs", "Last run") + + +class SchedulerTab(BaseTab): + """Add / remove / list cron-scheduled jobs.""" + + def __init__(self, log, pool) -> None: + super().__init__(log, pool) + root = QVBoxLayout(self) + root.setContentsMargins(12, 12, 12, 12) + root.setSpacing(12) + root.addWidget(self._add_group()) + root.addWidget(self._list_group(), 1) + self._refresh() + + def _add_group(self) -> QGroupBox: + box = QGroupBox("Schedule a job") + form = QFormLayout(box) + form.setVerticalSpacing(10) + form.setHorizontalSpacing(12) + + self._name = QLineEdit() + self._name.setPlaceholderText("unique job name") + self._cron = QLineEdit("*/5 * * * *") + self._cron.setPlaceholderText("minute hour dom month dow (e.g. */5 * * * *)") + self._actions = QPlainTextEdit() + self._actions.setPlaceholderText( + '[["FA_create_file", {"file_path": "scheduled.txt", "content": "tick"}]]' + ) + self._actions.setMinimumHeight(120) + + form.addRow("Name", self._name) + form.addRow("Cron", self._cron) + form.addRow("Actions (JSON)", self._actions) + form.addRow(self.make_button("Add job", self._on_add)) + return box + + def _list_group(self) -> QGroupBox: + box = QGroupBox("Active jobs") + layout = QVBoxLayout(box) + layout.setSpacing(8) + + self._table = QTableWidget(0, len(_COLUMNS)) + self._table.setHorizontalHeaderLabels(_COLUMNS) + header = self._table.horizontalHeader() + header.setSectionResizeMode(QHeaderView.ResizeMode.Stretch) + self._table.verticalHeader().setVisible(False) + self._table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows) + self._table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers) + layout.addWidget(self._table) + + row = QHBoxLayout() + row.setSpacing(8) + row.addWidget(self.make_button("Refresh", self._refresh)) + row.addWidget(self.make_button("Remove selected", self._on_remove_selected)) + row.addWidget(self.make_button("Remove all", self._on_remove_all)) + row.addStretch() + layout.addLayout(row) + return box + + def _on_add(self) -> None: + name = self._name.text().strip() + cron = self._cron.text().strip() + raw_actions = self._actions.toPlainText().strip() + if not name or not cron or not raw_actions: + self._log.append_line("scheduler: name, cron, and actions are all required") + return + try: + action_list = json.loads(raw_actions) + except json.JSONDecodeError as error: + self._log.append_line(f"scheduler: invalid JSON: {error}") + return + if not isinstance(action_list, list): + self._log.append_line("scheduler: action JSON must be an array") + return + try: + snapshot = schedule_add(name, cron, action_list) + except FileAutomationException as error: + self._log.append_line(f"scheduler: add failed: {error!r}") + return + self._log.append_line(f"scheduler: added {snapshot}") + self._refresh() + + def _on_remove_selected(self) -> None: + row = self._table.currentRow() + if row < 0: + self._log.append_line("scheduler: no row selected") + return + name_item = self._table.item(row, 0) + if name_item is None: + return + name = name_item.text() + try: + snapshot = schedule_remove(name) + except FileAutomationException as error: + self._log.append_line(f"scheduler: remove failed: {error!r}") + return + self._log.append_line(f"scheduler: removed {snapshot}") + self._refresh() + + def _on_remove_all(self) -> None: + snapshots = schedule_remove_all() + self._log.append_line(f"scheduler: removed {len(snapshots)} job(s)") + self._refresh() + + def _refresh(self) -> None: + jobs = schedule_list() + self._table.setRowCount(len(jobs)) + for row, data in enumerate(jobs): + self._set_cell(row, 0, data["name"]) + self._set_cell(row, 1, data["cron"]) + self._set_cell(row, 2, str(data["actions"])) + self._set_cell(row, 3, str(data["runs"])) + self._set_cell(row, 4, data["last_run"] or "—") + + def _set_cell(self, row: int, col: int, text: str) -> None: + item = QTableWidgetItem(text) + self._table.setItem(row, col, item) + + def closeEvent(self, event: Any) -> None: # noqa: N802 # pylint: disable=invalid-name — Qt override + schedule_remove_all() + super().closeEvent(event) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..8a26385 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,166 @@ +"""Tests for the cron parser and scheduler registry wiring. + +The scheduler's live tick thread is intentionally *not* exercised here — we +verify parser correctness, job CRUD, duplicate rejection, and registry +wiring. End-to-end minute-boundary firing is covered manually / in the GUI. +""" + +from __future__ import annotations + +import datetime as dt + +import pytest + +from automation_file.exceptions import FileAutomationException +from automation_file.scheduler.cron import CronException, CronExpression +from automation_file.scheduler.manager import ( + Scheduler, + SchedulerException, + _safe_execute, + register_scheduler_ops, +) + + +def test_cron_parses_star() -> None: + expression = CronExpression.parse("* * * * *") + assert 0 in expression.minutes and 59 in expression.minutes + assert expression.matches(dt.datetime(2026, 4, 21, 12, 30)) + + +def test_cron_parses_exact_minute() -> None: + expression = CronExpression.parse("30 9 * * *") + assert expression.minutes == frozenset({30}) + assert expression.hours == frozenset({9}) + assert expression.matches(dt.datetime(2026, 4, 21, 9, 30)) + assert not expression.matches(dt.datetime(2026, 4, 21, 9, 31)) + + +def test_cron_parses_step() -> None: + expression = CronExpression.parse("*/15 * * * *") + assert expression.minutes == frozenset({0, 15, 30, 45}) + + +def test_cron_parses_range_with_step() -> None: + expression = CronExpression.parse("0 9-17/2 * * *") + assert expression.hours == frozenset({9, 11, 13, 15, 17}) + + +def test_cron_parses_list() -> None: + expression = CronExpression.parse("0,15,30 * * * *") + assert expression.minutes == frozenset({0, 15, 30}) + + +def test_cron_parses_month_and_dow_aliases() -> None: + expression = CronExpression.parse("0 0 * JAN mon") + assert expression.months == frozenset({1}) + assert expression.weekdays == frozenset({1}) + + +def test_cron_dow_seven_aliases_to_sunday() -> None: + expression = CronExpression.parse("0 0 * * 7") + assert expression.weekdays == frozenset({0}) + + +def test_cron_rejects_wrong_field_count() -> None: + with pytest.raises(CronException): + CronExpression.parse("* * * *") + + +def test_cron_rejects_out_of_range() -> None: + with pytest.raises(CronException): + CronExpression.parse("60 * * * *") + + +def test_cron_rejects_empty_expression() -> None: + with pytest.raises(CronException): + CronExpression.parse("") + + +def test_cron_exception_inherits_from_file_automation() -> None: + assert issubclass(CronException, FileAutomationException) + + +def test_scheduler_add_remove_lifecycle() -> None: + engine = Scheduler() + try: + snapshot = engine.add("job-a", "*/5 * * * *", [["FA_schedule_list"]]) + assert snapshot["name"] == "job-a" + assert snapshot["cron"] == "*/5 * * * *" + assert snapshot["runs"] == 0 + assert "job-a" in engine + assert len(engine.list()) == 1 + removed = engine.remove("job-a") + assert removed["name"] == "job-a" + assert engine.list() == [] + finally: + engine.shutdown() + + +def test_scheduler_rejects_duplicate_names() -> None: + engine = Scheduler() + try: + engine.add("dup", "* * * * *", [["FA_schedule_list"]]) + with pytest.raises(SchedulerException): + engine.add("dup", "* * * * *", [["FA_schedule_list"]]) + finally: + engine.shutdown() + + +def test_scheduler_remove_unknown_raises() -> None: + engine = Scheduler() + try: + with pytest.raises(SchedulerException): + engine.remove("nope") + finally: + engine.shutdown() + + +def test_scheduler_remove_all_clears_everything() -> None: + engine = Scheduler() + try: + engine.add("a", "* * * * *", [["FA_schedule_list"]]) + engine.add("b", "* * * * *", [["FA_schedule_list"]]) + snapshots = engine.remove_all() + assert len(snapshots) == 2 + assert engine.list() == [] + finally: + engine.shutdown() + + +def test_scheduler_rejects_bad_cron() -> None: + engine = Scheduler() + try: + with pytest.raises(CronException): + engine.add("bad", "not a cron", [["FA_schedule_list"]]) + finally: + engine.shutdown() + + +def test_scheduler_exception_inherits_from_file_automation() -> None: + assert issubclass(SchedulerException, FileAutomationException) + + +def test_register_scheduler_ops_populates_registry() -> None: + from automation_file.core.action_registry import ActionRegistry + + registry = ActionRegistry() + register_scheduler_ops(registry) + for name in ( + "FA_schedule_add", + "FA_schedule_remove", + "FA_schedule_remove_all", + "FA_schedule_list", + ): + assert name in registry + + +def test_default_registry_contains_scheduler_ops() -> None: + from automation_file.core.action_registry import build_default_registry + + registry = build_default_registry() + assert "FA_schedule_add" in registry + assert "FA_schedule_list" in registry + + +def test_safe_execute_swallows_exceptions_cleanly() -> None: + _safe_execute("unit-test", [["FA_does_not_exist"]]) diff --git a/tests/test_ui_smoke.py b/tests/test_ui_smoke.py index 776096e..e426c96 100644 --- a/tests/test_ui_smoke.py +++ b/tests/test_ui_smoke.py @@ -55,6 +55,7 @@ def test_main_window_constructs(qt_app) -> None: "ServerTab", "TransferTab", "TriggerTab", + "SchedulerTab", "HomeTab", ], ) From c499f8468ee13838d0073b747ed8e8537e4a69da Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Tue, 21 Apr 2026 20:18:15 +0800 Subject: [PATCH 03/38] Add transfer progress + cancellation primitives with UI tab New automation_file.core.progress module exposes CancellationToken, ProgressReporter, and a module-level ProgressRegistry keyed by a user-chosen name. download_file and the S3 upload / download ops accept an optional progress_name kwarg that wires a reporter + token through the transfer loop, so the GUI or a JSON action can poll status or cancel mid-flight. FA_progress_list / FA_progress_cancel / FA_progress_clear are auto-registered alongside the other FA_* actions. Progress tab in the main window drives the same lifecycle from the GUI. --- automation_file/__init__.py | 21 +++ automation_file/core/action_registry.py | 7 + automation_file/core/progress.py | 161 ++++++++++++++++++++++ automation_file/remote/http_download.py | 30 +++- automation_file/remote/s3/download_ops.py | 44 +++++- automation_file/remote/s3/upload_ops.py | 38 ++++- automation_file/ui/main_window.py | 2 + automation_file/ui/tabs/__init__.py | 2 + automation_file/ui/tabs/progress_tab.py | 127 +++++++++++++++++ tests/test_progress.py | 124 +++++++++++++++++ tests/test_ui_smoke.py | 1 + 11 files changed, 550 insertions(+), 7 deletions(-) create mode 100644 automation_file/core/progress.py create mode 100644 automation_file/ui/tabs/progress_tab.py create mode 100644 tests/test_progress.py diff --git a/automation_file/__init__.py b/automation_file/__init__.py index 5099283..240dd13 100644 --- a/automation_file/__init__.py +++ b/automation_file/__init__.py @@ -22,6 +22,17 @@ from automation_file.core.callback_executor import CallbackExecutor from automation_file.core.json_store import read_action_json, write_action_json from automation_file.core.package_loader import PackageLoader +from automation_file.core.progress import ( + CancellationToken, + CancelledException, + ProgressRegistry, + ProgressReporter, + progress_cancel, + progress_clear, + progress_list, + progress_registry, + register_progress_ops, +) from automation_file.core.quota import Quota from automation_file.core.retry import retry_on_transient from automation_file.local.dir_ops import copy_dir, create_dir, remove_dir_tree, rename_dir @@ -226,6 +237,16 @@ def __getattr__(name: str) -> Any: "schedule_remove", "schedule_remove_all", "scheduler", + # Progress / cancellation + "CancellationToken", + "CancelledException", + "ProgressRegistry", + "ProgressReporter", + "progress_cancel", + "progress_clear", + "progress_list", + "progress_registry", + "register_progress_ops", # UI (lazy-loaded) "launch_ui", ] diff --git a/automation_file/core/action_registry.py b/automation_file/core/action_registry.py index cbf9f7b..690ab85 100644 --- a/automation_file/core/action_registry.py +++ b/automation_file/core/action_registry.py @@ -153,6 +153,12 @@ def _register_scheduler_ops(registry: ActionRegistry) -> None: register_scheduler_ops(registry) +def _register_progress_ops(registry: ActionRegistry) -> None: + from automation_file.core.progress import register_progress_ops + + register_progress_ops(registry) + + def build_default_registry() -> ActionRegistry: """Return a registry pre-populated with every built-in ``FA_*`` action.""" registry = ActionRegistry() @@ -162,6 +168,7 @@ def build_default_registry() -> ActionRegistry: _register_cloud_backends(registry) _register_trigger_ops(registry) _register_scheduler_ops(registry) + _register_progress_ops(registry) file_automation_logger.info( "action_registry: built default registry with %d commands", len(registry) ) diff --git a/automation_file/core/progress.py b/automation_file/core/progress.py new file mode 100644 index 0000000..92ea7c4 --- /dev/null +++ b/automation_file/core/progress.py @@ -0,0 +1,161 @@ +"""Transfer progress + cancellation primitives. + +Long-running transfers (HTTP downloads, S3 uploads/downloads, …) accept a +named handle from the shared :data:`progress_registry`. The registry keeps a +:class:`ProgressReporter` (bytes transferred, optional total) and a +:class:`CancellationToken` per name so the GUI or a JSON action can observe +progress or cancel mid-flight. + +Instrumentation is opt-in: callers pass ``progress_name="