New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added keycloak authentication #3486
Changes from all commits
d6957db
d91f9b7
bcc5fd1
da6897a
270ca22
36a22db
3a3e74d
92ff3c0
5e9b53f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,186 @@ | ||||||||||
# This file is part of Xpra. | ||||||||||
# Copyright (C) 2016-2021 Antoine Martin <antoine@xpra.org> | ||||||||||
# Copyright (C) 2022 Nathalie Casati <nat@yuka.ch> | ||||||||||
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any | ||||||||||
# later version. See the file COPYING for details. | ||||||||||
|
||||||||||
import os | ||||||||||
import sys | ||||||||||
import json | ||||||||||
|
||||||||||
from xpra.util import typedict | ||||||||||
from xpra.server.auth.sys_auth_base import SysAuthenticator, log | ||||||||||
|
||||||||||
KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/") | ||||||||||
KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm") | ||||||||||
KEYCLOAK_CLIENT_ID = os.environ.get("XPRA_KEYCLOAK_CLIENT_ID", "example_client") | ||||||||||
KEYCLOAK_CLIENT_SECRET_KEY = os.environ.get("XPRA_KEYCLOAK_CLIENT_SECRET_KEY", "secret") | ||||||||||
KEYCLOAK_REDIRECT_URI = os.environ.get("XPRA_KEYCLOAK_REDIRECT_URI", "http://localhost/login/") | ||||||||||
KEYCLOAK_SCOPE = os.environ.get("XPRA_KEYCLOAK_SCOPE", "openid") | ||||||||||
KEYCLOAK_GRANT_TYPE = os.environ.get("XPRA_KEYCLOAK_GRANT_TYPE", "authorization_code") | ||||||||||
|
||||||||||
|
||||||||||
class Authenticator(SysAuthenticator): | ||||||||||
|
||||||||||
def __init__(self, **kwargs): | ||||||||||
self.server_url = kwargs.pop("server_url", KEYCLOAK_SERVER_URL) | ||||||||||
self.realm_name = kwargs.pop("realm_name", KEYCLOAK_REALM_NAME) | ||||||||||
self.client_id = kwargs.pop("client_id", KEYCLOAK_CLIENT_ID) | ||||||||||
self.client_secret_key = kwargs.pop("client_secret_key", KEYCLOAK_CLIENT_SECRET_KEY) | ||||||||||
self.redirect_uri = kwargs.pop("redirect_uri", KEYCLOAK_REDIRECT_URI) | ||||||||||
self.scope = kwargs.pop("scope", KEYCLOAK_SCOPE) | ||||||||||
self.grant_type = kwargs.pop("grant_type", KEYCLOAK_GRANT_TYPE) | ||||||||||
|
||||||||||
# use keycloak as default prompt | ||||||||||
kwargs["prompt"] = kwargs.pop("prompt", "keycloak") | ||||||||||
|
||||||||||
if KEYCLOAK_GRANT_TYPE != "authorization_code": | ||||||||||
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported")) | ||||||||||
|
||||||||||
super().__init__(**kwargs) | ||||||||||
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s", | ||||||||||
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type) | ||||||||||
|
||||||||||
try: | ||||||||||
from oauthlib.oauth2 import WebApplicationClient | ||||||||||
|
||||||||||
# Get authorization code | ||||||||||
client = WebApplicationClient(KEYCLOAK_CLIENT_ID) | ||||||||||
authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth' | ||||||||||
self.salt = client.prepare_request_uri( | ||||||||||
authorization_url, | ||||||||||
redirect_uri = KEYCLOAK_REDIRECT_URI, | ||||||||||
scope = [KEYCLOAK_SCOPE], | ||||||||||
) | ||||||||||
except ImportError as e: | ||||||||||
log("check(..)", exc_info=True) | ||||||||||
log.warn("Warning: cannot use keycloak authentication:") | ||||||||||
log.warn(" %s", e) | ||||||||||
# unsure how to fail the auth at this point so we raise the exception | ||||||||||
raise(e) | ||||||||||
|
||||||||||
def __repr__(self): | ||||||||||
return "keycloak" | ||||||||||
|
||||||||||
def get_challenge(self, digests): | ||||||||||
assert not self.challenge_sent | ||||||||||
if "keycloak" not in digests: | ||||||||||
log.error("Error: client does not support keycloak authentication") | ||||||||||
return None | ||||||||||
self.challenge_sent = True | ||||||||||
return self.salt, "keycloak" | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this is correct. We are using this in the client. There is no password. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just thinking out loud here now that I have figured out what I meant here. The What bothers me here is not new to this authentication module so this is not a showstopper: the client will be prompting the user for a "keycloak" (and I think it should say The xpra/xpra/client/auth/kerberos_handler.py Line 76 in 137a222
xpra/xpra/client/auth/gss_handler.py Line 54 in 137a222
And I was hoping that a keycloak client authentication module could do the same thing.But the problem with all those modules is that when they fail, we end up also calling the prompt client auth handler as fallback..
The xpra/xpra/client/auth/prompt_handler.py Lines 24 to 25 in 137a222
I am just thinking that it should be more obvious to the user that what they type in the prompt handler can be intercepted in this case. |
||||||||||
|
||||||||||
def check(self, response_json) -> bool: | ||||||||||
assert self.challenge_sent | ||||||||||
#log("response_json: %r", response_json) | ||||||||||
|
||||||||||
if not response_json: | ||||||||||
log.error("Error: keycloak authentication failed") | ||||||||||
log.error("Invalid response received from authorization endpoint") | ||||||||||
return False | ||||||||||
|
||||||||||
try: | ||||||||||
response = json.loads(response_json) | ||||||||||
except json.JSONDecodeError: | ||||||||||
log.error("Error: keycloak authentication failed") | ||||||||||
log.error("Invalid response received from authorization endpoint") | ||||||||||
log("failed to parse json: %r", response_json, exc_info=True) | ||||||||||
return False | ||||||||||
|
||||||||||
if not isinstance(response, dict): | ||||||||||
log.error("Error: keycloak authentication failed") | ||||||||||
log.error("Invalid response received from authorization endpoint") | ||||||||||
log("response is of type %r but dict type is required", type(response), exc_info=True) | ||||||||||
log("failed to load response %r", response, exc_info=True) | ||||||||||
return False | ||||||||||
|
||||||||||
log("check(%r)", response) | ||||||||||
|
||||||||||
auth_code = response.get("code") | ||||||||||
error = response.get("error") | ||||||||||
|
||||||||||
if error: | ||||||||||
log.error("Error: keycloak authentication failed") | ||||||||||
log.error("%s: %s", error, response.get("error_description")) | ||||||||||
return False | ||||||||||
|
||||||||||
if not auth_code: | ||||||||||
log.error("Error: keycloak authentication failed") | ||||||||||
log.error("Invalid response received from authorization endpoint") | ||||||||||
iDmple marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
return False | ||||||||||
iDmple marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
try: | ||||||||||
from keycloak import KeycloakOpenID | ||||||||||
from keycloak.exceptions import KeycloakError | ||||||||||
|
||||||||||
# Configure client | ||||||||||
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL, | ||||||||||
client_id=KEYCLOAK_CLIENT_ID, | ||||||||||
realm_name=KEYCLOAK_REALM_NAME, | ||||||||||
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY) | ||||||||||
|
||||||||||
# Get well_known | ||||||||||
config_well_know = keycloak_openid.well_know() | ||||||||||
log("well_known: %r", config_well_know, exc_info=True) | ||||||||||
|
||||||||||
# Get token | ||||||||||
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI) | ||||||||||
|
||||||||||
# Verify token | ||||||||||
access_token = token.get("access_token") | ||||||||||
if not access_token: | ||||||||||
log.error("Error: keycloak authentication failed as access token is missing") | ||||||||||
return False | ||||||||||
|
||||||||||
token_info = keycloak_openid.introspect(access_token) | ||||||||||
log("token_info: %r", token_info, exc_info=True) | ||||||||||
|
||||||||||
token_state = token_info.get("active") | ||||||||||
if token_state is None: | ||||||||||
log.error("Error: keycloak authentication failed as token state is missing") | ||||||||||
return False | ||||||||||
|
||||||||||
if token_state is False: | ||||||||||
log.error("Error: keycloak authentication failed as token state not active") | ||||||||||
return False | ||||||||||
|
||||||||||
if token_state is True: | ||||||||||
# Get userinfo | ||||||||||
user_info = keycloak_openid.userinfo(access_token) | ||||||||||
log("userinfo_info: %r", user_info, exc_info=True) | ||||||||||
|
||||||||||
log("keycloak authentication succeeded: token is active") | ||||||||||
return True | ||||||||||
|
||||||||||
log.error("Error: keycloak authentication failed as token state is invalid") | ||||||||||
return False | ||||||||||
except KeycloakError as e: | ||||||||||
log.error("Error: keycloak authentication failed") | ||||||||||
log.error("Error code %s: %s", e.response_code, e.error_message) | ||||||||||
return False | ||||||||||
except ImportError as e: | ||||||||||
log("check(..)", exc_info=True) | ||||||||||
log.warn("Warning: cannot use keycloak authentication:") | ||||||||||
log.warn(" %s", e) | ||||||||||
return False | ||||||||||
|
||||||||||
def main(args): | ||||||||||
if len(args)!=2: | ||||||||||
print("invalid number of arguments") | ||||||||||
print("usage:") | ||||||||||
print("%s response_json" % (args[0],)) | ||||||||||
return 1 | ||||||||||
response_json = args[1] | ||||||||||
|
||||||||||
a = Authenticator() | ||||||||||
a.get_challenge("keycloak") | ||||||||||
|
||||||||||
if a.check(response_json): | ||||||||||
print("success") | ||||||||||
return 0 | ||||||||||
else: | ||||||||||
print("failed") | ||||||||||
return -1 | ||||||||||
|
||||||||||
if __name__ == "__main__": | ||||||||||
sys.exit(main(sys.argv)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't hurt to add a comment here, like "use
keycloak
as default prompt".There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor:
keycloak token
as default prompt would be clearer IMO.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we add more grant_types we could have
But never token. We could also implement them in different files if too different / complex.
We can add what you prefer here but in this case it will never be shown, or I don't see how. And it will never ask for a token.