From f67f25e7b9bdbfae540dd263f93c4226191e9935 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Fri, 8 Aug 2025 05:05:15 +0000 Subject: [PATCH 1/2] Adding support for /_api/tasks --- arangoasync/database.py | 146 ++++++++++++++++++++++++++++++++++++++ arangoasync/exceptions.py | 16 +++++ tests/conftest.py | 13 ++++ tests/helpers.py | 18 +++++ tests/test_task.py | 79 +++++++++++++++++++++ 5 files changed, 272 insertions(+) create mode 100644 tests/test_task.py diff --git a/arangoasync/database.py b/arangoasync/database.py index b048b4f..16c0d0c 100644 --- a/arangoasync/database.py +++ b/arangoasync/database.py @@ -40,6 +40,10 @@ PermissionUpdateError, ServerStatusError, ServerVersionError, + TaskCreateError, + TaskDeleteError, + TaskGetError, + TaskListError, TransactionAbortError, TransactionCommitError, TransactionExecuteError, @@ -2193,6 +2197,148 @@ def response_handler(resp: Response) -> Json: return await self._executor.execute(request, response_handler) + async def tasks(self) -> Result[Jsons]: + """Fetches all existing tasks from the server. + + Returns: + list: List of currently active server tasks. + + Raises: + TaskListError: If the list cannot be retrieved. + + References: + - `list-all-tasks `__ + """ # noqa: E501 + request = Request(method=Method.GET, endpoint="/_api/tasks") + + def response_handler(resp: Response) -> Jsons: + if not resp.is_success: + raise TaskListError(resp, request) + result: Jsons = self.deserializer.loads_many(resp.raw_body) + return result + + return await self._executor.execute(request, response_handler) + + async def task(self, task_id: str) -> Result[Json]: + """Return the details of an active server task. + + Args: + task_id (str) -> Server task ID. + + Returns: + dict: Details of the server task. + + Raises: + TaskGetError: If the task details cannot be retrieved. + + References: + - `get-a-task `__ + """ # noqa: E501 + request = Request(method=Method.GET, endpoint=f"/_api/tasks/{task_id}") + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise TaskGetError(resp, request) + result: Json = self.deserializer.loads(resp.raw_body) + return result + + return await self._executor.execute(request, response_handler) + + async def create_task( + self, + command: str, + task_id: Optional[str] = None, + name: Optional[str] = None, + offset: Optional[int] = None, + params: Optional[str] = None, + period: Optional[int] = None, + ) -> Result[Json]: + """Create a new task. + + Args: + command (str): The JavaScript code to be executed. + task_id (str | None): Optional task ID. If not provided, the server will + generate a unique ID. + name (str | None): The name of the task. + offset (int | None): The offset in seconds after which the task should + start executing. + params (str | None): Parameters to be passed to the command. + period (int | None): The number of seconds between the executions. + + Returns: + dict: Details of the created task. + + Raises: + TaskCreateError: If the task cannot be created. + + References: + - `create-a-task `__ + - `create-a-task-with-id `__ + """ # noqa: E501 + data: Json = {"command": command} + if name is not None: + data["name"] = name + if offset is not None: + data["offset"] = offset + if params is not None: + data["params"] = params + if period is not None: + data["period"] = period + + if task_id is None: + request = Request( + method=Method.POST, + endpoint="/_api/tasks", + data=self.serializer.dumps(data), + ) + else: + request = Request( + method=Method.PUT, + endpoint=f"/_api/tasks/{task_id}", + data=self.serializer.dumps(data), + ) + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise TaskCreateError(resp, request) + result: Json = self.deserializer.loads(resp.raw_body) + return result + + return await self._executor.execute(request, response_handler) + + async def delete_task( + self, + task_id: str, + ignore_missing: bool = False, + ) -> Result[bool]: + """Delete a server task. + + Args: + task_id (str): Task ID. + ignore_missing (bool): If `True`, do not raise an exception if the + task does not exist. + + Returns: + bool: `True` if the task was deleted successfully, `False` if the + task was not found and **ignore_missing** was set to `True`. + + Raises: + TaskDeleteError: If the operation fails. + + References: + - `delete-a-task `__ + """ # noqa: E501 + request = Request(method=Method.DELETE, endpoint=f"/_api/tasks/{task_id}") + + def response_handler(resp: Response) -> bool: + if resp.is_success: + return True + if resp.status_code == HTTP_NOT_FOUND and ignore_missing: + return False + raise TaskDeleteError(resp, request) + + return await self._executor.execute(request, response_handler) + class StandardDatabase(Database): """Standard database API wrapper. diff --git a/arangoasync/exceptions.py b/arangoasync/exceptions.py index 41644de..5ca333a 100644 --- a/arangoasync/exceptions.py +++ b/arangoasync/exceptions.py @@ -451,6 +451,22 @@ class SortValidationError(ArangoClientError): """Invalid sort parameters.""" +class TaskCreateError(ArangoServerError): + """Failed to create server task.""" + + +class TaskDeleteError(ArangoServerError): + """Failed to delete server task.""" + + +class TaskGetError(ArangoServerError): + """Failed to retrieve server task details.""" + + +class TaskListError(ArangoServerError): + """Failed to retrieve server tasks.""" + + class TransactionAbortError(ArangoServerError): """Failed to abort transaction.""" diff --git a/tests/conftest.py b/tests/conftest.py index 98d75de..66e5a9d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -256,6 +256,19 @@ async def teardown(): verify=False, ) + # Remove all tasks + test_tasks = [ + task + for task in await sys_db.tasks() + if task["name"].startswith("test_task") + ] + await asyncio.gather( + *( + sys_db.delete_task(task["id"], ignore_missing=True) + for task in test_tasks + ) + ) + # Remove all test users. tst_users = [ user["user"] diff --git a/tests/helpers.py b/tests/helpers.py index f2f63f7..dfaae4d 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -62,3 +62,21 @@ def generate_analyzer_name(): str: Random analyzer name. """ return f"test_analyzer_{uuid4().hex}" + + +def generate_task_name(): + """Generate and return a random task name. + + Returns: + str: Random task name. + """ + return f"test_task_{uuid4().hex}" + + +def generate_task_id(): + """Generate and return a random task ID. + + Returns: + str: Random task ID + """ + return f"test_task_id_{uuid4().hex}" diff --git a/tests/test_task.py b/tests/test_task.py new file mode 100644 index 0000000..4e1aee6 --- /dev/null +++ b/tests/test_task.py @@ -0,0 +1,79 @@ +import pytest + +from arangoasync.exceptions import ( + TaskCreateError, + TaskDeleteError, + TaskGetError, + TaskListError, +) +from tests.helpers import generate_task_id, generate_task_name + + +@pytest.mark.asyncio +async def test_task_management(sys_db, bad_db): + # This test intentionally uses the system database because cleaning up tasks is + # easier there. + + test_command = 'require("@arangodb").print(params);' + + # Test errors + with pytest.raises(TaskCreateError): + await bad_db.create_task(command=test_command) + with pytest.raises(TaskGetError): + await bad_db.task("non_existent_task_id") + with pytest.raises(TaskListError): + await bad_db.tasks() + with pytest.raises(TaskDeleteError): + await bad_db.delete_task("non_existent_task_id") + + # Create a task with a random ID + task_name = generate_task_name() + new_task = await sys_db.create_task( + name=task_name, + command=test_command, + params={"foo": 1, "bar": 2}, + offset=1, + ) + assert new_task["name"] == task_name + task_id = new_task["id"] + assert await sys_db.task(task_id) == new_task + + # Delete task + assert await sys_db.delete_task(task_id) is True + + # Create a task with a specific ID + task_name = generate_task_name() + task_id = generate_task_id() + new_task = await sys_db.create_task( + name=task_name, + command=test_command, + params={"foo": 1, "bar": 2}, + offset=1, + period=10, + task_id=task_id, + ) + assert new_task["name"] == task_name + assert new_task["id"] == task_id + + # Try to create a duplicate task + with pytest.raises(TaskCreateError): + await sys_db.create_task( + name=task_name, + command=test_command, + params={"foo": 1, "bar": 2}, + task_id=task_id, + ) + + # Test get missing task + with pytest.raises(TaskGetError): + await sys_db.task(generate_task_id()) + + # Test list tasks + tasks = await sys_db.tasks() + assert len(tasks) == 1 + + # Delete tasks + assert await sys_db.delete_task(task_id) is True + assert await sys_db.delete_task(task_id, ignore_missing=True) is False + with pytest.raises(TaskDeleteError): + await sys_db.delete_task(task_id) From 83bce62fe0571875d1dacd928666bbd8f14226b7 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Fri, 8 Aug 2025 05:22:30 +0000 Subject: [PATCH 2/2] Adding docs for /_api/tasks --- arangoasync/database.py | 4 ++-- docs/index.rst | 1 + docs/task.rst | 51 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 docs/task.rst diff --git a/arangoasync/database.py b/arangoasync/database.py index 16c0d0c..f2b03ee 100644 --- a/arangoasync/database.py +++ b/arangoasync/database.py @@ -2250,7 +2250,7 @@ async def create_task( task_id: Optional[str] = None, name: Optional[str] = None, offset: Optional[int] = None, - params: Optional[str] = None, + params: Optional[Json] = None, period: Optional[int] = None, ) -> Result[Json]: """Create a new task. @@ -2262,7 +2262,7 @@ async def create_task( name (str | None): The name of the task. offset (int | None): The offset in seconds after which the task should start executing. - params (str | None): Parameters to be passed to the command. + params (dict | None): Parameters to be passed to the command. period (int | None): The number of seconds between the executions. Returns: diff --git a/docs/index.rst b/docs/index.rst index 1b361fd..41eaeee 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -73,6 +73,7 @@ Contents compression serialization backup + task errors errno logging diff --git a/docs/task.rst b/docs/task.rst new file mode 100644 index 0000000..2490507 --- /dev/null +++ b/docs/task.rst @@ -0,0 +1,51 @@ +Tasks +----- + +ArangoDB can schedule user-defined Javascript snippets as one-time or periodic +(re-scheduled after each execution) tasks. Tasks are executed in the context of +the database they are defined in. + +**Example:** + +.. code-block:: python + + from arangoasync import ArangoClient + from arangoasync.auth import Auth + + # Initialize the client for ArangoDB. + async with ArangoClient(hosts="http://localhost:8529") as client: + auth = Auth(username="root", password="passwd") + + # Connect to "test" database as root user. + db = await client.db("test", auth=auth) + + # Create a new task which simply prints parameters. + await db.create_task( + name="test_task", + command=""" + var task = function(params){ + var db = require('@arangodb'); + db.print(params); + } + task(params); + """, + params={"foo": "bar"}, + offset=300, + period=10, + task_id="001" + ) + + # List all active tasks + tasks = await db.tasks() + + # Retrieve details of a task by ID. + details = await db.task("001") + + # Delete an existing task by ID. + await db.delete_task('001', ignore_missing=True) + + +.. note:: + When deleting a database, any tasks that were initialized under its context + remain active. It is therefore advisable to delete any running tasks before + deleting the database.