/
Pipeline.java
444 lines (403 loc) · 15.6 KB
/
Pipeline.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk;
import static com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link Pipeline} manages a directed acyclic graph of {@link PTransform PTransforms}, and the
* {@link PCollection PCollections} that the {@link PTransform}s consume and produce.
*
* <p>A {@link Pipeline} is initialized with a {@link PipelineRunner} that will later
* execute the {@link Pipeline}.
*
* <p>{@link Pipeline Pipelines} are independent, so they can be constructed and executed
* concurrently.
*
* <p>Each {@link Pipeline} is self-contained and isolated from any other
* {@link Pipeline}. The {@link PValue PValues} that are inputs and outputs of each of a
* {@link Pipeline Pipeline's} {@link PTransform PTransforms} are also owned by that
* {@link Pipeline}. A {@link PValue} owned by one {@link Pipeline} can be read only by
* {@link PTransform PTransforms} also owned by that {@link Pipeline}.
*
* <p>Here is a typical example of use:
* <pre> {@code
* // Start by defining the options for the pipeline.
* PipelineOptions options = PipelineOptionsFactory.create();
* // Then create the pipeline. The runner is determined by the options.
* Pipeline p = Pipeline.create(options);
*
* // A root PTransform, like TextIO.Read or Create, gets added
* // to the Pipeline by being applied:
* PCollection<String> lines =
* p.apply(TextIO.Read.from("gs://bucket/dir/file*.txt"));
*
* // A Pipeline can have multiple root transforms:
* PCollection<String> moreLines =
* p.apply(TextIO.Read.from("gs://bucket/other/dir/file*.txt"));
* PCollection<String> yetMoreLines =
* p.apply(Create.of("yet", "more", "lines").withCoder(StringUtf8Coder.of()));
*
* // Further PTransforms can be applied, in an arbitrary (acyclic) graph.
* // Subsequent PTransforms (and intermediate PCollections etc.) are
* // implicitly part of the same Pipeline.
* PCollection<String> allLines =
* PCollectionList.of(lines).and(moreLines).and(yetMoreLines)
* .apply(new Flatten<String>());
* PCollection<KV<String, Integer>> wordCounts =
* allLines
* .apply(ParDo.of(new ExtractWords()))
* .apply(new Count<String>());
* PCollection<String> formattedWordCounts =
* wordCounts.apply(ParDo.of(new FormatCounts()));
* formattedWordCounts.apply(TextIO.Write.to("gs://bucket/dir/counts.txt"));
*
* // PTransforms aren't executed when they're applied, rather they're
* // just added to the Pipeline. Once the whole Pipeline of PTransforms
* // is constructed, the Pipeline's PTransforms can be run using a
* // PipelineRunner. The default PipelineRunner executes the Pipeline
* // directly, sequentially, in this one process, which is useful for
* // unit tests and simple experiments:
* p.run();
*
* } </pre>
*/
public class Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
/**
* Thrown during execution of a {@link Pipeline}, whenever user code within that
* {@link Pipeline} throws an exception.
*
* <p>The original exception thrown by user code may be retrieved via {@link #getCause}.
*/
public static class PipelineExecutionException extends RuntimeException {
/**
* Wraps {@code cause} into a {@link PipelineExecutionException}.
*/
public PipelineExecutionException(Throwable cause) {
super(cause);
}
}
/////////////////////////////////////////////////////////////////////////////
// Public operations.
/**
* Constructs a pipeline from the provided options.
*
* @return The newly created pipeline.
*/
public static Pipeline create(PipelineOptions options) {
Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options);
LOG.debug("Creating {}", pipeline);
return pipeline;
}
/**
* Returns a {@link PBegin} owned by this Pipeline. This is useful
* as the input of a root PTransform such as {@link Read} or
* {@link Create}.
*/
public PBegin begin() {
return PBegin.in(this);
}
/**
* Like {@link #apply(String, PTransform)} but the transform node in the {@link Pipeline}
* graph will be named according to {@link PTransform#getName}.
*
* @see #apply(String, PTransform)
*/
public <OutputT extends POutput> OutputT apply(
PTransform<? super PBegin, OutputT> root) {
return begin().apply(root);
}
/**
* Adds a root {@link PTransform}, such as {@link Read} or {@link Create},
* to this {@link Pipeline}.
*
* <p>The node in the {@link Pipeline} graph will use the provided {@code name}.
* This name is used in various places, including the monitoring UI, logging,
* and to stably identify this node in the {@link Pipeline} graph upon update.
*
* <p>Alias for {@code begin().apply(name, root)}.
*/
public <OutputT extends POutput> OutputT apply(
String name, PTransform<? super PBegin, OutputT> root) {
return begin().apply(name, root);
}
/**
* Runs the {@link Pipeline} using its {@link PipelineRunner}.
*/
public PipelineResult run() {
// Ensure all of the nodes are fully specified before a PipelineRunner gets access to the
// pipeline.
LOG.debug("Running {} via {}", this, runner);
try {
return runner.run(this);
} catch (UserCodeException e) {
// This serves to replace the stack with one that ends here and
// is caused by the caught UserCodeException, thereby splicing
// out all the stack frames in between the PipelineRunner itself
// and where the worker calls into the user's code.
throw new PipelineExecutionException(e.getCause());
}
}
/////////////////////////////////////////////////////////////////////////////
// Below here are operations that aren't normally called by users.
/**
* Returns the {@link CoderRegistry} that this {@link Pipeline} uses.
*/
public CoderRegistry getCoderRegistry() {
if (coderRegistry == null) {
coderRegistry = new CoderRegistry();
coderRegistry.registerStandardCoders();
}
return coderRegistry;
}
/**
* Sets the {@link CoderRegistry} that this {@link Pipeline} uses.
*/
public void setCoderRegistry(CoderRegistry coderRegistry) {
this.coderRegistry = coderRegistry;
}
/**
* A {@link PipelineVisitor} can be passed into
* {@link Pipeline#traverseTopologically} to be called for each of the
* transforms and values in the {@link Pipeline}.
*/
public interface PipelineVisitor {
/**
* Called for each composite transform after all topological predecessors have been visited
* but before any of its component transforms.
*
* <p>The return value controls whether or not child transforms are visited.
*/
CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node);
/**
* Called for each composite transform after all of its component transforms and their outputs
* have been visited.
*/
void leaveCompositeTransform(TransformHierarchy.Node node);
/**
* Called for each primitive transform after all of its topological predecessors
* and inputs have been visited.
*/
void visitPrimitiveTransform(TransformHierarchy.Node node);
/**
* Called for each value after the transform that produced the value has been
* visited.
*/
void visitValue(PValue value, TransformHierarchy.Node producer);
/**
* Control enum for indicating whether or not a traversal should process the contents of
* a composite transform or not.
*/
enum CompositeBehavior {
ENTER_TRANSFORM,
DO_NOT_ENTER_TRANSFORM
}
/**
* Default no-op {@link PipelineVisitor} that enters all composite transforms.
* User implementations can override just those methods they are interested in.
*/
class Defaults implements PipelineVisitor {
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
return CompositeBehavior.ENTER_TRANSFORM;
}
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) { }
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) { }
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) { }
}
}
/**
* Invokes the {@link PipelineVisitor PipelineVisitor's}
* {@link PipelineVisitor#visitPrimitiveTransform} and
* {@link PipelineVisitor#visitValue} operations on each of this
* {@link Pipeline Pipeline's} transform and value nodes, in forward
* topological order.
*
* <p>Traversal of the {@link Pipeline} causes {@link PTransform PTransforms} and
* {@link PValue PValues} owned by the {@link Pipeline} to be marked as finished,
* at which point they may no longer be modified.
*
* <p>Typically invoked by {@link PipelineRunner} subclasses.
*/
public void traverseTopologically(PipelineVisitor visitor) {
// Ensure all nodes are fully specified before visiting the pipeline
Set<PValue> visitedValues =
// Visit all the transforms, which should implicitly visit all the values.
transforms.visit(visitor);
checkState(
visitedValues.containsAll(values),
"internal error: should have visited all the values after visiting all the transforms");
}
/**
* Like {@link #applyTransform(String, PInput, PTransform)} but defaulting to the name
* provided by the {@link PTransform}.
*/
public static <InputT extends PInput, OutputT extends POutput>
OutputT applyTransform(InputT input,
PTransform<? super InputT, OutputT> transform) {
return input.getPipeline().applyInternal(transform.getName(), input, transform);
}
/**
* Applies the given {@code PTransform} to this input {@code InputT} and returns
* its {@code OutputT}. This uses {@code name} to identify this specific application
* of the transform. This name is used in various places, including the monitoring UI,
* logging, and to stably identify this application node in the {@link Pipeline} graph during
* update.
*
* <p>Each {@link PInput} subclass that provides an {@code apply} method should delegate to
* this method to ensure proper registration with the {@link PipelineRunner}.
*/
public static <InputT extends PInput, OutputT extends POutput>
OutputT applyTransform(String name, InputT input,
PTransform<? super InputT, OutputT> transform) {
return input.getPipeline().applyInternal(name, input, transform);
}
/////////////////////////////////////////////////////////////////////////////
// Below here are internal operations, never called by users.
private final PipelineRunner<?> runner;
private final PipelineOptions options;
private final TransformHierarchy transforms = new TransformHierarchy();
private Collection<PValue> values = new ArrayList<>();
private Set<String> usedFullNames = new HashSet<>();
private CoderRegistry coderRegistry;
/**
* @deprecated replaced by {@link #Pipeline(PipelineRunner, PipelineOptions)}
*/
@Deprecated
protected Pipeline(PipelineRunner<?> runner) {
this(runner, PipelineOptionsFactory.create());
}
protected Pipeline(PipelineRunner<?> runner, PipelineOptions options) {
this.runner = runner;
this.options = options;
}
@Override
public String toString() {
return "Pipeline#" + hashCode();
}
/**
* Applies a {@link PTransform} to the given {@link PInput}.
*
* @see Pipeline#apply
*/
private <InputT extends PInput, OutputT extends POutput> OutputT applyInternal(
String name, InputT input, PTransform<? super InputT, OutputT> transform) {
String namePrefix = transforms.getCurrent().getFullName();
String uniqueName = uniquifyInternal(namePrefix, name);
boolean nameIsUnique = uniqueName.equals(buildName(namePrefix, name));
if (!nameIsUnique) {
switch (getOptions().getStableUniqueNames()) {
case OFF:
break;
case WARNING:
LOG.warn(
"Transform {} does not have a stable unique name. "
+ "This will prevent updating of pipelines.",
uniqueName);
break;
case ERROR:
throw new IllegalStateException(
"Transform "
+ uniqueName
+ " does not have a stable unique name. "
+ "This will prevent updating of pipelines.");
default:
throw new IllegalArgumentException(
"Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames());
}
}
LOG.debug("Adding {} to {}", transform, this);
transforms.pushNode(uniqueName, input, transform);
try {
transforms.finishSpecifyingInput();
transform.validate(input);
OutputT output = runner.apply(transform, input);
transforms.setOutput(output);
return output;
} finally {
transforms.popNode();
}
}
/**
* Returns the configured {@link PipelineRunner}.
*/
public PipelineRunner<?> getRunner() {
return runner;
}
/**
* Returns the configured {@link PipelineOptions}.
*/
public PipelineOptions getOptions() {
return options;
}
/**
* Returns a unique name for a transform with the given prefix (from
* enclosing transforms) and initial name.
*
* <p>For internal use only.
*/
private String uniquifyInternal(String namePrefix, String origName) {
String name = origName;
int suffixNum = 2;
while (true) {
String candidate = buildName(namePrefix, name);
if (usedFullNames.add(candidate)) {
return candidate;
}
// A duplicate! Retry.
name = origName + suffixNum++;
}
}
/**
* Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link
* PTransform PTransforms} in which it is used.
*/
public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
return new AggregatorPipelineExtractor(this).getAggregatorSteps();
}
/**
* Builds a name from a "/"-delimited prefix and a name.
*/
private String buildName(String namePrefix, String name) {
return namePrefix.isEmpty() ? name : namePrefix + "/" + name;
}
}