Skip to content

Commit

Permalink
Refactor logging to use an object rather than the logging module
Browse files Browse the repository at this point in the history
Changes the way we log from logging.level() to logger.leve(). This
is a more standardized and accepted way of logging in python.

We also add timining information to the logs, together with the correct
module name and common practice logging format. E.g.

From:
```
DEBUG [helpers.py:validate_okta_aws_app_url():422]: ParseResultBytes(scheme=b'', netloc=b'', path=b'', params=b'', query=b'', fragment=b'') does not look like a valid match.
ERROR [helpers.py:process_okta_aws_app_url():523]: Okta Application URL not found, or invalid. Please check your configuration and try again.
```

to:
```
2021-09-02 11:51:35,898 tokendito.helpers [validate_okta_aws_app_url():414] - DEBUG - ParseResultBytes(scheme=b'', netloc=b'', path=b'', params=b'', query=b'', fragment=b'') does not look like a valid match.
2021-09-02 11:51:35,898 tokendito.helpers [process_okta_aws_app_url():515] - ERROR - Okta Application URL not found, or invalid. Please check your configuration and try again.
```
  • Loading branch information
pcmxgti committed Sep 2, 2021
1 parent fad9514 commit c79c715
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 135 deletions.
33 changes: 18 additions & 15 deletions tokendito/aws_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from tokendito import helpers


logger = logging.getLogger(__name__)


def authenticate_to_roles(secret_session_token, url):
"""Authenticate AWS user with saml.
Expand All @@ -29,30 +32,30 @@ def authenticate_to_roles(secret_session_token, url):
"""
payload = {"onetimetoken": secret_session_token}
logging.debug(f"Authenticate AWS user with SAML URL [{url}]")
logger.debug(f"Authenticate AWS user with SAML URL [{url}]")
try:
response = requests.get(url, params=payload)
saml_response_string = response.text
if response.status_code == 400 or response.status_code == 401:
errmsg = "Invalid Credentials."
logging.critical(f"{errmsg}\nExiting with code:{response.status_code}")
logger.critical(f"{errmsg}\nExiting with code:{response.status_code}")
sys.exit(2)
elif response.status_code == 404:
errmsg = "Invalid Okta application URL. Please verify your configuration."
logging.critical(f"{errmsg}")
logger.critical(f"{errmsg}")
sys.exit(2)
elif response.status_code >= 500 and response.status_code < 504:
errmsg = "Unable to establish connection with Okta. Verify Okta Org URL and try again."
logging.critical(f"{errmsg}\nExiting with code:{response.status_code}")
logger.critical(f"{errmsg}\nExiting with code:{response.status_code}")
sys.exit(2)
elif response.status_code != 200:
logging.critical(f"Exiting with code:{response.status_code}")
logging.debug(saml_response_string)
logger.critical(f"Exiting with code:{response.status_code}")
logger.debug(saml_response_string)
sys.exit(2)

except Exception as error:
errmsg = f"Okta auth failed:\n{error}"
logging.critical(errmsg)
logger.critical(errmsg)
sys.exit(1)

saml_xml = helpers.validate_saml_response(saml_response_string)
Expand Down Expand Up @@ -82,7 +85,7 @@ def assume_role(role_arn, provider_arn, saml):
session_times = [43200, 28800, 21600, 14400, 7200, 3600, 1800, 900, "exit"]
for duration in session_times:
if duration == "exit":
logging.critical(
logger.critical(
default_error.format(
role_arn,
f"IAM role session time is not within set: {session_times[:-1]}",
Expand All @@ -107,7 +110,7 @@ def handle_assume_role(role_arn, provider_arn, encoded_xml, duration, default_er
:param saml: decoded saml response from okta
:return: AssumeRoleWithSaml API responses
"""
logging.debug(f"Attempting session time [{duration}]")
logger.debug(f"Attempting session time [{duration}]")
try:
session = botocore.session.get_session()
client = session.create_client("sts", config=Config(signature_version=UNSIGNED))
Expand All @@ -120,21 +123,21 @@ def handle_assume_role(role_arn, provider_arn, encoded_xml, duration, default_er
# Client Exceptions
except ClientError as error:
if error.response["Error"]["Code"] == "ValidationError":
logging.info(
logger.info(
f"AssumeRoleWithSaml failed with {error.response['Error']['Code']} "
f"for duration {duration}"
)
assume_role_response = "continue"
elif error.response["Error"]["Code"] == "AccessDenied":
errmsg = f"Error assuming intermediate {provider_arn} SAML role"
logging.critical(errmsg)
logger.critical(errmsg)
sys.exit(2)
else:
logging.critical(default_error.format(role_arn, str(error)))
logger.critical(default_error.format(role_arn, str(error)))
sys.exit(1)
# Service Exceptions
except Exception as error:
logging.critical(default_error.format(role_arn, str(error)))
logger.critical(default_error.format(role_arn, str(error)))
sys.exit(1)

return assume_role_response
Expand All @@ -149,7 +152,7 @@ def ensure_keys_work(assume_role_response):
:return:
"""
logging.debug("Validate the temporary AWS credentials")
logger.debug("Validate the temporary AWS credentials")

aws_access_key = assume_role_response["Credentials"]["AccessKeyId"]
aws_secret_key = assume_role_response["Credentials"]["SecretAccessKey"]
Expand All @@ -166,7 +169,7 @@ def ensure_keys_work(assume_role_response):
)
client.get_caller_identity()
except Exception as auth_error:
logging.critical(
logger.critical(
f"There was an error authenticating your keys with AWS: {auth_error}"
)
sys.exit(1)
Expand Down
51 changes: 26 additions & 25 deletions tokendito/duo_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from tokendito import settings


logger = logging.getLogger(__name__)


def prepare_duo_info(selected_okta_factor):
"""Aggregate most of the parameters needed throughout the Duo authentication process.
Expand Down Expand Up @@ -54,25 +57,23 @@ def duo_api_post(url, params={}, headers={}, payload={}):
"POST", url, params=params, headers=headers, data=payload
)
except Exception as request_issue:
logging.error(
f"There was an error connecting to the Duo API: \n{request_issue}"
)
logger.error(f"There was an error connecting to the Duo API: \n{request_issue}")
sys.exit(1)

json_message = None
try:
json_message = response.json()
except ValueError:
logging.debug(f"Non-json response from Duo API: \n{response}")
logger.debug(f"Non-json response from Duo API: \n{response}")

if response.status_code != 200:
logging.critical(
logger.critical(
f"Your Duo authentication has failed with status {response.status_code}."
)
if json_message and json_message["stat"].lower() != "ok":
logging.critical(f"\n{response.status_code}, {json_message['message']}")
logger.critical(f"\n{response.status_code}, {json_message['message']}")
else:
logging.critical(
logger.critical(
"Please re-run the program with parameter"
' "--loglevel debug" to see more information.'
)
Expand All @@ -95,14 +96,14 @@ def get_duo_sid(duo_info):
)

url = f"https://{duo_info['host']}/frame/web/v1/auth"
logging.info(f"Calling Duo {urlparse(url).path} with params {params.keys()}")
logger.info(f"Calling Duo {urlparse(url).path} with params {params.keys()}")
duo_auth_response = duo_api_post(url, params=params)

try:
duo_auth_redirect = urlparse(f"{unquote(duo_auth_response.url)}").query
duo_info["sid"] = duo_auth_redirect.strip("sid=")
except Exception as sid_error:
logging.error(
logger.error(
f"There was an error getting your SID. Please try again. \n{sid_error}"
)

Expand All @@ -125,7 +126,7 @@ def get_duo_devices(duo_auth):
device_soup = soup.find("select", {"name": "device"}).findAll("option")
devices = [f"{d['value']} - {d.text}" for d in device_soup]
if not devices:
logging.error("Please configure devices for your Duo MFA and retry.")
logger.error("Please configure devices for your Duo MFA and retry.")
sys.exit(2)

factor_options = []
Expand All @@ -151,17 +152,17 @@ def parse_duo_mfa_challenge(mfa_challenge):
mfa_status = mfa_challenge["stat"]
txid = mfa_challenge["response"]["txid"]
except ValueError as value_error:
logging.error(f"The Duo API returned a non-json response: \n{value_error}")
logger.error(f"The Duo API returned a non-json response: \n{value_error}")
sys.exit(1)
except KeyError as key_error:
logging.error(
logger.error(
f"The Duo API response is missing a required parameter: \n{key_error}"
)
print(json.dumps(mfa_challenge))
sys.exit(1)

if mfa_status == "fail":
logging.error(
logger.error(
f"Your Duo authentication has failed: \n{mfa_challenge['message']}"
)
sys.exit(1)
Expand Down Expand Up @@ -195,7 +196,7 @@ def duo_mfa_challenge(duo_info, mfa_option, passcode):
mfa_challenge = duo_api_post(url, payload=mfa_data)
txid = parse_duo_mfa_challenge(mfa_challenge)

logging.debug("Sent MFA Challenge and obtained Duo transaction ID.")
logger.debug("Sent MFA Challenge and obtained Duo transaction ID.")
return txid


Expand All @@ -208,7 +209,7 @@ def get_mfa_response(mfa_result):
try:
verify_mfa = mfa_result.json()["response"]
except Exception as parse_error:
logging.error(
logger.error(
f"There was an error parsing the mfa challenge result: \n{parse_error}"
)
sys.exit(1)
Expand All @@ -231,10 +232,10 @@ def parse_challenge(verify_mfa, challenge_result):
challenge_reason = verify_mfa["reason"]

if "result" in verify_mfa:
logging.info(f"Result received: {verify_mfa['result']}")
logger.info(f"Result received: {verify_mfa['result']}")
challenge_result = verify_mfa["result"].lower()

logging.debug(f"Challenge result is {challenge_result}")
logger.debug(f"Challenge result is {challenge_result}")
return challenge_result, challenge_reason


Expand All @@ -253,7 +254,7 @@ def duo_mfa_verify(duo_info, txid):
challenge_result = None

while True:
logging.debug("Waiting for MFA challenge response")
logger.debug("Waiting for MFA challenge response")
mfa_result = duo_api_post(url, payload=challenged_mfa)
verify_mfa = get_mfa_response(mfa_result)
challenge_result, challenge_reason = parse_challenge(
Expand All @@ -263,15 +264,15 @@ def duo_mfa_verify(duo_info, txid):
if challenge_result is None:
continue
elif challenge_result == "success":
logging.debug("Successful MFA challenge received")
logger.debug("Successful MFA challenge received")
break
elif challenge_result == "failure":
logging.critical(
logger.critical(
"MFA challenge has failed:" f" {challenge_reason}. Please try again."
)
sys.exit(2)
else:
logging.debug(
logger.debug(
f"MFA challenge result: {challenge_result}"
f"Reason: {challenge_reason}\n\n"
)
Expand Down Expand Up @@ -300,12 +301,12 @@ def duo_factor_callback(duo_info, verify_mfa):
f"{factor_callback.json()['response']['cookie']}:{duo_info['app_sig']}"
)
except Exception as sig_error:
logging.error(
logger.error(
"There was an error getting your application signature "
f"from Duo: \n{json.dumps(sig_error)}"
)

logging.debug("Completed factor callback.")
logger.debug("Completed factor callback.")
return sig_response


Expand Down Expand Up @@ -338,7 +339,7 @@ def authenticate_duo(selected_okta_factor):
try:
duo_info = prepare_duo_info(selected_okta_factor)
except KeyError as missing_key:
logging.error(
logger.error(
"There was an issue parsing the Okta factor."
f" Please try again. \n{missing_key}"
)
Expand All @@ -351,7 +352,7 @@ def authenticate_duo(selected_okta_factor):
)

mfa_option = factor_options[mfa_index]
logging.debug(f"Selected MFA is [{mfa_option}]")
logger.debug(f"Selected MFA is [{mfa_option}]")
passcode = set_passcode(mfa_option)

txid = duo_mfa_challenge(duo_info, mfa_option, passcode)
Expand Down
Loading

0 comments on commit c79c715

Please sign in to comment.