Skip to content

Commit

Permalink
#3486 style fixups
Browse files Browse the repository at this point in the history
* whitespace was wrong (4 spaces)
* empty lines
* error message format
* consistency: check for failures and shortcut out, success at the end
* unused import
* 'raise' format
  • Loading branch information
totaam committed Apr 21, 2022
1 parent 3a5c73b commit 34bd7bc
Showing 1 changed file with 88 additions and 92 deletions.
180 changes: 88 additions & 92 deletions xpra/server/auth/keycloak_auth.py
@@ -1,5 +1,5 @@
# This file is part of Xpra.
# Copyright (C) 2016-2021 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2016-2022 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2022 Nathalie Casati <nat@yuka.ch>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
Expand All @@ -8,7 +8,6 @@
import sys
import json

from xpra.util import typedict
from xpra.server.auth.sys_auth_base import SysAuthenticator, log

KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/")
Expand All @@ -35,29 +34,29 @@ def __init__(self, **kwargs):
kwargs["prompt"] = kwargs.pop("prompt", "keycloak")

if KEYCLOAK_GRANT_TYPE != "authorization_code":
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported"))
raise NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported")

super().__init__(**kwargs)
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s",
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type)

try:
from oauthlib.oauth2 import WebApplicationClient

# Get authorization code
client = WebApplicationClient(KEYCLOAK_CLIENT_ID)
authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth'
self.salt = client.prepare_request_uri(
authorization_url,
redirect_uri = KEYCLOAK_REDIRECT_URI,
scope = [KEYCLOAK_SCOPE],
)
from oauthlib.oauth2 import WebApplicationClient

# Get authorization code
client = WebApplicationClient(KEYCLOAK_CLIENT_ID)
authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth'
self.salt = client.prepare_request_uri(
authorization_url,
redirect_uri = KEYCLOAK_REDIRECT_URI,
scope = [KEYCLOAK_SCOPE],
)
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
# unsure how to fail the auth at this point so we raise the exception
raise(e)
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
# unsure how to fail the auth at this point so we raise the exception
raise

def __repr__(self):
return "keycloak"
Expand All @@ -72,97 +71,95 @@ def get_challenge(self, digests):

def check(self, response_json) -> bool:
assert self.challenge_sent
#log("response_json: %r", response_json)

if not response_json:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False

try:
response = json.loads(response_json)
response = json.loads(response_json)
except json.JSONDecodeError:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("failed to parse json: %r", response_json, exc_info=True)
return False
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("failed to parse json: %r", response_json, exc_info=True)
return False

if not isinstance(response, dict):
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("response is of type %r but dict type is required", type(response), exc_info=True)
log("failed to load response %r", response, exc_info=True)
return False
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("response is of type %r but dict type is required", type(response), exc_info=True)
log("failed to load response %r", response, exc_info=True)
return False

log("check(%r)", response)

auth_code = response.get("code")
error = response.get("error")

if error:
log.error("Error: keycloak authentication failed")
log.error("%s: %s", error, response.get("error_description"))
return False

if not auth_code:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False

try:
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError

# Configure client
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM_NAME,
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY)

# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", config_well_know, exc_info=True)

# Get token
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI)

# Verify token
access_token = token.get("access_token")
if not access_token:
log.error("Error: keycloak authentication failed as access token is missing")
log.error("Error: keycloak authentication failed")
log.error("%s: %s", error, response.get("error_description"))
return False

token_info = keycloak_openid.introspect(access_token)
log("token_info: %r", token_info, exc_info=True)

token_state = token_info.get("active")
if token_state is None:
log.error("Error: keycloak authentication failed as token state is missing")
if not auth_code:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False

if token_state is False:
log.error("Error: keycloak authentication failed as token state not active")
return False
try:
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError

# Configure client
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM_NAME,
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY)

# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", config_well_know, exc_info=True)

# Get token
token = keycloak_openid.token(code=auth_code,
grant_type=[KEYCLOAK_GRANT_TYPE],
redirect_uri=KEYCLOAK_REDIRECT_URI)

# Verify token
access_token = token.get("access_token")
if not access_token:
log.error("Error: keycloak authentication failed as access token is missing")
return False

token_info = keycloak_openid.introspect(access_token)
log("token_info: %r", token_info, exc_info=True)

token_state = token_info.get("active")
if token_state is None:
log.error("Error: keycloak authentication failed as token state is missing")
return False

if token_state is False:
log.error("Error: keycloak authentication failed as token state not active")
return False

if token_state is not True:
log.error("Error: keycloak authentication failed as token state is invalid")
return False

if token_state is True:
# Get userinfo
user_info = keycloak_openid.userinfo(access_token)
log("userinfo_info: %r", user_info, exc_info=True)

log("keycloak authentication succeeded: token is active")
return True

log.error("Error: keycloak authentication failed as token state is invalid")
return False
except KeycloakError as e:
log.error("Error: keycloak authentication failed")
log.error("Error code %s: %s", e.response_code, e.error_message)
return False
log.error("Error: keycloak authentication failed")
log.error(" error code %s: %s", e.response_code, e.error_message)
return False
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
return False
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
return False


def main(args):
if len(args)!=2:
Expand All @@ -175,12 +172,11 @@ def main(args):
a = Authenticator()
a.get_challenge("keycloak")

if a.check(response_json):
print("success")
return 0
else:
if not a.check(response_json):
print("failed")
return -1
print("success")
return 0

if __name__ == "__main__":
sys.exit(main(sys.argv))

0 comments on commit 34bd7bc

Please sign in to comment.