-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
FlinkMultiOutputDoFnFunction.java
126 lines (110 loc) · 4.66 KB
/
FlinkMultiOutputDoFnFunction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
* Encapsulates a {@link OldDoFn} that can emit to multiple
* outputs inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
*
* <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
* and must tag all outputs with the output number. Afterwards a filter will filter out
* those elements that are not to be in a specific output.
*/
public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> {
private final OldDoFn<InputT, OutputT> doFn;
private final SerializedPipelineOptions serializedOptions;
private final Map<TupleTag<?>, Integer> outputMap;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
private final boolean requiresWindowAccess;
private final boolean hasSideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
public FlinkMultiOutputDoFnFunction(
DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions options,
Map<TupleTag<?>, Integer> outputMap) {
this.doFn = DoFnAdapters.toOldDoFn(doFn);
this.serializedOptions = new SerializedPipelineOptions(options);
this.outputMap = outputMap;
this.requiresWindowAccess =
DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
this.hasSideInputs = !sideInputs.isEmpty();
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
}
@Override
public void mapPartition(
Iterable<WindowedValue<InputT>> values,
Collector<WindowedValue<RawUnionValue>> out) throws Exception {
FlinkMultiOutputProcessContext<InputT, OutputT> context =
new FlinkMultiOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
sideInputs, out,
outputMap
);
this.doFn.startBundle(context);
if (!requiresWindowAccess || hasSideInputs) {
// we don't need to explode the windows
for (WindowedValue<InputT> value : values) {
context.setWindowedValue(value);
doFn.processElement(context);
}
} else {
// we need to explode the windows because we have per-window
// side inputs and window access also only works if an element
// is in only one window
for (WindowedValue<InputT> value : values) {
for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
context.setWindowedValue(value);
doFn.processElement(context);
}
}
}
// set the windowed value to null so that the special logic for outputting
// in startBundle/finishBundle kicks in
context.setWindowedValue(null);
this.doFn.finishBundle(context);
}
@Override
public void open(Configuration parameters) throws Exception {
doFn.setup();
}
@Override
public void close() throws Exception {
doFn.teardown();
}
}