/
handlecombine.go
216 lines (188 loc) · 6.81 KB
/
handlecombine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"fmt"
"reflect"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"google.golang.org/protobuf/proto"
)
// This file retains the logic for the combine handler
// CombineCharacteristic holds the configuration for Combines.
type CombineCharacteristic struct {
EnableLifting bool // Sets whether a combine composite does combiner lifting or not.
}
// TODO figure out the factory we'd like.
func Combine(config any) *combine {
return &combine{config: config.(CombineCharacteristic)}
}
// combine represents an instance of the combine handler.
type combine struct {
config CombineCharacteristic
}
// ConfigURN returns the name for combine in the configuration file.
func (*combine) ConfigURN() string {
return "combine"
}
func (*combine) ConfigCharacteristic() reflect.Type {
return reflect.TypeOf((*CombineCharacteristic)(nil)).Elem()
}
var _ transformPreparer = (*combine)(nil)
func (*combine) PrepareUrns() []string {
return []string{urns.TransformCombinePerKey}
}
// PrepareTransform returns lifted combines and removes the leaves if enabled. Otherwise returns nothing.
func (h *combine) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult {
// If we aren't lifting, the "default impl" for combines should be sufficient.
if !h.config.EnableLifting {
return prepareResult{
SubbedComps: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
tid: t,
},
},
}
}
// To lift a combine, the spec should contain a CombinePayload.
// That contains the actual FunctionSpec for the DoFn, and the
// id for the accumulator coder.
// We can synthetically produce/determine the remaining coders for
// the Input and Output types from the existing PCollections.
//
// This means we also need to synthesize pcollections with the accumulator coder too.
// What we have:
// Input PCol: KV<K, I> -- INPUT
// -> GBK := KV<K, Iter<I>> -- GROUPED_I
// -> Combine := KV<K, O> -- OUTPUT
//
// What we want:
// Input PCol: KV<K, I> -- INPUT
// -> PreCombine := KV<K, A> -- LIFTED
// -> GBK -> KV<K, Iter<A>> -- GROUPED_A
// -> MergeAccumulators := KV<K, A> -- MERGED_A
// -> ExtractOutput -> KV<K, O> -- OUTPUT
//
// First we need to produce new coders for Iter<A>, KV<K, Iter<A>>, and KV<K, A>.
// The A coder ID is in the combine payload.
//
// Then we can produce the PCollections.
// We can reuse the INPUT and OUTPUT PCollections.
// We need LIFTED to have KV<K, A> kv_k_a
// We need GROUPED_A to have KV<K, Iter<A>> kv_k_iter_a
// We need MERGED_A to have KV<K, A> kv_k_a
//
// GROUPED_I ends up unused.
//
// The PCollections inherit the properties of the Input PCollection
// such as Boundedness, and Windowing Strategy.
//
// With these, we can produce the PTransforms with the appropriate URNs for the
// different parts of the composite, and return the new components.
cmbPayload := t.GetSpec().GetPayload()
cmb := &pipepb.CombinePayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(cmbPayload, cmb); err != nil {
panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
}
// First lets get the key coder ID.
var pcolInID string
// There's only one input.
for _, pcol := range t.GetInputs() {
pcolInID = pcol
}
inputPCol := comps.GetPcollections()[pcolInID]
kvkiID := inputPCol.GetCoderId()
kID := comps.GetCoders()[kvkiID].GetComponentCoderIds()[0]
// Now we can start synthesis!
// Coder IDs
aID := cmb.AccumulatorCoderId
ckvprefix := "c" + tid + "_kv_"
iterACID := "c" + tid + "_iter_" + aID
kvkaCID := ckvprefix + kID + "_" + aID
kvkIterACID := ckvprefix + kID + "_iter" + aID
// PCollection IDs
nprefix := "n" + tid + "_"
liftedNID := nprefix + "lifted"
groupedNID := nprefix + "grouped"
mergedNID := nprefix + "merged"
// Now we need the output collection ID
var pcolOutID string
// There's only one output.
for _, pcol := range t.GetOutputs() {
pcolOutID = pcol
}
// Transform IDs
eprefix := "e" + tid + "_"
liftEID := eprefix + "lift"
gbkEID := eprefix + "gbk"
mergeEID := eprefix + "merge"
extractEID := eprefix + "extract"
coder := func(urn string, componentIDs ...string) *pipepb.Coder {
return &pipepb.Coder{
Spec: &pipepb.FunctionSpec{
Urn: urn,
},
ComponentCoderIds: componentIDs,
}
}
pcol := func(name, coderID string) *pipepb.PCollection {
return &pipepb.PCollection{
UniqueName: name,
CoderId: coderID,
IsBounded: inputPCol.GetIsBounded(),
WindowingStrategyId: inputPCol.GetWindowingStrategyId(),
}
}
tform := func(name, urn, in, out, env string) *pipepb.PTransform {
return &pipepb.PTransform{
UniqueName: name,
Spec: &pipepb.FunctionSpec{
Urn: urn,
Payload: cmbPayload,
},
Inputs: map[string]string{
"i0": in,
},
Outputs: map[string]string{
"i0": out,
},
EnvironmentId: env,
}
}
newComps := &pipepb.Components{
Coders: map[string]*pipepb.Coder{
iterACID: coder(urns.CoderIterable, aID),
kvkaCID: coder(urns.CoderKV, kID, aID),
kvkIterACID: coder(urns.CoderKV, kID, iterACID),
},
Pcollections: map[string]*pipepb.PCollection{
liftedNID: pcol(liftedNID, kvkaCID),
groupedNID: pcol(groupedNID, kvkIterACID),
mergedNID: pcol(mergedNID, kvkaCID),
},
Transforms: map[string]*pipepb.PTransform{
liftEID: tform(liftEID, urns.TransformPreCombine, pcolInID, liftedNID, t.GetEnvironmentId()),
gbkEID: tform(gbkEID, urns.TransformGBK, liftedNID, groupedNID, ""),
mergeEID: tform(mergeEID, urns.TransformMerge, groupedNID, mergedNID, t.GetEnvironmentId()),
extractEID: tform(extractEID, urns.TransformExtract, mergedNID, pcolOutID, t.GetEnvironmentId()),
},
}
// We don't need to remove the composite, since we don't add it in
// when we return the new transforms, so it's not in the topology.
return prepareResult{
SubbedComps: newComps,
RemovedLeaves: removeSubTransforms(comps, t.GetSubtransforms()),
}
}