-
Notifications
You must be signed in to change notification settings - Fork 31
/
secret_resolvers.py
102 lines (76 loc) · 4.01 KB
/
secret_resolvers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# Copyright 2019 Adobe. All rights reserved.
# This file is licensed to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
# OF ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
import logging
from .simplessm import SimpleSSM
from .simples3 import SimpleS3
from .simplevault import SimpleVault
class SecretResolver:
def supports(self, secret_type):
return False
def resolve(self, secret_type, secret_params):
return None
def get_param_or_exception(self, key, params):
if key not in params:
raise Exception("Could not find required key '{}' in the secret params: {}".format(key, params))
return params[key]
class SSMSecretResolver(SecretResolver):
def __init__(self, default_aws_profile=None):
self.default_aws_profile = default_aws_profile
def supports(self, secret_type):
return secret_type == "ssm"
def resolve(self, secret_type, secret_params):
aws_profile = secret_params.get("aws_profile", self.default_aws_profile)
if not aws_profile:
raise Exception(
"Could not find the aws_profile in the secret params for SSM secret: {}".format(secret_params))
path = self.get_param_or_exception("path", secret_params)
region_name = secret_params.get("region_name", "us-east-1")
ssm = SimpleSSM(aws_profile, region_name)
return ssm.get(path)
class S3SecretResolver(SecretResolver):
def __init__(self, default_aws_profile=None):
self.default_aws_profile = default_aws_profile
def supports(self, secret_type):
return secret_type == "s3"
def resolve(self, secret_type, secret_params):
aws_profile = secret_params.get("aws_profile", self.default_aws_profile)
if not aws_profile:
raise Exception(
"Could not find the aws_profile in the secret params for S3 secret: {}".format(secret_params))
bucket = self.get_param_or_exception("bucket", secret_params)
path = self.get_param_or_exception("path", secret_params)
region_name = secret_params.get("region_name", "us-east-1")
base64Encode = secret_params.get("base64encode", False)
base64Encode = base64Encode == 'true'
s3 = SimpleS3(aws_profile, region_name)
return s3.get(bucket, path, base64Encode)
class VaultSecretResolver(SecretResolver):
def supports(self, secret_type):
return secret_type == "vault"
def resolve(self, secret_type, secret_params):
vault = SimpleVault
# Generate a token for a policy
if "token_policy" in secret_params.keys():
policy = self.get_param_or_exception("token_policy", secret_params)
return vault().get_token(policy)
# Retrieve secret from vault path
if "path" in secret_params.keys():
path = self.get_param_or_exception("path", secret_params)
return vault().get_path(path)
class AggregatedSecretResolver(SecretResolver):
def __init__(self, default_aws_profile=None):
self.secret_resolvers = (SSMSecretResolver(default_aws_profile), S3SecretResolver(default_aws_profile),
VaultSecretResolver())
def supports(self, secret_type):
return any([resolver.supports(secret_type) for resolver in self.secret_resolvers])
def resolve(self, secret_type, secret_params):
for resolver in self.secret_resolvers:
if resolver.supports(secret_type):
return resolver.resolve(secret_type, secret_params)
raise Exception("Could not resolve secret type '{}' with params {}".format(secret_type, secret_params))