Skip to content

Commit

Permalink
#3486 fix keycloak auth module thanks to unit tests
Browse files Browse the repository at this point in the history
the kwargs were being parsed but not actually used
  • Loading branch information
totaam committed Apr 30, 2022
1 parent d908846 commit 918a880
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 15 deletions.
38 changes: 38 additions & 0 deletions tests/unittests/unit/server/auth_test.py
Expand Up @@ -384,6 +384,44 @@ def exec_cmd(cmd, success=True):
exec_cmd("/bin/true", True)
exec_cmd("/bin/false", False)

def test_keycloak(self):
try:
self._init_auth("keycloak")
except ImportError as e:
print("Warning: keycloak auth test skipped")
print(" %s", e)
return
def t(digests=None, response=None, **kwargs):
a = self._init_auth("keycloak", **kwargs)
assert a.requires_challenge(), "%s should require a challenge" % a
if digests is not None:
assert a.get_challenge(digests), "cannot get challenge for digests %s" % (digests,)
if response is not None:
assert a.check(response), "check failed for response %s" % (response,)
return a
def f(digests=None, response=None, **kwargs):
try:
t(digests, response, **kwargs)
except Exception:
pass
else:
raise Exception("keycloak auth should have failed with arguments: %s" % (kwargs,))
t()
#only 'authorization_code' is supported:
f(grant_type="foo")
t(grant_type="authorization_code")
#only 'keycloak' digest is supported:
f(digests=("xor", ))
t(digests=("xor", "keycloak", ))
t(digests=("keycloak", ))
#we can't provide a valid response:
f(digests=("keycloak", ), response="")
f(digests=("keycloak", ), response="foo")
f(digests=("keycloak", ), response="{\"foo\":\"bar\"}")
f(digests=("keycloak", ), response="{\"code\":\"authorization_code\"}")
#non-https URL should fail:
f(server_url="http://localhost:8080/")


def main():
import logging
Expand Down
29 changes: 14 additions & 15 deletions xpra/server/auth/keycloak_auth.py
Expand Up @@ -10,7 +10,7 @@

from xpra.server.auth.sys_auth_base import SysAuthenticator, log

KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/")
KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "https://localhost:8080/auth/")
KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm")
KEYCLOAK_CLIENT_ID = os.environ.get("XPRA_KEYCLOAK_CLIENT_ID", "example_client")
KEYCLOAK_CLIENT_SECRET_KEY = os.environ.get("XPRA_KEYCLOAK_CLIENT_SECRET_KEY", "secret")
Expand All @@ -33,7 +33,7 @@ def __init__(self, **kwargs):
# use keycloak as default prompt
kwargs["prompt"] = kwargs.pop("prompt", "keycloak")

if KEYCLOAK_GRANT_TYPE != "authorization_code":
if self.grant_type != "authorization_code":
raise NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported")

super().__init__(**kwargs)
Expand All @@ -42,16 +42,15 @@ def __init__(self, **kwargs):

try:
from oauthlib.oauth2 import WebApplicationClient

# Get authorization code
client = WebApplicationClient(KEYCLOAK_CLIENT_ID)
authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth'
client = WebApplicationClient(self.client_id)
authorization_url = self.server_url + 'realms/' + self.realm_name + '/protocol/openid-connect/auth'
self.salt = client.prepare_request_uri(
authorization_url,
redirect_uri = KEYCLOAK_REDIRECT_URI,
scope = [KEYCLOAK_SCOPE],
redirect_uri = self.redirect_uri,
scope = [self.redirect_uri],
)
except ImportError as e:
except ImportError as e: # pragma: no cover
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
Expand Down Expand Up @@ -110,19 +109,19 @@ def check(self, response_json) -> bool:
from keycloak.exceptions import KeycloakError

# Configure client
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM_NAME,
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY)
keycloak_openid = KeycloakOpenID(server_url=self.server_url,
client_id=self.client_id,
realm_name=self.realm_name,
client_secret_key=self.client_id)

# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", config_well_know, exc_info=True)

# Get token
token = keycloak_openid.token(code=auth_code,
grant_type=[KEYCLOAK_GRANT_TYPE],
redirect_uri=KEYCLOAK_REDIRECT_URI)
grant_type=[self.grant_type],
redirect_uri=self.redirect_uri)

# Verify token
access_token = token.get("access_token")
Expand Down Expand Up @@ -161,7 +160,7 @@ def check(self, response_json) -> bool:
return False


def main(args):
def main(args): # pragma: no cover
if len(args)!=2:
print("invalid number of arguments")
print("usage:")
Expand Down

0 comments on commit 918a880

Please sign in to comment.