-
Notifications
You must be signed in to change notification settings - Fork 25
/
okta.py
318 lines (252 loc) · 10.1 KB
/
okta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# vim: set filetype=python ts=4 sw=4
# -*- coding: utf-8 -*-
"""
This module handles the all Okta operations.
1. Okta authentication
2. Update Okta Config File
"""
import json
import logging
import sys
import time
import requests
from tokendito import config
from tokendito import duo
from tokendito import user
logger = logging.getLogger(__name__)
_status_dict = dict(
E0000004="Authentication failed",
E0000047="API call exceeded rate limit due to too many requests",
PASSWORD_EXPIRED="Your password has expired",
LOCKED_OUT="Your account is locked out",
)
def api_wrapper(url, payload, headers=None):
"""Okta MFA authentication.
:param url: url to call
:param payload: JSON Payload
:param headers: Headers of the request
:return: Dictionary with authentication response
"""
logger.debug(f"url is {url}")
try:
response = requests.request("POST", url, data=json.dumps(payload), headers=headers)
response.raise_for_status()
except Exception as err:
logger.error(f"There was an error with the call to {url}: {err}")
sys.exit(1)
logger.debug(f"Response is {response}")
try:
ret = response.json()
except ValueError as e:
logger.error(
f"{type(e).__name__} - Failed to parse response\n"
f"URL: {url}\n"
f"Status: {response.status_code}\n"
f"Content: {response.content}\n"
)
sys.exit(1)
if "errorCode" in ret:
api_error_code_parser(ret["errorCode"])
sys.exit(1)
return ret
def api_error_code_parser(status=None):
"""Status code parsing.
param status: Response status
return message: status message
"""
if status and status in _status_dict.keys():
message = f"Okta auth failed: {_status_dict[status]}"
else:
message = f"Okta auth failed: {status}. Please verify your settings and try again."
logger.error(message)
logger.debug(f"Parsing error [{message}] ")
return message
def user_session_token(primary_auth, headers):
"""Get session_token.
param headers: Headers of the request
param primary_auth: Primary authentication
return session_token: Session Token from JSON response
"""
status = None
try:
status = primary_auth.get("status", None)
except AttributeError:
pass
if status == "SUCCESS" and "sessionToken" in primary_auth:
session_token = primary_auth.get("sessionToken")
elif status == "MFA_REQUIRED":
session_token = user_mfa_challenge(headers, primary_auth)
else:
logger.debug(f"Error parsing response: {json.dumps(primary_auth)}")
logger.error("Okta auth failed: unknown status.")
sys.exit(1)
user.add_sensitive_value_to_be_masked(session_token)
return session_token
def authenticate_user(config):
"""Authenticate user with okta credential.
:param config: Config object
:return: MFA session with options
"""
headers = {"content-type": "application/json", "accept": "application/json"}
payload = {"username": config.okta["username"], "password": config.okta["password"]}
logger.debug("Authenticate user to Okta")
logger.debug(f"Sending {headers}, {payload} to {config.okta['org']}")
primary_auth = api_wrapper(f"{config.okta['org']}/api/v1/authn", payload, headers)
session_token = user_session_token(primary_auth, headers)
logger.info("User has been succesfully authenticated.")
return session_token
def mfa_provider_type(
mfa_provider,
selected_factor,
mfa_challenge_url,
primary_auth,
selected_mfa_option,
headers,
payload,
):
"""Receive session key.
:param mfa_provider: MFA provider
:param selected_factor: Selected MFA factor
:param mfa_challenge_url: MFA challenge url
:param primary_auth: Primary authentication
:param selected_mfa_option: Selected MFA option
:return: session_key
"""
mfa_verify = dict()
if mfa_provider == "duo":
payload, headers, callback_url = duo.authenticate_duo(selected_factor)
duo.duo_api_post(callback_url, payload=payload)
mfa_verify = api_wrapper(mfa_challenge_url, payload, headers)
elif mfa_provider == "okta" or mfa_provider == "google":
mfa_verify = user_mfa_options(
selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth
)
else:
logger.error(
f"Sorry, the MFA provider '{mfa_provider}' is not yet supported."
" Please retry with another option."
)
exit(1)
return mfa_verify["sessionToken"]
def user_mfa_index(preset_mfa, available_mfas, mfa_options):
"""Get mfa method index in request.
:param preset_mfa: preset mfa method from settings
:param available_mfas: available mfa method ids
:param mfa_options: available mfa methods
"""
indices = []
# Gets the index number from each preset MFA in the list of avaliable ones.
if preset_mfa:
logger.debug(f"Get mfa method from {available_mfas}.")
indices = [i for i, elem in enumerate(available_mfas) if preset_mfa in elem]
mfa_index = None
if len(indices) == 0:
logger.debug(f"No matches with {preset_mfa}, going to get user input")
mfa_index = user.select_preferred_mfa_index(mfa_options)
elif len(indices) == 1:
logger.debug(f"One match: {preset_mfa} in {indices}")
mfa_index = indices[0]
else:
logger.error(
f"{preset_mfa} is not unique in {available_mfas}. Please check your configuration."
)
sys.exit(1)
return mfa_index
def user_mfa_challenge(headers, primary_auth):
"""Handle user mfa challenges.
:param headers: headers what needs to be sent to api
:param primary_auth: primary authentication
:return: Okta MFA Session token after the successful entry of the code
"""
logger.debug("Handle user MFA challenges")
try:
mfa_options = primary_auth["_embedded"]["factors"]
except KeyError as error:
logger.error(f"There was a wrong response structure: \n{error}")
sys.exit(1)
preset_mfa = config.okta["mfa_method"]
# This creates a list where each elements looks like provider_factor_id.
# For example, OKTA_push_9yi4bKJNH2WEWQ0x8, GOOGLE_token:software:totp_9yi4bKJNH2WEWQ
available_mfas = [f"{d['provider']}_{d['factorType']}_{d['id']}" for d in mfa_options]
mfa_index = user_mfa_index(preset_mfa, available_mfas, mfa_options)
# time to challenge the mfa option
selected_mfa_option = mfa_options[mfa_index]
logger.debug(f"Selected MFA is [{selected_mfa_option}]")
mfa_challenge_url = selected_mfa_option["_links"]["verify"]["href"]
payload = {
"stateToken": primary_auth["stateToken"],
"factorType": selected_mfa_option["factorType"],
"provider": selected_mfa_option["provider"],
"profile": selected_mfa_option["profile"],
}
selected_factor = api_wrapper(mfa_challenge_url, payload, headers)
mfa_provider = selected_factor["_embedded"]["factor"]["provider"].lower()
logger.debug(f"MFA Challenge URL: [{mfa_challenge_url}] headers: {headers}")
mfa_session_token = mfa_provider_type(
mfa_provider,
selected_factor,
mfa_challenge_url,
primary_auth,
selected_mfa_option,
headers,
payload,
)
return mfa_session_token
def user_mfa_options(selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth):
"""Handle user mfa options.
:param selected_mfa_option: Selected MFA option (SMS, push, etc)
:param headers: headers
:param mfa_challenge_url: MFA challenge URL
:param payload: payload
:param primary_auth: Primary authentication method
:return: payload data
"""
logger.debug("Handle user MFA options")
logger.debug(f"User MFA options selected: [{selected_mfa_option['factorType']}]")
if selected_mfa_option["factorType"] == "push":
return push_approval(headers, mfa_challenge_url, payload)
if config.okta["mfa_response"] is None:
logger.debug("Getting verification code from user.")
config.okta["mfa_response"] = user.get_input("Enter your verification code:")
user.add_sensitive_value_to_be_masked(config.okta["mfa_response"])
# time to verify the mfa method
payload = {"stateToken": primary_auth["stateToken"], "passCode": config.okta["mfa_response"]}
mfa_verify = api_wrapper(mfa_challenge_url, payload, headers)
if "sessionToken" in mfa_verify:
user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"])
logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]")
return mfa_verify
def push_approval(headers, mfa_challenge_url, payload):
"""Handle push approval from the user.
:param headers: HTTP headers sent to API call
:param mfa_challenge_url: MFA challenge url
:param payload: payload which needs to be sent
:return: Session Token if succeeded or terminates if user wait goes 5 min
"""
logger.debug(
f"Handle push approval from the user headers:{headers} challenge_url:{mfa_challenge_url}"
)
user.print("Waiting for an approval from the device...")
mfa_status = "WAITING"
mfa_verify = {}
while mfa_status == "WAITING":
mfa_verify = api_wrapper(mfa_challenge_url, payload, headers)
logger.debug(f"MFA Response:\n{json.dumps(mfa_verify)}")
if "factorResult" in mfa_verify:
mfa_status = mfa_verify["factorResult"]
elif "status" in mfa_verify and mfa_verify["status"] == "SUCCESS":
break
else:
logger.error("There was an error getting your MFA status.")
logger.debug(f"{mfa_verify}")
if "status" in mfa_verify:
logger.error(f"Exiting due to error: {mfa_verify['status']}")
sys.exit(1)
if mfa_status == "REJECTED":
logger.error("The Okta Verify push has been denied. Please retry later.")
sys.exit(2)
elif mfa_status == "TIMEOUT":
logger.error("Device approval window has expired.")
sys.exit(2)
time.sleep(1)
return mfa_verify