Skip to content

Commit

Permalink
Get Airflow Variables from AWS Systems Manager Parameter Store
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil committed Mar 28, 2020
1 parent eb4af4f commit f3738c7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
28 changes: 26 additions & 2 deletions airflow/providers/amazon/aws/secrets/systems_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
"""
Retrieves Connection object from AWS SSM Parameter Store
Retrieves Connection or Variables from AWS SSM Parameter Store
Configurable via ``airflow.cfg`` like so:
Expand All @@ -41,15 +41,19 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
For example, if ssm path is ``/airflow/connections/smtp_default``, this would be accessible
if you provide ``{"connections_prefix": "/airflow/connections"}`` and request conn_id ``smtp_default``.
And if ssm path is ``/airflow/variables/hello``, this would be accessible
if you provide ``{"variables_prefix": "/airflow/variables"}`` and request conn_id ``hello``.
"""

def __init__(
self,
connections_prefix: str = '/airflow/connections',
variables_prefix: str = '/airflow/variables',
profile_name: Optional[str] = None,
**kwargs
):
self.connections_prefix = connections_prefix.rstrip("/")
self.variables_prefix = variables_prefix.rstrip('/')
self.profile_name = profile_name
super().__init__(**kwargs)

Expand All @@ -69,7 +73,27 @@ def get_conn_uri(self, conn_id: str) -> Optional[str]:
:type conn_id: str
"""

ssm_path = self.build_path(self.connections_prefix, conn_id)
return self._get_secret(self.connections_prefix, conn_id)

def get_variable(self, key: str) -> Optional[str]:
"""
Get Airflow Variable from Environment Variable
:param key: Variable Key
:return: Variable Value
"""
return self._get_secret(self.variables_prefix, key)

def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[str]:
"""
Get secret value from Parameter Store.
:param path_prefix: Prefix for the Path to get Secret
:type path_prefix: str
:param secret_id: Secret Key
:type secret_id: str
"""
ssm_path = self.build_path(path_prefix, secret_id)
try:
response = self.client.get_parameter(
Name=ssm_path, WithDecryption=False
Expand Down
13 changes: 12 additions & 1 deletion docs/howto/use-alternative-secrets-backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ Here is a sample configuration:
[secrets]
backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "profile_name": "default"}
backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "profile_name": "default"}
Storing and Retrieving Connections
""""""""""""""""""""""""""""""""""

If you have set ``connections_prefix`` as ``/airflow/connections``, then for a connection id of ``smtp_default``,
you would want to store your connection at ``/airflow/connections/smtp_default``.
Expand All @@ -76,6 +79,14 @@ Optionally you can supply a profile name to reference aws profile, e.g. defined
The value of the SSM parameter must be the :ref:`connection URI representation <generating_connection_uri>`
of the connection object.

Storing and Retrieving Variables
""""""""""""""""""""""""""""""""

If you have set ``variables_prefix`` as ``/airflow/variables``, then for an Variable key of ``hello``,
you would want to store your Variable at ``/airflow/variables/hello``.

Optionally you can supply a profile name to reference aws profile, e.g. defined in ``~/.aws/config``.

.. _hashicorp_vault_secrets:

Hashicorp Vault Secrets Backend
Expand Down
31 changes: 31 additions & 0 deletions tests/providers/amazon/aws/secrets/test_systems_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,34 @@ def test_get_conn_uri_non_existent_key(self):

self.assertIsNone(ssm_backend.get_conn_uri(conn_id=conn_id))
self.assertEqual([], ssm_backend.get_connections(conn_id=conn_id))

@mock_ssm
def test_get_variable(self):
param = {
'Name': '/airflow/variables/hello',
'Type': 'String',
'Value': 'world'
}

ssm_backend = SystemsManagerParameterStoreBackend()
ssm_backend.client.put_parameter(**param)

returned_uri = ssm_backend.get_variable('hello')
self.assertEqual('world', returned_uri)

@mock_ssm
def test_get_variable_non_existent_key(self):
"""
Test that if Variable key is not present in SSM,
SystemsManagerParameterStoreBackend.get_variables should return None
"""
param = {
'Name': '/airflow/variables/hello',
'Type': 'String',
'Value': 'world'
}

ssm_backend = SystemsManagerParameterStoreBackend()
ssm_backend.client.put_parameter(**param)

self.assertIsNone(ssm_backend.get_variable("test_mysql"))

0 comments on commit f3738c7

Please sign in to comment.