From da6897a7c8c35a737a8c1b50dfd924eca8ed049d Mon Sep 17 00:00:00 2001 From: Nathalie Casati Date: Wed, 20 Apr 2022 15:37:49 +0000 Subject: [PATCH] Refactoring for PR --- xpra/client/auth/keycloak_handler.py | 23 ---- xpra/client/client_base.py | 2 +- xpra/server/auth/keycloak_auth.py | 154 ++++++++++++++++++++------- 3 files changed, 114 insertions(+), 65 deletions(-) delete mode 100644 xpra/client/auth/keycloak_handler.py diff --git a/xpra/client/auth/keycloak_handler.py b/xpra/client/auth/keycloak_handler.py deleted file mode 100644 index 3b7c139e0c..0000000000 --- a/xpra/client/auth/keycloak_handler.py +++ /dev/null @@ -1,23 +0,0 @@ -# This file is part of Xpra. -# Copyright (C) 2019 Antoine Martin -# Copyright (C) 2022 Nathalie Casati -# Xpra is released under the terms of the GNU GPL v2, or, at your option, any -# later version. See the file COPYING for details. - - -class Handler: - - def __init__(self, client, **_kwargs): - self.client = client - - def __repr__(self): - return "keycloak" - - def get_digest(self) -> str: - return "keycloak" - - def handle(self, packet) -> bool: - if not self.client.password: - return False - self.client.send_challenge_reply(packet, self.client.password) - return True diff --git a/xpra/client/client_base.py b/xpra/client/client_base.py index af3f41cd92..07ace8ab40 100644 --- a/xpra/client/client_base.py +++ b/xpra/client/client_base.py @@ -204,7 +204,7 @@ def init_challenge_handlers(self, challenge_handlers): items = ( "uri", "file", "env", "kerberos", "gss", - "keycloak", "u2f", + "u2f", "prompt", "prompt", "prompt", "prompt", ) ierror = authlog diff --git a/xpra/server/auth/keycloak_auth.py b/xpra/server/auth/keycloak_auth.py index 9211c08325..7e27288f6b 100644 --- a/xpra/server/auth/keycloak_auth.py +++ b/xpra/server/auth/keycloak_auth.py @@ -5,12 +5,11 @@ # later version. See the file COPYING for details. import os +import sys import json from xpra.util import typedict from xpra.server.auth.sys_auth_base import SysAuthenticator, log -from keycloak import KeycloakOpenID -from oauthlib.oauth2 import WebApplicationClient KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/") KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm") @@ -31,12 +30,19 @@ def __init__(self, **kwargs): self.redirect_uri = kwargs.pop("redirect_uri", KEYCLOAK_REDIRECT_URI) self.scope = kwargs.pop("scope", KEYCLOAK_SCOPE) self.grant_type = kwargs.pop("grant_type", KEYCLOAK_GRANT_TYPE) + + # use keycloak as default prompt kwargs["prompt"] = kwargs.pop("prompt", "keycloak") - if KEYCLOAK_GRANT_TYPE == "authorization_code": - super().__init__(**kwargs) - log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s", - self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type) + if KEYCLOAK_GRANT_TYPE != "authorization_code": + raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported")) + + super().__init__(**kwargs) + log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s", + self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type) + + try: + from oauthlib.oauth2 import WebApplicationClient # Get authorization code client = WebApplicationClient(KEYCLOAK_CLIENT_ID) @@ -46,8 +52,12 @@ def __init__(self, **kwargs): redirect_uri = KEYCLOAK_REDIRECT_URI, scope = [KEYCLOAK_SCOPE], ) - else: - raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported.")) + except ImportError as e: + log("check(..)", exc_info=True) + log.warn("Warning: cannot use keycloak authentication:") + log.warn(" %s", e) + # unsure how to fail the auth at this point so we raise the exception + raise(e) def __repr__(self): return "keycloak" @@ -62,52 +72,114 @@ def get_challenge(self, digests): def check(self, response_json) -> bool: assert self.challenge_sent + #log("response_json: %r", response_json) - if response_json is None or response_json == "": - log.error("keycloak authentication failed: invalid response received from authorization endpoint.") + if not response_json: + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") return False - #log("response_json: %s", repr(response_json)) - response = json.loads(response_json) - - if type(response) != dict or ("code" not in response and "error" not in response): - log.error("keycloak authentication failed: invalid response received from authorization endpoint.") + try: + response = json.loads(response_json) + except json.JSONDecodeError: + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") + log("failed to parse json: %r", response_json, exc_info=True) + return False + + if not isinstance(response, dict): + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") + log("response is of type %r but dict type is required", type(response), exc_info=True) + log("failed to load response %r", response, exc_info=True) return False - log("check(%s)", repr(response)) + log("check(%r)", response, exc_info=True) + + auth_code = response.get("code") + error = response.get("error") - if "error" in response: - log.error("keycloak authentication failed with error %s: %s", response["error"], response["error_description"]) + if error: + log.error("Error: keycloak authentication failed") + log.error("%s: %s", error, response.get("error_description")) return False - if "code" in response: + if not auth_code: + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") + + try: + from keycloak import KeycloakOpenID + from keycloak.exceptions import KeycloakError + # Configure client keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL, client_id=KEYCLOAK_CLIENT_ID, realm_name=KEYCLOAK_REALM_NAME, client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY) - try: - # Get well_known - #config_well_know = keycloak_openid.well_know() - #log("well_known: %s", repr(config_well_know)) + # Get well_known + config_well_know = keycloak_openid.well_know() + log("well_known: %r", config_well_know, exc_info=True) - # Get token - token = keycloak_openid.token(code=response["code"], grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI) - - # Verify token - token_info = keycloak_openid.introspect(token['access_token']) - #log("token_info: %s", repr(token_info)) - - if token_info["active"]: - # Get userinfo - #user_info = keycloak_openid.userinfo(token['access_token']) - #log("userinfo_info: %s", repr(user_info)) - - log("keycloak authentication succeeded: token is active") - else: - log.error("keycloak authentication failed: token is not active") - return token_info["active"] - except Exception as e: - log.error("keycloak authentication failed with error code %s: %s", e.response_code, e.error_message) + # Get token + token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI) + + # Verify token + access_token = token.get("access_token") + if not access_token: + log.error("Error: keycloak authentication failed as access token is missing") + return False + + token_info = keycloak_openid.introspect(access_token) + log("token_info: %r", token_info, exc_info=True) + + token_state = token_info.get("active") + if token_state is None: + log.error("Error: keycloak authentication failed as token state is missing") return False + + if token_state is False: + log.error("Error: keycloak authentication failed as token state not active") + return False + + if token_state is True: + # Get userinfo + user_info = keycloak_openid.userinfo(access_token) + log("userinfo_info: %r", user_info, exc_info=True) + + log("keycloak authentication succeeded: token is active") + return True + + log.error("Error: keycloak authentication failed as token state is invalid") + return False + except KeycloakError as e: + log.error("Error: keycloak authentication failed") + log.error("Error code %s: %s", e.response_code, e.error_message) + return False + except ImportError as e: + log("check(..)", exc_info=True) + log.warn("Warning: cannot use keycloak authentication:") + log.warn(" %s", e) + return False + +def main(args): + if len(args)!=2: + print("invalid number of arguments") + print("usage:") + print("%s response_json" % (args[0],)) + return 1 + response_json = args[1] + + a = Authenticator() + a.get_challenge("keycloak") + + if a.check(response_json): + print("success") + return 0 + else: + print("failed") + return -1 + +if __name__ == "__main__": + sys.exit(main(sys.argv))