-
Notifications
You must be signed in to change notification settings - Fork 371
/
kv_v2.py
509 lines (436 loc) · 19.9 KB
/
kv_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
#!/usr/bin/env python
"""KvV2 methods module."""
import warnings
from hvac import exceptions, utils
from hvac.api.vault_api_base import VaultApiBase
DEFAULT_MOUNT_POINT = "secret"
class KvV2(VaultApiBase):
"""KV Secrets Engine - Version 2 (API).
Reference: https://www.vaultproject.io/api/secret/kv/kv-v2.html
"""
def configure(
self,
max_versions=10,
cas_required=None,
delete_version_after="0s",
mount_point=DEFAULT_MOUNT_POINT,
):
"""Configure backend level settings that are applied to every key in the key-value store.
Supported methods:
POST: /{mount_point}/config. Produces: 204 (empty body)
:param max_versions: The number of versions to keep per key. This value applies to all keys, but a key's
metadata setting can overwrite this value. Once a key has more than the configured allowed versions the
oldest version will be permanently deleted. Defaults to 10.
:type max_versions: int
:param cas_required: If true all keys will require the cas parameter to be set on all write requests.
:type cas_required: bool
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:param delete_version_after: Specifies the length of time before a version is deleted. Accepts Go duration format string.
Defaults to "0s" (i.e., disabled).
:type delete_version_after: str
:return: The response of the request.
:rtype: requests.Response
"""
params = {
"max_versions": max_versions,
"delete_version_after": delete_version_after,
}
if cas_required is not None:
params["cas_required"] = cas_required
api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point)
return self._adapter.post(
url=api_path,
json=params,
)
def read_configuration(self, mount_point=DEFAULT_MOUNT_POINT):
"""Read the KV Version 2 configuration.
Supported methods:
GET: /auth/{mount_point}/config. Produces: 200 application/json
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url(
"/v1/{mount_point}/config",
mount_point=mount_point,
)
return self._adapter.get(url=api_path)
def read_secret(
self, path, mount_point=DEFAULT_MOUNT_POINT, raise_on_deleted_version=None
):
"""Retrieve the secret at the specified location.
Equivalent to calling read_secret_version with version=None.
Supported methods:
GET: /{mount_point}/data/{path}. Produces: 200 application/json
:param path: Specifies the path of the secret to read. This is specified as part of the URL.
:type path: str | unicode
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:param raise_on_deleted_version: Changes the behavior when the requested version is deleted.
If True an exception will be raised.
If False, some metadata about the deleted secret is returned.
If None (pre-v3), a default of True will be used and a warning will be issued.
:type raise_on_deleted_version: bool
:return: The JSON response of the request.
:rtype: dict
"""
return self.read_secret_version(
path,
mount_point=mount_point,
raise_on_deleted_version=raise_on_deleted_version,
)
def read_secret_version(
self,
path,
version=None,
mount_point=DEFAULT_MOUNT_POINT,
raise_on_deleted_version=None,
):
"""Retrieve the secret at the specified location, with the specified version.
Supported methods:
GET: /{mount_point}/data/{path}. Produces: 200 application/json
:param path: Specifies the path of the secret to read. This is specified as part of the URL.
:type path: str | unicode
:param version: Specifies the version to return. If not set the latest version is returned.
:type version: int
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:param raise_on_deleted_version: Changes the behavior when the requested version is deleted.
If True an exception will be raised.
If False, some metadata about the deleted secret is returned.
If None (pre-v3), a default of True will be used and a warning will be issued.
:type raise_on_deleted_version: bool
:return: The JSON response of the request.
:rtype: dict
"""
if raise_on_deleted_version is None:
msg = (
"The raise_on_deleted parameter will change its default value to False in hvac v3.0.0. "
"The current default of True will presere previous behavior. "
"To use the old behavior with no warning, explicitly set this value to True. "
"See https://github.com/hvac/hvac/pull/907"
)
warnings.warn(
message=msg,
category=DeprecationWarning,
stacklevel=2,
)
raise_on_deleted_version = True
params = {}
if version is not None:
params["version"] = version
api_path = utils.format_url(
"/v1/{mount_point}/data/{path}", mount_point=mount_point, path=path
)
try:
return self._adapter.get(
url=api_path,
params=params,
)
except exceptions.InvalidPath as e:
if not raise_on_deleted_version:
try:
if (
e.json is not None
and e.json["data"]["metadata"]["deletion_time"] != ""
):
return e.json
except KeyError:
pass
raise
def create_or_update_secret(
self, path, secret, cas=None, mount_point=DEFAULT_MOUNT_POINT
):
"""Create a new version of a secret at the specified location.
If the value does not yet exist, the calling token must have an ACL policy granting the create capability. If
the value already exists, the calling token must have an ACL policy granting the update capability.
Supported methods:
POST: /{mount_point}/data/{path}. Produces: 200 application/json
:param path: Path
:type path: str | unicode
:param cas: Set the "cas" value to use a Check-And-Set operation. If not set the write will be allowed. If set
to 0 a write will only be allowed if the key doesn't exist. If the index is non-zero the write will only be
allowed if the key's current version matches the version specified in the cas parameter.
:type cas: int
:param secret: The contents of the "secret" dict will be stored and returned on read.
:type secret: dict
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
params = {
"options": {},
"data": secret,
}
if cas is not None:
params["options"]["cas"] = cas
api_path = utils.format_url(
"/v1/{mount_point}/data/{path}", mount_point=mount_point, path=path
)
return self._adapter.post(
url=api_path,
json=params,
)
def patch(self, path, secret, mount_point=DEFAULT_MOUNT_POINT):
"""Set or update data in the KV store without overwriting.
:param path: Path
:type path: str | unicode
:param secret: The contents of the "secret" dict will be stored and returned on read.
:type secret: dict
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the create_or_update_secret request.
:rtype: dict
"""
# First, do a read.
try:
current_secret_version = self.read_secret_version(
path=path,
mount_point=mount_point,
)
except exceptions.InvalidPath:
raise exceptions.InvalidPath(
'No value found at "{path}"; patch only works on existing data.'.format(
path=path
)
)
# Update existing secret dict.
patched_secret = current_secret_version["data"]["data"]
patched_secret.update(secret)
# Write back updated secret.
return self.create_or_update_secret(
path=path,
cas=current_secret_version["data"]["metadata"]["version"],
secret=patched_secret,
mount_point=mount_point,
)
def delete_latest_version_of_secret(self, path, mount_point=DEFAULT_MOUNT_POINT):
"""Issue a soft delete of the secret's latest version at the specified location.
This marks the version as deleted and will stop it from being returned from reads, but the underlying data will
not be removed. A delete can be undone using the undelete path.
Supported methods:
DELETE: /{mount_point}/data/{path}. Produces: 204 (empty body)
:param path: Specifies the path of the secret to delete. This is specified as part of the URL.
:type path: str | unicode
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url(
"/v1/{mount_point}/data/{path}", mount_point=mount_point, path=path
)
return self._adapter.delete(
url=api_path,
)
def delete_secret_versions(self, path, versions, mount_point=DEFAULT_MOUNT_POINT):
"""Issue a soft delete of the specified versions of the secret.
This marks the versions as deleted and will stop them from being returned from reads,
but the underlying data will not be removed. A delete can be undone using the
undelete path.
Supported methods:
POST: /{mount_point}/delete/{path}. Produces: 204 (empty body)
:param path: Specifies the path of the secret to delete. This is specified as part of the URL.
:type path: str | unicode
:param versions: The versions to be deleted. The versioned data will not be deleted, but it will no longer be
returned in normal get requests.
:type versions: int
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
if not isinstance(versions, list) or len(versions) == 0:
error_msg = 'argument to "versions" must be a list containing one or more integers, "{versions}" provided.'.format(
versions=versions
)
raise exceptions.ParamValidationError(error_msg)
params = {
"versions": versions,
}
api_path = utils.format_url(
"/v1/{mount_point}/delete/{path}", mount_point=mount_point, path=path
)
return self._adapter.post(
url=api_path,
json=params,
)
def undelete_secret_versions(self, path, versions, mount_point=DEFAULT_MOUNT_POINT):
"""Undelete the data for the provided version and path in the key-value store.
This restores the data, allowing it to be returned on get requests.
Supported methods:
POST: /{mount_point}/undelete/{path}. Produces: 204 (empty body)
:param path: Specifies the path of the secret to undelete. This is specified as part of the URL.
:type path: str | unicode
:param versions: The versions to undelete. The versions will be restored and their data will be returned on
normal get requests.
:type versions: list of int
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
if not isinstance(versions, list) or len(versions) == 0:
error_msg = 'argument to "versions" must be a list containing one or more integers, "{versions}" provided.'.format(
versions=versions
)
raise exceptions.ParamValidationError(error_msg)
params = {
"versions": versions,
}
api_path = utils.format_url(
"/v1/{mount_point}/undelete/{path}", mount_point=mount_point, path=path
)
return self._adapter.post(
url=api_path,
json=params,
)
def destroy_secret_versions(self, path, versions, mount_point=DEFAULT_MOUNT_POINT):
"""Permanently remove the specified version data and numbers for the provided path from the key-value store.
Supported methods:
POST: /{mount_point}/destroy/{path}. Produces: 204 (empty body)
:param path: Specifies the path of the secret to destroy.
This is specified as part of the URL.
:type path: str | unicode
:param versions: The versions to destroy. Their data will be
permanently deleted.
:type versions: list of int
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
if not isinstance(versions, list) or len(versions) == 0:
error_msg = 'argument to "versions" must be a list containing one or more integers, "{versions}" provided.'.format(
versions=versions
)
raise exceptions.ParamValidationError(error_msg)
params = {
"versions": versions,
}
api_path = utils.format_url(
"/v1/{mount_point}/destroy/{path}", mount_point=mount_point, path=path
)
return self._adapter.post(
url=api_path,
json=params,
)
def list_secrets(self, path, mount_point=DEFAULT_MOUNT_POINT):
"""Return a list of key names at the specified location.
Folders are suffixed with /. The input must be a folder; list on a file will not return a value. Note that no
policy-based filtering is performed on keys; do not encode sensitive information in key names. The values
themselves are not accessible via this command.
Supported methods:
LIST: /{mount_point}/metadata/{path}. Produces: 200 application/json
:param path: Specifies the path of the secrets to list. This is specified as part of the URL.
:type path: str | unicode
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url(
"/v1/{mount_point}/metadata/{path}", mount_point=mount_point, path=path
)
return self._adapter.list(
url=api_path,
)
def read_secret_metadata(self, path, mount_point=DEFAULT_MOUNT_POINT):
"""Retrieve the metadata and versions for the secret at the specified path.
Supported methods:
GET: /{mount_point}/metadata/{path}. Produces: 200 application/json
:param path: Specifies the path of the secret to read. This is specified as part of the URL.
:type path: str | unicode
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url(
"/v1/{mount_point}/metadata/{path}", mount_point=mount_point, path=path
)
return self._adapter.get(
url=api_path,
)
def update_metadata(
self,
path,
max_versions=None,
cas_required=None,
delete_version_after="0s",
mount_point=DEFAULT_MOUNT_POINT,
custom_metadata=None,
):
"""Updates the max_versions of cas_required setting on an existing path.
Supported methods:
POST: /{mount_point}/metadata/{path}. Produces: 204 (empty body)
:param path: Path
:type path: str | unicode
:param max_versions: The number of versions to keep per key. If not set, the backend's configured max version is
used. Once a key has more than the configured allowed versions the oldest version will be permanently
deleted.
:type max_versions: int
:param cas_required: If true the key will require the cas parameter to be set on all write requests. If false,
the backend's configuration will be used.
:type cas_required: bool
:param delete_version_after: Specifies the length of time before a version is deleted. Accepts Go duration format string.
Defaults to "0s" (i.e., disabled).
:type delete_version_after: str
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:param custom_metadata: A dictionary of key/value metadata to describe the secret. Requires Vault 1.9.0 or greater.
:type custom_metadata: dict
:return: The response of the request.
:rtype: requests.Response
"""
params = {
"delete_version_after": delete_version_after,
}
if max_versions is not None:
params["max_versions"] = max_versions
if cas_required is not None:
if not isinstance(cas_required, bool):
error_msg = (
"bool expected for cas_required param, {type} received".format(
type=type(cas_required)
)
)
raise exceptions.ParamValidationError(error_msg)
params["cas_required"] = cas_required
if custom_metadata is not None:
if not isinstance(custom_metadata, dict):
error_msg = (
"dict expected for custom_metadata param, {type} received".format(
type=type(custom_metadata)
)
)
raise exceptions.ParamValidationError(error_msg)
params["custom_metadata"] = custom_metadata
api_path = utils.format_url(
"/v1/{mount_point}/metadata/{path}", mount_point=mount_point, path=path
)
return self._adapter.post(
url=api_path,
json=params,
)
def delete_metadata_and_all_versions(self, path, mount_point=DEFAULT_MOUNT_POINT):
"""Delete (permanently) the key metadata and all version data for the specified key.
All version history will be removed.
Supported methods:
DELETE: /{mount_point}/metadata/{path}. Produces: 204 (empty body)
:param path: Specifies the path of the secret to delete. This is specified as part of the URL.
:type path: str | unicode
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url(
"/v1/{mount_point}/metadata/{path}", mount_point=mount_point, path=path
)
return self._adapter.delete(
url=api_path,
)