Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 70 additions & 39 deletions ci/apiv2/test_apitoken.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
from hashtopolis import ApiToken, HashtopolisError
from utils import BaseTest
import base64
import json
import time

import requests

from hashtopolis import ApiToken

from utils import BaseTest, create_restricted_user


def _decode_jwt_scope(token):
"""Decode the JWT payload (without signature verification) and return the parsed scope dict."""
payload_b64 = token.split('.')[1]
payload_b64 += '=' * (-len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
return json.loads(payload['scope'])


def _create_apitoken_raw(test, auth, scopes):
"""POST /ui/apiTokens as the given user and register the resulting token for cleanup."""
connector = ApiToken.objects.get_conn()
connector.authenticate(auth=auth)
uri = connector._api_endpoint + '/ui/apiTokens'
headers = {**connector._headers, 'Content-Type': 'application/json'}
now = int(time.time())
payload = {
'scopes': scopes,
'startValid': now,
'endValid': now + 3600,
}
r = requests.post(uri, headers=headers, data=json.dumps(payload))
assert r.status_code == 201, f"Failed to create apitoken: status={r.status_code} body={r.text}"
obj = ApiToken(**r.json()['data'])
test.delete_after_test(obj)
return obj


class ApiTokenTest(BaseTest):
Expand All @@ -9,48 +43,45 @@ def create_test_object(self, *nargs, **kwargs):
return self.create_apitoken(*nargs, **kwargs)

def test_create(self):
model_obj = self.create_test_object(delete=False)
model_obj = self.create_test_object()
self._test_create(model_obj)

def test_token_returned_on_create(self):
def test_delete(self):
model_obj = self.create_test_object(delete=False)
# The JWT token string is only present in the POST response
self.assertTrue(hasattr(model_obj, 'token'))
self.assertIsNotNone(model_obj.token)
self.assertIsInstance(model_obj.token, str)
self.assertGreater(len(model_obj.token), 0)
self._test_delete(model_obj)

def test_token_not_in_get(self):
model_obj = self.create_test_object(delete=False)
# Retrieve the object via GET and verify the token field is absent
obj = self.model_class.objects.get(pk=model_obj.id)
self.assertFalse(hasattr(obj, 'token') and obj.token is not None)
def test_expandables(self):
model_obj = self.create_test_object()
expandables = ['user']
self._test_expandables(model_obj, expandables)

def test_revoke(self):
model_obj = self.create_test_object(delete=False)
self._test_patch(model_obj, 'isRevoked', True)
def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permJwtApiKeyRead': True})

def test_expand_user(self):
model_obj = self.create_test_object(delete=False)
self._test_expandables(model_obj, ['user'])
def test_token_scope_admin_grants_requested(self):
"""Admin holds every legacy permission, so any requested scope must be granted in the JWT."""
model_obj = self.create_test_object()
scope = _decode_jwt_scope(model_obj.token)
self.assertTrue(scope.get('permHashlistRead'))

def test_patch_readonly_startValid(self):
model_obj = self.create_test_object(delete=False)
model_obj.startValid = 0
with self.assertRaises(HashtopolisError) as e:
model_obj.save()
self.assertEqual(e.exception.status_code, 403)
self.assertIn('startValid', e.exception.title)
def test_token_scope_intersection_grants_permitted(self):
"""A restricted user is granted a requested scope they hold via the legacy permission mapping."""
auth = create_restricted_user(self, {
'permHashlistRead': True,
'permJwtApiKeyCreate': True,
})
model_obj = _create_apitoken_raw(self, auth, ['permHashlistRead'])
scope = _decode_jwt_scope(model_obj.token)
self.assertTrue(scope.get('permHashlistRead'))

def test_patch_readonly_endValid(self):
model_obj = self.create_test_object(delete=False)
model_obj.endValid = 9999999999
with self.assertRaises(HashtopolisError) as e:
model_obj.save()
self.assertEqual(e.exception.status_code, 403)
self.assertIn('endValid', e.exception.title)

def test_acl(self):
# Admin's token should not be visible to a different user
model_obj = self.create_test_object(delete=False)
self._test_acl_list(model_obj, {'permJwtApiKeyRead': True})
def test_token_scope_intersection_denies_unpermitted(self):
"""A restricted user must NOT receive a scope they do not have, even if they request it."""
auth = create_restricted_user(self, {
'permHashlistRead': True,
'permJwtApiKeyCreate': True,
})
model_obj = _create_apitoken_raw(self, auth, ['permHashlistRead', 'permFileRead'])
scope = _decode_jwt_scope(model_obj.token)
self.assertTrue(scope.get('permHashlistRead'))
self.assertFalse(scope.get('permFileRead'))
21 changes: 9 additions & 12 deletions src/inc/apiv2/model/ApiTokenAPI.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Hashtopolis\inc\apiv2\error\HttpError;
use Hashtopolis\inc\StartupConfig;

use Hashtopolis\inc\utils\AccessUtils;
use Hashtopolis\inc\utils\JwtTokenUtils;

class ApiTokenAPI extends AbstractModelAPI {
Expand Down Expand Up @@ -74,19 +75,15 @@ protected function createObject(array $data): int {
//Scopes is an array of permissions in format [permFileTaskUpdate, permAgentDelete]
$scopes = explode(",", $data["scopes"]);

$allPermissions = $this->getRightGroup($this->getCurrentUser()->getRightGroupId())->getPermissions();
if ($allPermissions == 'ALL') {
// Special (legacy) case for administrative access, enable all available permissions
$all_perms = array_keys(self::$acl_mapping);
$rightgroup_perms = array_combine($all_perms, array_fill(0, count($all_perms), true));
}
else {
$rightgroup_perms = json_decode($allPermissions, true);
}
$NotAllowedPerms = array_filter($rightgroup_perms, fn($v) => $v === false);
$allowedPerms = array_intersect_key($rightgroup_perms, array_flip($scopes));
$userCrudPerms = AccessUtils::getPermissionArrayConverted(
$this->getRightGroup($this->getCurrentUser()->getRightGroupId())->getPermissions()
);

$requestedScopes = $allowedPerms + $NotAllowedPerms;
// Modern CRUD scope dict: true iff the perm was requested AND the user has it.
$requestedScopes = [];
foreach ($userCrudPerms as $perm => $granted) {
$requestedScopes[$perm] = $granted && in_array($perm, $scopes, true);
}

$secret = StartupConfig::getInstance()->getPepper(0);
$iat = $data[JwtApiKey::START_VALID];
Expand Down
Loading