diff --git a/tests/unittests/unit/server/auth_test.py b/tests/unittests/unit/server/auth_test.py index f09a199078..e2649e3d87 100755 --- a/tests/unittests/unit/server/auth_test.py +++ b/tests/unittests/unit/server/auth_test.py @@ -384,6 +384,44 @@ def exec_cmd(cmd, success=True): exec_cmd("/bin/true", True) exec_cmd("/bin/false", False) + def test_keycloak(self): + try: + self._init_auth("keycloak") + except ImportError as e: + print("Warning: keycloak auth test skipped") + print(" %s", e) + return + def t(digests=None, response=None, **kwargs): + a = self._init_auth("keycloak", **kwargs) + assert a.requires_challenge(), "%s should require a challenge" % a + if digests is not None: + assert a.get_challenge(digests), "cannot get challenge for digests %s" % (digests,) + if response is not None: + assert a.check(response), "check failed for response %s" % (response,) + return a + def f(digests=None, response=None, **kwargs): + try: + t(digests, response, **kwargs) + except Exception: + pass + else: + raise Exception("keycloak auth should have failed with arguments: %s" % (kwargs,)) + t() + #only 'authorization_code' is supported: + f(grant_type="foo") + t(grant_type="authorization_code") + #only 'keycloak' digest is supported: + f(digests=("xor", )) + t(digests=("xor", "keycloak", )) + t(digests=("keycloak", )) + #we can't provide a valid response: + f(digests=("keycloak", ), response="") + f(digests=("keycloak", ), response="foo") + f(digests=("keycloak", ), response="{\"foo\":\"bar\"}") + f(digests=("keycloak", ), response="{\"code\":\"authorization_code\"}") + #non-https URL should fail: + f(server_url="http://localhost:8080/") + def main(): import logging diff --git a/xpra/server/auth/keycloak_auth.py b/xpra/server/auth/keycloak_auth.py index a3f91bf031..c91f6d6131 100644 --- a/xpra/server/auth/keycloak_auth.py +++ b/xpra/server/auth/keycloak_auth.py @@ -10,7 +10,7 @@ from xpra.server.auth.sys_auth_base import SysAuthenticator, log -KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/") +KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "https://localhost:8080/auth/") KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm") KEYCLOAK_CLIENT_ID = os.environ.get("XPRA_KEYCLOAK_CLIENT_ID", "example_client") KEYCLOAK_CLIENT_SECRET_KEY = os.environ.get("XPRA_KEYCLOAK_CLIENT_SECRET_KEY", "secret") @@ -33,7 +33,7 @@ def __init__(self, **kwargs): # use keycloak as default prompt kwargs["prompt"] = kwargs.pop("prompt", "keycloak") - if KEYCLOAK_GRANT_TYPE != "authorization_code": + if self.grant_type != "authorization_code": raise NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported") super().__init__(**kwargs) @@ -42,16 +42,15 @@ def __init__(self, **kwargs): try: from oauthlib.oauth2 import WebApplicationClient - # Get authorization code - client = WebApplicationClient(KEYCLOAK_CLIENT_ID) - authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth' + client = WebApplicationClient(self.client_id) + authorization_url = self.server_url + 'realms/' + self.realm_name + '/protocol/openid-connect/auth' self.salt = client.prepare_request_uri( authorization_url, - redirect_uri = KEYCLOAK_REDIRECT_URI, - scope = [KEYCLOAK_SCOPE], + redirect_uri = self.redirect_uri, + scope = [self.redirect_uri], ) - except ImportError as e: + except ImportError as e: # pragma: no cover log("check(..)", exc_info=True) log.warn("Warning: cannot use keycloak authentication:") log.warn(" %s", e) @@ -110,10 +109,10 @@ def check(self, response_json) -> bool: from keycloak.exceptions import KeycloakError # Configure client - keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL, - client_id=KEYCLOAK_CLIENT_ID, - realm_name=KEYCLOAK_REALM_NAME, - client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY) + keycloak_openid = KeycloakOpenID(server_url=self.server_url, + client_id=self.client_id, + realm_name=self.realm_name, + client_secret_key=self.client_id) # Get well_known config_well_know = keycloak_openid.well_know() @@ -121,8 +120,8 @@ def check(self, response_json) -> bool: # Get token token = keycloak_openid.token(code=auth_code, - grant_type=[KEYCLOAK_GRANT_TYPE], - redirect_uri=KEYCLOAK_REDIRECT_URI) + grant_type=[self.grant_type], + redirect_uri=self.redirect_uri) # Verify token access_token = token.get("access_token") @@ -161,7 +160,7 @@ def check(self, response_json) -> bool: return False -def main(args): +def main(args): # pragma: no cover if len(args)!=2: print("invalid number of arguments") print("usage:")