Skip to content

Commit

Permalink
Merge d85cca1 into 69202ce
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmxgti committed Oct 5, 2022
2 parents 69202ce + d85cca1 commit 1b1ba26
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 112 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[tool.black]
line-length = 100
target-version = ['py36', 'py37', 'py38']
target-version = ['py36', 'py37', 'py38', 'py39', 'py310', 'py311']
164 changes: 157 additions & 7 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def test_setup_logging():
ret = user.setup_logging(args)
assert ret == logging.INFO

# test that a default level is set on a bad level
args = {"loglevel": "pytest"}
ret = user.setup_logging(args)
assert ret == logging.INFO


def test_setup_early_logging(monkeypatch, tmpdir):
"""Test early logging."""
Expand Down Expand Up @@ -127,14 +132,14 @@ def test_get_interactive_config(mocker):
"""Test if interactive configuration is collected correctly."""
from tokendito import user

# test that all values return
# test that all values return correctly
ret = user.get_interactive_config(
app_url="https://pytest", org_url="https://pytest", username="pytest"
app_url="https://pytest/pytest", org_url="https://pytest", username="pytest"
)
assert (
ret["okta_username"] == "pytest"
and ret["okta_org_url"] == "https://pytest"
and ret["okta_app_url"] == "https://pytest"
and ret["okta_app_url"] == "https://pytest/pytest"
)

# test that interactive values are handled correctly
Expand Down Expand Up @@ -252,6 +257,12 @@ def test_display_selected_role():
ret = user.display_selected_role("pytest", {"Credentials": {"Expiration": utcnow}})
assert ret is not None and "pytest" in ret

with pytest.raises(SystemExit) as err:
ret = user.display_selected_role("pytest", {"pytest": {}})
assert err.value.code == 1

assert ret is not None and "pytest" in ret


@pytest.mark.parametrize(
"url,expected",
Expand Down Expand Up @@ -364,6 +375,10 @@ def test_utc_to_local():

assert user.utc_to_local(utc) == local_time

with pytest.raises(SystemExit) as err:
user.utc_to_local("pytest")
assert err.value.code == 1


def test_set_passcode(mocker):
"""Check if numerical passcode can handle leading zero values."""
Expand Down Expand Up @@ -922,7 +937,8 @@ def test_default_loglevel():
def test_loglevel_collected_from_env(monkeypatch, tmpdir):
"""Ensure that the loglevel collected from env vars."""
from argparse import Namespace
from tokendito import user, config, Config
import logging
from tokendito import user

args = {
"okta_username": "pytest_arg",
Expand All @@ -935,8 +951,142 @@ def test_loglevel_collected_from_env(monkeypatch, tmpdir):

monkeypatch.setenv("TOKENDITO_USER_LOGLEVEL", "DEBUG")
monkeypatch.setattr(user, "parse_cli_args", lambda *x: Namespace(**args))
monkeypatch.setattr(user, "process_ini_file", lambda *x: Config())
ret = user.setup_early_logging(args)["loglevel"]
val = logging.getLevelName(ret)

assert val == logging.DEBUG


def test_create_directory(tmpdir):
"""Test dir creation."""
from tokendito import user

path = tmpdir.mkdir("pytest")
testdir = f"{path}/pytest"

ret = user.create_directory(testdir)
assert ret is None

with pytest.raises(SystemExit) as err:
user.create_directory(__file__)
assert err.value.code == 1


def test_get_submodules_names():
"""Test whether submodules are retrieves correctly."""
from tokendito import user

ret = user.get_submodule_names()
assert "__main__" in ret

ret = user.get_submodule_names("")
assert ret == []


def test_process_interactive_input(mocker):
"""Test interactive input processor."""
from tokendito import user, Config

user.process_options(None)
# Check that a good object retrieves an interactive password
mocker.patch("getpass.getpass", return_value="pytest_password")
config = dict(okta=dict())
pytest_config = Config(**config)
pytest_config.okta["app_url"] = "https://pytest/appurl"
pytest_config.okta["org"] = "https://pytest/"
pytest_config.okta["username"] = "pytest"
ret = user.process_interactive_input(pytest_config)
assert ret.okta["password"] == "pytest_password"

# Check that a bad object raises an exception
with pytest.raises(SystemExit) as error:
assert user.process_interactive_input({"pytest": "pytest"}) == error


@pytest.mark.parametrize(
"config,expected",
[
({}, False),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.org",
"app_url": None,
}
},
True,
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.org",
"app_url": "https://badurl_pytest.org",
}
},
False,
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.org",
"app_url": "https://acme.okta.org/home/amazon_aws/0123456789abcdef0123/456?fromHome=true", # noqa: E501
}
},
True,
),
(
{
"okta": {
"username": "pytest",
"password": "pytest",
"org": "https://acme.okta.com/",
"app_url": "https://acme.okta.org/home/amazon_aws/0123456789abcdef0123/456?fromHome=true", # noqa: E501
}
},
False,
),
],
)
def test_validate_configuration(config, expected):
"""Test configuration validator."""
from tokendito import user, Config

pytest_config = Config(**config)
assert user.validate_configuration(pytest_config) is expected


def test_sanitize_config_values():
"""Test configuration sanitizer method."""
from tokendito import user, Config

pytest_config = Config(
aws=dict(output="pytest", region="pytest"),
okta=dict(app_url="https://pytest_org", org="https://pytest_bar/"),
)
ret = user.sanitize_config_values(pytest_config)
assert ret.aws["region"] == pytest_config.get_defaults()["aws"]["region"]
assert ret.aws["output"] == pytest_config.get_defaults()["aws"]["output"]
assert ret.okta["app_url"].startswith(ret.okta["org"])


def test_get_regions():
"""Test retrieval of available AWS regions."""
from tokendito import aws

ret = aws.get_regions(profile="pytest")
assert ret == []
ret = aws.get_regions()
assert "us-east-1" in ret


def test_get_output_types():
"""Test getting AWS output types."""
from tokendito import aws

assert config.user["loglevel"] == "DEBUG"
ret = aws.get_output_types()
assert "json" in ret
50 changes: 28 additions & 22 deletions tokendito/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@


logger = logging.getLogger(__name__)
DEFAULT_OUTPUT_TYPES = ["json", "text", "csv", "yaml", "yaml-stream"]


def get_regions(profile=None):
"""Get avaliable regions from botocore.
:return: List of available regions.
"""
regions = []
try:
session = botocore.session.get_session(env_vars=profile)
regions = session.get_available_regions("sts")
logger.debug(f"Found AWS regions: {regions}")
except Exception:
pass
return regions


def get_output_types():
"""Provide available output types.
Currently, this cannot be done dynamically.
:return: List of available output types.
"""
return DEFAULT_OUTPUT_TYPES


def authenticate_to_roles(secret_session_token, urls, cookies=None):
Expand All @@ -41,29 +66,10 @@ def authenticate_to_roles(secret_session_token, urls, cookies=None):
logger.debug(f"Authenticate AWS user with SAML URL [{url}]")

response = requests.get(url, params=payload, cookies=cookies)
response.raise_for_status()
saml_response_string = response.text
if response.status_code == 400 or response.status_code == 401:
errmsg = "Invalid Credentials."
logger.error(f"{errmsg}\nExiting with code:{response.status_code}")
sys.exit(2)
elif response.status_code == 404:
errmsg = "Invalid Okta application URL. Please verify your configuration."
logger.error(f"{errmsg}")
sys.exit(2)
elif response.status_code >= 500 and response.status_code < 504:
errmsg = (
"Unable to establish connection with Okta. Verify Okta Org URL and try again."
)
logger.error(f"{errmsg}\nExiting with code:{response.status_code}")
sys.exit(2)
elif response.status_code != 200:
logger.error(f"Exiting with code:{response.status_code}")
logger.debug(saml_response_string)
sys.exit(2)

except Exception as error:
errmsg = f"Okta auth failed:\n{error}"
logger.error(errmsg)
except Exception as err:
logger.error(f"There was an error with the call to {url}: {err}")
sys.exit(1)

saml_xml = user.validate_saml_response(saml_response_string)
Expand Down
4 changes: 2 additions & 2 deletions tokendito/duo.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_duo_devices(duo_auth):
"""
soup = BeautifulSoup(duo_auth.content, "html.parser")

device_soup = soup.find("select", {"name": "device"}).findAll("option")
device_soup = soup.find("select", {"name": "device"}).findAll("option") # type: ignore
devices = [f"{d['value']} - {d.text}" for d in device_soup]
if not devices:
logger.error("Please configure devices for your Duo MFA and retry.")
Expand All @@ -123,7 +123,7 @@ def get_duo_devices(duo_auth):
factor_options = []
for device in devices:
options = soup.find("fieldset", {"data-device-index": device.split(" - ")[0]})
factors = options.findAll("input", {"name": "factor"})
factors = options.findAll("input", {"name": "factor"}) # type: ignore (PEP 561)
for factor in factors:
factor_option = {"device": device, "factor": factor["value"]}
factor_options.append(factor_option)
Expand Down
17 changes: 8 additions & 9 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,18 @@ def user_session_token(primary_auth, headers):
return session_token


def authenticate_user(url, username, password):
def authenticate_user(config):
"""Authenticate user with okta credential.
:param url: company specific URL of the okta
:param username: okta username
:param password: okta password
:return: MFA session options
:param config: Config object
:return: MFA session with options
"""
headers = {"content-type": "application/json", "accept": "application/json"}
payload = {"username": username, "password": password}
payload = {"username": config.okta["username"], "password": config.okta["password"]}

logger.debug(f"Authenticate Okta headers [{headers}] ")
primary_auth = api_wrapper(f"{url}/api/v1/authn", payload, headers)
logger.debug("Authenticate user to Okta")
logger.debug(f"Sending {headers}, {payload} to {config.okta['org']}")
primary_auth = api_wrapper(f"{config.okta['org']}/api/v1/authn", payload, headers)

session_token = user_session_token(primary_auth, headers)
logger.info("User has been succesfully authenticated.")
Expand Down Expand Up @@ -306,7 +305,7 @@ def push_approval(headers, mfa_challenge_url, payload):
logger.error("Device approval window has expired.")
sys.exit(2)

time.sleep(0.5)
time.sleep(1)

if mfa_verify is None:
logger.error("Unknown error in MFA approval.")
Expand Down
30 changes: 14 additions & 16 deletions tokendito/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,35 @@

def cli(args):
"""Tokendito retrieves AWS credentials after authenticating with Okta."""
args = user.parse_cli_args(args)
# Early logging, in case the user requests debugging via env/CLI
user.setup_early_logging(args)

# Set some required initial values
user.process_options(args)
logger.debug(f"Final configuration is {config}")

user.process_interactive_input(config)
# Late logging (default)
user.setup_logging(config.user)

# Authenticate okta and AWS also use assumerole to assign the role
logger.debug("Authenticate user with Okta and AWS.")
# Validate configuration
if not user.validate_configuration(config):
logger.error("Could not validate configuration. Please check your settings and try again.")
sys.exit(1)

secret_session_token = okta.authenticate_user(
config.okta["org"], config.okta["username"], config.okta["password"]
)
# Authenticate okta and AWS also use assumerole to assign the role
session_token = okta.authenticate_user(config)

session_cookies = None

if config.okta["app_url"]:
if not user.validate_okta_app_url(config.okta["app_url"]):
logger.error(
"Okta Application URL not found, or invalid. Please check "
"your configuration and try again."
)
sys.exit(2)

app_label = ""
config.okta["app_url"] = (config.okta["app_url"], app_label)
else:
session_cookies = user.request_cookies(config.okta["org"], secret_session_token)
session_cookies = user.request_cookies(config.okta["org"], session_token)
config.okta["app_url"] = user.discover_app_url(config.okta["org"], session_cookies)

auth_apps = aws.authenticate_to_roles(
secret_session_token, config.okta["app_url"], cookies=session_cookies
session_token, config.okta["app_url"], cookies=session_cookies
)

(role_response, role_name) = aws.select_assumeable_role(auth_apps)
Expand Down
Loading

0 comments on commit 1b1ba26

Please sign in to comment.