From 5ca5701263225cae2fccc63bd8236bd92ed62818 Mon Sep 17 00:00:00 2001 From: nrusch Date: Mon, 4 Nov 2019 18:06:46 -0800 Subject: [PATCH] Add environment variable support to Docker environment payload This allows the environment config to be specific as a JSON string, in which case an optional map of environment variables can be included, and will be passed along to the container runtime via `--env=` flags. --- .../src/main/proto/beam_runner_api.proto | 1 + .../core/construction/Environments.java | 45 +++++++++++++++++-- .../core/construction/EnvironmentsTest.java | 15 +++++++ .../environment/DockerEnvironmentFactory.java | 6 +++ .../sdk/options/PortablePipelineOptions.java | 12 +++-- .../apache_beam/options/pipeline_options.py | 8 ++-- .../runners/portability/fn_api_runner.py | 31 +++++++------ .../portability/portable_runner_test.py | 19 +++++++- .../apache_beam/transforms/environments.py | 40 ++++++++++++----- .../transforms/environments_test.py | 1 + 10 files changed, 141 insertions(+), 37 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 90f52fc6c7762..d4e62b4e5690b 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1042,6 +1042,7 @@ message StandardEnvironments { // The payload of a Docker image message DockerPayload { string container_image = 1; // implicitly linux_amd64. + map env = 2; // Environment variables } message ProcessPayload { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java index 79b01116eb2fc..2177de5016796 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java @@ -115,14 +115,36 @@ public static Environment createOrGetDefaultEnvironment(String type, String conf } } - public static Environment createDockerEnvironment(String dockerImageUrl) { - if (Strings.isNullOrEmpty(dockerImageUrl)) { + public static Environment createDockerEnvironment(String config) { + if (Strings.isNullOrEmpty(config)) { return JAVA_SDK_HARNESS_ENVIRONMENT; } + // Support a JSON config, but fall back to interpreting it as a simple URL + // in case of a parse error. + try { + DockerPayloadReferenceJSON payloadReferenceJSON = + MAPPER.readValue(config, DockerPayloadReferenceJSON.class); + return createDockerEnvironment( + payloadReferenceJSON.getDockerImageUrl(), + payloadReferenceJSON.getEnv()); + } catch (IOException e) { + return createDockerEnvironment(config, null); + } + } + + public static Environment createDockerEnvironment( + String dockerImageUrl, Map env) { + if (Strings.isNullOrEmpty(dockerImageUrl)) { + throw new RuntimeException("Empty Docker image URL"); + } + DockerPayload.Builder builder = DockerPayload.newBuilder() + .setContainerImage(dockerImageUrl); + if (env != null) { + builder.putAllEnv(env); + } return Environment.newBuilder() .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER)) - .setPayload( - DockerPayload.newBuilder().setContainerImage(dockerImageUrl).build().toByteString()) + .setPayload(builder.build().toByteString()) .build(); } @@ -248,6 +270,21 @@ private static String windowExtractor(PTransform transform) .getEnvironmentId(); } + private static class DockerPayloadReferenceJSON { + @Nullable private String dockerImageUrl; + @Nullable private Map env; + + @Nullable + public String getDockerImageUrl() { + return dockerImageUrl; + } + + @Nullable + public Map getEnv() { + return env; + } + } + private static class ProcessPayloadReferenceJSON { @Nullable private String os; @Nullable private String arch; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java index 5cbe98a426dbe..94d52b160b259 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java @@ -67,6 +67,21 @@ public void createEnvironments() throws IOException { .setPayload( DockerPayload.newBuilder().setContainerImage("java").build().toByteString()) .build())); + assertThat( + Environments.createOrGetDefaultEnvironment( + Environments.ENVIRONMENT_DOCKER, + "{\"docker_image\": \"java\", \"env\": {\"k1\": \"v1\", \"k2\": \"v2\"} }"), + is( + Environment.newBuilder() + .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER)) + .setPayload( + DockerPayload.newBuilder() + .setContainerImage("java") + .putEnv("k1", "v1") + .putEnv("k2", "v2") + .build() + .toByteString()) + .build())); assertThat( Environments.createOrGetDefaultEnvironment( Environments.ENVIRONMENT_PROCESS, diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java index 154aaecc5bb94..61b5addb3c962 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java @@ -23,6 +23,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -133,6 +134,11 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep // host networking on Mac) .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER")); + Map dockerEnvMap = dockerPayload.getEnvMap(); + if (!dockerEnvMap.isEmpty()) { + dockerEnvMap.forEach((k, v) -> dockerOptsBuilder.add(String.format("--env=%s=%s", k, v))); + } + Boolean retainDockerContainer = pipelineOptions.as(ManualDockerEnvironmentOptions.class).getRetainDockerContainers(); if (!retainDockerContainer) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index 2eb4d56493d47..09607f00e84d2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -56,11 +56,15 @@ public interface PortablePipelineOptions extends PipelineOptions { @Description( "Set environment configuration for running the user code.\n" - + " For DOCKER: Url for the docker image.\n" + + " For DOCKER: Url for the docker image, or json of the form " + + "{\"docker_image\": \"\", " + + "\"env\":{\"\": \"\"} }, " + + " where the image URL is a required field.\n" + " For PROCESS: json of the form " - + "{\"os\": \"\", \"arch\": \"\", \"command\": \"\", " - + "\"env\":{\"\": \"\"} }. " - + "All fields in the json are optional except command.") + + "{\"os\": \"\", \"arch\": \"\", " + + "\"command\": \"\", " + + "\"env\":{\"\": \"\"} }, " + + "where all fields are optional except command.") @Nullable String getDefaultEnvironmentConfig(); diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 5ade96fd677b5..b4a107aeda5c0 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -884,11 +884,13 @@ def _add_argparse_args(cls, parser): parser.add_argument( '--environment_config', default=None, help=('Set environment configuration for running the user code.\n For ' - 'DOCKER: Url for the docker image.\n For PROCESS: json of the ' + 'DOCKER: Url for the docker image, or json of the form ' + '\n {"docker_image": "", "env":{' + '"": ""} }, where the image ' + 'URL is a required field.\n For PROCESS: json of the ' 'form {"os": "", "arch": "", "command": ' '"", "env":{"": ' - '""} }. All fields in the json are optional except ' - 'command.')) + '""} }, where all fields are optional except command.')) parser.add_argument( '--sdk_worker_parallelism', default=1, help=('Sets the number of sdk worker processes that will run on each ' diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 0735f307e1823..5138aff8c5862 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1436,6 +1436,7 @@ def __init__(self, payload, state, provision_info, grpc_server): super(DockerSdkWorkerHandler, self).__init__(state, provision_info, grpc_server) self._container_image = payload.container_image + self._container_env = payload.env self._container_id = None def host_from_worker(self): @@ -1451,19 +1452,23 @@ def start_worker(self): subprocess.check_call(['docker', 'pull', self._container_image]) except Exception: logging.info('Unable to pull image %s' % self._container_image) - self._container_id = subprocess.check_output( - ['docker', - 'run', - '-d', - # TODO: credentials - '--network=host', - self._container_image, - '--id=%s' % self.worker_id, - '--logging_endpoint=%s' % self.logging_api_service_descriptor().url, - '--control_endpoint=%s' % self.control_address, - '--artifact_endpoint=%s' % self.control_address, - '--provision_endpoint=%s' % self.control_address, - ]).strip() + command = [ + 'docker', + 'run', + '-d', + # TODO: credentials + '--network=host', + self._container_image, + '--id=%s' % self.worker_id, + '--logging_endpoint=%s' % self.logging_api_service_descriptor().url, + '--control_endpoint=%s' % self.control_address, + '--artifact_endpoint=%s' % self.control_address, + '--provision_endpoint=%s' % self.control_address, + ] + if self._container_env: + command.extend( + '--env=%s=%s' % item for item in self._container_env.items()) + self._container_id = subprocess.check_output(command).strip() while True: status = subprocess.check_output([ 'docker', diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 24c6b87abf580..821b40f4a20e2 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -18,6 +18,7 @@ from __future__ import print_function import inspect +import json import logging import platform import signal @@ -269,6 +270,22 @@ def test__create_docker_environment(self): 'environment_config': docker_image, })), environments.DockerEnvironment(container_image=docker_image)) + def test__create_docker_environment_json_config(self): + docker_image = 'py-docker' + env_map = {'ENVKEY1': 'ENVVALUE1'} + config_map = {'docker_image': docker_image, 'env': env_map} + self.assertEqual( + PortableRunner._create_environment(PipelineOptions.from_dictionary({ + 'environment_type': 'DOCKER', + 'environment_config': json.dumps(config_map), + })), environments.DockerEnvironment(container_image=docker_image, + env=env_map)) + with self.assertRaises(ValueError): + PortableRunner._create_environment(PipelineOptions.from_dictionary({ + 'environment_type': 'DOCKER', + 'environment_config': json.dumps({'env': env_map}), + })) + def test__create_process_environment(self): self.assertEqual( PortableRunner._create_environment(PipelineOptions.from_dictionary({ @@ -290,7 +307,7 @@ def test__create_external_environment(self): 'environment_type': "EXTERNAL", 'environment_config': 'localhost:50000', })), environments.ExternalEnvironment('localhost:50000')) - raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} ' + raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"} ' for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()): self.assertEqual( PortableRunner._create_environment(PipelineOptions.from_dictionary({ diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 8758ab8ca8b55..c4153255862ab 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -37,6 +37,11 @@ 'SubprocessSDKEnvironment', 'RunnerAPIEnvironmentHolder'] +def _looks_like_json(config_string): + import re + return re.match(r'\s*\{.*\}\s*$', config_string) + + class Environment(object): """Abstract base class for environments. @@ -118,17 +123,19 @@ def from_options(cls, options): beam_runner_api_pb2.DockerPayload) class DockerEnvironment(Environment): - def __init__(self, container_image=None): + def __init__(self, container_image=None, env=None): from apache_beam.runners.portability.portable_runner import PortableRunner if container_image: self.container_image = container_image else: self.container_image = PortableRunner.default_docker_image() + self.env = env or {} def __eq__(self, other): return self.__class__ == other.__class__ \ - and self.container_image == other.container_image + and self.container_image == other.container_image \ + and self.env == other.env def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. @@ -138,20 +145,33 @@ def __hash__(self): return hash((self.__class__, self.container_image)) def __repr__(self): - return 'DockerEnvironment(container_image=%s)' % self.container_image + return 'DockerEnvironment(container_image=%s,env=%s)' % ( + self.container_image, self.env) def to_runner_api_parameter(self, context): return (common_urns.environments.DOCKER.urn, beam_runner_api_pb2.DockerPayload( - container_image=self.container_image)) + container_image=self.container_image, + env=self.env)) @staticmethod def from_runner_api_parameter(payload, context): - return DockerEnvironment(container_image=payload.container_image) + return DockerEnvironment(container_image=payload.container_image, + env=payload.env) @classmethod def from_options(cls, options): - return cls(container_image=options.environment_config) + if _looks_like_json(options.environment_config): + config = json.loads(options.environment_config) + docker_image = config.get('docker_image') + if not docker_image: + raise ValueError('Docker image URL must be set.') + env = config.get('env') + else: + docker_image = options.environment_config + env = None + + return cls(container_image=docker_image, env=env) @Environment.register_urn(common_urns.environments.PROCESS.urn, @@ -203,7 +223,7 @@ def from_runner_api_parameter(payload, context): def from_options(cls, options): config = json.loads(options.environment_config) return cls(config.get('command'), os=config.get('os', ''), - arch=config.get('arch', ''), env=config.get('env', '')) + arch=config.get('arch', ''), env=config.get('env')) @Environment.register_urn(common_urns.environments.EXTERNAL.urn, @@ -245,11 +265,7 @@ def from_runner_api_parameter(payload, context): @classmethod def from_options(cls, options): - def looks_like_json(environment_config): - import re - return re.match(r'\s*\{.*\}\s*$', environment_config) - - if looks_like_json(options.environment_config): + if _looks_like_json(options.environment_config): config = json.loads(options.environment_config) url = config.get('url') if not url: diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index 0fd568c5f5dc7..1464d57c26b80 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -39,6 +39,7 @@ def test_environment_encoding(self): for environment in ( DockerEnvironment(), DockerEnvironment(container_image='img'), + DockerEnvironment(container_image='img', env={'k1': 'v1'}), ProcessEnvironment('run.sh'), ProcessEnvironment('run.sh', os='linux', arch='amd64', env={'k1': 'v1'}),