Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions arango/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from arango.connection import Connection
from arango.errno import HTTP_NOT_FOUND
from arango.exceptions import (
AccessTokenCreateError,
AccessTokenDeleteError,
AccessTokenListError,
AnalyzerCreateError,
AnalyzerDeleteError,
AnalyzerGetError,
Expand Down Expand Up @@ -1158,6 +1161,89 @@ def response_handler(resp: Response) -> Json:

return self._execute(request, response_handler)

def create_access_token(
self,
user: str,
name: str,
valid_until: int,
) -> Result[Json]:
"""Create an access token for the given user.

:param user: The name of the user.
:type user: str
:param name: A name for the access token to make identification easier,
like a short description.
:type name: str
:param valid_until: A Unix timestamp in seconds to set the expiration
date and time.
:type valid_until: int

:return: Information about the created access token, including the token itself.
:rtype: dict

:raise arango.exceptions.AccessTokenCreateError: If the operations fails.
"""
data: Json = {
"name": name,
"valid_until": valid_until,
}

request = Request(
method="post",
endpoint=f"/_api/token/{user}",
data=data,
)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AccessTokenCreateError(resp, request)
result: Json = resp.body
return result

return self._executor.execute(request, response_handler)

def delete_access_token(self, user: str, token_id: int) -> Result[None]:
"""Delete an access token for the given user.

:param user: The name of the user.
:type user: str
:param token_id: The ID of the access token to delete.
:type token_id: int

:raise arango.exceptions.AccessTokenDeleteError: If the operation fails.
"""
request = Request(
method="delete",
endpoint=f"/_api/token/{user}/{token_id}",
)

def response_handler(resp: Response) -> None:
if not resp.is_success:
raise AccessTokenDeleteError(resp, request)

return self._executor.execute(request, response_handler)

def list_access_tokens(self, user: str) -> Result[Jsons]:
"""List all access tokens for the given user.

:param user: The name of the user.
:type user: str

:return: List of access tokens for the user.
:rtype: list

:raise arango.exceptions.AccessTokenListError: If the operation fails.
"""
request = Request(method="get", endpoint=f"/_api/token/{user}")

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AccessTokenListError(resp, request)
result: Jsons = resp.body["tokens"]
return result

return self._executor.execute(request, response_handler)

def tls(self) -> Result[Json]:
"""Return TLS data (server key, client-auth CA).

Expand Down
17 changes: 17 additions & 0 deletions arango/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ class AQLQueryRulesGetError(ArangoServerError):
"""Failed to retrieve AQL query rules."""


#######################
# Access Token Errors #
#######################


class AccessTokenCreateError(ArangoServerError):
"""Failed to create an access token."""


class AccessTokenDeleteError(ArangoServerError):
"""Failed to delete an access token."""


class AccessTokenListError(ArangoServerError):
"""Failed to retrieve access tokens."""


##############################
# Async Execution Exceptions #
##############################
Expand Down
2 changes: 1 addition & 1 deletion arango/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def normalize_headers(
if driver_flags is not None:
for flag in driver_flags:
flags = flags + flag + ";"
driver_version = "8.2.4"
driver_version = "8.2.5"
driver_header = "python-arango/" + driver_version + " (" + flags + ")"
normalized_headers: Headers = {
"charset": "utf-8",
Expand Down
9 changes: 9 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ def generate_service_mount():
return f"/test_{uuid4().hex}"


def generate_token_name():
"""Generate and return a random token name.
:return: Random token name.
:rtype: str
"""
return f"test_token_{uuid4().hex}"


def generate_jwt(secret, exp=3600):
"""Generate and return a JWT.
Expand Down
56 changes: 55 additions & 1 deletion tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import time

from arango.connection import BasicConnection, JwtConnection, JwtSuperuserConnection
from arango.errno import FORBIDDEN, HTTP_UNAUTHORIZED
from arango.exceptions import (
AccessTokenCreateError,
AccessTokenDeleteError,
AccessTokenListError,
JWTAuthError,
JWTExpiredError,
JWTSecretListError,
Expand All @@ -11,7 +16,12 @@
ServerTLSReloadError,
ServerVersionError,
)
from tests.helpers import assert_raises, generate_jwt, generate_string
from tests.helpers import (
assert_raises,
generate_jwt,
generate_string,
generate_token_name,
)


def test_auth_invalid_method(client, db_name, username, password):
Expand Down Expand Up @@ -155,3 +165,47 @@ def test_auth_jwt_expiry(client, db_name, root_password, secret):
db = client.db("_system", user_token=valid_token)
with assert_raises(JWTExpiredError) as err:
db.conn.set_token(expired_token)


def test_auth_access_token(client, db_name, username, password, bad_db):
# Login with basic auth
db_auth_basic = client.db(
name=db_name,
username=username,
password=password,
verify=True,
auth_method="basic",
)

# Create an access token
token_name = generate_token_name()
token = db_auth_basic.create_access_token(
user=username, name=token_name, valid_until=int(time.time() + 3600)
)
assert token["active"] is True

# Cannot create a token with the same name
with assert_raises(AccessTokenCreateError):
db_auth_basic.create_access_token(
user=username, name=token_name, valid_until=int(time.time() + 3600)
)

# Authenticate with the created token
access_token_db = client.db(
name=db_name,
username=username,
password=token["token"],
verify=True,
auth_method="basic",
)

# List access tokens
tokens = access_token_db.list_access_tokens(username)
assert isinstance(tokens, list)
with assert_raises(AccessTokenListError):
bad_db.list_access_tokens(username)

# Clean up
access_token_db.delete_access_token(username, token["id"])
with assert_raises(AccessTokenDeleteError):
access_token_db.delete_access_token(username, token["id"])
Loading