Skip to content

Commit

Permalink
[dagster-aws] Add secretsmanager resources (#6802)
Browse files Browse the repository at this point in the history
* [dagster-aws] Add secretsmanager resources

* rebuild sphinx docs

* Expand test suite, isort

* Regen docs

* Update docs a bit
  • Loading branch information
benpankow committed Mar 1, 2022
1 parent 3408263 commit 0158c07
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
13 changes: 13 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
AWS (dagster-aws)
=================

Utilities for interfacing with AWS with Dagster.

.. currentmodule:: dagster_aws

S3
Expand Down Expand Up @@ -82,3 +84,14 @@ CloudWatch

.. autoconfigurable:: dagster_aws.cloudwatch.cloudwatch_logger
:annotation: LoggerDefinition

SecretsManager
--------------

Resources which surface SecretsManager secrets for use in Dagster resources and jobs.

.. autoconfigurable:: dagster_aws.secretsmanager.secretsmanager_resource
:annotation: ResourceDefinition

.. autoconfigurable:: dagster_aws.secretsmanager.secretsmanager_secrets_resource
:annotation: ResourceDefinition
16 changes: 2 additions & 14 deletions python_modules/libraries/dagster-aws/dagster_aws/s3/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import boto3
from botocore import __version__ as botocore_version
from botocore.config import Config
from botocore.handlers import disable_signing
from packaging import version

from dagster import check

from ..utils import construct_boto_client_retry_config


class S3Callback:
def __init__(self, logger, bucket, key, filename, size):
Expand Down Expand Up @@ -51,14 +50,3 @@ def construct_s3_client(
s3_client.meta.events.register("choose-signer.s3.*", disable_signing)

return s3_client


def construct_boto_client_retry_config(max_attempts):
check.int_param(max_attempts, "max_attempts")

# retry mode option was introduced in botocore 1.15.0
# https://botocore.amazonaws.com/v1/documentation/api/1.15.0/reference/config.html
retry_config = {"max_attempts": max_attempts}
if version.parse(botocore_version) >= version.parse("1.15.0"):
retry_config["mode"] = "standard"
return Config(retries=retry_config)
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .resources import secretsmanager_resource, secretsmanager_secrets_resource
from .secrets import get_secrets_from_arns, get_tagged_secrets
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from contextlib import contextmanager

from dagster import Array, Field, Noneable, check, resource
from dagster.core.test_utils import environ
from dagster.utils.merger import merge_dicts

from .secrets import construct_secretsmanager_client, get_secrets_from_arns, get_tagged_secrets

SECRETSMANAGER_SESSION_CONFIG = {
"region_name": Field(
str,
description="Specifies a custom region for the SecretsManager session",
is_required=False,
),
"max_attempts": Field(
int,
description="This provides Boto3's retry handler with a value of maximum retry attempts, "
"where the initial call counts toward the max_attempts value that you provide",
is_required=False,
default_value=5,
),
"profile_name": Field(
str,
description="Specifies a profile to connect that session",
is_required=False,
),
}


@resource(SECRETSMANAGER_SESSION_CONFIG)
def secretsmanager_resource(context):
"""Resource that gives access to AWS SecretsManager.
The underlying SecretsManager session is created by calling
:py:func:`boto3.session.Session(profile_name) <boto3:boto3.session>`.
The returned resource object is a SecretsManager client, an instance of `botocore.client.SecretsManager`.
Example:
.. code-block:: python
from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import secretsmanager_resource
@op(required_resource_keys={'secretsmanager'})
def example_secretsmanager_op(context):
return context.resources.secretsmanager.get_secret_value(
SecretId='arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf'
)
@job(resource_defs={'secretsmanager': secretsmanager_resource})
def example_job(context):
example_secretsmanager_op()
example_job.execute_in_process(
run_config={
'resources': {
'secretsmanager': {
'config': {
'region_name': 'us-west-1',
}
}
}
}
)
Note that your ops must also declare that they require this resource with
`required_resource_keys`, or it will not be initialized for the execution of their compute
functions.
You may configure this resource as follows:
.. code-block:: YAML
resources:
secretsmanager:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
# through the ordinary boto credential chain.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
# profile as specified in ~/.aws/credentials file
"""
return construct_secretsmanager_client(
max_attempts=context.resource_config["max_attempts"],
region_name=context.resource_config.get("region_name"),
profile_name=context.resource_config.get("profile_name"),
)


@resource(
merge_dicts(
SECRETSMANAGER_SESSION_CONFIG,
{
"secrets": Field(
Array(str),
is_required=False,
default_value=[],
description=("An array of AWS Secrets Manager secrets arns to fetch."),
),
"secrets_tag": Field(
Noneable(str),
is_required=False,
default_value=None,
description=(
"AWS Secrets Manager secrets with this tag will be fetched and made available."
),
),
"add_to_environment": Field(
bool,
is_required=False,
default_value=False,
description=("Whether to mount the secrets as environment variables."),
),
},
)
)
@contextmanager
def secretsmanager_secrets_resource(context):
"""Resource that provides a dict which maps selected SecretsManager secrets to
their string values. Also optionally sets chosen secrets as environment variables.
Example:
.. code-block:: python
import os
from dagster import build_op_context, job, op
from dagster_aws.secretsmanager import secretsmanager_secrets_resource
@op(required_resource_keys={'secrets'})
def example_secretsmanager_secrets_op(context):
return context.resources.secrets.get("my-secret-name")
@op(required_resource_keys={'secrets'})
def example_secretsmanager_secrets_op_2(context):
return os.getenv("my-other-secret-name")
@job(resource_defs={'secrets': secretsmanager_secrets_resource})
def example_job(context):
example_secretsmanager_secrets_op()
example_secretsmanager_secrets_op_2()
example_job.execute_in_process(
run_config={
'resources': {
'secrets': {
'config': {
'region_name': 'us-west-1',
'secrets_tag': 'dagster',
'add_to_environment': True,
}
}
}
}
)
Note that your ops must also declare that they require this resource with
`required_resource_keys`, or it will not be initialized for the execution of their compute
functions.
You may configure this resource as follows:
.. code-block:: YAML
resources:
secretsmanager:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the SecretsManager session. Default is chosen
# through the ordinary boto credential chain.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for SecretsManager session. Default is default
# profile as specified in ~/.aws/credentials file
secrets: ["arn:aws:secretsmanager:region:aws_account_id:secret:appauthexample-AbCdEf"]
# Optional[List[str]]: Specifies a list of secret ARNs to pull from SecretsManager.
secrets_tag: "dagster"
# Optional[str]: Specifies a tag, all secrets which have the tag set will be pulled
# from SecretsManager.
add_to_environment: true
# Optional[bool]: Whether to set the selected secrets as environment variables. Defaults
# to false.
"""
add_to_environment = check.bool_param(
context.resource_config["add_to_environment"], "add_to_environment"
)
secrets_tag = check.opt_str_param(context.resource_config["secrets_tag"], "secrets_tag")
secrets = check.list_param(context.resource_config["secrets"], "secrets", of_type=str)

secrets_manager = construct_secretsmanager_client(
max_attempts=context.resource_config["max_attempts"],
region_name=context.resource_config.get("region_name"),
profile_name=context.resource_config.get("profile_name"),
)

secret_arns = merge_dicts(
(get_tagged_secrets(secrets_manager, secrets_tag) if secrets_tag else {}),
get_secrets_from_arns(secrets_manager, secrets),
)

secrets_map = {
name: secrets_manager.get_secret_value(SecretId=arn).get("SecretString")
for name, arn in secret_arns.items()
}
with environ(secrets_map if add_to_environment else {}):
yield secrets_map
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
from typing import Dict, List
from typing import Dict, List, Optional

import boto3

from dagster import check

from ..utils import construct_boto_client_retry_config


def construct_secretsmanager_client(
max_attempts: int, region_name: Optional[str] = None, profile_name: Optional[str] = None
):
check.int_param(max_attempts, "max_attempts")
check.opt_str_param(region_name, "region_name")
check.opt_str_param(profile_name, "profile_name")

client_session = boto3.session.Session(profile_name=profile_name)
secrets_manager = client_session.client(
"secretsmanager",
region_name=region_name,
config=construct_boto_client_retry_config(max_attempts),
)

return secrets_manager


def get_tagged_secrets(secrets_manager, secrets_tag: str) -> Dict[str, str]:
Expand All @@ -17,7 +40,6 @@ def get_tagged_secrets(secrets_manager, secrets_tag: str) -> Dict[str, str]:
},
],
):

for secret in page["SecretList"]:
secrets[secret["Name"]] = secret["ARN"]

Expand Down
15 changes: 15 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from botocore import __version__ as botocore_version
from botocore.config import Config
from dagster import check
from packaging import version


def construct_boto_client_retry_config(max_attempts):
check.int_param(max_attempts, "max_attempts")

# retry mode option was introduced in botocore 1.15.0
# https://botocore.amazonaws.com/v1/documentation/api/1.15.0/reference/config.html
retry_config = {"max_attempts": max_attempts}
if version.parse(botocore_version) >= version.parse("1.15.0"):
retry_config["mode"] = "standard"
return Config(retries=retry_config)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import boto3
import pytest
from moto import mock_s3
from moto import mock_s3, mock_secretsmanager


# Make sure unit tests never connect to real AWS
Expand All @@ -19,3 +19,9 @@ def mock_s3_resource():
@pytest.fixture
def mock_s3_bucket(mock_s3_resource): # pylint: disable=redefined-outer-name
yield mock_s3_resource.create_bucket(Bucket="test-bucket")


@pytest.fixture
def mock_secretsmanager_resource():
with mock_secretsmanager():
yield boto3.client("secretsmanager")
Empty file.

1 comment on commit 0158c07

@vercel
Copy link

@vercel vercel bot commented on 0158c07 Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.