diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 61439301b3e46..d83c92366fffc 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1085,6 +1085,7 @@ message StandardEnvironments { // The payload of a Docker image message DockerPayload { string container_image = 1; // implicitly linux_amd64. + map env = 2; // Environment variables } message ProcessPayload { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java index 79b01116eb2fc..1db77db76c612 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.construction; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; @@ -115,14 +116,34 @@ public static Environment createOrGetDefaultEnvironment(String type, String conf } } - public static Environment createDockerEnvironment(String dockerImageUrl) { - if (Strings.isNullOrEmpty(dockerImageUrl)) { + public static Environment createDockerEnvironment(String config) { + if (Strings.isNullOrEmpty(config)) { return JAVA_SDK_HARNESS_ENVIRONMENT; } + // Support a JSON config, but fall back to interpreting it as a simple URL + // in case of a parse error. + try { + DockerPayloadReferenceJSON payloadReferenceJSON = + MAPPER.readValue(config, DockerPayloadReferenceJSON.class); + return createDockerEnvironment( + payloadReferenceJSON.getDockerImageUrl(), payloadReferenceJSON.getEnv()); + } catch (IOException e) { + return createDockerEnvironment(config, null); + } + } + + public static Environment createDockerEnvironment( + String dockerImageUrl, Map env) { + if (Strings.isNullOrEmpty(dockerImageUrl)) { + throw new RuntimeException("Empty Docker image URL"); + } + DockerPayload.Builder builder = DockerPayload.newBuilder().setContainerImage(dockerImageUrl); + if (env != null) { + builder.putAllEnv(env); + } return Environment.newBuilder() .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER)) - .setPayload( - DockerPayload.newBuilder().setContainerImage(dockerImageUrl).build().toByteString()) + .setPayload(builder.build().toByteString()) .build(); } @@ -248,6 +269,24 @@ private static String windowExtractor(PTransform transform) .getEnvironmentId(); } + private static class DockerPayloadReferenceJSON { + @JsonProperty("docker_image") + @Nullable + private String dockerImageUrl; + + @Nullable private Map env; + + @Nullable + public String getDockerImageUrl() { + return dockerImageUrl; + } + + @Nullable + public Map getEnv() { + return env; + } + } + private static class ProcessPayloadReferenceJSON { @Nullable private String os; @Nullable private String arch; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java index 5cbe98a426dbe..94d52b160b259 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java @@ -67,6 +67,21 @@ public void createEnvironments() throws IOException { .setPayload( DockerPayload.newBuilder().setContainerImage("java").build().toByteString()) .build())); + assertThat( + Environments.createOrGetDefaultEnvironment( + Environments.ENVIRONMENT_DOCKER, + "{\"docker_image\": \"java\", \"env\": {\"k1\": \"v1\", \"k2\": \"v2\"} }"), + is( + Environment.newBuilder() + .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER)) + .setPayload( + DockerPayload.newBuilder() + .setContainerImage("java") + .putEnv("k1", "v1") + .putEnv("k2", "v2") + .build() + .toByteString()) + .build())); assertThat( Environments.createOrGetDefaultEnvironment( Environments.ENVIRONMENT_PROCESS, diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java index 154aaecc5bb94..61b5addb3c962 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java @@ -23,6 +23,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -133,6 +134,11 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep // host networking on Mac) .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER")); + Map dockerEnvMap = dockerPayload.getEnvMap(); + if (!dockerEnvMap.isEmpty()) { + dockerEnvMap.forEach((k, v) -> dockerOptsBuilder.add(String.format("--env=%s=%s", k, v))); + } + Boolean retainDockerContainer = pipelineOptions.as(ManualDockerEnvironmentOptions.class).getRetainDockerContainers(); if (!retainDockerContainer) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index a531e8605ba50..35bb31b3e41fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -57,11 +57,15 @@ public interface PortablePipelineOptions extends PipelineOptions { @Description( "Set environment configuration for running the user code.\n" - + " For DOCKER: Url for the docker image.\n" + + " For DOCKER: Url for the docker image, or json of the form " + + "{\"docker_image\": \"\", " + + "\"env\":{\"\": \"\"} }, " + + " where the image URL is a required field.\n" + " For PROCESS: json of the form " - + "{\"os\": \"\", \"arch\": \"\", \"command\": \"\", " - + "\"env\":{\"\": \"\"} }. " - + "All fields in the json are optional except command.") + + "{\"os\": \"\", \"arch\": \"\", " + + "\"command\": \"\", " + + "\"env\":{\"\": \"\"} }, " + + "where all fields are optional except command.") @Nullable String getDefaultEnvironmentConfig(); diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 4de4b519adfa4..6b1372d42797a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -899,11 +899,13 @@ def _add_argparse_args(cls, parser): parser.add_argument( '--environment_config', default=None, help=('Set environment configuration for running the user code.\n For ' - 'DOCKER: Url for the docker image.\n For PROCESS: json of the ' + 'DOCKER: Url for the docker image, or json of the form ' + '\n {"docker_image": "", "env":{' + '"": ""} }, where the image ' + 'URL is a required field.\n For PROCESS: json of the ' 'form {"os": "", "arch": "", "command": ' '"", "env":{"": ' - '""} }. All fields in the json are optional except ' - 'command.')) + '""} }, where all fields are optional except command.')) parser.add_argument( '--sdk_worker_parallelism', default=1, help=('Sets the number of sdk worker processes that will run on each ' diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 134976efd083a..a10dd0025b1f3 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1434,6 +1434,7 @@ def __init__(self, payload, state, provision_info, grpc_server): super(DockerSdkWorkerHandler, self).__init__(state, provision_info, grpc_server) self._container_image = payload.container_image + self._container_env = payload.env self._container_id = None def host_from_worker(self): @@ -1449,19 +1450,23 @@ def start_worker(self): subprocess.check_call(['docker', 'pull', self._container_image]) except Exception: _LOGGER.info('Unable to pull image %s' % self._container_image) - self._container_id = subprocess.check_output( - ['docker', - 'run', - '-d', - # TODO: credentials - '--network=host', - self._container_image, - '--id=%s' % self.worker_id, - '--logging_endpoint=%s' % self.logging_api_service_descriptor().url, - '--control_endpoint=%s' % self.control_address, - '--artifact_endpoint=%s' % self.control_address, - '--provision_endpoint=%s' % self.control_address, - ]).strip() + command = [ + 'docker', + 'run', + '-d', + # TODO: credentials + '--network=host', + self._container_image, + '--id=%s' % self.worker_id, + '--logging_endpoint=%s' % self.logging_api_service_descriptor().url, + '--control_endpoint=%s' % self.control_address, + '--artifact_endpoint=%s' % self.control_address, + '--provision_endpoint=%s' % self.control_address, + ] + if self._container_env: + command.extend( + '--env=%s=%s' % item for item in self._container_env.items()) + self._container_id = subprocess.check_output(command).strip() while True: status = subprocess.check_output([ 'docker', diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index f8b6cf892f95f..83d1e3503c068 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -18,6 +18,7 @@ from __future__ import print_function import inspect +import json import logging import platform import signal @@ -311,6 +312,22 @@ def test__create_docker_environment(self): 'environment_config': docker_image, })), environments.DockerEnvironment(container_image=docker_image)) + def test__create_docker_environment_json_config(self): + docker_image = 'py-docker' + env_map = {'ENVKEY1': 'ENVVALUE1'} + config_map = {'docker_image': docker_image, 'env': env_map} + self.assertEqual( + PortableRunner._create_environment(PipelineOptions.from_dictionary({ + 'environment_type': 'DOCKER', + 'environment_config': json.dumps(config_map), + })), environments.DockerEnvironment(container_image=docker_image, + env=env_map)) + with self.assertRaises(ValueError): + PortableRunner._create_environment(PipelineOptions.from_dictionary({ + 'environment_type': 'DOCKER', + 'environment_config': json.dumps({'env': env_map}), + })) + def test__create_process_environment(self): self.assertEqual( PortableRunner._create_environment(PipelineOptions.from_dictionary({ diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 6f67266fddbf3..6341b3f64f394 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -39,6 +39,12 @@ 'SubprocessSDKEnvironment', 'RunnerAPIEnvironmentHolder'] +def _looks_like_json(config): + import re + return isinstance(config, (str, bytes)) \ + and re.match(r'\s*\{.*\}\s*$', config) is not None + + class Environment(object): """Abstract base class for environments. @@ -120,15 +126,17 @@ def from_options(cls, options): beam_runner_api_pb2.DockerPayload) class DockerEnvironment(Environment): - def __init__(self, container_image=None): + def __init__(self, container_image=None, env=None): if container_image: self.container_image = container_image else: self.container_image = self.default_docker_image() + self.env = env or {} def __eq__(self, other): return self.__class__ == other.__class__ \ - and self.container_image == other.container_image + and self.container_image == other.container_image \ + and self.env == other.env def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. @@ -138,20 +146,33 @@ def __hash__(self): return hash((self.__class__, self.container_image)) def __repr__(self): - return 'DockerEnvironment(container_image=%s)' % self.container_image + return 'DockerEnvironment(container_image=%s,env=%s)' % ( + self.container_image, self.env) def to_runner_api_parameter(self, context): return (common_urns.environments.DOCKER.urn, beam_runner_api_pb2.DockerPayload( - container_image=self.container_image)) + container_image=self.container_image, + env=self.env)) @staticmethod def from_runner_api_parameter(payload, context): - return DockerEnvironment(container_image=payload.container_image) + return DockerEnvironment(container_image=payload.container_image, + env=payload.env) @classmethod def from_options(cls, options): - return cls(container_image=options.environment_config) + if _looks_like_json(options.environment_config): + config = json.loads(options.environment_config) + docker_image = config.get('docker_image') + if not docker_image: + raise ValueError('Docker image URL must be set.') + env = config.get('env') + else: + docker_image = options.environment_config + env = None + + return cls(container_image=docker_image, env=env) @staticmethod def default_docker_image(): @@ -221,7 +242,7 @@ def from_runner_api_parameter(payload, context): def from_options(cls, options): config = json.loads(options.environment_config) return cls(config.get('command'), os=config.get('os', ''), - arch=config.get('arch', ''), env=config.get('env', '')) + arch=config.get('arch', ''), env=config.get('env')) @Environment.register_urn(common_urns.environments.EXTERNAL.urn, @@ -263,11 +284,7 @@ def from_runner_api_parameter(payload, context): @classmethod def from_options(cls, options): - def looks_like_json(environment_config): - import re - return re.match(r'\s*\{.*\}\s*$', environment_config) - - if looks_like_json(options.environment_config): + if _looks_like_json(options.environment_config): config = json.loads(options.environment_config) url = config.get('url') if not url: diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index 46868e8f1d3bb..c589918fa7298 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -39,6 +39,7 @@ def test_environment_encoding(self): for environment in ( DockerEnvironment(), DockerEnvironment(container_image='img'), + DockerEnvironment(container_image='img', env={'k1': 'v1'}), ProcessEnvironment('run.sh'), ProcessEnvironment('run.sh', os='linux', arch='amd64', env={'k1': 'v1'}),