/
options.go
171 lines (146 loc) · 6.58 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package jobopts contains shared options for job submission. These options
// are exposed to allow user code to inspect and modify them.
package jobopts
import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"
"sync/atomic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)
func init() {
flag.Var(&SdkHarnessContainerImageOverrides,
"sdk_harness_container_image_override",
"Overrides for SDK harness container images. Could be for the "+
"local SDK or for a remote SDK that pipeline has to support due "+
"to a cross-language transform. Each entry consists of two values "+
"separated by a comma where first value gives a regex to "+
"identify the container image to override and the second value "+
"gives the replacement container image. Multiple entries can be "+
"specified by using this flag multiple times. A container will "+
"have no more than 1 override applied to it. If multiple "+
"overrides match a container image it is arbitrary which "+
"will be applied.")
}
var (
// Endpoint is the job service endpoint.
Endpoint = flag.String("endpoint", "", "Job service endpoint (required).")
// JobName is the name of the job.
JobName = flag.String("job_name", "", "Job name (optional).")
// EnvironmentType is the environment type to run the user code.
EnvironmentType = flag.String("environment_type", "DOCKER",
"Environment Type. Possible options are DOCKER, and LOOPBACK.")
// EnvironmentConfig is the environment configuration for running the user code.
EnvironmentConfig = flag.String("environment_config",
"",
"Set environment configuration for running the user code.\n"+
"For DOCKER: Url for the docker image.\n"+
"For PROCESS: json of the form {\"os\": \"<OS>\", "+
"\"arch\": \"<ARCHITECTURE>\", \"command\": \"<process to execute>\", "+
"\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }. "+
"All fields in the json are optional except command.")
// SdkHarnessContainerImageOverrides contains patterns for overriding
// container image names in a pipeline.
SdkHarnessContainerImageOverrides stringSlice
// WorkerBinary is the location of the compiled worker binary. If not
// specified, the binary is produced via go build.
WorkerBinary = flag.String("worker_binary", "", "Worker binary (optional)")
// Experiments toggle experimental features in the runner.
Experiments = flag.String("experiments", "", "Comma-separated list of experiments (optional).")
// Async determines whether to wait for job completion.
Async = flag.Bool("async", false, "Do not wait for job completion.")
// Strict mode applies additional validation to user pipelines before
// executing them and fails early if the pipelines don't pass.
Strict = flag.Bool("beam_strict", false, "Apply additional validation to pipelines.")
// Flag to retain docker containers created by the runner. If false, then
// containers are deleted once the job ends, even if it failed.
RetainDockerContainers = flag.Bool("retain_docker_containers", false, "Retain Docker containers created by the runner.")
// Flag to set the degree of parallelism. If not set, the configured Flink default is used, or 1 if none can be found.
Parallelism = flag.Int("parallelism", -1, "The degree of parallelism to be used when distributing operations onto Flink workers.")
)
type missingFlagError error
// GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
// such as Dataflow set a reasonable default. Convenience function.
func GetEndpoint() (string, error) {
if *Endpoint == "" {
return "", missingFlagError(errors.New("no job service endpoint specified. Use --endpoint=<endpoint>"))
}
return *Endpoint, nil
}
var unique int32
// GetJobName returns the specified job name or, if not present, a fresh
// autogenerated name. Convenience function.
func GetJobName() string {
if *JobName == "" {
id := atomic.AddInt32(&unique, 1)
return fmt.Sprintf("go-job-%v-%v", id, time.Now().UnixNano())
}
return *JobName
}
// GetEnvironmentUrn returns the specified EnvironmentUrn used to run the SDK Harness,
// if not present, returns the docker environment urn "beam:env:docker:v1".
// Convenience function.
func GetEnvironmentUrn(ctx context.Context) string {
switch env := strings.ToLower(*EnvironmentType); env {
case "process":
return "beam:env:process:v1"
case "loopback", "external":
return "beam:env:external:v1"
case "docker":
return "beam:env:docker:v1"
default:
log.Infof(ctx, "No environment type specified. Using default environment: '%v'", *EnvironmentType)
return "beam:env:docker:v1"
}
}
// IsLoopback returns whether the EnvironmentType is loopback.
func IsLoopback() bool {
return strings.ToLower(*EnvironmentType) == "loopback"
}
// GetEnvironmentConfig returns the specified configuration for specified SDK Harness,
// if not present, the default development container for the current user.
// Convenience function.
func GetEnvironmentConfig(ctx context.Context) string {
if *EnvironmentConfig == "" {
*EnvironmentConfig = os.ExpandEnv("apache/beam_go_sdk:" + core.SdkVersion)
log.Infof(ctx, "No environment config specified. Using default config: '%v'", *EnvironmentConfig)
}
return *EnvironmentConfig
}
// GetSdkImageOverrides gets the specified overrides as a map where each key is
// a regular expression pattern to match, and each value is the string to
// replace matching containers with.
func GetSdkImageOverrides() map[string]string {
ret := make(map[string]string)
for _, pattern := range SdkHarnessContainerImageOverrides {
splits := strings.SplitN(pattern, ",", 2)
ret[splits[0]] = splits[1]
}
return ret
}
// GetExperiments returns the experiments.
func GetExperiments() []string {
if *Experiments == "" {
return nil
}
return strings.Split(*Experiments, ",")
}