Skip to content

Commit

Permalink
Merge 57bb30f into fa346f8
Browse files Browse the repository at this point in the history
  • Loading branch information
Kurotrup committed Jun 29, 2022
2 parents fa346f8 + 57bb30f commit c73b56f
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 13 deletions.
6 changes: 3 additions & 3 deletions tokendito/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@
logger = logging.getLogger(__name__)


def authenticate_to_roles(secret_session_token, url):
def authenticate_to_roles(secret_session_token, url, cookies=None):
"""Authenticate AWS user with saml.
:param secret_session_token: secret session token
:param url: url of the AWS account
:param cookies: html cookies
:return: response text
"""
payload = {"onetimetoken": secret_session_token}
logger.debug(f"Authenticate AWS user with SAML URL [{url}]")
try:
response = requests.get(url, params=payload)
response = requests.get(url, params=payload, cookies=cookies)
saml_response_string = response.text
if response.status_code == 400 or response.status_code == 401:
errmsg = "Invalid Credentials."
Expand Down Expand Up @@ -181,7 +182,6 @@ def select_assumeable_role(saml_response_string, saml):
roles_and_providers = user.extract_arns(saml)
role_arn = user.select_role_arn(list(roles_and_providers.keys()), saml, saml_response_string)
role_name = role_arn.split("/")[-1]

assume_role_response = assume_role(role_arn, roles_and_providers[role_arn], saml)

return assume_role_response, role_name
19 changes: 17 additions & 2 deletions tokendito/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
"""This module retrieves AWS credentials after authenticating with Okta."""
import logging
import sys

from tokendito import aws
from tokendito import config
Expand All @@ -19,7 +20,7 @@ def cli(args):
user.setup_logging(config.user)
logger.debug(f"Final configuration is {config}")

user.process_okta_app_url()
user.process_okta_org_url(config)

logger.debug("Set Okta credentials.")
user.set_okta_username()
Expand All @@ -32,9 +33,23 @@ def cli(args):
config.okta["org"], config.okta["username"], config.okta["password"]
)

session_cookies = None

if config.okta["app_url"]:
if not user.validate_okta_app_url(config.okta["app_url"]):
logger.error(
"Okta Application URL not found, or invalid. Please check "
"your configuration and try again."
)
sys.exit(2)
else:
session_cookies = user.request_cookies(config.okta["org"], secret_session_token)
config.okta["app_url"] = user.discover_app_url(config.okta["org"], session_cookies)

saml_response_string, saml_xml = aws.authenticate_to_roles(
secret_session_token, config.okta["app_url"]
secret_session_token, config.okta["app_url"], cookies=session_cookies
)

assume_role_response, role_name = aws.select_assumeable_role(saml_response_string, saml_xml)

aws.ensure_keys_work(assume_role_response)
Expand Down
125 changes: 117 additions & 8 deletions tokendito/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,24 +575,25 @@ def process_environment(prefix="tokendito"):
return config_env


def process_okta_app_url():
"""Process Okta app url.
def process_okta_app_url(config_obj):
"""
Validate okta app url, and extract okta org url from it.
:param app_url: string with okta tile URL.
:return: None.
:param config_obj: configuration object
:returns: None
"""
if not validate_okta_app_url(config.okta["app_url"]):
if not validate_okta_app_url(config_obj.okta["app_url"]):
logger.error(
"Okta Application URL not found, or invalid. Please check "
"your configuration and try again."
)
sys.exit(2)

url = urlparse(config.okta["app_url"])
url = urlparse(config_obj.okta["app_url"])
okta_org = f"{url.scheme}://{url.netloc}"
okta_aws_app_url = f"{okta_org}{url.path}"
config.okta["org"] = okta_org
config.okta["app_url"] = okta_aws_app_url
config_obj.okta["org"] = okta_org
config_obj.okta["app_url"] = okta_aws_app_url


def user_configuration_input():
Expand Down Expand Up @@ -841,3 +842,111 @@ def process_options(args):
config.update(config_ini)
config.update(config_env)
config.update(config_args)


def process_okta_org_url(config_obj):
"""
Extract okta org url from app url, or request it from user.
:param config_obj: configuration object
:returns: None
"""
if not config_obj.okta["app_url"] and not config_obj.okta["org"]:
while not config_obj.okta["org"]:
user_input = input("Please set okta org url:")
config_obj.okta["org"] = user_input

elif config_obj.okta["app_url"] and not config_obj.okta["org"]:
process_okta_app_url(config_obj)


def request_cookies(url, session_token):
"""
Request session cookie.
:param url: okta org url, str
:param session_token: session token, str
:returns: cookies object
"""
url = f"{url}/api/v1/sessions"
data = json.dumps({"sessionToken": f"{session_token}"})

response_with_cookie = make_request(method="POST", url=url, data=data)
sesh_id = response_with_cookie.json()["id"]

cookies = response_with_cookie.cookies
cookies.update({"sid": f"{sesh_id}"})

return cookies


def discover_app_url(url, cookies):
"""
Discover aws app url on user`s okta dashboard.
:param url: okta org url
:param cookies: HTML cookies
:returns: aws app url. str
"""
url = f"{url}/api/v1/users/me/home/tabs"
params = {
"type": "all",
"expand": ["items", "items.resource"],
}

response_with_tabs = make_request(method="GET", url=url, cookies=cookies, params=params)
tabs = response_with_tabs.json()

aws_apps = []
for tab in tabs:
for app in tab["_embedded"]["items"]:
if "amazon_aws" in app["_embedded"]["resource"]["linkUrl"]:
aws_apps.append(app["_embedded"]["resource"])

if not aws_apps:
logger.error("AWS app url not found please set url and try again")
sys.exit(2)

app_url = process_urls_pool(aws_apps) if len(aws_apps) > 1 else aws_apps[0]["linkUrl"]

return app_url


def make_request(method, url, headers=None, **kwargs):
"""
Wrap 'requests.request' and perform response checks.
:param method: request method
:param url: request URL
:param headers: request headers
:param kwargs: additional parameters passed to request
:returns: response object
"""
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}

response = requests.request(method=method, url=url, headers=headers, **kwargs)

if response.status_code != 200:
logger.error(
f"Your {method} request failed with status_code {response.status_code}.\n"
f"{response.content}\n"
)

return response


def process_urls_pool(apps_list):
"""
Ask user to choose aws url.
:param apps_list: list of apps objects
:returns: app url, str
"""
print("You have more than one AWS app, please select one:\n")

for i, app in enumerate(apps_list):
print(f"[{i}] {app['label']}: {app['linkUrl']}")
ind = collect_integer(len(apps_list))

return apps_list[ind]["linkUrl"]

0 comments on commit c73b56f

Please sign in to comment.