Skip to content

Commit

Permalink
Merge 8bcb64b into 69202ce
Browse files Browse the repository at this point in the history
  • Loading branch information
sevignyj committed Oct 26, 2022
2 parents 69202ce + 8bcb64b commit c32363e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 8 deletions.
7 changes: 6 additions & 1 deletion tests/functional_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,18 @@ def test_generate_credentials(custom_args):
f"{config.okta['username']}",
"--password",
f"{config.okta['password']}",
"--loglevel",
"DEBUG",
]
# run as a local module, as we can't guarantee that the binary is installed.
executable = [sys.executable, "-m", "tokendito"]
runnable = executable + args

proc = run_process(runnable)
assert not proc["stderr"]
assert "'password': '*****'" in proc["stderr"]
assert f"{config.okta['password']}" not in proc["stderr"]
assert f"{config.okta['mfa_response']}" not in proc["stderr"]
assert '"sessionToken": "*****"' in proc["stderr"]
assert proc["exit_status"] == 0


Expand Down
45 changes: 40 additions & 5 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,41 @@ def test_set_local_credentials(tmpdir):
assert ret == "default"


def test_add_sensitive_value_to_be_masked():
"""Test adding some values only adds whatas expected."""
from tokendito import user

user.add_sensitive_value_to_be_masked("should be added")
user.add_sensitive_value_to_be_masked("should be added2")
user.add_sensitive_value_to_be_masked("should be added3", "password")
user.add_sensitive_value_to_be_masked("should not be added", "public")
assert (
"should be added" in user.mask_items
and "should be added2" in user.mask_items
and "should be added3" in user.mask_items
and len(user.mask_items) == 3
)


def test_logger_mask(caplog):
"""Test that masking data in loggger works as expected."""
from tokendito import user
import logging

logger = logging.getLogger(__name__)
logger.addFilter(user.MaskLoggerSecret())
user.add_sensitive_value_to_be_masked("supersecret")
user.add_sensitive_value_to_be_masked("another secret", "sessionToken")
with caplog.at_level(logging.DEBUG):
logger.debug("This should be displayed, but not: supersecret")
logger.debug("another secret")
assert (
"supersecret" not in caplog.text
and "another secret" not in caplog.text
and "This should be displayed" in caplog.text
)


def test_display_selected_role():
"""Test that role is printed correctly."""
from datetime import timezone
Expand Down Expand Up @@ -399,14 +434,14 @@ def test_process_arguments():
from tokendito import user
from argparse import Namespace

valid_settings = dict(okta_username="pytest", okta_password="pytest")
valid_settings = dict(okta_username="pytest", okta_password="pytest_password")
invalid_settings = dict(pytest_expected_failure="pytest_failure")
args = {**valid_settings, **invalid_settings}
ret = user.process_arguments(Namespace(**args))

# Make sure that the arguments we passed are interpreted
assert ret.okta["username"] == "pytest"
assert ret.okta["password"] == "pytest"
assert ret.okta["password"] == "pytest_password"
# Make sure that incorrect arguments are not passed down to the Config object.
assert "pytest" not in ret.__dict__

Expand All @@ -416,7 +451,7 @@ def test_process_ini_file(tmpdir):
from tokendito import user

valid_settings = dict(
okta_password="pytest",
okta_password="pytest_password",
okta_username="pytest",
)
invalid_settings = dict(user_pytest_expected_failure="pytest")
Expand All @@ -442,7 +477,7 @@ def test_process_ini_file(tmpdir):

ret = user.process_ini_file(path, "pytest")
assert ret.okta["username"] == "pytest"
assert ret.okta["password"] == "pytest"
assert ret.okta["password"] == "pytest_password"

with pytest.raises(SystemExit) as err:
user.process_ini_file(path, "pytest_expected_element_failure")
Expand Down Expand Up @@ -546,7 +581,7 @@ def test_bad_mfa_provider_type(mocker, sample_headers):
primary_auth = 1
selected_factor = 1

mfa_verify = {"sessionToken": "123"}
mfa_verify = {"sessionToken": "pytest_session_token"}
mfa_bad_provider = "bad_provider"
mocker.patch(
"tokendito.duo.authenticate_duo",
Expand Down
2 changes: 1 addition & 1 deletion tokendito/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from os.path import expanduser
import sys


__version__ = "2.0.0"
__title__ = "tokendito"
__description__ = "Get AWS STS tokens from Okta SSO"
Expand All @@ -28,6 +27,7 @@ class Config(object):
encoding=sys.stdin.encoding,
loglevel="WARNING",
log_output_file="",
mask_items=[],
),
aws=dict(
config_file=f"{expanduser('~')}/.aws/config",
Expand Down
5 changes: 5 additions & 0 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def user_session_token(primary_auth, headers):
logger.error("Okta auth failed: unknown status.")
sys.exit(1)

user.add_sensitive_value_to_be_masked(session_token)

return session_token


Expand Down Expand Up @@ -260,10 +262,13 @@ def user_mfa_options(selected_mfa_option, headers, mfa_challenge_url, payload, p
logger.debug("Getting verification code from user.")
print("Type verification code and press Enter")
config.okta["mfa_response"] = user.get_input()
user.add_sensitive_value_to_be_masked(config.okta["mfa_response"])

# time to verify the mfa method
payload = {"stateToken": primary_auth["stateToken"], "passCode": config.okta["mfa_response"]}
mfa_verify = api_wrapper(mfa_challenge_url, payload, headers)
if "sessionToken" in mfa_verify:
user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"])
logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]")

return mfa_verify
Expand Down
36 changes: 35 additions & 1 deletion tokendito/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@
logger = logging.getLogger(__name__)


mask_items = []


class MaskLoggerSecret(logging.Filter):
"""Masks secrets in logger messages."""

def __init__(self):
"""Initialize filter."""
logging.Filter.__init__(self)

def filter(self, record):
"""Apply filter on logger messages."""
for secret in mask_items:
if not isinstance(secret, str):
secret = str(secret)
record.msg = record.msg.replace(secret, "*****")
return True


def parse_cli_args(args):
"""Parse command line arguments.
Expand Down Expand Up @@ -198,6 +217,7 @@ def setup_logging(conf):
# Set a reasonable default logging format.
root_logger.handlers.clear()
root_logger.addHandler(handler)
root_logger.addFilter(MaskLoggerSecret())

# Pre-create a log handler for each submodule
# with the same format and level. Settings are
Expand All @@ -207,6 +227,7 @@ def setup_logging(conf):
conf["loglevel"] = conf["loglevel"].upper()
for submodule in submodules:
submodule_logger = logging.getLogger(submodule)
submodule_logger.addFilter(MaskLoggerSecret())
try:
submodule_logger.setLevel(conf["loglevel"])
except ValueError as err:
Expand Down Expand Up @@ -539,6 +560,14 @@ def display_version():
)


def add_sensitive_value_to_be_masked(value, key=None):
"""Add value to be masked from the logs."""
"""If a key is passed only add it if the key refers to a secret element."""
sensitive_keys = ("password", "mfa_response", "sessionToken")
if key is None or key in sensitive_keys:
mask_items.append(value)


def process_ini_file(file, profile):
"""Process options from a ConfigParser ini file.
Expand All @@ -559,6 +588,7 @@ def process_ini_file(file, profile):
if match.group(1) not in res:
res[match.group(1)] = dict()
res[match.group(1)][match.group(2)] = val
add_sensitive_value_to_be_masked(val, match.group(2))
except configparser.Error as err:
logger.error(f"Could not load profile '{profile}': {str(err)}")
sys.exit(2)
Expand Down Expand Up @@ -594,6 +624,7 @@ def process_arguments(args):
res[match.group(1)] = dict()
if val:
res[match.group(1)][match.group(2)] = val
add_sensitive_value_to_be_masked(val, match.group(2))
logger.debug(f"Found arguments: {res}")

try:
Expand Down Expand Up @@ -622,7 +653,9 @@ def process_environment(prefix="tokendito"):
if match:
if match.group(2) not in res:
res[match.group(2)] = dict()
res[match.group(2)][match.group(3)] = val
if val:
res[match.group(2)][match.group(3)] = val
add_sensitive_value_to_be_masked(val, match.group(3))
logger.debug(f"Found environment variables: {res}")

try:
Expand Down Expand Up @@ -982,6 +1015,7 @@ def process_interactive_input(config_obj):

if config_obj.okta["password"] == "":
config_obj.okta["password"] = get_password()
add_sensitive_value_to_be_masked(config_obj.okta["password"])
logger.debug(f"Runtime configuration is: {config_obj}")


Expand Down

0 comments on commit c32363e

Please sign in to comment.