-
Notifications
You must be signed in to change notification settings - Fork 13.7k
/
vault.py
260 lines (234 loc) · 11.4 KB
/
vault.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Objects relating to sourcing connections & variables from Hashicorp Vault"""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin
class VaultBackend(BaseSecretsBackend, LoggingMixin):
"""
Retrieves Connections and Variables from Hashicorp Vault.
Configurable via ``airflow.cfg`` as follows:
.. code-block:: ini
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
"connections_path": "connections",
"url": "http://127.0.0.1:8200",
"mount_point": "airflow"
}
For example, if your keys are under ``connections`` path in ``airflow`` mount_point, this
would be accessible if you provide ``{"connections_path": "connections"}`` and request
conn_id ``smtp_default``.
:param connections_path: Specifies the path of the secret to read to get Connections.
(default: 'connections'). If set to None (null), requests for connections will not be sent to Vault.
:param variables_path: Specifies the path of the secret to read to get Variable.
(default: 'variables'). If set to None (null), requests for variables will not be sent to Vault.
:param config_path: Specifies the path of the secret to read Airflow Configurations
(default: 'config'). If set to None (null), requests for configurations will not be sent to Vault.
:param url: Base URL for the Vault instance being addressed.
:param auth_type: Authentication Type for Vault. Default is ``token``. Available values are:
('approle', 'aws_iam', 'azure', 'github', 'gcp', 'kubernetes', 'ldap', 'radius', 'token', 'userpass')
:param auth_mount_point: It can be used to define mount_point for authentication chosen
Default depends on the authentication method used.
:param mount_point: The "path" the secret engine was mounted on. Default is "secret". Note that
this mount_point is not used for authentication if authentication is done via a
different engine. If set to None, the mount secret should be provided as a prefix for each
variable/connection_id. For authentication mount_points see, auth_mount_point.
:param kv_engine_version: Select the version of the engine to run (``1`` or ``2``, default: ``2``).
:param token: Authentication token to include in requests sent to Vault.
(for ``token`` and ``github`` auth_type)
:param token_path: path to file containing authentication token to include in requests sent to Vault
(for ``token`` and ``github`` auth_type).
:param username: Username for Authentication (for ``ldap`` and ``userpass`` auth_type).
:param password: Password for Authentication (for ``ldap`` and ``userpass`` auth_type).
:param key_id: Key ID for Authentication (for ``aws_iam`` and ''azure`` auth_type).
:param secret_id: Secret ID for Authentication (for ``approle``, ``aws_iam`` and ``azure`` auth_types).
:param role_id: Role ID for Authentication (for ``approle``, ``aws_iam`` auth_types).
:param kubernetes_role: Role for Authentication (for ``kubernetes`` auth_type).
:param kubernetes_jwt_path: Path for kubernetes jwt token (for ``kubernetes`` auth_type, default:
``/var/run/secrets/kubernetes.io/serviceaccount/token``).
:param gcp_key_path: Path to Google Cloud Service Account key file (JSON) (for ``gcp`` auth_type).
Mutually exclusive with gcp_keyfile_dict.
:param gcp_keyfile_dict: Dictionary of keyfile parameters. (for ``gcp`` auth_type).
Mutually exclusive with gcp_key_path.
:param gcp_scopes: Comma-separated string containing OAuth2 scopes (for ``gcp`` auth_type).
:param azure_tenant_id: The tenant id for the Azure Active Directory (for ``azure`` auth_type).
:param azure_resource: The configured URL for the application registered in Azure Active Directory
(for ``azure`` auth_type).
:param radius_host: Host for radius (for ``radius`` auth_type).
:param radius_secret: Secret for radius (for ``radius`` auth_type).
:param radius_port: Port for radius (for ``radius`` auth_type).
"""
def __init__(
self,
connections_path: str = "connections",
variables_path: str = "variables",
config_path: str = "config",
url: str | None = None,
auth_type: str = "token",
auth_mount_point: str | None = None,
mount_point: str | None = "secret",
kv_engine_version: int = 2,
token: str | None = None,
token_path: str | None = None,
username: str | None = None,
password: str | None = None,
key_id: str | None = None,
secret_id: str | None = None,
role_id: str | None = None,
kubernetes_role: str | None = None,
kubernetes_jwt_path: str = "/var/run/secrets/kubernetes.io/serviceaccount/token",
gcp_key_path: str | None = None,
gcp_keyfile_dict: dict | None = None,
gcp_scopes: str | None = None,
azure_tenant_id: str | None = None,
azure_resource: str | None = None,
radius_host: str | None = None,
radius_secret: str | None = None,
radius_port: int | None = None,
**kwargs,
):
super().__init__()
if connections_path is not None:
self.connections_path = connections_path.rstrip("/")
else:
self.connections_path = connections_path
if variables_path is not None:
self.variables_path = variables_path.rstrip("/")
else:
self.variables_path = variables_path
if config_path is not None:
self.config_path = config_path.rstrip("/")
else:
self.config_path = config_path
self.mount_point = mount_point
self.kv_engine_version = kv_engine_version
self.vault_client = _VaultClient(
url=url,
auth_type=auth_type,
auth_mount_point=auth_mount_point,
mount_point=mount_point,
kv_engine_version=kv_engine_version,
token=token,
token_path=token_path,
username=username,
password=password,
key_id=key_id,
secret_id=secret_id,
role_id=role_id,
kubernetes_role=kubernetes_role,
kubernetes_jwt_path=kubernetes_jwt_path,
gcp_key_path=gcp_key_path,
gcp_keyfile_dict=gcp_keyfile_dict,
gcp_scopes=gcp_scopes,
azure_tenant_id=azure_tenant_id,
azure_resource=azure_resource,
radius_host=radius_host,
radius_secret=radius_secret,
radius_port=radius_port,
**kwargs,
)
def _parse_path(self, secret_path: str) -> tuple[str | None, str | None]:
if not self.mount_point:
split_secret_path = secret_path.split("/", 1)
if len(split_secret_path) < 2:
return None, None
return split_secret_path[0], split_secret_path[1]
else:
return "", secret_path
def get_response(self, conn_id: str) -> dict | None:
"""
Get data from Vault
:return: The data from the Vault path if exists
"""
mount_point, conn_key = self._parse_path(conn_id)
if self.connections_path is None or conn_key is None:
return None
secret_path = self.build_path(self.connections_path, conn_key)
return self.vault_client.get_secret(
secret_path=(mount_point + "/" if mount_point else "") + secret_path
)
def get_conn_uri(self, conn_id: str) -> str | None:
"""
Get serialized representation of connection
:param conn_id: The connection id
:return: The connection uri retrieved from the secret
"""
# Since VaultBackend implements `get_connection`, `get_conn_uri` is not used. So we
# don't need to implement (or direct users to use) method `get_conn_value` instead
warnings.warn(
f"Method `{self.__class__.__name__}.get_conn_uri` is deprecated and will be removed "
"in a future release.",
DeprecationWarning,
stacklevel=2,
)
response = self.get_response(conn_id)
return response.get("conn_uri") if response else None
# Make sure connection is imported this way for type checking, otherwise when importing
# the backend it will get a circular dependency and fail
if TYPE_CHECKING:
from airflow.models.connection import Connection
def get_connection(self, conn_id: str) -> Connection | None:
"""
Get connection from Vault as secret. Prioritize conn_uri if exists,
if not fall back to normal Connection creation.
:return: A Connection object constructed from Vault data
"""
# The Connection needs to be locally imported because otherwise we get into cyclic import
# problems when instantiating the backend during configuration
from airflow.models.connection import Connection
response = self.get_response(conn_id)
if response is None:
return None
uri = response.get("conn_uri")
if uri:
return Connection(conn_id, uri=uri)
return Connection(conn_id, **response)
def get_variable(self, key: str) -> str | None:
"""
Get Airflow Variable
:param key: Variable Key
:return: Variable Value retrieved from the vault
"""
mount_point, variable_key = self._parse_path(key)
if self.variables_path is None or variable_key is None:
return None
else:
secret_path = self.build_path(self.variables_path, variable_key)
response = self.vault_client.get_secret(
secret_path=(mount_point + "/" if mount_point else "") + secret_path
)
return response.get("value") if response else None
def get_config(self, key: str) -> str | None:
"""
Get Airflow Configuration
:param key: Configuration Option Key
:return: Configuration Option Value retrieved from the vault
"""
mount_point, config_key = self._parse_path(key)
if self.config_path is None or config_key is None:
return None
else:
secret_path = self.build_path(self.config_path, config_key)
response = self.vault_client.get_secret(
secret_path=(mount_point + "/" if mount_point else "") + secret_path
)
return response.get("value") if response else None