Skip to content

Commit

Permalink
Merge pull request #2 from HIP-infrastructure/keycloak-fixes
Browse files Browse the repository at this point in the history
Refactoring for PR
  • Loading branch information
iDmple committed Apr 20, 2022
2 parents 270ca22 + 36a22db commit 3a3e74d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 44 deletions.
2 changes: 1 addition & 1 deletion xpra/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def init_challenge_handlers(self, challenge_handlers):
items = (
"uri", "file", "env",
"kerberos", "gss",
"keycloak", "u2f",
"u2f",
"prompt", "prompt", "prompt", "prompt",
)
ierror = authlog
Expand Down
146 changes: 103 additions & 43 deletions xpra/server/auth/keycloak_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
# later version. See the file COPYING for details.

import os
import sys
import json

from xpra.util import typedict
from xpra.server.auth.sys_auth_base import SysAuthenticator, log
from keycloak import KeycloakOpenID
from oauthlib.oauth2 import WebApplicationClient

KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/")
KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm")
Expand All @@ -31,12 +30,19 @@ def __init__(self, **kwargs):
self.redirect_uri = kwargs.pop("redirect_uri", KEYCLOAK_REDIRECT_URI)
self.scope = kwargs.pop("scope", KEYCLOAK_SCOPE)
self.grant_type = kwargs.pop("grant_type", KEYCLOAK_GRANT_TYPE)

# use keycloak as default prompt
kwargs["prompt"] = kwargs.pop("prompt", "keycloak")

if KEYCLOAK_GRANT_TYPE == "authorization_code":
super().__init__(**kwargs)
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s",
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type)
if KEYCLOAK_GRANT_TYPE != "authorization_code":
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported"))

super().__init__(**kwargs)
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s",
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type)

try:
from oauthlib.oauth2 import WebApplicationClient

# Get authorization code
client = WebApplicationClient(KEYCLOAK_CLIENT_ID)
Expand All @@ -46,8 +52,12 @@ def __init__(self, **kwargs):
redirect_uri = KEYCLOAK_REDIRECT_URI,
scope = [KEYCLOAK_SCOPE],
)
else:
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported."))
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
# unsure how to fail the auth at this point so we raise the exception
raise(e)

def __repr__(self):
return "keycloak"
Expand All @@ -62,64 +72,114 @@ def get_challenge(self, digests):

def check(self, response_json) -> bool:
assert self.challenge_sent
#log("response_json: %r", repr(response_json))
#log("response_json: %r", response_json)

if not response_json:
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False

try:
response = json.loads(response_json)
except Exception:
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
except json.JSONDecodeError:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("failed to parse json: %r", response_json, exc_info=True)
return False

if not isinstance(response, dict):
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("response is of type %r but dict type is required", type(response), exc_info=True)
log("failed to load response %r", response, exc_info=True)
return False

log("check(%r)", repr(response))
log("check(%r)", response, exc_info=True)

auth_code = response.get("code")
error = response.get("error")

if not error and not auth_code:
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
return False

if error:
log.error("Error: keycloak authentication failed with %s: %s", error, response.get("error_description"))
log.error("Error: keycloak authentication failed")
log.error("%s: %s", error, response.get("error_description"))
return False

if auth_code:
if not auth_code:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")

try:
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError

# Configure client
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM_NAME,
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY)

try:
# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", repr(config_well_know))
# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", config_well_know, exc_info=True)

# Get token
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI)

# Verify token
token_info = keycloak_openid.introspect(token['access_token'])
log("token_info: %r", repr(token_info))

if token_info["active"]:
# Get userinfo
user_info = keycloak_openid.userinfo(token['access_token'])
log("userinfo_info: %r", repr(user_info))

log("keycloak authentication succeeded: token is active")
return True
else:
log.error("Error: keycloak authentication failed as token is not active")
return False
except Exception as e:
log.error("Error: keycloak authentication failed with code %s: %s", e.response_code, e.error_message)
# Get token
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI)

# Verify token
access_token = token.get("access_token")
if not access_token:
log.error("Error: keycloak authentication failed as access token is missing")
return False

token_info = keycloak_openid.introspect(access_token)
log("token_info: %r", token_info, exc_info=True)

token_state = token_info.get("active")
if token_state is None:
log.error("Error: keycloak authentication failed as token state is missing")
return False

if token_state is False:
log.error("Error: keycloak authentication failed as token state not active")
return False

if token_state is True:
# Get userinfo
user_info = keycloak_openid.userinfo(access_token)
log("userinfo_info: %r", user_info, exc_info=True)

log("keycloak authentication succeeded: token is active")
return True

log.error("Error: keycloak authentication failed as token state is invalid")
return False
except KeycloakError as e:
log.error("Error: keycloak authentication failed")
log.error("Error code %s: %s", e.response_code, e.error_message)
return False
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
return False

def main(args):
if len(args)!=2:
print("invalid number of arguments")
print("usage:")
print("%s response_json" % (args[0],))
return 1
response_json = args[1]

a = Authenticator()
a.get_challenge("keycloak")

if a.check(response_json):
print("success")
return 0
else:
print("failed")
return -1

if __name__ == "__main__":
sys.exit(main(sys.argv))

0 comments on commit 3a3e74d

Please sign in to comment.