diff --git a/ci/apiv2/test_apitoken.py b/ci/apiv2/test_apitoken.py index 6e61e585e..23b6f3fad 100644 --- a/ci/apiv2/test_apitoken.py +++ b/ci/apiv2/test_apitoken.py @@ -1,5 +1,39 @@ -from hashtopolis import ApiToken, HashtopolisError -from utils import BaseTest +import base64 +import json +import time + +import requests + +from hashtopolis import ApiToken + +from utils import BaseTest, create_restricted_user + + +def _decode_jwt_scope(token): + """Decode the JWT payload (without signature verification) and return the parsed scope dict.""" + payload_b64 = token.split('.')[1] + payload_b64 += '=' * (-len(payload_b64) % 4) + payload = json.loads(base64.urlsafe_b64decode(payload_b64)) + return json.loads(payload['scope']) + + +def _create_apitoken_raw(test, auth, scopes): + """POST /ui/apiTokens as the given user and register the resulting token for cleanup.""" + connector = ApiToken.objects.get_conn() + connector.authenticate(auth=auth) + uri = connector._api_endpoint + '/ui/apiTokens' + headers = {**connector._headers, 'Content-Type': 'application/json'} + now = int(time.time()) + payload = { + 'scopes': scopes, + 'startValid': now, + 'endValid': now + 3600, + } + r = requests.post(uri, headers=headers, data=json.dumps(payload)) + assert r.status_code == 201, f"Failed to create apitoken: status={r.status_code} body={r.text}" + obj = ApiToken(**r.json()['data']) + test.delete_after_test(obj) + return obj class ApiTokenTest(BaseTest): @@ -9,48 +43,45 @@ def create_test_object(self, *nargs, **kwargs): return self.create_apitoken(*nargs, **kwargs) def test_create(self): - model_obj = self.create_test_object(delete=False) + model_obj = self.create_test_object() self._test_create(model_obj) - def test_token_returned_on_create(self): + def test_delete(self): model_obj = self.create_test_object(delete=False) - # The JWT token string is only present in the POST response - self.assertTrue(hasattr(model_obj, 'token')) - self.assertIsNotNone(model_obj.token) - self.assertIsInstance(model_obj.token, str) - self.assertGreater(len(model_obj.token), 0) + self._test_delete(model_obj) - def test_token_not_in_get(self): - model_obj = self.create_test_object(delete=False) - # Retrieve the object via GET and verify the token field is absent - obj = self.model_class.objects.get(pk=model_obj.id) - self.assertFalse(hasattr(obj, 'token') and obj.token is not None) + def test_expandables(self): + model_obj = self.create_test_object() + expandables = ['user'] + self._test_expandables(model_obj, expandables) - def test_revoke(self): - model_obj = self.create_test_object(delete=False) - self._test_patch(model_obj, 'isRevoked', True) + def test_acl(self): + model_obj = self.create_test_object() + self._test_acl_list(model_obj, {'permJwtApiKeyRead': True}) - def test_expand_user(self): - model_obj = self.create_test_object(delete=False) - self._test_expandables(model_obj, ['user']) + def test_token_scope_admin_grants_requested(self): + """Admin holds every legacy permission, so any requested scope must be granted in the JWT.""" + model_obj = self.create_test_object() + scope = _decode_jwt_scope(model_obj.token) + self.assertTrue(scope.get('permHashlistRead')) - def test_patch_readonly_startValid(self): - model_obj = self.create_test_object(delete=False) - model_obj.startValid = 0 - with self.assertRaises(HashtopolisError) as e: - model_obj.save() - self.assertEqual(e.exception.status_code, 403) - self.assertIn('startValid', e.exception.title) + def test_token_scope_intersection_grants_permitted(self): + """A restricted user is granted a requested scope they hold via the legacy permission mapping.""" + auth = create_restricted_user(self, { + 'permHashlistRead': True, + 'permJwtApiKeyCreate': True, + }) + model_obj = _create_apitoken_raw(self, auth, ['permHashlistRead']) + scope = _decode_jwt_scope(model_obj.token) + self.assertTrue(scope.get('permHashlistRead')) - def test_patch_readonly_endValid(self): - model_obj = self.create_test_object(delete=False) - model_obj.endValid = 9999999999 - with self.assertRaises(HashtopolisError) as e: - model_obj.save() - self.assertEqual(e.exception.status_code, 403) - self.assertIn('endValid', e.exception.title) - - def test_acl(self): - # Admin's token should not be visible to a different user - model_obj = self.create_test_object(delete=False) - self._test_acl_list(model_obj, {'permJwtApiKeyRead': True}) + def test_token_scope_intersection_denies_unpermitted(self): + """A restricted user must NOT receive a scope they do not have, even if they request it.""" + auth = create_restricted_user(self, { + 'permHashlistRead': True, + 'permJwtApiKeyCreate': True, + }) + model_obj = _create_apitoken_raw(self, auth, ['permHashlistRead', 'permFileRead']) + scope = _decode_jwt_scope(model_obj.token) + self.assertTrue(scope.get('permHashlistRead')) + self.assertFalse(scope.get('permFileRead')) diff --git a/src/inc/apiv2/model/ApiTokenAPI.php b/src/inc/apiv2/model/ApiTokenAPI.php index 601eaea13..9c2a244da 100644 --- a/src/inc/apiv2/model/ApiTokenAPI.php +++ b/src/inc/apiv2/model/ApiTokenAPI.php @@ -11,6 +11,7 @@ use Hashtopolis\inc\apiv2\error\HttpError; use Hashtopolis\inc\StartupConfig; +use Hashtopolis\inc\utils\AccessUtils; use Hashtopolis\inc\utils\JwtTokenUtils; class ApiTokenAPI extends AbstractModelAPI { @@ -74,19 +75,15 @@ protected function createObject(array $data): int { //Scopes is an array of permissions in format [permFileTaskUpdate, permAgentDelete] $scopes = explode(",", $data["scopes"]); - $allPermissions = $this->getRightGroup($this->getCurrentUser()->getRightGroupId())->getPermissions(); - if ($allPermissions == 'ALL') { - // Special (legacy) case for administrative access, enable all available permissions - $all_perms = array_keys(self::$acl_mapping); - $rightgroup_perms = array_combine($all_perms, array_fill(0, count($all_perms), true)); - } - else { - $rightgroup_perms = json_decode($allPermissions, true); - } - $NotAllowedPerms = array_filter($rightgroup_perms, fn($v) => $v === false); - $allowedPerms = array_intersect_key($rightgroup_perms, array_flip($scopes)); + $userCrudPerms = AccessUtils::getPermissionArrayConverted( + $this->getRightGroup($this->getCurrentUser()->getRightGroupId())->getPermissions() + ); - $requestedScopes = $allowedPerms + $NotAllowedPerms; + // Modern CRUD scope dict: true iff the perm was requested AND the user has it. + $requestedScopes = []; + foreach ($userCrudPerms as $perm => $granted) { + $requestedScopes[$perm] = $granted && in_array($perm, $scopes, true); + } $secret = StartupConfig::getInstance()->getPepper(0); $iat = $data[JwtApiKey::START_VALID];