/
plugin.py
156 lines (136 loc) · 5.11 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
.. module: lemur.plugins.lemur_vault_dest.plugin
:platform: Unix
:copyright: (c) 2019
:license: Apache, see LICENCE for more details.
Plugin for uploading certificates and private key as secret to hashi vault
that can be pulled down by end point nodes.
.. moduleauthor:: Christopher Jolley <chris@alwaysjolley.com>
"""
import hvac
from flask import current_app
from lemur.common.defaults import common_name
from lemur.common.utils import parse_certificate
from lemur.plugins.bases import DestinationPlugin
from cryptography import x509
from cryptography.hazmat.backends import default_backend
class VaultDestinationPlugin(DestinationPlugin):
"""Hashicorp Vault Destination plugin for Lemur"""
title = 'Vault'
slug = 'hashi-vault-destination'
description = 'Allow the uploading of certificates to Hashi Vault as secret'
author = 'Christopher Jolley'
author_url = 'https://github.com/alwaysjolley/lemur'
options = [
{
'name': 'vaultUrl',
'type': 'str',
'required': True,
'validation': '^https?://[a-zA-Z0-9.:-]+$',
'helpMessage': 'Valid URL to Hashi Vault instance'
},
{
'name': 'vaultAuthTokenFile',
'type': 'str',
'required': True,
'validation': '(/[^/]+)+',
'helpMessage': 'Must be a valid file path!'
},
{
'name': 'vaultMount',
'type': 'str',
'required': True,
'validation': '^\S+$',
'helpMessage': 'Must be a valid Vault secrets mount name!'
},
{
'name': 'vaultPath',
'type': 'str',
'required': True,
'validation': '^([a-zA-Z0-9_-]+/?)+$',
'helpMessage': 'Must be a valid Vault secrets path'
},
{
'name': 'objectName',
'type': 'str',
'required': False,
'validation': '[0-9a-zA-Z:_-]+',
'helpMessage': 'Name to bundle certs under, if blank use cn'
},
{
'name': 'bundleChain',
'type': 'select',
'value': 'cert only',
'available': [
'Nginx',
'Apache',
'no chain'
],
'required': True,
'helpMessage': 'Bundle the chain into the certificate'
}
]
def __init__(self, *args, **kwargs):
super(VaultDestinationPlugin, self).__init__(*args, **kwargs)
def upload(self, name, body, private_key, cert_chain, options, **kwargs):
"""
Upload certificate and private key
:param private_key:
:param cert_chain:
:return:
"""
cname = common_name(parse_certificate(body))
url = self.get_option('vaultUrl', options)
token_file = self.get_option('vaultAuthTokenFile', options)
mount = self.get_option('vaultMount', options)
path = self.get_option('vaultPath', options)
bundle = self.get_option('bundleChain', options)
obj_name = self.get_option('objectName', options)
with open(token_file, 'r') as file:
token = file.readline().rstrip('\n')
client = hvac.Client(url=url, token=token)
if obj_name:
path = '{0}/{1}'.format(path, obj_name)
else:
path = '{0}/{1}'.format(path, cname)
secret = get_secret(url, token, mount, path)
secret['data'][cname] = {}
if bundle == 'Nginx' and cert_chain:
secret['data'][cname]['crt'] = '{0}\n{1}'.format(body, cert_chain)
elif bundle == 'Apache' and cert_chain:
secret['data'][cname]['crt'] = body
secret['data'][cname]['chain'] = cert_chain
else:
secret['data'][cname]['crt'] = body
secret['data'][cname]['key'] = private_key
san_list = get_san_list(body)
if isinstance(san_list, list):
secret['data'][cname]['san'] = san_list
try:
client.secrets.kv.v1.create_or_update_secret(
path=path, mount_point=mount, secret=secret['data'])
except ConnectionError as err:
current_app.logger.exception(
"Exception uploading secret to vault: {0}".format(err), exc_info=True)
def get_san_list(body):
""" parse certificate for SAN names and return list, return empty list on error """
san_list = []
try:
byte_body = body.encode('utf-8')
cert = x509.load_pem_x509_certificate(byte_body, default_backend())
ext = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
san_list = ext.value.get_values_for_type(x509.DNSName)
except x509.extensions.ExtensionNotFound:
pass
finally:
return san_list
def get_secret(url, token, mount, path):
""" retreiive existing data from mount path and return dictionary """
result = {'data': {}}
try:
client = hvac.Client(url=url, token=token)
result = client.secrets.kv.v1.read_secret(path=path, mount_point=mount)
except ConnectionError:
pass
finally:
return result