From d585f7039164875c9b50f6554710d7366d580d5f Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Sun, 21 Dec 2025 12:56:30 +0800 Subject: [PATCH 1/2] Access tokens --- arango/database.py | 86 ++++++++++++++++++++++++++++++++++++++++++++ arango/exceptions.py | 17 +++++++++ tests/helpers.py | 9 +++++ tests/test_auth.py | 56 ++++++++++++++++++++++++++++- 4 files changed, 167 insertions(+), 1 deletion(-) diff --git a/arango/database.py b/arango/database.py index 130b7b5a..48a032d7 100644 --- a/arango/database.py +++ b/arango/database.py @@ -19,6 +19,9 @@ from arango.connection import Connection from arango.errno import HTTP_NOT_FOUND from arango.exceptions import ( + AccessTokenCreateError, + AccessTokenDeleteError, + AccessTokenListError, AnalyzerCreateError, AnalyzerDeleteError, AnalyzerGetError, @@ -1158,6 +1161,89 @@ def response_handler(resp: Response) -> Json: return self._execute(request, response_handler) + def create_access_token( + self, + user: str, + name: str, + valid_until: int, + ) -> Result[Json]: + """Create an access token for the given user. + + :param user: The name of the user. + :type user: str + :param name: A name for the access token to make identification easier, + like a short description. + :type name: str + :param valid_until: A Unix timestamp in seconds to set the expiration + date and time. + :type valid_until: int + + :return: Information about the created access token, including the token itself. + :rtype: dict + + :raise arango.exceptions.AccessTokenCreateError: If the operations fails. + """ + data: Json = { + "name": name, + "valid_until": valid_until, + } + + request = Request( + method="post", + endpoint=f"/_api/token/{user}", + data=data, + ) + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise AccessTokenCreateError(resp, request) + result: Json = resp.body + return result + + return self._executor.execute(request, response_handler) + + def delete_access_token(self, user: str, token_id: int) -> Result[None]: + """Delete an access token for the given user. + + :param user: The name of the user. + :type user: str + :param token_id: The ID of the access token to delete. + :type token_id: int + + :raise arango.exceptions.AccessTokenDeleteError: If the operation fails. + """ + request = Request( + method="delete", + endpoint=f"/_api/token/{user}/{token_id}", + ) + + def response_handler(resp: Response) -> None: + if not resp.is_success: + raise AccessTokenDeleteError(resp, request) + + return self._executor.execute(request, response_handler) + + def list_access_tokens(self, user: str) -> Result[Jsons]: + """List all access tokens for the given user. + + :param user: The name of the user. + :type user: str + + :return: List of access tokens for the user. + :rtype: list + + :raise arango.exceptions.AccessTokenListError: If the operation fails. + """ + request = Request(method="get", endpoint=f"/_api/token/{user}") + + def response_handler(resp: Response) -> Jsons: + if not resp.is_success: + raise AccessTokenListError(resp, request) + result: Jsons = resp.body["tokens"] + return result + + return self._executor.execute(request, response_handler) + def tls(self) -> Result[Json]: """Return TLS data (server key, client-auth CA). diff --git a/arango/exceptions.py b/arango/exceptions.py index 7fc62983..77e00d39 100644 --- a/arango/exceptions.py +++ b/arango/exceptions.py @@ -161,6 +161,23 @@ class AQLQueryRulesGetError(ArangoServerError): """Failed to retrieve AQL query rules.""" +####################### +# Access Token Errors # +####################### + + +class AccessTokenCreateError(ArangoServerError): + """Failed to create an access token.""" + + +class AccessTokenDeleteError(ArangoServerError): + """Failed to delete an access token.""" + + +class AccessTokenListError(ArangoServerError): + """Failed to retrieve access tokens.""" + + ############################## # Async Execution Exceptions # ############################## diff --git a/tests/helpers.py b/tests/helpers.py index ef25a786..b6fa76ce 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -108,6 +108,15 @@ def generate_service_mount(): return f"/test_{uuid4().hex}" +def generate_token_name(): + """Generate and return a random token name. + + :return: Random token name. + :rtype: str + """ + return f"test_token_{uuid4().hex}" + + def generate_jwt(secret, exp=3600): """Generate and return a JWT. diff --git a/tests/test_auth.py b/tests/test_auth.py index 0f747563..9a869512 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,6 +1,11 @@ +import time + from arango.connection import BasicConnection, JwtConnection, JwtSuperuserConnection from arango.errno import FORBIDDEN, HTTP_UNAUTHORIZED from arango.exceptions import ( + AccessTokenCreateError, + AccessTokenDeleteError, + AccessTokenListError, JWTAuthError, JWTExpiredError, JWTSecretListError, @@ -11,7 +16,12 @@ ServerTLSReloadError, ServerVersionError, ) -from tests.helpers import assert_raises, generate_jwt, generate_string +from tests.helpers import ( + assert_raises, + generate_jwt, + generate_string, + generate_token_name, +) def test_auth_invalid_method(client, db_name, username, password): @@ -155,3 +165,47 @@ def test_auth_jwt_expiry(client, db_name, root_password, secret): db = client.db("_system", user_token=valid_token) with assert_raises(JWTExpiredError) as err: db.conn.set_token(expired_token) + + +def test_auth_access_token(client, db_name, username, password, bad_db): + # Login with basic auth + db_auth_basic = client.db( + name=db_name, + username=username, + password=password, + verify=True, + auth_method="basic", + ) + + # Create an access token + token_name = generate_token_name() + token = db_auth_basic.create_access_token( + user=username, name=token_name, valid_until=int(time.time() + 3600) + ) + assert token["active"] is True + + # Cannot create a token with the same name + with assert_raises(AccessTokenCreateError): + db_auth_basic.create_access_token( + user=username, name=token_name, valid_until=int(time.time() + 3600) + ) + + # Authenticate with the created token + access_token_db = client.db( + name=db_name, + username=username, + password=token["token"], + verify=True, + auth_method="basic", + ) + + # List access tokens + tokens = access_token_db.list_access_tokens(username) + assert isinstance(tokens, list) + with assert_raises(AccessTokenListError): + bad_db.list_access_tokens(username) + + # Clean up + access_token_db.delete_access_token(username, token["id"]) + with assert_raises(AccessTokenDeleteError): + access_token_db.delete_access_token(username, token["id"]) From 182d6ad45014fb367e50e96add12648ebef4eb02 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Sun, 21 Dec 2025 13:10:08 +0800 Subject: [PATCH 2/2] Bump driver version --- arango/request.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arango/request.py b/arango/request.py index 5a213f68..66fb26ad 100644 --- a/arango/request.py +++ b/arango/request.py @@ -12,7 +12,7 @@ def normalize_headers( if driver_flags is not None: for flag in driver_flags: flags = flags + flag + ";" - driver_version = "8.2.4" + driver_version = "8.2.5" driver_header = "python-arango/" + driver_version + " (" + flags + ")" normalized_headers: Headers = { "charset": "utf-8",