diff --git a/docs/README.md b/docs/README.md index b277771..b594179 100644 --- a/docs/README.md +++ b/docs/README.md @@ -74,7 +74,7 @@ tokendito --profile engineer usage: tokendito [-h] [--version] [--configure] [--username OKTA_USERNAME] [--password OKTA_PASSWORD] [--profile USER_CONFIG_PROFILE] [--config-file USER_CONFIG_FILE] [--loglevel {DEBUG,INFO,WARN,ERROR}] [--log-output-file USER_LOG_OUTPUT_FILE] [--aws-config-file AWS_CONFIG_FILE] [--aws-output AWS_OUTPUT] [--aws-profile AWS_PROFILE] [--aws-region AWS_REGION] [--aws-role-arn AWS_ROLE_ARN] [--aws-shared-credentials-file AWS_SHARED_CREDENTIALS_FILE] - [--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--quiet] + [--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--use-profile-expiration] [--quiet] Gets an STS token to use with the AWS CLI and SDK. @@ -115,6 +115,8 @@ options: --okta-mfa-response OKTA_MFA_RESPONSE Sets the MFA response to a challenge --use-device-token Use device token across sessions + --use-profile-expiration + Use profile expiration to bypass re-authenticating --quiet Suppress output ``` @@ -157,6 +159,7 @@ The following table lists the environment variable and user configuration entry | `--okta-mfa` | `TOKENDITO_OKTA_MFA` | `okta_mfa` | | `--okta-mfa-response` | `TOKENDITO_OKTA_MFA_RESPONSE` | `okta_mfa_response` | | `--use-device-token` | `TOKENDITO_USER_USE_DEVICE_TOKEN` | `user_use_device_token` | +| `--use-profile-expiration` | `TOKENDITO_USER_USE_PROFILE_EXPIRATION` | `user_use_profile_expiration` | | `--quiet` | `TOKENDITO_USER_QUIET` | `quiet` | # Configuration file location diff --git a/tests/unit/test_user.py b/tests/unit/test_user.py index 0cb823a..3439901 100644 --- a/tests/unit/test_user.py +++ b/tests/unit/test_user.py @@ -1,7 +1,7 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """Unit tests, and local fixtures for the user module.""" -from datetime import datetime +from datetime import datetime, timedelta, timezone import os import sys @@ -466,7 +466,7 @@ def test_update_configuration(tmpdir): assert ret.okta["mfa"] == "pytest" -def test_update_device_token(tmpdir): +def test_update_profile_device_token(tmpdir): """Test writing and reading device token to a configuration file.""" from tokendito import user from tokendito.config import Config @@ -481,11 +481,58 @@ def test_update_device_token(tmpdir): ) # Write out a config file via configure() and ensure it's functional - user.update_device_token(pytest_config) + user.update_profile_device_token(pytest_config) ret = user.process_ini_file(path, "pytest") assert ret.okta["device_token"] == device_token +def test_check_profile_expiration(): + """Test checking profile expiration.""" + from tokendito import user + from tokendito.config import Config + + now = datetime.now(timezone.utc) + future = now + timedelta(days=1) + past = now + timedelta(days=-1) + + pytest_config = Config( + aws={"profile": "test-profile"}, + okta={"profile_expiration": str(future)}, + user={"use_profile_expiration": True}, + ) + + # Expiration in the future should exit + with pytest.raises(SystemExit): + user.check_profile_expiration(pytest_config) + + # Expiration in the past should not exit + pytest_config.okta["profile_expiration"] = str(past) + try: + user.check_profile_expiration(pytest_config) + except SystemExit: + pytest.fail("Profile expiration was invalid and should not have exited") + + +def test_update_profile_expiration(tmpdir): + """Test writing and reading profile expiration to a configuration file.""" + from tokendito import user + from tokendito.config import Config + + path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini") + + expiration = datetime.now(timezone.utc) + + pytest_config = Config( + okta={"profile_expiration": expiration}, + user={"config_file": path, "config_profile": "pytest"}, + ) + + # Write out a config file via configure() and ensure it's functional + user.update_profile_expiration(pytest_config) + ret = user.process_ini_file(path, "pytest") + assert datetime.fromisoformat(ret.okta["profile_expiration"]) == expiration + + def test_process_ini_file(tmpdir): """Test whether ini config elements are set correctly. diff --git a/tokendito/config.py b/tokendito/config.py index 8020a38..0e2708a 100644 --- a/tokendito/config.py +++ b/tokendito/config.py @@ -30,6 +30,7 @@ class Config(object): loglevel="INFO", log_output_file="", use_device_token=False, + use_profile_expiration=False, mask_items=[], quiet=False, ), @@ -50,6 +51,7 @@ class Config(object): tile=None, org=None, device_token=None, + profile_expiration=None, ), ) diff --git a/tokendito/user.py b/tokendito/user.py index 8172c38..158f8ea 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -5,7 +5,7 @@ import builtins import codecs import configparser -from datetime import timezone +from datetime import datetime, timezone from getpass import getpass import json import logging @@ -64,6 +64,8 @@ def cmd_interface(args): ) sys.exit(1) + check_profile_expiration(config) + if config.user["use_device_token"]: device_token = config.okta["device_token"] if device_token: @@ -109,11 +111,7 @@ def cmd_interface(args): output=config.aws["output"], ) - device_token = HTTP_client.get_device_token() - if config.user["use_device_token"] and device_token: - logger.info(f"Saving device token to config profile {args.user_config_profile}") - config.okta["device_token"] = device_token - update_device_token(config) + update_profile(config, role_response) display_selected_role(profile_name=config.aws["profile"], role_response=role_response) @@ -235,6 +233,13 @@ def parse_cli_args(args): default=False, help="Use device token across sessions", ) + parser.add_argument( + "--use-profile-expiration", + dest="user_use_profile_expiration", + action="store_true", + default=False, + help="Use profile expiration to bypass re-authenticating", + ) parser.add_argument( "--quiet", dest="user_quiet", @@ -517,6 +522,65 @@ def prompt_role_choices(aut_tiles): return selected_role +def get_role_expiration(role_response={}): + """Get the expiration from the role response. + + :param role_response: Assume Role response dict + :return expiration + """ + try: + return role_response["Credentials"]["Expiration"] + except (KeyError, TypeError) as err: + logger.error(f"Could not retrieve expiration: {err}") + sys.exit(1) + + +def check_profile_expiration(config): + """Check profile expiration and exit if still valid. + + :param config: Config object + """ + if not config.user["use_profile_expiration"]: + return + + profile = config.aws["profile"] + + profile_expiration_str = config.okta["profile_expiration"] + if not profile_expiration_str: + logger.warning(f"Expiration unavailable for config profile {profile}. ") + return + + profile_expiration = datetime.fromisoformat(profile_expiration_str) + now = datetime.now(timezone.utc) + + if now < profile_expiration: + logger.info(f"Expiration for config profile {profile} is still valid: {profile_expiration}") + sys.exit(0) + else: + logger.warning( + f"Expiration for config profile {profile} is no longer valid: {profile_expiration}" + ) + + +def update_profile(config, role_response={}): + """Update profile with device token and expiration if needed. + + :param config: Config object + :param role_response: Assume Role response dict + """ + device_token = HTTP_client.get_device_token() + if config.user["use_device_token"] and device_token: + logger.info(f"Saving device token to config profile {config.aws['profile']}") + config.okta["device_token"] = device_token + update_profile_device_token(config) + + role_expiration = get_role_expiration(role_response) + if config.user["use_profile_expiration"] and role_expiration: + logger.info(f"Saving expiration to config profile {config.aws['profile']}") + config.okta["profile_expiration"] = role_expiration + update_profile_expiration(config) + + def display_selected_role(profile_name="", role_response={}): """Print details about how to assume role. @@ -525,13 +589,8 @@ def display_selected_role(profile_name="", role_response={}): :return: message displayed. """ - try: - expiration_time = role_response["Credentials"]["Expiration"] - except (KeyError, TypeError) as err: - logger.error(f"Could not retrieve expiration time: {err}") - sys.exit(1) - - expiration_time_local = utc_to_local(expiration_time) + role_expiration = get_role_expiration(role_response) + role_expiration_local = utc_to_local(role_expiration) msg = ( f"\nGenerated profile '{profile_name}' in " f"{config.aws['shared_credentials_file']}.\n" @@ -539,7 +598,7 @@ def display_selected_role(profile_name="", role_response={}): f"aws --profile '{profile_name}' sts get-caller-identity" "\nOR\n\t" f"export AWS_PROFILE='{profile_name}'\n\n" - f"Credentials are valid until {expiration_time} ({expiration_time_local})." + f"Credentials are valid until {role_expiration} ({role_expiration_local})." ) print(msg) @@ -775,7 +834,7 @@ def process_environment(prefix="tokendito"): def process_interactive_input(config, skip_password=False): """ - Request input interactively interactively for elements that are not proesent. + Request input interactively interactively for elements that are not present. :param config: Config object with some values set. :param skip_password: Whether or not ask the user for a password. @@ -1002,7 +1061,7 @@ def update_configuration(config): logger.info(f"Updated {ini_file} with profile {profile}") -def update_device_token(config): +def update_profile_device_token(config): """Update configuration file on local system with device token. :param config: the current configuration @@ -1023,6 +1082,27 @@ def update_device_token(config): logger.info(f"Updated {ini_file} with profile {profile}") +def update_profile_expiration(config): + """Update configuration file on local system with profile expiration. + + :param config: the current configuration + :return: None + """ + logger.debug("Update configuration file on local system with profile expiration.") + ini_file = config.user["config_file"] + profile = config.user["config_profile"] + + contents = {} + # Copy relevant parts of the configuration into an dictionary that + # will be written out to disk + if "profile_expiration" in config.okta and config.okta["profile_expiration"] is not None: + contents["okta_profile_expiration"] = config.okta["profile_expiration"] + + logger.debug(f"Adding {contents} to config file.") + update_ini(profile=profile, ini_file=ini_file, **contents) + logger.info(f"Updated {ini_file} with profile {profile}") + + def set_local_credentials(response={}, role="default", region="us-east-1", output="json"): """Write to local files to insert credentials.