Skip to content

Commit

Permalink
fix existing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmxgti committed Nov 2, 2022
1 parent dba0630 commit 2ef2727
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 17 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ exclude_lines = [
"except KeyboardInterrupt:",
"if __name__ == .__main__.:",
"if __package__ is None:",
"logger.debug",
"pragma: no cover",
]
62 changes: 55 additions & 7 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,26 @@ def test_get_interactive_config(mocker):
ret = user.get_interactive_config(app_url=None, org_url=None, username="pytest")
assert ret["okta_username"] == "pytest" and ret["okta_org"] == "https://pytest"

# test that a username is collected
mocker.patch("tokendito.user.get_username", return_value="pytests")
ret = user.get_interactive_config(
app_url="https://pytest/pytest", org_url="https://pytest/", username=""
)
assert ret["okta_username"] == "pytests"


def test_collect_integer(mocker):
"""Test whether integers from the user are retrieved."""
from tokendito import user

mocker.patch("tokendito.user.IntPrompt.ask", return_value=[])
ret = user.collect_integer()
assert ret == []

mocker.patch("tokendito.user.IntPrompt.ask", return_value=0)
ret = user.collect_integer()
assert ret == 0


@pytest.mark.parametrize(
"url,expected",
Expand Down Expand Up @@ -206,7 +226,7 @@ def test_assert_credentials():

with pytest.raises(SystemExit) as err:
aws.assert_credentials({})
assert err.value.code == 2
assert err.value.code == 1

saml_response = {
"Credentials": {
Expand All @@ -227,7 +247,7 @@ def test_set_local_credentials(tmpdir):
# evaluate that we exit on a bad role
with pytest.raises(SystemExit) as err:
user.set_local_credentials(response={}, role="pytest", region="pytest", output="pytest")
assert err.value.code == 1
assert err.value.code == 1

# evaluate that we succeed on a working role
response = {
Expand Down Expand Up @@ -293,7 +313,7 @@ def test_display_selected_role():

with pytest.raises(SystemExit) as err:
ret = user.display_selected_role("pytest", {"pytest": {}})
assert err.value.code == 1
assert err.value.code == 1

assert ret is not None and "pytest" in ret

Expand Down Expand Up @@ -359,7 +379,7 @@ def test_utc_to_local():

with pytest.raises(SystemExit) as err:
user.utc_to_local("pytest")
assert err.value.code == 1
assert err.value.code == 1


def test_set_passcode(mocker):
Expand Down Expand Up @@ -435,15 +455,15 @@ def test_process_ini_file(tmpdir):
# Ensure we fail if the section is not found
with pytest.raises(SystemExit) as err:
user.process_ini_file(path, "pytest_expected_failure")
assert err.value.code == 2
assert err.value.code == 2

ret = user.process_ini_file(path, "pytest")
assert ret.okta["username"] == "pytest"
assert ret.okta["password"] == "pytest_password"

with pytest.raises(SystemExit) as err:
user.process_ini_file(path, "pytest_expected_element_failure")
assert err.value.code == 2
assert err.value.code == 1


@pytest.mark.parametrize(
Expand Down Expand Up @@ -717,6 +737,34 @@ def test_user_mfa_challenge_with_no_mfa_methods(sample_headers, sample_json_resp
assert user_mfa_challenge(sample_headers, primary_auth) == error


@pytest.mark.parametrize(
"wrapper_return,expected",
[
({"factorResult": "REJECTED", "status": "FAILED"}, 2),
({"factorResult": "TIMEOUT", "status": "FAILED"}, 2),
({"status": "FAILED"}, 1),
({}, 1),
({"factorResult": "SUCCESS", "status": "SUCCESS"}, 0),
],
)
def test_push_approval(mocker, wrapper_return, expected):
"""Test push approval."""
from tokendito import okta

headers = {"content-type": "application/json", "accept": "application/json"}
challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify"

mocker.patch("tokendito.okta.api_wrapper", return_value=wrapper_return)

if "status" in wrapper_return and wrapper_return["status"] == "SUCCESS":
ret = okta.push_approval(headers, challenge_url, None)
assert ret["status"] == "SUCCESS" and ret["factorResult"] == "SUCCESS"
else:
with pytest.raises(SystemExit) as err:
okta.push_approval(headers, challenge_url, None)
assert err.value.code == expected


@pytest.mark.parametrize(
"selected_role",
[
Expand Down Expand Up @@ -914,7 +962,7 @@ def test_config_object():
assert pytest_config.get_defaults()["aws"]["region"] == pytest_config.aws["region"]


def test_loglevel_collected_from_env(monkeypatch, tmpdir):
def test_loglevel_collected_from_env(monkeypatch):
"""Ensure that the loglevel collected from env vars."""
from argparse import Namespace
import logging
Expand Down
4 changes: 1 addition & 3 deletions tokendito/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ def assume_role(role_arn, provider_arn, saml):
:return: AssumeRoleWithSaml API response
"""
default_error = (
"\nUnable to assume role with the following details:\n- Role ARN: {}\n- Error: {}\n"
)
default_error = "Unable to assume role {}: {}"

encoded_xml = codecs.encode(saml.encode("utf-8"), "base64")
assume_role_response = None
Expand Down
11 changes: 5 additions & 6 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ def push_approval(headers, mfa_challenge_url, payload):
:return: Session Token if succeeded or terminates if user wait goes 5 min
"""
logger.debug(f"Handle push approval from the user [{headers}] [{mfa_challenge_url}]")
logger.debug(
f"Handle push approval from the user headers:{headers} challenge_url:{mfa_challenge_url}"
)

user.print("Waiting for an approval from the device...")
mfa_status = "WAITING"
Expand All @@ -295,11 +297,11 @@ def push_approval(headers, mfa_challenge_url, payload):

if "factorResult" in mfa_verify:
mfa_status = mfa_verify["factorResult"]
elif mfa_verify["status"] == "SUCCESS":
elif "status" in mfa_verify and mfa_verify["status"] == "SUCCESS":
break
else:
logger.error("There was an error getting your MFA status.")
logger.debug(mfa_verify)
logger.debug(f"{mfa_verify}")
if "status" in mfa_verify:
logger.error(f"Exiting due to error: {mfa_verify['status']}")
sys.exit(1)
Expand All @@ -313,7 +315,4 @@ def push_approval(headers, mfa_challenge_url, payload):

time.sleep(1)

if mfa_verify is None:
logger.error("Unknown error in MFA approval.")
sys.exit(2)
return mfa_verify
2 changes: 1 addition & 1 deletion tokendito/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ def get_input(prompt=""):
return user_input


def collect_integer(valid_range=None):
def collect_integer(valid_range=0):
"""Collect input from user.
Prompt the user for input. Validate it and cast to integer.
Expand Down

0 comments on commit 2ef2727

Please sign in to comment.