Skip to content

Commit

Permalink
feat: Add support for optional profile expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
ruhulio committed Dec 13, 2023
1 parent f39ce92 commit fc7b053
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 20 deletions.
5 changes: 4 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ tokendito --profile engineer
usage: tokendito [-h] [--version] [--configure] [--username OKTA_USERNAME] [--password OKTA_PASSWORD] [--profile USER_CONFIG_PROFILE] [--config-file USER_CONFIG_FILE]
[--loglevel {DEBUG,INFO,WARN,ERROR}] [--log-output-file USER_LOG_OUTPUT_FILE] [--aws-config-file AWS_CONFIG_FILE] [--aws-output AWS_OUTPUT]
[--aws-profile AWS_PROFILE] [--aws-region AWS_REGION] [--aws-role-arn AWS_ROLE_ARN] [--aws-shared-credentials-file AWS_SHARED_CREDENTIALS_FILE]
[--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--quiet]
[--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--use-profile-expiration] [--quiet]
Gets an STS token to use with the AWS CLI and SDK.
Expand Down Expand Up @@ -115,6 +115,8 @@ options:
--okta-mfa-response OKTA_MFA_RESPONSE
Sets the MFA response to a challenge
--use-device-token Use device token across sessions
--use-profile-expiration
Use profile expiration to bypass re-authenticating
--quiet Suppress output
```

Expand Down Expand Up @@ -157,6 +159,7 @@ The following table lists the environment variable and user configuration entry
| `--okta-mfa` | `TOKENDITO_OKTA_MFA` | `okta_mfa` |
| `--okta-mfa-response` | `TOKENDITO_OKTA_MFA_RESPONSE` | `okta_mfa_response` |
| `--use-device-token` | `TOKENDITO_USER_USE_DEVICE_TOKEN` | `user_use_device_token` |
| `--use-profile-expiration` | `TOKENDITO_USER_USE_PROFILE_EXPIRATION` | `user_use_profile_expiration` |
| `--quiet` | `TOKENDITO_USER_QUIET` | `quiet` |

# Configuration file location
Expand Down
53 changes: 50 additions & 3 deletions tests/unit/test_user.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# vim: set filetype=python ts=4 sw=4
# -*- coding: utf-8 -*-
"""Unit tests, and local fixtures for the user module."""
from datetime import datetime
from datetime import datetime, timedelta, timezone
import os
import sys

Expand Down Expand Up @@ -466,7 +466,7 @@ def test_update_configuration(tmpdir):
assert ret.okta["mfa"] == "pytest"


def test_update_device_token(tmpdir):
def test_update_profile_device_token(tmpdir):
"""Test writing and reading device token to a configuration file."""
from tokendito import user
from tokendito.config import Config
Expand All @@ -481,11 +481,58 @@ def test_update_device_token(tmpdir):
)

# Write out a config file via configure() and ensure it's functional
user.update_device_token(pytest_config)
user.update_profile_device_token(pytest_config)
ret = user.process_ini_file(path, "pytest")
assert ret.okta["device_token"] == device_token


def test_check_profile_expiration():
"""Test checking profile expiration."""
from tokendito import user
from tokendito.config import Config

now = datetime.now(timezone.utc)
future = now + timedelta(days=1)
past = now + timedelta(days=-1)

pytest_config = Config(
aws={"profile": "test-profile"},
okta={"profile_expiration": str(future)},
user={"use_profile_expiration": True},
)

# Expiration in the future should exit
with pytest.raises(SystemExit):
user.check_profile_expiration(pytest_config)

# Expiration in the past should not exit
pytest_config.okta["profile_expiration"] = str(past)
try:
user.check_profile_expiration(pytest_config)
except SystemExit:
pytest.fail("Profile expiration was invalid and should not have exited")


def test_update_profile_expiration(tmpdir):
"""Test writing and reading profile expiration to a configuration file."""
from tokendito import user
from tokendito.config import Config

path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini")

expiration = datetime.now(timezone.utc)

pytest_config = Config(
okta={"profile_expiration": expiration},
user={"config_file": path, "config_profile": "pytest"},
)

# Write out a config file via configure() and ensure it's functional
user.update_profile_expiration(pytest_config)
ret = user.process_ini_file(path, "pytest")
assert datetime.fromisoformat(ret.okta["profile_expiration"]) == expiration


def test_process_ini_file(tmpdir):
"""Test whether ini config elements are set correctly.
Expand Down
2 changes: 2 additions & 0 deletions tokendito/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Config(object):
loglevel="INFO",
log_output_file="",
use_device_token=False,
use_profile_expiration=False,
mask_items=[],
quiet=False,
),
Expand All @@ -50,6 +51,7 @@ class Config(object):
tile=None,
org=None,
device_token=None,
profile_expiration=None,
),
)

Expand Down
112 changes: 96 additions & 16 deletions tokendito/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import builtins
import codecs
import configparser
from datetime import timezone
from datetime import datetime, timezone
from getpass import getpass
import json
import logging
Expand Down Expand Up @@ -64,6 +64,8 @@ def cmd_interface(args):
)
sys.exit(1)

check_profile_expiration(config)

if config.user["use_device_token"]:
device_token = config.okta["device_token"]
if device_token:
Expand Down Expand Up @@ -109,11 +111,7 @@ def cmd_interface(args):
output=config.aws["output"],
)

device_token = HTTP_client.get_device_token()
if config.user["use_device_token"] and device_token:
logger.info(f"Saving device token to config profile {args.user_config_profile}")
config.okta["device_token"] = device_token
update_device_token(config)
update_profile(config, role_response)

display_selected_role(profile_name=config.aws["profile"], role_response=role_response)

Expand Down Expand Up @@ -235,6 +233,13 @@ def parse_cli_args(args):
default=False,
help="Use device token across sessions",
)
parser.add_argument(
"--use-profile-expiration",
dest="user_use_profile_expiration",
action="store_true",
default=False,
help="Use profile expiration to bypass re-authenticating",
)
parser.add_argument(
"--quiet",
dest="user_quiet",
Expand Down Expand Up @@ -517,6 +522,65 @@ def prompt_role_choices(aut_tiles):
return selected_role


def get_role_expiration(role_response={}):
"""Get the expiration from the role response.
:param role_response: Assume Role response dict
:return expiration
"""
try:
return role_response["Credentials"]["Expiration"]
except (KeyError, TypeError) as err:
logger.error(f"Could not retrieve expiration: {err}")
sys.exit(1)


def check_profile_expiration(config):
"""Check profile expiration and exit if still valid.
:param config: Config object
"""
if not config.user["use_profile_expiration"]:
return

profile = config.aws["profile"]

profile_expiration_str = config.okta["profile_expiration"]
if not profile_expiration_str:
logger.warning(f"Expiration unavailable for config profile {profile}. ")
return

profile_expiration = datetime.fromisoformat(profile_expiration_str)
now = datetime.now(timezone.utc)

if now < profile_expiration:
logger.info(f"Expiration for config profile {profile} is still valid: {profile_expiration}")
sys.exit(0)
else:
logger.warning(
f"Expiration for config profile {profile} is no longer valid: {profile_expiration}"
)


def update_profile(config, role_response={}):
"""Update profile with device token and expiration if needed.
:param config: Config object
:param role_response: Assume Role response dict
"""
device_token = HTTP_client.get_device_token()
if config.user["use_device_token"] and device_token:
logger.info(f"Saving device token to config profile {config.aws['profile']}")
config.okta["device_token"] = device_token
update_profile_device_token(config)

role_expiration = get_role_expiration(role_response)
if config.user["use_profile_expiration"] and role_expiration:
logger.info(f"Saving expiration to config profile {config.aws['profile']}")
config.okta["profile_expiration"] = role_expiration
update_profile_expiration(config)


def display_selected_role(profile_name="", role_response={}):
"""Print details about how to assume role.
Expand All @@ -525,21 +589,16 @@ def display_selected_role(profile_name="", role_response={}):
:return: message displayed.
"""
try:
expiration_time = role_response["Credentials"]["Expiration"]
except (KeyError, TypeError) as err:
logger.error(f"Could not retrieve expiration time: {err}")
sys.exit(1)

expiration_time_local = utc_to_local(expiration_time)
role_expiration = get_role_expiration(role_response)
role_expiration_local = utc_to_local(role_expiration)
msg = (
f"\nGenerated profile '{profile_name}' in "
f"{config.aws['shared_credentials_file']}.\n"
"\nUse profile to authenticate to AWS:\n\t"
f"aws --profile '{profile_name}' sts get-caller-identity"
"\nOR\n\t"
f"export AWS_PROFILE='{profile_name}'\n\n"
f"Credentials are valid until {expiration_time} ({expiration_time_local})."
f"Credentials are valid until {role_expiration} ({role_expiration_local})."
)

print(msg)
Expand Down Expand Up @@ -775,7 +834,7 @@ def process_environment(prefix="tokendito"):

def process_interactive_input(config, skip_password=False):
"""
Request input interactively interactively for elements that are not proesent.
Request input interactively interactively for elements that are not present.
:param config: Config object with some values set.
:param skip_password: Whether or not ask the user for a password.
Expand Down Expand Up @@ -1002,7 +1061,7 @@ def update_configuration(config):
logger.info(f"Updated {ini_file} with profile {profile}")


def update_device_token(config):
def update_profile_device_token(config):
"""Update configuration file on local system with device token.
:param config: the current configuration
Expand All @@ -1023,6 +1082,27 @@ def update_device_token(config):
logger.info(f"Updated {ini_file} with profile {profile}")


def update_profile_expiration(config):
"""Update configuration file on local system with profile expiration.
:param config: the current configuration
:return: None
"""
logger.debug("Update configuration file on local system with profile expiration.")
ini_file = config.user["config_file"]
profile = config.user["config_profile"]

contents = {}
# Copy relevant parts of the configuration into an dictionary that
# will be written out to disk
if "profile_expiration" in config.okta and config.okta["profile_expiration"] is not None:
contents["okta_profile_expiration"] = config.okta["profile_expiration"]

logger.debug(f"Adding {contents} to config file.")
update_ini(profile=profile, ini_file=ini_file, **contents)
logger.info(f"Updated {ini_file} with profile {profile}")


def set_local_credentials(response={}, role="default", region="us-east-1", output="json"):
"""Write to local files to insert credentials.
Expand Down

0 comments on commit fc7b053

Please sign in to comment.