-
Notifications
You must be signed in to change notification settings - Fork 0
/
createAttachPolicyToDestAcct.py
127 lines (108 loc) · 3.67 KB
/
createAttachPolicyToDestAcct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import boto3
import json
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
file_handler = logging.FileHandler('createAttachPolicyToDestAcct.log')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
iam = boto3.client("iam")
#Creating a policy with PolicyName of 'S3InvDestAccountPolicy'
policyName='S3InvDestAccountPolicy'
# method to determine accountID. I will be using this account Id as the destination account Id.
def getUserName():
sts = boto3.client('sts')
user = sts.get_caller_identity()['Arn'].split(":")[5].split("/")[1]
logger.info(f"IAM user name is {user}")
return(user)
def create_iam_policy(policyName):
# Create IAM client
iam = boto3.client('iam')
InLinePolicy = {}
# Create a custom managedpolicy
my_managed_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:ListAllMyBuckets"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::*"
]
},
{
"Effect": "Allow",
"Action": [
"organizations:Describe*",
"organizations:List*"
],
"Resource": "*"
},
{
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Effect": "Allow",
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": [
"s3:PutBucketPublicAccessBlock",
"s3:GetInventoryConfiguration",
"s3:PutBucketPolicy",
"s3:CreateBucket",
"s3:ListBucket",
"iam:GetUser",
"s3:GetBucketLocation",
"s3:GetBucketPolicy",
"s3:PutInventoryConfiguration"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::*:role/OrgS3role"
}
]
}
try:
InLinePolicy = iam.create_policy(
PolicyName = policyName,
PolicyDocument=json.dumps(my_managed_policy)
)
except iam.exceptions.EntityAlreadyExistsException:
logger.info(f"Policy already exists")
# List Policies and extract policy created using create_iam_policy()
def list_policies(policyName):
policyArn = ''
paginator = iam.get_paginator('list_policies')
for response in paginator.paginate(Scope="Local"):
for policy in response["Policies"]:
if policy['PolicyName'] == policyName:
policyArn = policy['Arn']
logger.info(f"Policy Name: {policy['PolicyName']} ARN: {policyArn}")
return policyArn
#Attached policy to user profile
def attach_user_policy(policy_arn, userName):
try:
attachedUserPolicy = iam.attach_user_policy(
UserName=userName,
PolicyArn=policy_arn
)
except ClientError as e:
logger.error(f"Error {e} while attaching {policy_arn} to the user {userName}")
def main():
# track the user name
userName = getUserName()
create_iam_policy(policyName)
policy_arn = list_policies(policyName)
attach_user_policy(policy_arn, userName)
if __name__ == "__main__":
main()