From 2ef27271be1b0b681a21ec740f7e0d12087cba03 Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Wed, 2 Nov 2022 16:29:42 -0400 Subject: [PATCH] fix existing tests --- pyproject.toml | 1 + tests/unit_test.py | 62 ++++++++++++++++++++++++++++++++++++++++------ tokendito/aws.py | 4 +-- tokendito/okta.py | 11 ++++---- tokendito/user.py | 2 +- 5 files changed, 63 insertions(+), 17 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 37c13bde..cc46ecb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,5 +7,6 @@ exclude_lines = [ "except KeyboardInterrupt:", "if __name__ == .__main__.:", "if __package__ is None:", + "logger.debug", "pragma: no cover", ] diff --git a/tests/unit_test.py b/tests/unit_test.py index 56274fe5..7686214c 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -147,6 +147,26 @@ def test_get_interactive_config(mocker): ret = user.get_interactive_config(app_url=None, org_url=None, username="pytest") assert ret["okta_username"] == "pytest" and ret["okta_org"] == "https://pytest" + # test that a username is collected + mocker.patch("tokendito.user.get_username", return_value="pytests") + ret = user.get_interactive_config( + app_url="https://pytest/pytest", org_url="https://pytest/", username="" + ) + assert ret["okta_username"] == "pytests" + + +def test_collect_integer(mocker): + """Test whether integers from the user are retrieved.""" + from tokendito import user + + mocker.patch("tokendito.user.IntPrompt.ask", return_value=[]) + ret = user.collect_integer() + assert ret == [] + + mocker.patch("tokendito.user.IntPrompt.ask", return_value=0) + ret = user.collect_integer() + assert ret == 0 + @pytest.mark.parametrize( "url,expected", @@ -206,7 +226,7 @@ def test_assert_credentials(): with pytest.raises(SystemExit) as err: aws.assert_credentials({}) - assert err.value.code == 2 + assert err.value.code == 1 saml_response = { "Credentials": { @@ -227,7 +247,7 @@ def test_set_local_credentials(tmpdir): # evaluate that we exit on a bad role with pytest.raises(SystemExit) as err: user.set_local_credentials(response={}, role="pytest", region="pytest", output="pytest") - assert err.value.code == 1 + assert err.value.code == 1 # evaluate that we succeed on a working role response = { @@ -293,7 +313,7 @@ def test_display_selected_role(): with pytest.raises(SystemExit) as err: ret = user.display_selected_role("pytest", {"pytest": {}}) - assert err.value.code == 1 + assert err.value.code == 1 assert ret is not None and "pytest" in ret @@ -359,7 +379,7 @@ def test_utc_to_local(): with pytest.raises(SystemExit) as err: user.utc_to_local("pytest") - assert err.value.code == 1 + assert err.value.code == 1 def test_set_passcode(mocker): @@ -435,7 +455,7 @@ def test_process_ini_file(tmpdir): # Ensure we fail if the section is not found with pytest.raises(SystemExit) as err: user.process_ini_file(path, "pytest_expected_failure") - assert err.value.code == 2 + assert err.value.code == 2 ret = user.process_ini_file(path, "pytest") assert ret.okta["username"] == "pytest" @@ -443,7 +463,7 @@ def test_process_ini_file(tmpdir): with pytest.raises(SystemExit) as err: user.process_ini_file(path, "pytest_expected_element_failure") - assert err.value.code == 2 + assert err.value.code == 1 @pytest.mark.parametrize( @@ -717,6 +737,34 @@ def test_user_mfa_challenge_with_no_mfa_methods(sample_headers, sample_json_resp assert user_mfa_challenge(sample_headers, primary_auth) == error +@pytest.mark.parametrize( + "wrapper_return,expected", + [ + ({"factorResult": "REJECTED", "status": "FAILED"}, 2), + ({"factorResult": "TIMEOUT", "status": "FAILED"}, 2), + ({"status": "FAILED"}, 1), + ({}, 1), + ({"factorResult": "SUCCESS", "status": "SUCCESS"}, 0), + ], +) +def test_push_approval(mocker, wrapper_return, expected): + """Test push approval.""" + from tokendito import okta + + headers = {"content-type": "application/json", "accept": "application/json"} + challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify" + + mocker.patch("tokendito.okta.api_wrapper", return_value=wrapper_return) + + if "status" in wrapper_return and wrapper_return["status"] == "SUCCESS": + ret = okta.push_approval(headers, challenge_url, None) + assert ret["status"] == "SUCCESS" and ret["factorResult"] == "SUCCESS" + else: + with pytest.raises(SystemExit) as err: + okta.push_approval(headers, challenge_url, None) + assert err.value.code == expected + + @pytest.mark.parametrize( "selected_role", [ @@ -914,7 +962,7 @@ def test_config_object(): assert pytest_config.get_defaults()["aws"]["region"] == pytest_config.aws["region"] -def test_loglevel_collected_from_env(monkeypatch, tmpdir): +def test_loglevel_collected_from_env(monkeypatch): """Ensure that the loglevel collected from env vars.""" from argparse import Namespace import logging diff --git a/tokendito/aws.py b/tokendito/aws.py index 21f530b2..ff57b1dc 100644 --- a/tokendito/aws.py +++ b/tokendito/aws.py @@ -86,9 +86,7 @@ def assume_role(role_arn, provider_arn, saml): :return: AssumeRoleWithSaml API response """ - default_error = ( - "\nUnable to assume role with the following details:\n- Role ARN: {}\n- Error: {}\n" - ) + default_error = "Unable to assume role {}: {}" encoded_xml = codecs.encode(saml.encode("utf-8"), "base64") assume_role_response = None diff --git a/tokendito/okta.py b/tokendito/okta.py index 9717dffb..0ec6c80c 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -283,7 +283,9 @@ def push_approval(headers, mfa_challenge_url, payload): :return: Session Token if succeeded or terminates if user wait goes 5 min """ - logger.debug(f"Handle push approval from the user [{headers}] [{mfa_challenge_url}]") + logger.debug( + f"Handle push approval from the user headers:{headers} challenge_url:{mfa_challenge_url}" + ) user.print("Waiting for an approval from the device...") mfa_status = "WAITING" @@ -295,11 +297,11 @@ def push_approval(headers, mfa_challenge_url, payload): if "factorResult" in mfa_verify: mfa_status = mfa_verify["factorResult"] - elif mfa_verify["status"] == "SUCCESS": + elif "status" in mfa_verify and mfa_verify["status"] == "SUCCESS": break else: logger.error("There was an error getting your MFA status.") - logger.debug(mfa_verify) + logger.debug(f"{mfa_verify}") if "status" in mfa_verify: logger.error(f"Exiting due to error: {mfa_verify['status']}") sys.exit(1) @@ -313,7 +315,4 @@ def push_approval(headers, mfa_challenge_url, payload): time.sleep(1) - if mfa_verify is None: - logger.error("Unknown error in MFA approval.") - sys.exit(2) return mfa_verify diff --git a/tokendito/user.py b/tokendito/user.py index 1fabe86e..12fff29d 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -1022,7 +1022,7 @@ def get_input(prompt=""): return user_input -def collect_integer(valid_range=None): +def collect_integer(valid_range=0): """Collect input from user. Prompt the user for input. Validate it and cast to integer.