-
Notifications
You must be signed in to change notification settings - Fork 16
/
AwsHelpers.py
132 lines (119 loc) · 4.54 KB
/
AwsHelpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import boto3
import botocore
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
NoCredentialsError,
ProfileNotFound,
)
from lib.config.configuration import assume_role_duration
def assume_role(logger, aws_account_number, role_name, duration=assume_role_duration):
"""
Assumes the provided role in each account and returns the session
:param aws_account_number: AWS Account Number
:param role_name: Role to assume in target account
:return: Assumed Role Credentials
"""
logger.info("Assuming IAM Role: %s (%s)", role_name, aws_account_number)
sts_session = boto3.session.Session()
sts_client = sts_session.client(
"sts", config=botocore.config.Config(max_pool_connections=100)
)
try:
# Get the current partition
partition = sts_client.get_caller_identity()["Arn"].split(":")[1]
response = sts_client.assume_role(
RoleArn="arn:{}:iam::{}:role/{}".format(
partition,
aws_account_number,
role_name,
),
RoleSessionName="MetaHub",
DurationSeconds=duration,
)
except (ClientError, NoCredentialsError) as e:
logger.error("Error assuming IAM role: {}".format(e))
exit(1)
# Session
logger.info(
"Getting session for assumed IAM Role: %s (%s)", role_name, aws_account_number
)
Credentials = response["Credentials"]
access_key = Credentials["AccessKeyId"]
secret_key = Credentials["SecretAccessKey"]
session_token = Credentials["SessionToken"]
try:
boto3_session = boto3.session.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token,
)
except ClientError as e:
logger.error("Error getting session for assumed IAM role: {}".format(e))
exit(1)
return boto3_session
def get_account_id(logger, sess=None, profile=None):
account_id = None
sts = get_boto3_client(logger, "sts", "us-east-1", sess, profile)
try:
account_id = sts.get_caller_identity().get("Account")
except (NoCredentialsError, ClientError, EndpointConnectionError) as e:
logger.error("Error getting account ID: {}".format(e))
return account_id
def get_region(logger):
my_region = None
try:
my_session = boto3.session.Session()
my_region = my_session.region_name
except (NoCredentialsError, ClientError) as e:
logger.error("Error getting region: {}".format(e))
return my_region
def get_available_regions(logger, aws_service):
try:
my_session = boto3.session.Session()
available_regions = my_session.get_available_regions(aws_service)
except (NoCredentialsError, ClientError) as e:
logger.error(
"Error getting available regions for Service {}: {}".format(aws_service, e)
)
exit(1)
return available_regions
def get_account_alias(logger, aws_account_number, role_name=None, profile=None):
logger.info("Getting account alias for account {}".format(aws_account_number))
aliases = ""
local_account = get_account_id(logger, sess=None, profile=profile)
if aws_account_number != local_account and not role_name:
logger.warning(
"Can't get account alias for account {}, not --mh-assume-role provided".format(
aws_account_number
)
)
return aliases
if role_name and aws_account_number:
sess = assume_role(logger, aws_account_number, role_name)
else:
sess = None
iam_client = get_boto3_client(logger, "iam", "us-east-1", sess, profile)
try:
aliases = iam_client.list_account_aliases()["AccountAliases"][0]
except (NoCredentialsError, ClientError, EndpointConnectionError) as e:
logger.error("Error getting account alias: {}".format(e))
except IndexError:
logger.info("No account alias found")
return aliases
def get_boto3_client(logger, service, region, sess, profile=None):
if sess:
return sess.client(service_name=service, region_name=region)
if profile:
try:
return boto3.Session(profile_name=profile).client(
service_name=service, region_name=region
)
except ProfileNotFound as e:
logger.error(
"Error getting boto3 client using AWS profile (check --sh-profile): {}".format(
e
)
)
exit(1)
return boto3.client(service, region_name=region)