Skip to content

Commit

Permalink
Merge bd48305 into 69202ce
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmxgti committed Oct 19, 2022
2 parents 69202ce + bd48305 commit 5b7743a
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[tool.black]
line-length = 100
target-version = ['py36', 'py37', 'py38']
target-version = ['py36', 'py37', 'py38', 'py39', 'py310', 'py311']
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ def pytest_addoption(parser):
default="/dev/null",
help="Sets an optional config file to read from",
)
parser.addoption("--aws-profile", default="pytest", help="Sets the AWS profile name")
7 changes: 5 additions & 2 deletions tests/functional_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def custom_args(request):
"--okta-mfa-response",
"--aws-role-arn",
"--config-file",
"--aws-profile",
]
arg_list = []
# pytest does not have a method for listing options, so we have look them up.
Expand Down Expand Up @@ -211,6 +212,8 @@ def test_generate_credentials(custom_args):
args = [
"--aws-role-arn",
f"{config.aws['role_arn']}",
"--aws-profile",
f"{config.aws['profile']}",
"--okta-app-url",
f"{config.okta['app_url']}",
"--okta-mfa-method",
Expand Down Expand Up @@ -247,8 +250,8 @@ def test_aws_credentials(custom_args):

if not config.aws["role_arn"]:
pytest.skip("No AWS profile defined, test will be skipped.")
profile = config.aws["role_arn"].split("/")[-1]
runnable = ["aws", "--profile", profile, "sts", "get-caller-identity"]

runnable = ["aws", "--profile", config.aws["profile"], "sts", "get-caller-identity"]
proc = run_process(runnable)
assert not proc["stderr"]
assert proc["exit_status"] == 0
203 changes: 196 additions & 7 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def test_setup_logging():
ret = user.setup_logging(args)
assert ret == logging.INFO

# test that a default level is set on a bad level
args = {"loglevel": "pytest"}
ret = user.setup_logging(args)
assert ret == logging.INFO


def test_setup_early_logging(monkeypatch, tmpdir):
"""Test early logging."""
Expand Down Expand Up @@ -127,14 +132,14 @@ def test_get_interactive_config(mocker):
"""Test if interactive configuration is collected correctly."""
from tokendito import user

# test that all values return
# test that all values return correctly
ret = user.get_interactive_config(
app_url="https://pytest", org_url="https://pytest", username="pytest"
app_url="https://pytest/pytest", org_url="https://pytest", username="pytest"
)
assert (
ret["okta_username"] == "pytest"
and ret["okta_org_url"] == "https://pytest"
and ret["okta_app_url"] == "https://pytest"
and ret["okta_app_url"] == "https://pytest/pytest"
)

# test that interactive values are handled correctly
Expand Down Expand Up @@ -252,6 +257,12 @@ def test_display_selected_role():
ret = user.display_selected_role("pytest", {"Credentials": {"Expiration": utcnow}})
assert ret is not None and "pytest" in ret

with pytest.raises(SystemExit) as err:
ret = user.display_selected_role("pytest", {"pytest": {}})
assert err.value.code == 1

assert ret is not None and "pytest" in ret


@pytest.mark.parametrize(
"url,expected",
Expand Down Expand Up @@ -364,6 +375,10 @@ def test_utc_to_local():

assert user.utc_to_local(utc) == local_time

with pytest.raises(SystemExit) as err:
user.utc_to_local("pytest")
assert err.value.code == 1


def test_set_passcode(mocker):
"""Check if numerical passcode can handle leading zero values."""
Expand Down Expand Up @@ -922,7 +937,8 @@ def test_default_loglevel():
def test_loglevel_collected_from_env(monkeypatch, tmpdir):
"""Ensure that the loglevel collected from env vars."""
from argparse import Namespace
from tokendito import user, config, Config
import logging
from tokendito import user

args = {
"okta_username": "pytest_arg",
Expand All @@ -935,8 +951,181 @@ def test_loglevel_collected_from_env(monkeypatch, tmpdir):

monkeypatch.setenv("TOKENDITO_USER_LOGLEVEL", "DEBUG")
monkeypatch.setattr(user, "parse_cli_args", lambda *x: Namespace(**args))
monkeypatch.setattr(user, "process_ini_file", lambda *x: Config())
ret = user.setup_early_logging(args)["loglevel"]
val = logging.getLevelName(ret)

assert val == logging.DEBUG


def test_create_directory(tmpdir):
"""Test dir creation."""
from tokendito import user

path = tmpdir.mkdir("pytest")
testdir = f"{path}/pytest"

ret = user.create_directory(testdir)
assert ret is None

with pytest.raises(SystemExit) as err:
user.create_directory(__file__)
assert err.value.code == 1


def test_get_submodules_names():
"""Test whether submodules are retrieves correctly."""
from tokendito import user

ret = user.get_submodule_names()
assert "__main__" in ret

ret = user.get_submodule_names("")
assert ret == []


def test_process_interactive_input(mocker):
"""Test interactive input processor."""
from tokendito import user, Config

# Check that a good object retrieves an interactive password
mocker.patch("getpass.getpass", return_value="pytest_password")
config = dict(okta=dict())
pytest_config = Config(**config)
pytest_config.okta["app_url"] = "https://pytest/appurl"
pytest_config.okta["org"] = "https://pytest/"
pytest_config.okta["username"] = "pytest"
ret = user.process_interactive_input(pytest_config)
assert ret.okta["password"] == "pytest_password"

# Check that a bad object raises an exception
with pytest.raises(SystemExit) as error:
assert user.process_interactive_input({"pytest": "pytest"}) == error


user.process_options(None)
@pytest.mark.parametrize(
"value,submit,expected",
[
("pytest", None, "pytest"),
("pytest", "deadbeef", "pytest"),
("pytest", 0xDEADBEEF, "pytest"),
(None, None, "default"),
(None, "", "default"),
(None, 0xDEADBEEF, str(0xDEADBEEF)),
],
)
def test_set_role_name(value, submit, expected):
"""Test setting the AWS Role (profile) name."""
from tokendito import user, Config

pytest_config = Config(aws=dict(profile=value))

ret = user.set_role_name(pytest_config, submit)
assert ret.aws["profile"] == expected


@pytest.mark.parametrize(
"config,expected",
[
(
{"okta": {"username": "", "password": "", "org": None, "app_url": None}},
[
"Username not set.",
"Password not set.",
"Either Okta Org or App URL must be defined.",
],
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.org",
"app_url": None,
}
},
[],
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.org",
"app_url": "https://badurl_pytest.org",
}
},
[
"Tile URL https://badurl_pytest.org is not valid.",
"Org URL https://acme.okta.org and Tile URL "
"https://badurl_pytest.org must be in the same domain.",
],
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.org",
"app_url": "https://acme.okta.org/home/amazon_aws/"
"0123456789abcdef0123/456?fromHome=true",
}
},
[],
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.com/",
"app_url": "https://acme.okta.org/home/amazon_aws/"
"0123456789abcdef0123/456?fromHome=true",
}
},
[
"Org URL https://acme.okta.com/ and Tile URL "
"https://acme.okta.org/home/amazon_aws/"
"0123456789abcdef0123/456?fromHome=true must be in the same domain."
],
),
],
)
def test_validate_configuration(config, expected):
"""Test configuration validator."""
from tokendito import user, Config

pytest_cfg = Config(**config)
print(pytest_cfg)
assert user.validate_configuration(pytest_cfg) == expected


def test_sanitize_config_values():
"""Test configuration sanitizer method."""
from tokendito import user, Config

pytest_config = Config(
aws=dict(output="pytest", region="pytest"),
okta=dict(app_url="https://pytest_org", org="https://pytest_bar/"),
)
ret = user.sanitize_config_values(pytest_config)
assert ret.aws["region"] == pytest_config.get_defaults()["aws"]["region"]
assert ret.aws["output"] == pytest_config.get_defaults()["aws"]["output"]
assert ret.okta["app_url"].startswith(ret.okta["org"])


def test_get_regions():
"""Test retrieval of available AWS regions."""
from tokendito import aws

ret = aws.get_regions(profile="pytest")
assert ret == []
ret = aws.get_regions()
assert "us-east-1" in ret


def test_get_output_types():
"""Test getting AWS output types."""
from tokendito import aws

assert config.user["loglevel"] == "DEBUG"
ret = aws.get_output_types()
assert "json" in ret
58 changes: 32 additions & 26 deletions tokendito/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@


logger = logging.getLogger(__name__)
DEFAULT_OUTPUT_TYPES = ["json", "text", "csv", "yaml", "yaml-stream"]


def get_regions(profile=None):
"""Get avaliable regions from botocore.
:return: List of available regions.
"""
regions = []
try:
session = botocore.session.get_session(env_vars=profile)
regions = session.get_available_regions("sts")
logger.debug(f"Found AWS regions: {regions}")
except Exception:
pass
return regions


def get_output_types():
"""Provide available output types.
Currently, this cannot be done dynamically.
:return: List of available output types.
"""
return DEFAULT_OUTPUT_TYPES


def authenticate_to_roles(secret_session_token, urls, cookies=None):
Expand All @@ -41,29 +66,10 @@ def authenticate_to_roles(secret_session_token, urls, cookies=None):
logger.debug(f"Authenticate AWS user with SAML URL [{url}]")

response = requests.get(url, params=payload, cookies=cookies)
response.raise_for_status()
saml_response_string = response.text
if response.status_code == 400 or response.status_code == 401:
errmsg = "Invalid Credentials."
logger.error(f"{errmsg}\nExiting with code:{response.status_code}")
sys.exit(2)
elif response.status_code == 404:
errmsg = "Invalid Okta application URL. Please verify your configuration."
logger.error(f"{errmsg}")
sys.exit(2)
elif response.status_code >= 500 and response.status_code < 504:
errmsg = (
"Unable to establish connection with Okta. Verify Okta Org URL and try again."
)
logger.error(f"{errmsg}\nExiting with code:{response.status_code}")
sys.exit(2)
elif response.status_code != 200:
logger.error(f"Exiting with code:{response.status_code}")
logger.debug(saml_response_string)
sys.exit(2)

except Exception as error:
errmsg = f"Okta auth failed:\n{error}"
logger.error(errmsg)
except Exception as err:
logger.error(f"There was an error with the call to {url}: {err}")
sys.exit(1)

saml_xml = user.validate_saml_response(saml_response_string)
Expand Down Expand Up @@ -163,7 +169,7 @@ def assert_credentials(role_response={}):
aws_access_key = role_response["Credentials"]["AccessKeyId"]
aws_secret_key = role_response["Credentials"]["SecretAccessKey"]
aws_session_token = role_response["Credentials"]["SessionToken"]
except KeyError:
except (KeyError, TypeError):
logger.error("SAML Response did not contain credentials")
sys.exit(1)

Expand All @@ -188,8 +194,8 @@ def assert_credentials(role_response={}):
def select_assumeable_role(apps):
"""Select the role to perform the AssumeRoleWithSaml.
# :param apps: apps metadata, list of tuples
# :return: AWS AssumeRoleWithSaml response, role name, tuple
:param apps: apps metadata, list of tuples
:return: tuple with AWS AssumeRoleWithSaml response and role name
"""
authenticated_aps = {}
for url, saml_response, saml, label in apps:
Expand All @@ -211,4 +217,4 @@ def select_assumeable_role(apps):
authenticated_aps[_id]["saml"],
)

return assume_role_response, role_name
return (assume_role_response, role_name)
4 changes: 2 additions & 2 deletions tokendito/duo.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_duo_devices(duo_auth):
"""
soup = BeautifulSoup(duo_auth.content, "html.parser")

device_soup = soup.find("select", {"name": "device"}).findAll("option")
device_soup = soup.find("select", {"name": "device"}).findAll("option") # type: ignore
devices = [f"{d['value']} - {d.text}" for d in device_soup]
if not devices:
logger.error("Please configure devices for your Duo MFA and retry.")
Expand All @@ -123,7 +123,7 @@ def get_duo_devices(duo_auth):
factor_options = []
for device in devices:
options = soup.find("fieldset", {"data-device-index": device.split(" - ")[0]})
factors = options.findAll("input", {"name": "factor"})
factors = options.findAll("input", {"name": "factor"}) # type: ignore (PEP 561)
for factor in factors:
factor_option = {"device": device, "factor": factor["value"]}
factor_options.append(factor_option)
Expand Down
Loading

0 comments on commit 5b7743a

Please sign in to comment.