Skip to content

Commit

Permalink
Support Push + Number Challenge
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmxgti committed Jan 24, 2023
1 parent b529c91 commit 89e5d35
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 45 deletions.
4 changes: 4 additions & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ Tokendito is written and maintained by Cloud Security and Engineering at
Dow Jones, and various contributors:

## Dow Jones Cloud Security and Engineering

* Sydney Sweeney
* Jean-Pierre Sevigny
* Nico Halpern

## Patches and more

* Kuber Kaul
* Steve Stevenson
* Roman Sluzhynskyy
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ The following changes are part of this release:
- Automatically discover AWS URLs.
- Fix authentication with DUO.
- Add support for setting the logging level via both the INI file and ENV vars.
- Add support for Python 3.9 and 3.10.
- Add support for Python 3.9, 3.10, and 3.11.
- And many fixes.

Consult [additional notes](docs/README.md) for how to use tokendito.
Consult [additional notes](https://github.com/dowjones/tokendito/blob/main/docs/README.md) for how to use tokendito.

## Requirements

Expand All @@ -53,9 +53,10 @@ pip or pip3.
3. Run `tokendito`.

**NOTE**: Advanced users may shorten the `tokendito` interaction to a [single
command](docs/README.md#single-command-usage).
command](https://github.com/dowjones/tokendito/blob/main/docs/README.md#single-command-usage).

Have multiple Okta tiles to switch between? View our [multi-tile
guide](docs/README.md#multi-tile-guide).
guide](https://github.com/dowjones/tokendito/blob/main/docs/README.md#multi-tile-guide).

### Tips, tricks, troubleshooting, examples, and more docs are [here](docs/README.md)! Also, [contributions are welcome](docs/CONTRIBUTING.md)!
### Tips, tricks, troubleshooting, examples, and more docs are [here]()https://github.com/dowjones/tokendito/blob/main/docs/README.md!
[Contributions are welcome](https://github.com/dowjones/tokendito/blob/main/docs/CONTRIBUTING.md)!
58 changes: 48 additions & 10 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,26 +839,64 @@ def test_user_mfa_challenge_with_no_mfas(sample_headers, sample_json_response):


@pytest.mark.parametrize(
"wrapper_return,expected",
"return_value,side_effect,expected",
[
({"factorResult": "REJECTED", "status": "FAILED"}, 2),
({"factorResult": "TIMEOUT", "status": "FAILED"}, 2),
({"status": "FAILED"}, 1),
({}, 1),
({"factorResult": "SUCCESS", "status": "SUCCESS"}, 0),
({"status": "SUCCESS", "sessionToken": "pytest"}, None, 0),
({"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"}, None, 0),
({"status": "MFA_CHALLENGE", "factorResult": "REJECTED"}, None, 2),
({"status": "MFA_CHALLENGE", "factorResult": "TIMEOUT"}, None, 2),
({"status": "UNKNOWN", "factorResult": "UNKNOWN"}, None, 2),
(
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_links": {"next": {"href": None}},
},
[
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_links": {"next": {"href": None}},
},
{"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"},
],
0,
),
(
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_embedded": {"factor": {"_embedded": {"challenge": {"correctAnswer": 100}}}},
"_links": {"next": {"href": None}},
},
[
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_embedded": {"factor": {"_embedded": {"challenge": {"correctAnswer": 100}}}},
"_links": {"next": {"href": None}},
},
{"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"},
],
0,
),
],
)
def test_push_approval(mocker, sample_headers, wrapper_return, expected):
def test_push_approval(mocker, sample_headers, return_value, side_effect, expected):
"""Test push approval."""
from tokendito import okta

challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify"

mocker.patch("tokendito.okta.api_wrapper", return_value=wrapper_return)
mocker.patch("tokendito.okta.api_wrapper", return_value=return_value, side_effect=side_effect)
mocker.patch("time.sleep", return_value=0)

if "status" in wrapper_return and wrapper_return["status"] == "SUCCESS":
if "status" in return_value and return_value["status"] == "SUCCESS":
ret = okta.push_approval(sample_headers, challenge_url, None)
assert ret["status"] == "SUCCESS"
elif "factorResult" in return_value and return_value["factorResult"] == "WAITING":
ret = okta.push_approval(sample_headers, challenge_url, None)
assert ret["status"] == "SUCCESS" and ret["factorResult"] == "SUCCESS"
assert ret["status"] == "SUCCESS"
else:
with pytest.raises(SystemExit) as err:
okta.push_approval(sample_headers, challenge_url, None)
Expand Down
2 changes: 1 addition & 1 deletion tokendito/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from platformdirs import user_config_dir

__version__ = "2.0.0"
__version__ = "2.1.0"
__title__ = "tokendito"
__description__ = "Get AWS STS tokens from Okta SSO"
__long_description_content_type__ = "text/markdown"
Expand Down
78 changes: 49 additions & 29 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,36 +283,56 @@ def push_approval(headers, mfa_challenge_url, payload):
:return: Session Token if succeeded or terminates if user wait goes 5 min
"""
logger.debug(
f"Handle push approval from the user headers:{headers} challenge_url:{mfa_challenge_url}"
)
logger.debug(f"Push approval with headers:{headers} challenge_url:{mfa_challenge_url}")

user.print("Waiting for an approval from the device...")
mfa_status = "WAITING"
mfa_verify = {}
while mfa_status == "WAITING":
mfa_verify = api_wrapper(mfa_challenge_url, payload, headers)

logger.debug(f"MFA Response:\n{json.dumps(mfa_verify)}")

if "factorResult" in mfa_verify:
mfa_status = mfa_verify["factorResult"]
elif "status" in mfa_verify and mfa_verify["status"] == "SUCCESS":
break
else:
logger.error("There was an error getting your MFA status.")
logger.debug(f"{mfa_verify}")
if "status" in mfa_verify:
logger.error(f"Exiting due to error: {mfa_verify['status']}")
sys.exit(1)

if mfa_status == "REJECTED":
logger.error("The Okta Verify push has been denied. Please retry later.")
sys.exit(2)
elif mfa_status == "TIMEOUT":
logger.error("Device approval window has expired.")
sys.exit(2)

status = "MFA_CHALLENGE"
result = "WAITING"
response = {}
challenge_displayed = False

while status == "MFA_CHALLENGE" and result == "WAITING":
response = api_wrapper(mfa_challenge_url, payload, headers)
if "sessionToken" in response:
user.add_sensitive_value_to_be_masked(response["sessionToken"])

logger.debug(f"MFA Response:\n{json.dumps(response)}")
# Retrieve these values from the object, and set a sensible default if they do not
# exist.
status = response.get("status", "UNKNOWN")
result = response.get("factorResult", "UNKNOWN")

# The docs at https://developer.okta.com/docs/reference/api/authn/#verify-push-factor
# state that the call will return a factorResult in [ SUCCESS, REJECTED, TIMEOUT,
# WAITING]. However, on success, SUCCESS is not set and we have to rely on the
# response["status"] instead
answer = (
response.get("_embedded", {})
.get("factor", {})
.get("_embedded", {})
.get("challenge", {})
.get("correctAnswer", None)
)
if answer and not challenge_displayed:
# If a Number Challenge response exists, retrieve it from this deeply nested path,
# otherwise set to None.
user.print(f"Number Challenge response is {answer}")
challenge_displayed = True
time.sleep(1)

return mfa_verify
if status == "SUCCESS" and "sessionToken" in response:
# noop, we will return the variable later
pass
# Everything else should have a status of "MFA_CHALLENGE", and the result provides a
# hint on why the challenge failed.
elif result == "REJECTED":
logger.error("The Okta Verify push has been denied.")
sys.exit(2)
elif result == "TIMEOUT":
logger.error("Device approval window has expired.")
sys.exit(2)
else:
logger.error(f"Push response type {result} for {status} not implemented.")
sys.exit(2)

return response

0 comments on commit 89e5d35

Please sign in to comment.