/
handlepardo.go
257 lines (224 loc) · 8.68 KB
/
handlepardo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"fmt"
"reflect"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
)
// This file retains the logic for the pardo handler
// ParDoCharacteristic holds the configuration for ParDos.
type ParDoCharacteristic struct {
DisableSDF bool // Sets whether a pardo supports SDFs or not.
}
func ParDo(config any) *pardo {
return &pardo{config: config.(ParDoCharacteristic)}
}
// pardo represents an instance of the pardo handler.
type pardo struct {
config ParDoCharacteristic
}
// ConfigURN returns the name for combine in the configuration file.
func (*pardo) ConfigURN() string {
return "pardo"
}
func (*pardo) ConfigCharacteristic() reflect.Type {
return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem()
}
var _ transformPreparer = (*pardo)(nil)
func (*pardo) PrepareUrns() []string {
return []string{urns.TransformParDo}
}
// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features
// and requirements.
func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult {
// ParDos are a pain in the butt.
// Combines, by comparison, are dramatically simpler.
// This is because for ParDos, how they are handled, and what kinds of transforms are in
// and around the ParDo, the actual shape of the graph will change.
// At their simplest, it's something a DoFn will handle on their own.
// At their most complex, they require intimate interaction with the subgraph
// bundling process, the data layer, state layers, and control layers.
// But unlike combines, which have a clear urn for composite + special payload,
// ParDos have the standard URN for composites with the standard payload.
// So always, we need to first unmarshal the payload.
pardoPayload := t.GetSpec().GetPayload()
pdo := &pipepb.ParDoPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil {
panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
}
// Lets check for and remove anything that makes things less simple.
if pdo.OnWindowExpirationTimerFamilySpec == "" &&
!pdo.RequestsFinalization &&
!pdo.RequiresStableInput &&
!pdo.RequiresTimeSortedInput &&
pdo.RestrictionCoderId == "" {
// Which inputs are Side inputs don't change the graph further,
// so they're not included here. Any nearly any ParDo can have them.
// At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.
// StatefulDoFns need to be marked as being roots.
var forcedRoots []string
if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 {
forcedRoots = append(forcedRoots, tid)
}
return prepareResult{
SubbedComps: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
tid: t,
},
},
ForcedRoots: forcedRoots,
}
}
// Side inputs add to topology and make fusion harder to deal with
// (side input producers can't be in the same stage as their consumers)
// State, Timers, Stable Input, Time Sorted Input, and some parts of SDF
// Are easier to deal with by including a fusion break. But we can do that with a
// runner specific transform for stable input, and another for time sorted input.
// TODO add
// SplittableDoFns have 3 required phases and a 4th optional phase.
//
// PAIR_WITH_RESTRICTION which pairs elements with their restrictions
// Input: element; := INPUT
// Output: KV(element, restriction) := PWR
//
// SPLIT_AND_SIZE_RESTRICTIONS splits the pairs into sub element ranges
// and a relative size for each, in a float64 format.
// Input: KV(element, restriction) := PWR
// Output: KV(KV(element, restriction), float64) := SPLITnSIZED
//
// PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS actually processes the
// elements. This is also where splits need to be handled.
// In particular, primary and residual splits have the same format as the input.
// Input: KV(KV(element, restriction), size) := SPLITnSIZED
// Output: DoFn's output. := OUTPUT
//
// TRUNCATE_SIZED_RESTRICTION is how the runner has an SDK turn an
// unbounded transform into a bound one. Not needed until the pipeline
// is told to drain.
// Input: KV(KV(element, restriction), float64) := synthetic split results from above
// Output: KV(KV(element, restriction), float64). := synthetic, truncated results sent as Split n Sized
//
// So with that, we can figure out the coders we need.
//
// cE - Element Coder (same as input coder)
// cR - Restriction Coder
// cS - Size Coder (float64)
// ckvER - KV<Element, Restriction>
// ckvERS - KV<KV<Element, Restriction>, Size>
//
// There could be a few output coders, but the outputs can be copied from
// the original transform directly.
// First lets get the parallel input coder ID.
var pcolInID, inputLocalID string
for localID, globalID := range t.GetInputs() {
// The parallel input is the one that isn't a side input.
if _, ok := pdo.SideInputs[localID]; !ok {
inputLocalID = localID
pcolInID = globalID
break
}
}
inputPCol := comps.GetPcollections()[pcolInID]
cEID := inputPCol.GetCoderId()
cRID := pdo.RestrictionCoderId
cSID := "c" + tid + "size"
ckvERID := "c" + tid + "kv_ele_rest"
ckvERSID := ckvERID + "_size"
coder := func(urn string, componentIDs ...string) *pipepb.Coder {
return &pipepb.Coder{
Spec: &pipepb.FunctionSpec{
Urn: urn,
},
ComponentCoderIds: componentIDs,
}
}
coders := map[string]*pipepb.Coder{
ckvERID: coder(urns.CoderKV, cEID, cRID),
cSID: coder(urns.CoderDouble),
ckvERSID: coder(urns.CoderKV, ckvERID, cSID),
}
// PCollections only have two new ones.
// INPUT -> same as ordinary DoFn
// PWR, uses ckvER
// SPLITnSIZED, uses ckvERS
// OUTPUT -> same as ordinary outputs
nPWRID := "n" + tid + "_pwr"
nSPLITnSIZEDID := "n" + tid + "_splitnsized"
pcol := func(name, coderID string) *pipepb.PCollection {
return &pipepb.PCollection{
UniqueName: name,
CoderId: coderID,
IsBounded: inputPCol.GetIsBounded(),
WindowingStrategyId: inputPCol.GetWindowingStrategyId(),
}
}
pcols := map[string]*pipepb.PCollection{
nPWRID: pcol(nPWRID, ckvERID),
nSPLITnSIZEDID: pcol(nSPLITnSIZEDID, ckvERSID),
}
// PTransforms have 3 new ones, with process sized elements and restrictions
// taking the brunt of the complexity, consuming the inputs
ePWRID := "e" + tid + "_pwr"
eSPLITnSIZEDID := "e" + tid + "_splitnsize"
eProcessID := "e" + tid + "_processandsplit"
tform := func(name, urn, in, out string) *pipepb.PTransform {
return &pipepb.PTransform{
UniqueName: name,
Spec: &pipepb.FunctionSpec{
Urn: urn,
Payload: pardoPayload,
},
Inputs: map[string]string{
inputLocalID: in,
},
Outputs: map[string]string{
"i0": out,
},
EnvironmentId: t.GetEnvironmentId(),
}
}
newInputs := maps.Clone(t.GetInputs())
newInputs[inputLocalID] = nSPLITnSIZEDID
tforms := map[string]*pipepb.PTransform{
ePWRID: tform(ePWRID, urns.TransformPairWithRestriction, pcolInID, nPWRID),
eSPLITnSIZEDID: tform(eSPLITnSIZEDID, urns.TransformSplitAndSize, nPWRID, nSPLITnSIZEDID),
eProcessID: {
UniqueName: eProcessID,
Spec: &pipepb.FunctionSpec{
Urn: urns.TransformProcessSizedElements,
Payload: pardoPayload,
},
Inputs: newInputs,
Outputs: t.GetOutputs(),
EnvironmentId: t.GetEnvironmentId(),
},
}
return prepareResult{
SubbedComps: &pipepb.Components{
Coders: coders,
Pcollections: pcols,
Transforms: tforms,
},
RemovedLeaves: removeSubTransforms(comps, t.GetSubtransforms()),
// Force ProcessSized to be a root to ensure SDFs are able to split
// between elements or within elements.
// Also this is where a transform would be stateful anyway.
ForcedRoots: []string{eProcessID},
}
}