diff --git a/xpra/server/auth/keycloak_auth.py b/xpra/server/auth/keycloak_auth.py index d0a3aba488..a3f91bf031 100644 --- a/xpra/server/auth/keycloak_auth.py +++ b/xpra/server/auth/keycloak_auth.py @@ -1,5 +1,5 @@ # This file is part of Xpra. -# Copyright (C) 2016-2021 Antoine Martin +# Copyright (C) 2016-2022 Antoine Martin # Copyright (C) 2022 Nathalie Casati # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. @@ -8,7 +8,6 @@ import sys import json -from xpra.util import typedict from xpra.server.auth.sys_auth_base import SysAuthenticator, log KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/") @@ -35,29 +34,29 @@ def __init__(self, **kwargs): kwargs["prompt"] = kwargs.pop("prompt", "keycloak") if KEYCLOAK_GRANT_TYPE != "authorization_code": - raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported")) - + raise NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported") + super().__init__(**kwargs) log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s", self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type) try: - from oauthlib.oauth2 import WebApplicationClient - - # Get authorization code - client = WebApplicationClient(KEYCLOAK_CLIENT_ID) - authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth' - self.salt = client.prepare_request_uri( - authorization_url, - redirect_uri = KEYCLOAK_REDIRECT_URI, - scope = [KEYCLOAK_SCOPE], - ) + from oauthlib.oauth2 import WebApplicationClient + + # Get authorization code + client = WebApplicationClient(KEYCLOAK_CLIENT_ID) + authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth' + self.salt = client.prepare_request_uri( + authorization_url, + redirect_uri = KEYCLOAK_REDIRECT_URI, + scope = [KEYCLOAK_SCOPE], + ) except ImportError as e: - log("check(..)", exc_info=True) - log.warn("Warning: cannot use keycloak authentication:") - log.warn(" %s", e) - # unsure how to fail the auth at this point so we raise the exception - raise(e) + log("check(..)", exc_info=True) + log.warn("Warning: cannot use keycloak authentication:") + log.warn(" %s", e) + # unsure how to fail the auth at this point so we raise the exception + raise def __repr__(self): return "keycloak" @@ -72,97 +71,95 @@ def get_challenge(self, digests): def check(self, response_json) -> bool: assert self.challenge_sent - #log("response_json: %r", response_json) - if not response_json: - log.error("Error: keycloak authentication failed") - log.error("Invalid response received from authorization endpoint") - return False - + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") + return False + try: - response = json.loads(response_json) + response = json.loads(response_json) except json.JSONDecodeError: - log.error("Error: keycloak authentication failed") - log.error("Invalid response received from authorization endpoint") - log("failed to parse json: %r", response_json, exc_info=True) - return False + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") + log("failed to parse json: %r", response_json, exc_info=True) + return False if not isinstance(response, dict): - log.error("Error: keycloak authentication failed") - log.error("Invalid response received from authorization endpoint") - log("response is of type %r but dict type is required", type(response), exc_info=True) - log("failed to load response %r", response, exc_info=True) - return False + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") + log("response is of type %r but dict type is required", type(response), exc_info=True) + log("failed to load response %r", response, exc_info=True) + return False log("check(%r)", response) - auth_code = response.get("code") error = response.get("error") if error: - log.error("Error: keycloak authentication failed") - log.error("%s: %s", error, response.get("error_description")) - return False - - if not auth_code: - log.error("Error: keycloak authentication failed") - log.error("Invalid response received from authorization endpoint") - return False - - try: - from keycloak import KeycloakOpenID - from keycloak.exceptions import KeycloakError - - # Configure client - keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL, - client_id=KEYCLOAK_CLIENT_ID, - realm_name=KEYCLOAK_REALM_NAME, - client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY) - - # Get well_known - config_well_know = keycloak_openid.well_know() - log("well_known: %r", config_well_know, exc_info=True) - - # Get token - token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI) - - # Verify token - access_token = token.get("access_token") - if not access_token: - log.error("Error: keycloak authentication failed as access token is missing") + log.error("Error: keycloak authentication failed") + log.error("%s: %s", error, response.get("error_description")) return False - token_info = keycloak_openid.introspect(access_token) - log("token_info: %r", token_info, exc_info=True) - - token_state = token_info.get("active") - if token_state is None: - log.error("Error: keycloak authentication failed as token state is missing") + if not auth_code: + log.error("Error: keycloak authentication failed") + log.error("Invalid response received from authorization endpoint") return False - if token_state is False: - log.error("Error: keycloak authentication failed as token state not active") - return False + try: + from keycloak import KeycloakOpenID + from keycloak.exceptions import KeycloakError + + # Configure client + keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL, + client_id=KEYCLOAK_CLIENT_ID, + realm_name=KEYCLOAK_REALM_NAME, + client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY) + + # Get well_known + config_well_know = keycloak_openid.well_know() + log("well_known: %r", config_well_know, exc_info=True) + + # Get token + token = keycloak_openid.token(code=auth_code, + grant_type=[KEYCLOAK_GRANT_TYPE], + redirect_uri=KEYCLOAK_REDIRECT_URI) + + # Verify token + access_token = token.get("access_token") + if not access_token: + log.error("Error: keycloak authentication failed as access token is missing") + return False + + token_info = keycloak_openid.introspect(access_token) + log("token_info: %r", token_info, exc_info=True) + + token_state = token_info.get("active") + if token_state is None: + log.error("Error: keycloak authentication failed as token state is missing") + return False + + if token_state is False: + log.error("Error: keycloak authentication failed as token state not active") + return False + + if token_state is not True: + log.error("Error: keycloak authentication failed as token state is invalid") + return False - if token_state is True: - # Get userinfo user_info = keycloak_openid.userinfo(access_token) log("userinfo_info: %r", user_info, exc_info=True) - log("keycloak authentication succeeded: token is active") return True - - log.error("Error: keycloak authentication failed as token state is invalid") - return False except KeycloakError as e: - log.error("Error: keycloak authentication failed") - log.error("Error code %s: %s", e.response_code, e.error_message) - return False + log.error("Error: keycloak authentication failed") + log.error(" error code %s: %s", e.response_code, e.error_message) + return False except ImportError as e: - log("check(..)", exc_info=True) - log.warn("Warning: cannot use keycloak authentication:") - log.warn(" %s", e) - return False + log("check(..)", exc_info=True) + log.warn("Warning: cannot use keycloak authentication:") + log.warn(" %s", e) + return False + def main(args): if len(args)!=2: @@ -175,12 +172,11 @@ def main(args): a = Authenticator() a.get_challenge("keycloak") - if a.check(response_json): - print("success") - return 0 - else: + if not a.check(response_json): print("failed") return -1 + print("success") + return 0 if __name__ == "__main__": sys.exit(main(sys.argv))