Skip to content

Commit

Permalink
support container kwargs for celery-docker (#7335)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Apr 8, 2022
1 parent d344eed commit ca8a616
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ def aws_configmap(namespace, should_cleanup):
)
aws_data["AWS_ACCESS_KEY_ID"] = creds["aws_access_key_id"]
aws_data["AWS_SECRET_ACCESS_KEY"] = creds["aws_secret_access_key"]
except:
except Exception as e:
raise Exception(
"Must have AWS credentials set in AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY "
"to be able to run Helm tests locally"
"Must have AWS credentials set to be able to run Helm tests locally. Run "
f"'aws sso login' to authenticate. Original error: {e}"
)

print("Creating ConfigMap %s with AWS credentials" % (TEST_AWS_CONFIGMAP_NAME))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
solids:
get_environment_solid:
config:
looking_for: FIND_ME
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ def hanging_solid(_):
time.sleep(0.1)


@solid(config_schema={"looking_for": str})
def get_environment_solid(context):
return os.environ.get(context.solid_config["looking_for"])


@pipeline(
mode_defs=[
ModeDefinition(
Expand Down Expand Up @@ -249,6 +254,7 @@ def define_docker_celery_pipeline():
)
def docker_celery_pipeline():
count_letters(multiply_the_word())
get_environment_solid()

return docker_celery_pipeline

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Executor,
Field,
MetadataEntry,
Permissive,
StringSource,
check,
executor,
Expand Down Expand Up @@ -55,6 +56,11 @@ def celery_docker_config():
is_required=False,
description="Name of the network this container will be connected to at creation time",
),
"container_kwargs": Field(
Permissive(),
is_required=False,
description="Additional keyword args for the docker container",
),
},
is_required=True,
description="The configuration for interacting with docker in the celery worker.",
Expand Down Expand Up @@ -93,7 +99,7 @@ def celery_docker_executor(init_context):
.. code-block:: python
from dagster import job
from dagster_celery_docker.executor import celery_executor
from dagster_celery_docker.executor import celery_docker_executor
@job(executor_def=celery_docker_executor)
def celery_enabled_job():
Expand All @@ -112,6 +118,9 @@ def celery_enabled_job():
username: 'my_user'
password: {env: 'DOCKER_PASSWORD'}
env_vars: ["DAGSTER_HOME"] # environment vars to pass from celery worker to docker
container_kwargs: # keyword args to be passed to the container. example:
volumes: ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']
broker: 'pyamqp://guest@localhost//' # Optional[str]: The URL of the Celery broker
backend: 'rpc://' # Optional[str]: The URL of the Celery results backend
include: ['my_module'] # Optional[List[str]]: Modules every worker should import
Expand All @@ -124,7 +133,7 @@ def celery_enabled_job():
different broker than the one your workers are listening to, the workers will never be able to
pick up tasks for execution.
In deployments where the celery_k8s_job_executor is used all appropriate celery and dagster_celery
In deployments where the celery_docker_job_executor is used all appropriate celery and dagster_celery
commands must be invoked with the `-A dagster_celery_docker.app` argument.
"""

Expand Down Expand Up @@ -272,15 +281,33 @@ def _execute_step_docker(
if docker_config.get("env_vars"):
docker_env = {env_name: os.getenv(env_name) for env_name in docker_config["env_vars"]}

container_kwargs = check.opt_dict_param(
docker_config.get("container_kwargs"), "container_kwargs", key_type=str
)

# set defaults for detach and auto_remove
container_kwargs["detach"] = container_kwargs.get("detach", False)
container_kwargs["auto_remove"] = container_kwargs.get("auto_remove", True)

# if environment variables are provided via container_kwargs, merge with env_vars
if container_kwargs.get("environment") is not None:
e_vars = container_kwargs.get("environment")
if isinstance(e_vars, dict):
docker_env.update(e_vars)
else:
for v in e_vars:
key, val = v.split("=")
docker_env[key] = val
del container_kwargs["environment"]

try:
docker_response = client.containers.run(
docker_image,
command=command,
detach=False,
auto_remove=True,
# pass through this worker's environment for things like AWS creds etc.
environment=docker_env,
network=docker_config.get("network", None),
**container_kwargs,
)

res = docker_response.decode("utf-8")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json
import os

import boto3
import pytest


@pytest.fixture
def aws_creds():
sm_client = boto3.client("secretsmanager", region_name="us-west-1")

if os.environ.get("AWS_ACCESS_KEY_ID") and os.environ.get("AWS_SECRET_ACCESS_KEY"):
return {
"aws_access_key_id": os.environ.get("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.environ.get("AWS_SECRET_ACCESS_KEY"),
}

try:
creds = json.loads(
sm_client.get_secret_value(
SecretId=os.getenv("AWS_SSM_REFERENCE", "development/DOCKER_AWS_CREDENTIAL")
).get("SecretString")
)
return creds
except Exception as e:
raise Exception(
"Must have AWS credentials set to be able to run celery docker tests locally. Run "
f"'aws sso login' to authenticate. Original error: {e}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ def celery_docker_postgres_instance(overrides=None):
yield instance


def test_execute_celery_docker_image_on_executor_config():
def test_execute_celery_docker_image_on_executor_config(aws_creds):
docker_image = get_test_project_docker_image()
docker_config = {
"image": docker_image,
"env_vars": [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
],
"network": "container:test-postgres-db-celery-docker",
"container_kwargs": {
"environment": {
"FIND_ME": "here!",
"AWS_ACCESS_KEY_ID": aws_creds["aws_access_key_id"],
"AWS_SECRET_ACCESS_KEY": aws_creds["aws_secret_access_key"],
},
# "auto_remove": False # uncomment when debugging to view container logs after execution
},
}

if IS_BUILDKITE:
Expand All @@ -49,6 +53,7 @@ def test_execute_celery_docker_image_on_executor_config():
[
os.path.join(get_test_project_environments_path(), "env.yaml"),
os.path.join(get_test_project_environments_path(), "env_s3.yaml"),
os.path.join(get_test_project_environments_path(), "env_environment_vars.yaml"),
]
),
{
Expand All @@ -71,16 +76,21 @@ def test_execute_celery_docker_image_on_executor_config():
instance=instance,
)
assert result.success
assert result.result_for_solid("get_environment_solid").output_value("result") == "here!"


def test_execute_celery_docker_image_on_pipeline_config():
def test_execute_celery_docker_image_on_pipeline_config(aws_creds):
docker_image = get_test_project_docker_image()
docker_config = {
"env_vars": [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
],
"network": "container:test-postgres-db-celery-docker",
"container_kwargs": {
"environment": [
"FIND_ME=here!",
f"AWS_ACCESS_KEY_ID={aws_creds['aws_access_key_id']}",
f"AWS_SECRET_ACCESS_KEY={aws_creds['aws_secret_access_key']}",
],
# "auto_remove": False # uncomment when debugging to view container logs after execution
},
}

if IS_BUILDKITE:
Expand All @@ -94,6 +104,7 @@ def test_execute_celery_docker_image_on_pipeline_config():
[
os.path.join(get_test_project_environments_path(), "env.yaml"),
os.path.join(get_test_project_environments_path(), "env_s3.yaml"),
os.path.join(get_test_project_environments_path(), "env_environment_vars.yaml"),
]
),
{
Expand All @@ -115,3 +126,4 @@ def test_execute_celery_docker_image_on_pipeline_config():
instance=instance,
)
assert result.success
assert result.result_for_solid("get_environment_solid").output_value("result") == "here!"

0 comments on commit ca8a616

Please sign in to comment.