Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring for PR #2

Merged
merged 2 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion xpra/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def init_challenge_handlers(self, challenge_handlers):
items = (
"uri", "file", "env",
"kerberos", "gss",
"keycloak", "u2f",
"u2f",
"prompt", "prompt", "prompt", "prompt",
)
ierror = authlog
Expand Down
146 changes: 103 additions & 43 deletions xpra/server/auth/keycloak_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
# later version. See the file COPYING for details.

import os
import sys
import json

from xpra.util import typedict
from xpra.server.auth.sys_auth_base import SysAuthenticator, log
from keycloak import KeycloakOpenID
from oauthlib.oauth2 import WebApplicationClient

KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/")
KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm")
Expand All @@ -31,12 +30,19 @@ def __init__(self, **kwargs):
self.redirect_uri = kwargs.pop("redirect_uri", KEYCLOAK_REDIRECT_URI)
self.scope = kwargs.pop("scope", KEYCLOAK_SCOPE)
self.grant_type = kwargs.pop("grant_type", KEYCLOAK_GRANT_TYPE)

# use keycloak as default prompt
kwargs["prompt"] = kwargs.pop("prompt", "keycloak")

if KEYCLOAK_GRANT_TYPE == "authorization_code":
super().__init__(**kwargs)
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s",
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type)
if KEYCLOAK_GRANT_TYPE != "authorization_code":
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported"))

super().__init__(**kwargs)
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s",
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type)

try:
from oauthlib.oauth2 import WebApplicationClient

# Get authorization code
client = WebApplicationClient(KEYCLOAK_CLIENT_ID)
Expand All @@ -46,8 +52,12 @@ def __init__(self, **kwargs):
redirect_uri = KEYCLOAK_REDIRECT_URI,
scope = [KEYCLOAK_SCOPE],
)
else:
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported."))
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
# unsure how to fail the auth at this point so we raise the exception
raise(e)

def __repr__(self):
return "keycloak"
Expand All @@ -62,64 +72,114 @@ def get_challenge(self, digests):

def check(self, response_json) -> bool:
assert self.challenge_sent
#log("response_json: %r", repr(response_json))
#log("response_json: %r", response_json)

if not response_json:
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False

try:
response = json.loads(response_json)
except Exception:
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
except json.JSONDecodeError:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("failed to parse json: %r", response_json, exc_info=True)
return False

if not isinstance(response, dict):
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("response is of type %r but dict type is required", type(response), exc_info=True)
log("failed to load response %r", response, exc_info=True)
return False

log("check(%r)", repr(response))
log("check(%r)", response, exc_info=True)

auth_code = response.get("code")
error = response.get("error")

if not error and not auth_code:
log.error("Error: keycloak authentication failed as invalid response was received from authorization endpoint.")
return False

if error:
log.error("Error: keycloak authentication failed with %s: %s", error, response.get("error_description"))
log.error("Error: keycloak authentication failed")
log.error("%s: %s", error, response.get("error_description"))
return False

if auth_code:
if not auth_code:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")

try:
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError

# Configure client
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM_NAME,
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY)

try:
# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", repr(config_well_know))
# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", config_well_know, exc_info=True)

# Get token
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI)

# Verify token
token_info = keycloak_openid.introspect(token['access_token'])
log("token_info: %r", repr(token_info))

if token_info["active"]:
# Get userinfo
user_info = keycloak_openid.userinfo(token['access_token'])
log("userinfo_info: %r", repr(user_info))

log("keycloak authentication succeeded: token is active")
return True
else:
log.error("Error: keycloak authentication failed as token is not active")
return False
except Exception as e:
log.error("Error: keycloak authentication failed with code %s: %s", e.response_code, e.error_message)
# Get token
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI)

# Verify token
access_token = token.get("access_token")
if not access_token:
log.error("Error: keycloak authentication failed as access token is missing")
return False

token_info = keycloak_openid.introspect(access_token)
log("token_info: %r", token_info, exc_info=True)

token_state = token_info.get("active")
if token_state is None:
log.error("Error: keycloak authentication failed as token state is missing")
return False

if token_state is False:
log.error("Error: keycloak authentication failed as token state not active")
return False

if token_state is True:
# Get userinfo
user_info = keycloak_openid.userinfo(access_token)
log("userinfo_info: %r", user_info, exc_info=True)

log("keycloak authentication succeeded: token is active")
return True

log.error("Error: keycloak authentication failed as token state is invalid")
return False
except KeycloakError as e:
log.error("Error: keycloak authentication failed")
log.error("Error code %s: %s", e.response_code, e.error_message)
return False
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
return False

def main(args):
if len(args)!=2:
print("invalid number of arguments")
print("usage:")
print("%s response_json" % (args[0],))
return 1
response_json = args[1]

a = Authenticator()
a.get_challenge("keycloak")

if a.check(response_json):
print("success")
return 0
else:
print("failed")
return -1

if __name__ == "__main__":
sys.exit(main(sys.argv))