From e16ea516c9267470e1ba095648b5700b34992d8b Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Tue, 8 Nov 2022 11:25:07 -0500 Subject: [PATCH] Remove Rich support (for now) --- requirements.txt | 1 - tests/unit_test.py | 151 ++++++++++++++++---------- tokendito/__init__.py | 1 - tokendito/__main__.py | 4 +- tokendito/aws.py | 6 +- tokendito/duo.py | 4 +- tokendito/okta.py | 2 +- tokendito/tokendito.py | 4 +- tokendito/tool.py | 2 - tokendito/user.py | 233 +++++++++++++++++++++++------------------ 10 files changed, 233 insertions(+), 175 deletions(-) diff --git a/requirements.txt b/requirements.txt index 2de63822..5119439f 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ botocore>=1.12.36 requests>=2.19.0 beautifulsoup4>=4.6.0 -rich<12.0.0 # Until https://github.com/Textualize/rich/issues/2293 is fixed diff --git a/tests/unit_test.py b/tests/unit_test.py index 85cfe7e5..3abe2861 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -8,7 +8,6 @@ import pytest import requests_mock -import rich import semver @@ -65,7 +64,7 @@ def test_get_username(mocker): """Test whether data sent is the same as data returned.""" from tokendito import user - mocker.patch("rich.prompt.Prompt.ask", return_value="pytest_patched") + mocker.patch("tokendito.user.input", return_value="pytest_patched") val = user.get_username() assert val == "pytest_patched" @@ -75,7 +74,7 @@ def test_get_password(mocker): """Test whether data sent is the same as data returned.""" from tokendito import user - mocker.patch("rich.prompt.Prompt.ask", return_value="pytest_patched") + mocker.patch("getpass.getpass", return_value="pytest_patched") val = user.get_password() assert val == "pytest_patched" @@ -127,22 +126,6 @@ def test_setup_early_logging(monkeypatch, tmpdir): assert "TOKENDITO_USER_PYTEST_EXPECTED_FAILURE" not in ret -def test_setup_console(monkeypatch): - """Test the console.""" - from tokendito import user - from argparse import Namespace - - args = dict(user_no_color=False, user_quiet=False) - - env_keys = dict( - TOKENDITO_USER_QUIET="true", - TOKENDITO_USER_NO_COLOR="true", - ) - monkeypatch.setattr(os, "environ", {**env_keys}) - ret = user.setup_console(Namespace(**args)) - assert ret.quiet is True and ret.no_color is True - - def test_get_interactive_config(mocker): """Test if interactive configuration is collected correctly.""" from tokendito import user @@ -171,17 +154,13 @@ def test_get_interactive_config(mocker): assert ret["okta_username"] == "pytests" -def test_collect_integer(mocker): +@pytest.mark.parametrize("value,expected", [("00", 0), ("01", 1), ("5", 5)]) +def test_collect_integer(mocker, value, expected): """Test whether integers from the user are retrieved.""" from tokendito import user - mocker.patch("tokendito.user.IntPrompt.ask", return_value=[]) - ret = user.collect_integer() - assert ret == [] - - mocker.patch("tokendito.user.IntPrompt.ask", return_value=0) - ret = user.collect_integer() - assert ret == 0 + mocker.patch("tokendito.user.input", return_value=value) + assert user.collect_integer(10) == expected @pytest.mark.parametrize( @@ -196,7 +175,7 @@ def test_get_org_url(mocker, url, expected): """Test Org URL.""" from tokendito import user - mocker.patch("rich.prompt.Prompt.ask", return_value=url) + mocker.patch("tokendito.user.input", return_value=url) assert user.get_org_url() == expected @@ -218,10 +197,62 @@ def test_get_app_url(mocker, url, expected): """Test get App URL.""" from tokendito import user - mocker.patch("rich.prompt.Prompt.ask", return_value=url) + mocker.patch("tokendito.user.input", return_value=url) assert user.get_app_url() == expected +@pytest.mark.parametrize( + "test,limit,expected", + [(0, 10, True), (5, 10, True), (10, 10, False), (-1, 10, False), (1, 0, False)], +) +def test_check_within_range(test, limit, expected): + """Test whether a given number is in the range 0 >= num < limit.""" + from tokendito import user + + assert user.check_within_range(test, limit) is expected + + +@pytest.mark.parametrize( + "value,expected", + [ + ("-1", False), + ("0", True), + ("1", True), + (-1, False), + (0, True), + (1, True), + (3.7, False), + ("3.7", False), + ("seven", False), + ("0xff", False), + (None, False), + ], +) +def test_check_integer(value, expected): + """Test whether the integer testing function works within boundaries.""" + from tokendito import user + + assert user.check_integer(value) is expected + + +@pytest.mark.parametrize( + "test,limit,expected", [(1, 10, True), (-1, 10, False), ("pytest", 10, False)] +) +def test_validate_input(test, limit, expected): + """Check if a given input is within the 0 >= num < limit range.""" + from tokendito import user + + assert user.validate_input(test, limit) is expected + + +def test_get_input(mocker): + """Check if provided input is return unmodified.""" + from tokendito import user + + mocker.patch("tokendito.user.input", return_value="pytest_patched") + assert user.get_input() == "pytest_patched" + + def test_update_ini(tmpdir): """Ensure ini files are updated correctly.""" from tokendito import user @@ -373,15 +404,6 @@ def test_validate_app_url(url, expected): assert user.validate_okta_app_url(input_url=url) is expected -def test_get_input(mocker): - """Check if provided input is return unmodified.""" - from tokendito import user - - # mocker.patch("tokendito.user.input", return_value="pytest_patched") - mocker.patch("rich.prompt.Prompt.ask", return_value="pytest_patched") - assert user.get_input() == "pytest_patched" - - def test_utc_to_local(): """Check if passed utc datestamp becomes local one.""" from tokendito import user @@ -402,7 +424,7 @@ def test_set_passcode(mocker): """Check if numerical passcode can handle leading zero values.""" from tokendito import duo - mocker.patch("rich.prompt.Prompt.ask", return_value="0123456") + mocker.patch("tokendito.user.input", return_value="0123456") assert duo.set_passcode({"factor": "passcode"}) == "0123456" @@ -718,29 +740,35 @@ def test_select_preferred_mfa_index(mocker, sample_json_response): assert select_preferred_mfa_index(mfa_options) == output -def test_select_preferred_mfa_index_output(capsys, mocker, sample_json_response): +@pytest.mark.parametrize( + "email", + [ + ("First.Last@acme.org"), + ], +) +def test_select_preferred_mfa_index_output(email, capsys, mocker, sample_json_response): """Test whether the function gives correct output.""" from tokendito.user import select_preferred_mfa_index + from tokendito import config + # For this test, ensure that quiet is never true + config.user["quiet"] = False primary_auth = sample_json_response mfa_options = primary_auth["okta_response_mfa"]["_embedded"]["factors"] - # We only test a specific porttion of each string since if there is no terminal present - # the line wraps at 80 characters, and new lines are intermixed with the output - correct_output = [ - "Select your preferred MFA method and press Enter", - "Id: opfrar9yi4bKJNH2WEW", - "Id: FfdskljfdsS1ljUT0r8", - "Id: fdsfsd6ewREr8", - ] + correct_output = ( + "\nSelect your preferred MFA method and press Enter:\n" + "[0] OKTA push Redmi 6 Pro Id: opfrar9yi4bKJNH2WEW\n" + f"[1] GOOGLE token:software:totp {email} Id: FfdskljfdsS1ljUT0r8\n" + f"[2] OKTA token:software:totp {email} Id: fdsfsd6ewREr8\n" + f"[3] GOOGLE pytest_dupe Not Presented Id: fdsfsd6ewREr0\n" + f"[4] OKTA pytest_dupe Not Presented Id: fdsfsd6ewREr1\n" + ) - # tox -> pytest/capsys -> rich don't play well together, they add a TTY when there is none. - rich.reconfigure(force_interactive=False, highlight=False) mocker.patch("tokendito.user.collect_integer", return_value=1) select_preferred_mfa_index(mfa_options) captured = capsys.readouterr() - for elem in correct_output: - assert elem in captured.out + assert captured.out == correct_output def test_user_mfa_options(sample_headers, sample_json_response, mocker): @@ -763,7 +791,7 @@ def test_user_mfa_options(sample_headers, sample_json_response, mocker): selected_mfa_option = {"factorType": "token:software:totp"} primary_auth["stateToken"] = "pytest" mfa_verify = {"sessionToken": "pytest"} - mocker.patch("rich.prompt.Prompt.ask", return_value="012345") + mocker.patch("tokendito.user.get_input", return_value="012345") mocker.patch("tokendito.okta.api_wrapper", return_value=mfa_verify) ret = user_mfa_options( selected_mfa_option, sample_headers, mfa_challenge_url, payload, primary_auth @@ -1056,18 +1084,25 @@ def test_process_interactive_input(mocker): from tokendito import user, Config # Check that a good object retrieves an interactive password - mocker.patch("rich.prompt.Prompt.ask", return_value="pytest_password") + mocker.patch("getpass.getpass", return_value="pytest_password") - config = dict(okta=dict()) - pytest_config = Config(**config) + pytest_config = Config() pytest_config.okta["app_url"] = "https://pytest/appurl" pytest_config.okta["org"] = "https://pytest/" pytest_config.okta["username"] = "pytest" ret = user.process_interactive_input(pytest_config) - assert ret.okta["password"] == "pytest_password" + pytest_config.update(ret) + assert pytest_config.okta["password"] == "pytest_password" + + # Check that quiet mode does not retrieve a username + pytest_config.user["quiet"] = True + pytest_config.okta["username"] = "" + ret = user.process_interactive_input(pytest_config) + pytest_config.update(ret) + assert pytest_config.okta["username"] == "" # Check that a bad object raises an exception - with pytest.raises(SystemExit) as error: + with pytest.raises(AttributeError) as error: assert user.process_interactive_input({"pytest": "pytest"}) == error diff --git a/tokendito/__init__.py b/tokendito/__init__.py index 882be542..d962a13a 100644 --- a/tokendito/__init__.py +++ b/tokendito/__init__.py @@ -29,7 +29,6 @@ class Config(object): loglevel="INFO", log_output_file="", mask_items=[], - no_color=False, quiet=False, ), aws=dict( diff --git a/tokendito/__main__.py b/tokendito/__main__.py index 870a5fa6..68fa0fba 100755 --- a/tokendito/__main__.py +++ b/tokendito/__main__.py @@ -4,8 +4,6 @@ """tokendito module entry point.""" import sys -from rich import print - def main(args=None): # needed for console script """Packge entry point.""" @@ -19,7 +17,7 @@ def main(args=None): # needed for console script try: return cli(args) except KeyboardInterrupt: - print("\n[bold red]Interrupted[/bold red]") + print("\nInterrupted") sys.exit(1) diff --git a/tokendito/aws.py b/tokendito/aws.py index b4834d65..838932db 100644 --- a/tokendito/aws.py +++ b/tokendito/aws.py @@ -17,7 +17,6 @@ from botocore.exceptions import ClientError import botocore.session import requests -from rich.progress import track from tokendito import user @@ -65,9 +64,8 @@ def authenticate_to_roles(secret_session_token, urls, cookies=None): if tile_count > 1: plural = "s" - for url, label in track( - url_list, description=f"Discovering roles in {tile_count} tile{plural}:" - ): + logger.info(f"Discovering roles in {tile_count} tile{plural}.") + for url, label in url_list: try: logger.debug(f"Performing role discovery in {url}") diff --git a/tokendito/duo.py b/tokendito/duo.py index 0196c474..4c426995 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -209,7 +209,7 @@ def parse_challenge(verify_mfa, challenge_result): challenge_reason = None if "status" in verify_mfa: - user.print(f"[bold]{verify_mfa['status']}[/bold]") + user.print(f"{verify_mfa['status']}") if "reason" in verify_mfa: challenge_reason = verify_mfa["reason"] @@ -293,7 +293,7 @@ def set_passcode(mfa_option): """ passcode = None if mfa_option["factor"].lower() == "passcode": - user.print("[bold]Type your TOTP and press Enter[/bold]") + user.print("Type your TOTP and press Enter:") passcode = user.get_input() return passcode diff --git a/tokendito/okta.py b/tokendito/okta.py index eca04a11..6051bd33 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -261,7 +261,7 @@ def user_mfa_options(selected_mfa_option, headers, mfa_challenge_url, payload, p if config.okta["mfa_response"] is None: logger.debug("Getting verification code from user.") - config.okta["mfa_response"] = user.get_input("Enter your verification code") + config.okta["mfa_response"] = user.get_input("Enter your verification code:") user.add_sensitive_value_to_be_masked(config.okta["mfa_response"]) # time to verify the mfa method diff --git a/tokendito/tokendito.py b/tokendito/tokendito.py index dba0ab7b..26c04fbb 100755 --- a/tokendito/tokendito.py +++ b/tokendito/tokendito.py @@ -4,8 +4,6 @@ """tokendito cli entry point.""" import sys -from rich import print - def main(args=None): # needed for console script """Packge entry point.""" @@ -23,5 +21,5 @@ def main(args=None): # needed for console script try: sys.exit(main(sys.argv[1:])) except KeyboardInterrupt: - print("\n[bold red]Interrupted[/bold red]") + print("\nInterrupted") sys.exit(1) diff --git a/tokendito/tool.py b/tokendito/tool.py index 9a347378..41759333 100644 --- a/tokendito/tool.py +++ b/tokendito/tool.py @@ -16,8 +16,6 @@ def cli(args): """Tokendito retrieves AWS credentials after authenticating with Okta.""" args = user.parse_cli_args(args) - # Set up colors and command-line editing - user.setup_console(args) # Early logging, in case the user requests debugging via env/CLI user.setup_early_logging(args) diff --git a/tokendito/user.py b/tokendito/user.py index 1b667f99..6fb9d01c 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -2,9 +2,11 @@ # -*- coding: utf-8 -*- """Helper module for AWS and Okta configuration, management and data flow.""" import argparse +import builtins import codecs import configparser from datetime import timezone +import getpass import json import logging import os @@ -19,10 +21,6 @@ from bs4 import __version__ as __bs4_version__ # type: ignore (bs4 does not have PEP 561 support) from bs4 import BeautifulSoup import requests -import rich -from rich.console import Console -from rich.logging import RichHandler -from rich.prompt import IntPrompt, Prompt from tokendito import __version__ from tokendito import aws from tokendito import Config @@ -138,19 +136,12 @@ def parse_cli_args(args): "--okta-mfa-response", help="Sets the MFA response to a challenge", ) - parser.add_argument( - "--no-color", - dest="user_no_color", - action="store_true", - default=False, - help="Supress colored output.", - ) parser.add_argument( "--quiet", dest="user_quiet", action="store_true", default=False, - help="Suppress output (implies --no-color)", + help="Suppress output", ) parsed_args = parser.parse_args(args) @@ -226,26 +217,12 @@ def setup_logging(conf): :return: loglevel name """ root_logger = logging.getLogger() - # We get quiet / no color directly from the stdout console settings - stdout_console = rich.get_console() - # Time and level name come from the Rich handler - formatter = logging.Formatter(fmt="|%(name)s %(funcName)s():%(lineno)i| %(message)s") - handler = RichHandler( - show_time=True, - omit_repeated_times=False, - show_path=False, - markup=True, - console=Console( - stderr=True, - no_color=stdout_console.no_color, - quiet=stdout_console.quiet, - ), + formatter = logging.Formatter( + fmt="%(asctime)s %(levelname)s |%(name)s %(funcName)s():%(lineno)i| %(message)s" ) + handler = logging.StreamHandler() if "log_output_file" in conf and conf["log_output_file"]: - formatter = logging.Formatter( - fmt="%(asctime)s %(levelname)s |%(name)s %(funcName)s():%(lineno)i| %(message)s" - ) handler = logging.FileHandler(conf["log_output_file"]) handler.setFormatter(formatter) @@ -273,37 +250,10 @@ def setup_logging(conf): return loglevel -def setup_console(args): - """Set up readline, and screen colors. - - :param args: ConfigParser object - :return: None - """ - # These areconsidered true if present. We do this here as there is no env processing yet. - # The order of operations is backwards from the general flow of the program as having - # an env variable here should override the command-line defaults, which default to not quiet, - # and with color. - console_settings = { - "no_color": args.user_no_color, - "quiet": args.user_quiet, - } - if "TOKENDITO_USER_NO_COLOR" in os.environ: - console_settings["no_color"] = True - if "TOKENDITO_USER_QUIET" in os.environ: - console_settings["quiet"] = True - - rich.reconfigure( - no_color=console_settings["no_color"], - quiet=console_settings["quiet"], - ) - console = rich.get_console() - return console - - def print(args): - """Pass-through to rich, so that it interprets colors.""" - console = rich.get_console() - console.print(args, end=os.linesep, soft_wrap=True, highlight=False) + """Print only if not in quiet mode. Does not affect logging.""" + if config.user["quiet"] is not True: + builtins.print(args) return args @@ -401,7 +351,7 @@ def select_preferred_mfa_index(mfa_options, factor_key="provider", subfactor_key """ logger.debug("Show all the MFA options to the users.") logger.debug(json.dumps(mfa_options)) - print("\n[green]Select your preferred MFA method and press Enter:[/green]") + print("\nSelect your preferred MFA method and press Enter:") longest_index = len(str(len(mfa_options))) longest_factor_name = max([len(d[factor_key]) for d in mfa_options]) @@ -414,11 +364,11 @@ def select_preferred_mfa_index(mfa_options, factor_key="provider", subfactor_key mfa_method = mfa_option.get(subfactor_key, "Not presented") provider = mfa_option.get(factor_key, "Not presented") print( - f"[bold][{i: >{longest_index}}][/bold] " - f"[cyan]{provider: <{longest_factor_name}}[/cyan] " - f"[blue]{mfa_method: <{longest_subfactor_name}}[/blue] " - f"[blue]{factor_info: <{factor_info_indent}}[/blue] " - f"[magenta]Id: {factor_id}[/magenta]" + f"[{i: >{longest_index}}] " + f"{provider: <{longest_factor_name}} " + f"{mfa_method: <{longest_subfactor_name}} " + f"{factor_info: <{factor_info_indent}} " + f"Id: {factor_id}" ) user_input = collect_integer(len(mfa_options)) @@ -446,7 +396,7 @@ def prompt_role_choices(aut_aps): aliases_mapping.append((app["label"], role.split(":")[4], role, url)) logger.debug("Ask user to select role") - print("\n[bold]Please select one of the following[/bold]:") + print("\nPlease select one of the following:") longest_alias = max(len(i[1]) for i in aliases_mapping) longest_index = len(str(len(aliases_mapping))) @@ -458,12 +408,9 @@ def prompt_role_choices(aut_aps): padding_index = longest_index - len(str(i)) if print_label != label: print_label = label - print(f"\n[green]{label}:[/green]") + print(f"\n{label}:") - print( - f"[bold][{i}][/bold] {padding_index * ' '}" - f"[cyan]{alias: <{longest_alias}}[/cyan] [blue]{role}[/blue]" - ) + print(f"[{i}] {padding_index * ' '}" f"{alias: <{longest_alias}} {role}") user_input = collect_integer(len(aliases_mapping)) selected_role = (aliases_mapping[user_input][2], aliases_mapping[user_input][3]) @@ -488,13 +435,13 @@ def display_selected_role(profile_name="", role_response={}): expiration_time_local = utc_to_local(expiration_time) msg = ( - f"\nGenerated profile [bold]'{profile_name}'[/bold] in" - f" [green]{config.aws['shared_credentials_file']}[/green].\n" + f"\nGenerated profile '{profile_name}' in " + f"{config.aws['shared_credentials_file']}.\n" "\nUse profile to authenticate to AWS:\n\t" - f"[bold]aws --profile '{profile_name}' sts get-caller-identity[/bold]" + f"aws --profile '{profile_name}' sts get-caller-identity" "\nOR\n\t" - f"[bold]export AWS_PROFILE='{profile_name}'[/bold]\n\n" - f"Credentials are valid until [bold]{expiration_time}[/bold] ({expiration_time_local})." + f"export AWS_PROFILE='{profile_name}'\n\n" + f"Credentials are valid until {expiration_time} ({expiration_time_local})." ) print(msg) @@ -630,12 +577,12 @@ def display_version(): (system, _, release, _, _, _) = platform.uname() logger.debug(f"Display version: {__version__}") print( - f"[bold]tokendito[/bold]/{__version__} " - f"[bold]Python[/bold]/{python_version} " - f"[bold]{system}[/bold]/{release} " - f"[bold]botocore[/bold]/{__botocore_version__} " - f"[bold]bs4[/bold]/{__bs4_version__} " - f"[bold]requests[/bold]/{requests.__version__}" + f"tokendito/{__version__} " + f"Python/{python_version} " + f"{system}/{release} " + f"botocore/{__botocore_version__} " + f"bs4/{__bs4_version__} " + f"requests/{requests.__version__}" ) @@ -756,7 +703,12 @@ def process_interactive_input(config): :param config: Config object with some values set :returns: Config object with necessary values set. """ - # reuse interactive config. It will only request the portions needed. + # Return quickly if the user attempts to run in quiet (non-interactive) mode. + if config.user["quiet"] is True: + logger.debug(f"Skipping interactive config: quiet mode is {config.user['quiet']}") + return config + + # Reuse interactive config. It will only request the portions needed. try: details = get_interactive_config( app_url=config.okta["app_url"], @@ -784,6 +736,7 @@ def process_interactive_input(config): config_int = Config(**res) logger.debug(f"Interactive configuration is: {config_int}") + config.update(config_int) return config_int @@ -797,9 +750,7 @@ def get_interactive_config(app_url=None, org_url=None, username=""): # We need either one of these two: while org_url is None and app_url is None: - print( - "\n\n[bold]Please enter either your Organization URL, a tile (app) URL, or both.[/bold]" - ) + print("\n\nPlease enter either your Organization URL, a tile (app) URL, or both.") org_url = get_org_url() app_url = get_app_url() @@ -833,7 +784,7 @@ def get_org_url(): :return: string with sanitized value, or the empty string. """ - message = "Okta Org URL. E.g. https://acme.okta.com/" + message = "Okta Org URL. E.g. https://acme.okta.com/: " res = "" while res == "": @@ -846,7 +797,7 @@ def get_org_url(): if validate_okta_org_url(user_data): res = user_data else: - print("[red]Invalid input, try again.[/red]") + print("Invalid input, try again.") logger.debug(f"Org URL is: {res}") return res @@ -856,7 +807,9 @@ def get_app_url(): :return: string with sanitized value, or the empty string. """ - message = "Okta App URL. E.g. https://acme.okta.com/home/" "amazon_aws/b07384d113edec49eaa6/123" + message = ( + "Okta App URL. E.g. https://acme.okta.com/home/" "amazon_aws/b07384d113edec49eaa6/123: " + ) res = "" while res == "": @@ -869,7 +822,7 @@ def get_app_url(): if validate_okta_app_url(user_data): res = user_data else: - print("[red]Invalid input, try again.[/red]") + print("Invalid input, try again.") logger.debug(f"App URL is: {res}") return res @@ -879,7 +832,7 @@ def get_username(): :return: string with sanitized value. """ - message = "Organization username. E.g. jane.doe@acme.com" + message = "Organization username. E.g. jane.doe@acme.com: " res = "" while res == "": user_data = get_input(prompt=message) @@ -887,7 +840,7 @@ def get_username(): if user_data != "": res = user_data else: - print("[red]Invalid input, try again.[/red]") + print("Invalid input, try again.") logger.debug(f"Username is {res}") return res @@ -903,7 +856,7 @@ def get_password(): logger.debug("Set password.") while res == "": - password = Prompt.ask("[bold]Password[/bold]", password=True) + password = getpass.getpass() res = password logger.debug("password set interactively") return res @@ -1011,14 +964,57 @@ def update_ini(profile="", ini_file="", **kwargs): return ini -def get_input(prompt=""): +def check_within_range(user_input, valid_range): + """Validate the user input is within the range of the presented menu. + + :param user_input: integer-validated user input. + :param valid_range: the valid range presented on the user's menu. + :return range_validation: true or false + """ + range_validation = False + if int(user_input) in range(0, valid_range): + range_validation = True + else: + logger.debug(f"Valid range is {valid_range}") + logger.error("Value is not in within the selection range.") + return range_validation + + +def check_integer(value): + """Validate integer. + + :param value: value to be validated. + :return: True when the number is a positive integer, false otherwise. + """ + integer_validation = False + if str(value).isdigit(): + integer_validation = True + else: + logger.error("Please enter a valid integer.") + + return integer_validation + + +def validate_input(value, valid_range): + """Validate user input is an integer and within menu range. + + :param value: user input + :param valid_range: valid range based on how many menu options available to user. + """ + integer_validation = check_integer(value) + if integer_validation and valid_range: + integer_validation = check_within_range(value, valid_range) + return integer_validation + + +def get_input(prompt="-> "): """Collect user input for TOTP. :param prompt: optional string with prompt. :return user_input: raw from user. """ - user_input = Prompt.ask(f"[bold]{prompt}[/bold]") - logger.debug(f"User input [{user_input}]") + user_input = input(f"{prompt}") + logger.debug(f"User input: {user_input}") return user_input @@ -1031,8 +1027,14 @@ def collect_integer(valid_range=0): :param valid_range: number of menu options available to user. :return user_input: validated, casted integer from user. """ - prompt_choices = [f"{x}" for x in range(valid_range)] - user_input = IntPrompt.ask("[bold]Enter your selection[/bold]", choices=prompt_choices) + user_input = None + while True: + user_input = get_input() + valid_input = validate_input(user_input, valid_range) + logger.debug(f"User input validation status is {valid_input}") + if valid_input: + user_input = int(user_input) + break return user_input @@ -1067,8 +1069,8 @@ def process_options(args): logger.debug(f"Final configuration is {config}") -def validate_configuration(config): - """Ensure that minimum configuration values are sane. +def validate_basic_configuration(config): + """Ensure that basic configuration values are sane. :param config: Config element with final configuration. :return: message with validation issues. @@ -1097,6 +1099,37 @@ def validate_configuration(config): return message +def validate_quiet_configuration(config): + """Ensure that minimum configuration settings for running quietly are met. + + This is kept separately from validate_basic_configuration to avoid complexity + and avoid testability. These functions should always be used together. + + :param config: Config element with final configuration. + :return: message with validation issues. + """ + message = [] + if "quiet" in config.user and config.user["quiet"] is not False: + if not config.aws["role_arn"]: + message.append("Role ARN not set.") + if not config.okta["mfa_method"]: + message.append("MFA Method not set.") + if not config.okta["mfa_response"] and config.okta["mfa_method"] != "push": + message.append("MFA Response not set.") + return message + + +def validate_configuration(config): + """ + Ensure that configuration settings are appropriate before contacting the Okta endpoint. + + :param config: Config element with final configuration. + :return: message with validation issues. + """ + messages = validate_basic_configuration(config) + validate_quiet_configuration(config) + return messages + + def sanitize_config_values(config): """Adjust values that may need to be corrected.