forked from Xpra-org/xpra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
keycloak_auth.py
186 lines (150 loc) · 7.22 KB
/
keycloak_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# This file is part of Xpra.
# Copyright (C) 2016-2021 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2022 Nathalie Casati <nat@yuka.ch>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
import os
import sys
import json
from xpra.util import typedict
from xpra.server.auth.sys_auth_base import SysAuthenticator, log
KEYCLOAK_SERVER_URL = os.environ.get("XPRA_KEYCLOAK_SERVER_URL", "http://localhost:8080/auth/")
KEYCLOAK_REALM_NAME = os.environ.get("XPRA_KEYCLOAK_REALM_NAME", "example_realm")
KEYCLOAK_CLIENT_ID = os.environ.get("XPRA_KEYCLOAK_CLIENT_ID", "example_client")
KEYCLOAK_CLIENT_SECRET_KEY = os.environ.get("XPRA_KEYCLOAK_CLIENT_SECRET_KEY", "secret")
KEYCLOAK_REDIRECT_URI = os.environ.get("XPRA_KEYCLOAK_REDIRECT_URI", "http://localhost/login/")
KEYCLOAK_SCOPE = os.environ.get("XPRA_KEYCLOAK_SCOPE", "openid")
KEYCLOAK_GRANT_TYPE = os.environ.get("XPRA_KEYCLOAK_GRANT_TYPE", "authorization_code")
class Authenticator(SysAuthenticator):
def __init__(self, **kwargs):
self.server_url = kwargs.pop("server_url", KEYCLOAK_SERVER_URL)
self.realm_name = kwargs.pop("realm_name", KEYCLOAK_REALM_NAME)
self.client_id = kwargs.pop("client_id", KEYCLOAK_CLIENT_ID)
self.client_secret_key = kwargs.pop("client_secret_key", KEYCLOAK_CLIENT_SECRET_KEY)
self.redirect_uri = kwargs.pop("redirect_uri", KEYCLOAK_REDIRECT_URI)
self.scope = kwargs.pop("scope", KEYCLOAK_SCOPE)
self.grant_type = kwargs.pop("grant_type", KEYCLOAK_GRANT_TYPE)
# use keycloak as default prompt
kwargs["prompt"] = kwargs.pop("prompt", "keycloak")
if KEYCLOAK_GRANT_TYPE != "authorization_code":
raise(NotImplementedError("Warning: only grant type \"authorization_code\" is currently supported"))
super().__init__(**kwargs)
log("keycloak auth: server_url=%s, client_id=%s, realm_name=%s, redirect_uri=%s, scope=%s, grant_type=%s",
self.server_url, self.client_id, self.realm_name, self.redirect_uri, self.scope, self.grant_type)
try:
from oauthlib.oauth2 import WebApplicationClient
# Get authorization code
client = WebApplicationClient(KEYCLOAK_CLIENT_ID)
authorization_url = KEYCLOAK_SERVER_URL + 'realms/' + KEYCLOAK_REALM_NAME + '/protocol/openid-connect/auth'
self.salt = client.prepare_request_uri(
authorization_url,
redirect_uri = KEYCLOAK_REDIRECT_URI,
scope = [KEYCLOAK_SCOPE],
)
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
# unsure how to fail the auth at this point so we raise the exception
raise(e)
def __repr__(self):
return "keycloak"
def get_challenge(self, digests):
assert not self.challenge_sent
if "keycloak" not in digests:
log.error("Error: client does not support keycloak authentication")
return None
self.challenge_sent = True
return self.salt, "keycloak"
def check(self, response_json) -> bool:
assert self.challenge_sent
#log("response_json: %r", response_json)
if not response_json:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False
try:
response = json.loads(response_json)
except json.JSONDecodeError:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("failed to parse json: %r", response_json, exc_info=True)
return False
if not isinstance(response, dict):
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
log("response is of type %r but dict type is required", type(response), exc_info=True)
log("failed to load response %r", response, exc_info=True)
return False
log("check(%r)", response)
auth_code = response.get("code")
error = response.get("error")
if error:
log.error("Error: keycloak authentication failed")
log.error("%s: %s", error, response.get("error_description"))
return False
if not auth_code:
log.error("Error: keycloak authentication failed")
log.error("Invalid response received from authorization endpoint")
return False
try:
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError
# Configure client
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_SERVER_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM_NAME,
client_secret_key=KEYCLOAK_CLIENT_SECRET_KEY)
# Get well_known
config_well_know = keycloak_openid.well_know()
log("well_known: %r", config_well_know, exc_info=True)
# Get token
token = keycloak_openid.token(code=auth_code, grant_type=[KEYCLOAK_GRANT_TYPE], redirect_uri=KEYCLOAK_REDIRECT_URI)
# Verify token
access_token = token.get("access_token")
if not access_token:
log.error("Error: keycloak authentication failed as access token is missing")
return False
token_info = keycloak_openid.introspect(access_token)
log("token_info: %r", token_info, exc_info=True)
token_state = token_info.get("active")
if token_state is None:
log.error("Error: keycloak authentication failed as token state is missing")
return False
if token_state is False:
log.error("Error: keycloak authentication failed as token state not active")
return False
if token_state is True:
# Get userinfo
user_info = keycloak_openid.userinfo(access_token)
log("userinfo_info: %r", user_info, exc_info=True)
log("keycloak authentication succeeded: token is active")
return True
log.error("Error: keycloak authentication failed as token state is invalid")
return False
except KeycloakError as e:
log.error("Error: keycloak authentication failed")
log.error("Error code %s: %s", e.response_code, e.error_message)
return False
except ImportError as e:
log("check(..)", exc_info=True)
log.warn("Warning: cannot use keycloak authentication:")
log.warn(" %s", e)
return False
def main(args):
if len(args)!=2:
print("invalid number of arguments")
print("usage:")
print("%s response_json" % (args[0],))
return 1
response_json = args[1]
a = Authenticator()
a.get_challenge("keycloak")
if a.check(response_json):
print("success")
return 0
else:
print("failed")
return -1
if __name__ == "__main__":
sys.exit(main(sys.argv))