Skip to content

Commit

Permalink
Add environment variable support to Docker environment payload
Browse files Browse the repository at this point in the history
This allows the environment config to be specific as a JSON string,
in which case an optional map of environment variables can be included,
and will be passed along to the container runtime via `--env=` flags.
  • Loading branch information
nrusch committed Nov 11, 2019
1 parent 7124651 commit 5ca5701
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 37 deletions.
1 change: 1 addition & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Expand Up @@ -1042,6 +1042,7 @@ message StandardEnvironments {
// The payload of a Docker image
message DockerPayload {
string container_image = 1; // implicitly linux_amd64.
map<string, string> env = 2; // Environment variables
}

message ProcessPayload {
Expand Down
Expand Up @@ -115,14 +115,36 @@ public static Environment createOrGetDefaultEnvironment(String type, String conf
}
}

public static Environment createDockerEnvironment(String dockerImageUrl) {
if (Strings.isNullOrEmpty(dockerImageUrl)) {
public static Environment createDockerEnvironment(String config) {
if (Strings.isNullOrEmpty(config)) {
return JAVA_SDK_HARNESS_ENVIRONMENT;
}
// Support a JSON config, but fall back to interpreting it as a simple URL
// in case of a parse error.
try {
DockerPayloadReferenceJSON payloadReferenceJSON =
MAPPER.readValue(config, DockerPayloadReferenceJSON.class);
return createDockerEnvironment(
payloadReferenceJSON.getDockerImageUrl(),
payloadReferenceJSON.getEnv());
} catch (IOException e) {
return createDockerEnvironment(config, null);
}
}

public static Environment createDockerEnvironment(
String dockerImageUrl, Map<String, String> env) {
if (Strings.isNullOrEmpty(dockerImageUrl)) {
throw new RuntimeException("Empty Docker image URL");
}
DockerPayload.Builder builder = DockerPayload.newBuilder()
.setContainerImage(dockerImageUrl);
if (env != null) {
builder.putAllEnv(env);
}
return Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
DockerPayload.newBuilder().setContainerImage(dockerImageUrl).build().toByteString())
.setPayload(builder.build().toByteString())
.build();
}

Expand Down Expand Up @@ -248,6 +270,21 @@ private static String windowExtractor(PTransform transform)
.getEnvironmentId();
}

private static class DockerPayloadReferenceJSON {
@Nullable private String dockerImageUrl;
@Nullable private Map<String, String> env;

@Nullable
public String getDockerImageUrl() {
return dockerImageUrl;
}

@Nullable
public Map<String, String> getEnv() {
return env;
}
}

private static class ProcessPayloadReferenceJSON {
@Nullable private String os;
@Nullable private String arch;
Expand Down
Expand Up @@ -67,6 +67,21 @@ public void createEnvironments() throws IOException {
.setPayload(
DockerPayload.newBuilder().setContainerImage("java").build().toByteString())
.build()));
assertThat(
Environments.createOrGetDefaultEnvironment(
Environments.ENVIRONMENT_DOCKER,
"{\"docker_image\": \"java\", \"env\": {\"k1\": \"v1\", \"k2\": \"v2\"} }"),
is(
Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
DockerPayload.newBuilder()
.setContainerImage("java")
.putEnv("k1", "v1")
.putEnv("k2", "v2")
.build()
.toByteString())
.build()));
assertThat(
Environments.createOrGetDefaultEnvironment(
Environments.ENVIRONMENT_PROCESS,
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.model.pipeline.v1.RunnerApi;
Expand Down Expand Up @@ -133,6 +134,11 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));

Map<String, String> dockerEnvMap = dockerPayload.getEnvMap();
if (!dockerEnvMap.isEmpty()) {
dockerEnvMap.forEach((k, v) -> dockerOptsBuilder.add(String.format("--env=%s=%s", k, v)));
}

Boolean retainDockerContainer =
pipelineOptions.as(ManualDockerEnvironmentOptions.class).getRetainDockerContainers();
if (!retainDockerContainer) {
Expand Down
Expand Up @@ -56,11 +56,15 @@ public interface PortablePipelineOptions extends PipelineOptions {

@Description(
"Set environment configuration for running the user code.\n"
+ " For DOCKER: Url for the docker image.\n"
+ " For DOCKER: Url for the docker image, or json of the form "
+ "{\"docker_image\": \"<URL>\", "
+ "\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }, "
+ " where the image URL is a required field.\n"
+ " For PROCESS: json of the form "
+ "{\"os\": \"<OS>\", \"arch\": \"<ARCHITECTURE>\", \"command\": \"<process to execute>\", "
+ "\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }. "
+ "All fields in the json are optional except command.")
+ "{\"os\": \"<OS>\", \"arch\": \"<ARCHITECTURE>\", "
+ "\"command\": \"<process to execute>\", "
+ "\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }, "
+ "where all fields are optional except command.")
@Nullable
String getDefaultEnvironmentConfig();

Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/options/pipeline_options.py
Expand Up @@ -884,11 +884,13 @@ def _add_argparse_args(cls, parser):
parser.add_argument(
'--environment_config', default=None,
help=('Set environment configuration for running the user code.\n For '
'DOCKER: Url for the docker image.\n For PROCESS: json of the '
'DOCKER: Url for the docker image, or json of the form '
'\n {"docker_image": "<URL>", "env":{'
'"<Environment variables 1>": "<ENV_VAL>"} }, where the image '
'URL is a required field.\n For PROCESS: json of the '
'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '
'"<process to execute>", "env":{"<Environment variables 1>": '
'"<ENV_VAL>"} }. All fields in the json are optional except '
'command.'))
'"<ENV_VAL>"} }, where all fields are optional except command.'))
parser.add_argument(
'--sdk_worker_parallelism', default=1,
help=('Sets the number of sdk worker processes that will run on each '
Expand Down
31 changes: 18 additions & 13 deletions sdks/python/apache_beam/runners/portability/fn_api_runner.py
Expand Up @@ -1436,6 +1436,7 @@ def __init__(self, payload, state, provision_info, grpc_server):
super(DockerSdkWorkerHandler, self).__init__(state, provision_info,
grpc_server)
self._container_image = payload.container_image
self._container_env = payload.env
self._container_id = None

def host_from_worker(self):
Expand All @@ -1451,19 +1452,23 @@ def start_worker(self):
subprocess.check_call(['docker', 'pull', self._container_image])
except Exception:
logging.info('Unable to pull image %s' % self._container_image)
self._container_id = subprocess.check_output(
['docker',
'run',
'-d',
# TODO: credentials
'--network=host',
self._container_image,
'--id=%s' % self.worker_id,
'--logging_endpoint=%s' % self.logging_api_service_descriptor().url,
'--control_endpoint=%s' % self.control_address,
'--artifact_endpoint=%s' % self.control_address,
'--provision_endpoint=%s' % self.control_address,
]).strip()
command = [
'docker',
'run',
'-d',
# TODO: credentials
'--network=host',
self._container_image,
'--id=%s' % self.worker_id,
'--logging_endpoint=%s' % self.logging_api_service_descriptor().url,
'--control_endpoint=%s' % self.control_address,
'--artifact_endpoint=%s' % self.control_address,
'--provision_endpoint=%s' % self.control_address,
]
if self._container_env:
command.extend(
'--env=%s=%s' % item for item in self._container_env.items())
self._container_id = subprocess.check_output(command).strip()
while True:
status = subprocess.check_output([
'docker',
Expand Down
Expand Up @@ -18,6 +18,7 @@
from __future__ import print_function

import inspect
import json
import logging
import platform
import signal
Expand Down Expand Up @@ -269,6 +270,22 @@ def test__create_docker_environment(self):
'environment_config': docker_image,
})), environments.DockerEnvironment(container_image=docker_image))

def test__create_docker_environment_json_config(self):
docker_image = 'py-docker'
env_map = {'ENVKEY1': 'ENVVALUE1'}
config_map = {'docker_image': docker_image, 'env': env_map}
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': 'DOCKER',
'environment_config': json.dumps(config_map),
})), environments.DockerEnvironment(container_image=docker_image,
env=env_map))
with self.assertRaises(ValueError):
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': 'DOCKER',
'environment_config': json.dumps({'env': env_map}),
}))

def test__create_process_environment(self):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
Expand All @@ -290,7 +307,7 @@ def test__create_external_environment(self):
'environment_type': "EXTERNAL",
'environment_config': 'localhost:50000',
})), environments.ExternalEnvironment('localhost:50000'))
raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"} '
for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
Expand Down
40 changes: 28 additions & 12 deletions sdks/python/apache_beam/transforms/environments.py
Expand Up @@ -37,6 +37,11 @@
'SubprocessSDKEnvironment', 'RunnerAPIEnvironmentHolder']


def _looks_like_json(config_string):
import re
return re.match(r'\s*\{.*\}\s*$', config_string)


class Environment(object):
"""Abstract base class for environments.
Expand Down Expand Up @@ -118,17 +123,19 @@ def from_options(cls, options):
beam_runner_api_pb2.DockerPayload)
class DockerEnvironment(Environment):

def __init__(self, container_image=None):
def __init__(self, container_image=None, env=None):
from apache_beam.runners.portability.portable_runner import PortableRunner

if container_image:
self.container_image = container_image
else:
self.container_image = PortableRunner.default_docker_image()
self.env = env or {}

def __eq__(self, other):
return self.__class__ == other.__class__ \
and self.container_image == other.container_image
and self.container_image == other.container_image \
and self.env == other.env

def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
Expand All @@ -138,20 +145,33 @@ def __hash__(self):
return hash((self.__class__, self.container_image))

def __repr__(self):
return 'DockerEnvironment(container_image=%s)' % self.container_image
return 'DockerEnvironment(container_image=%s,env=%s)' % (
self.container_image, self.env)

def to_runner_api_parameter(self, context):
return (common_urns.environments.DOCKER.urn,
beam_runner_api_pb2.DockerPayload(
container_image=self.container_image))
container_image=self.container_image,
env=self.env))

@staticmethod
def from_runner_api_parameter(payload, context):
return DockerEnvironment(container_image=payload.container_image)
return DockerEnvironment(container_image=payload.container_image,
env=payload.env)

@classmethod
def from_options(cls, options):
return cls(container_image=options.environment_config)
if _looks_like_json(options.environment_config):
config = json.loads(options.environment_config)
docker_image = config.get('docker_image')
if not docker_image:
raise ValueError('Docker image URL must be set.')
env = config.get('env')
else:
docker_image = options.environment_config
env = None

return cls(container_image=docker_image, env=env)


@Environment.register_urn(common_urns.environments.PROCESS.urn,
Expand Down Expand Up @@ -203,7 +223,7 @@ def from_runner_api_parameter(payload, context):
def from_options(cls, options):
config = json.loads(options.environment_config)
return cls(config.get('command'), os=config.get('os', ''),
arch=config.get('arch', ''), env=config.get('env', ''))
arch=config.get('arch', ''), env=config.get('env'))


@Environment.register_urn(common_urns.environments.EXTERNAL.urn,
Expand Down Expand Up @@ -245,11 +265,7 @@ def from_runner_api_parameter(payload, context):

@classmethod
def from_options(cls, options):
def looks_like_json(environment_config):
import re
return re.match(r'\s*\{.*\}\s*$', environment_config)

if looks_like_json(options.environment_config):
if _looks_like_json(options.environment_config):
config = json.loads(options.environment_config)
url = config.get('url')
if not url:
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/environments_test.py
Expand Up @@ -39,6 +39,7 @@ def test_environment_encoding(self):
for environment in (
DockerEnvironment(),
DockerEnvironment(container_image='img'),
DockerEnvironment(container_image='img', env={'k1': 'v1'}),
ProcessEnvironment('run.sh'),
ProcessEnvironment('run.sh', os='linux', arch='amd64',
env={'k1': 'v1'}),
Expand Down

0 comments on commit 5ca5701

Please sign in to comment.