Skip to content

Commit

Permalink
Get Airflow Variables from AWS Systems Manager Parameter Store (#7945)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7239d9a)
  • Loading branch information
kaxil committed Mar 30, 2020
1 parent 6578d82 commit f1108c1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
30 changes: 28 additions & 2 deletions airflow/contrib/secrets/aws_systems_manager.py
Expand Up @@ -29,7 +29,7 @@

class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
"""
Retrieves Connection object from AWS SSM Parameter Store
Retrieves Connection or Variables from AWS SSM Parameter Store
Configurable via ``airflow.cfg`` like so:
Expand All @@ -41,15 +41,19 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
For example, if ssm path is ``/airflow/connections/smtp_default``, this would be accessible
if you provide ``{"connections_prefix": "/airflow/connections"}`` and request conn_id ``smtp_default``.
And if ssm path is ``/airflow/variables/hello``, this would be accessible
if you provide ``{"variables_prefix": "/airflow/variables"}`` and request conn_id ``hello``.
"""

def __init__(
self,
connections_prefix='/airflow/connections', # type: str
variables_prefix='/airflow/variables', # type: str
profile_name=None, # type: Optional[str]
**kwargs
):
self.connections_prefix = connections_prefix.rstrip("/")
self.variables_prefix = variables_prefix.rstrip('/')
self.profile_name = profile_name
super(SystemsManagerParameterStoreBackend, self).__init__(**kwargs)

Expand All @@ -71,7 +75,29 @@ def get_conn_uri(self, conn_id):
:rtype: str
"""

ssm_path = self.build_path(self.connections_prefix, conn_id)
return self._get_secret(self.connections_prefix, conn_id)

def get_variable(self, key):
# type: (str) -> Optional[str]
"""
Get Airflow Variable from Environment Variable
:param key: Variable Key
:return: Variable Value
"""
return self._get_secret(self.variables_prefix, key)

def _get_secret(self, path_prefix, secret_id):
# type: (str, str) -> Optional[str]
"""
Get secret value from Parameter Store.
:param path_prefix: Prefix for the Path to get Secret
:type path_prefix: str
:param secret_id: Secret Key
:type secret_id: str
"""
ssm_path = self.build_path(path_prefix, secret_id)
try:
response = self.client.get_parameter(
Name=ssm_path, WithDecryption=False
Expand Down
14 changes: 13 additions & 1 deletion docs/howto/use-alternative-secrets-backend.rst
Expand Up @@ -65,8 +65,12 @@ Here is a sample configuration:
.. code-block:: ini
[secrets]
backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "profile_name": "default"}
backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "profile_name": "default"}
Storing and Retrieving Connections
""""""""""""""""""""""""""""""""""

If you have set ``connections_prefix`` as ``/airflow/connections``, then for a connection id of ``smtp_default``,
you would want to store your connection at ``/airflow/connections/smtp_default``.
Expand All @@ -76,6 +80,14 @@ Optionally you can supply a profile name to reference aws profile, e.g. defined
The value of the SSM parameter must be the :ref:`connection URI representation <generating_connection_uri>`
of the connection object.

Storing and Retrieving Variables
""""""""""""""""""""""""""""""""

If you have set ``variables_prefix`` as ``/airflow/variables``, then for an Variable key of ``hello``,
you would want to store your Variable at ``/airflow/variables/hello``.

Optionally you can supply a profile name to reference aws profile, e.g. defined in ``~/.aws/config``.

.. _hashicorp_vault_secrets:

Hashicorp Vault Secrets Backend
Expand Down
31 changes: 31 additions & 0 deletions tests/contrib/secrets/test_aws_systems_manager.py
Expand Up @@ -65,3 +65,34 @@ def test_get_conn_uri_non_existent_key(self):

self.assertIsNone(ssm_backend.get_conn_uri(conn_id=conn_id))
self.assertEqual([], ssm_backend.get_connections(conn_id=conn_id))

@mock_ssm
def test_get_variable(self):
param = {
'Name': '/airflow/variables/hello',
'Type': 'String',
'Value': 'world'
}

ssm_backend = SystemsManagerParameterStoreBackend()
ssm_backend.client.put_parameter(**param)

returned_uri = ssm_backend.get_variable('hello')
self.assertEqual('world', returned_uri)

@mock_ssm
def test_get_variable_non_existent_key(self):
"""
Test that if Variable key is not present in SSM,
SystemsManagerParameterStoreBackend.get_variables should return None
"""
param = {
'Name': '/airflow/variables/hello',
'Type': 'String',
'Value': 'world'
}

ssm_backend = SystemsManagerParameterStoreBackend()
ssm_backend.client.put_parameter(**param)

self.assertIsNone(ssm_backend.get_variable("test_mysql"))

0 comments on commit f1108c1

Please sign in to comment.