/
17_rotate_secrets.py
69 lines (46 loc) · 2.3 KB
/
17_rotate_secrets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import boto3
def lambda_handler(event, context):
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Setup the client
service_client = boto3.client('secretsmanager')
# Make sure the version is staged correctly
metadata = service_client.describe_secret(SecretId=arn)
if not metadata['RotationEnabled']:
raise ValueError("Secret %s is not enabled for rotation" % arn)
if step == "createSecret":
create_secret(service_client, arn, token)
elif step == "setSecret":
set_secret(service_client, arn, token)
elif step == "testSecret":
test_secret(service_client, arn, token)
elif step == "finishSecret":
finish_secret(service_client, arn, token)
else:
raise ValueError("Invalid step parameter")
def create_secret(service_client, arn, token):
service_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")
# Now try to get the secret version, if that fails, put a new secret
try:
service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
except service_client.exceptions.ResourceNotFoundException:
# Generate a random password
passwd = service_client.get_random_password(ExcludeCharacters='/@"\'\\')
# Put the secret
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=passwd['RandomPassword'], VersionStages=['AWSPENDING'])
def set_secret(service_client, arn, token):
print("No database user credentials to update...")
def test_secret(service_client, arn, token):
print("No need to testing against any service...")
def finish_secret(service_client, arn, token):
# First describe the secret to get the current version
metadata = service_client.describe_secret(SecretId=arn)
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
if version == token:
# The correct version is already marked as current, return
return
# Finalize by staging the secret version current
service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=version)
break