/
step.go
260 lines (234 loc) · 10.2 KB
/
step.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package synthetic
import (
"fmt"
"math/rand"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
)
func init() {
register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*stepFn)(nil))
register.DoFn4x0[*sdf.LockRTracker, []byte, []byte, func([]byte, []byte)]((*sdfStepFn)(nil))
register.Emitter2[[]byte, []byte]()
}
// Step creates a synthetic step transform that receives KV<[]byte, []byte>
// elements from other synthetic transforms, and outputs KV<[]byte, []byte>
// elements based on its inputs.
//
// This function accepts a StepConfig to configure the behavior of the synthetic
// step, including whether that step is implemented as a splittable or
// non-splittable DoFn.
//
// The recommended way to create StepConfigs is via the StepConfigBuilder.
// Usage example:
//
// cfg := synthetic.DefaultStepConfig().OutputPerInput(10).FilterRatio(0.5).Build()
// step := synthetic.Step(s, cfg, input)
func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
s = s.Scope("synthetic.Step")
if cfg.Splittable {
return beam.ParDo(s, &sdfStepFn{Cfg: cfg}, col)
}
return beam.ParDo(s, &stepFn{Cfg: cfg}, col)
}
// stepFn is a DoFn implementing behavior for synthetic steps. For usage
// information, see synthetic.Step.
//
// The stepFn is expected to be initialized with a cfg and will follow that
// config to determine its behavior when emitting elements.
type stepFn struct {
Cfg StepConfig
rng randWrapper
}
// Setup sets up the random number generator.
func (fn *stepFn) Setup() {
fn.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// ProcessElement takes an input and either filters it or produces a number of
// outputs identical to that input based on the outputs per input configuration
// in StepConfig.
func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio
for i := 0; i < fn.Cfg.OutputPerInput; i++ {
if !filtered {
emit(key, val)
}
}
}
// sdfStepFn is a splittable DoFn implementing behavior for synthetic steps.
// For usage information, see synthetic.Step.
//
// The sdfStepFn is expected to be initialized with a cfg and will follow
// that config to determine its behavior when splitting and emitting elements.
type sdfStepFn struct {
Cfg StepConfig
rng randWrapper
}
// CreateInitialRestriction creates an offset range restriction representing
// the number of elements to emit for this received element, as specified by
// the output per input configuration in StepConfig.
func (fn *sdfStepFn) CreateInitialRestriction(_, _ []byte) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: int64(fn.Cfg.OutputPerInput),
}
}
// SplitRestriction splits restrictions equally according to the number of
// initial splits specified in StepConfig. Each restriction output by this
// method will contain at least one element, so the number of splits will not
// exceed the number of elements.
func (fn *sdfStepFn) SplitRestriction(_, _ []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
return rest.EvenSplits(int64(fn.Cfg.InitialSplits))
}
// RestrictionSize outputs the size of the restriction as the number of elements
// that restriction will output.
func (fn *sdfStepFn) RestrictionSize(_, _ []byte, rest offsetrange.Restriction) float64 {
return rest.Size()
}
// CreateTracker creates an offset range restriction tracker for the
// restriction.
func (fn *sdfStepFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}
// Setup sets up the random number generator.
func (fn *sdfStepFn) Setup() {
fn.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// ProcessElement takes an input and either filters it or produces a number of
// outputs identical to that input based on the restriction size.
func (fn *sdfStepFn) ProcessElement(rt *sdf.LockRTracker, key, val []byte, emit func([]byte, []byte)) {
filtered := fn.Cfg.FilterRatio > 0 && fn.rng.Float64() < fn.Cfg.FilterRatio
for i := rt.GetRestriction().(offsetrange.Restriction).Start; rt.TryClaim(i); i++ {
if !filtered {
emit(key, val)
}
}
}
// StepConfigBuilder is used to initialize StepConfigs. See StepConfigBuilder's
// methods for descriptions of the fields in a StepConfig and how they can be
// set. The intended approach for using this builder is to begin by calling the
// DefaultStepConfig function, followed by calling setters, followed by calling
// Build.
//
// Usage example:
//
// cfg := synthetic.DefaultStepConfig().OutputPerInput(10).FilterRatio(0.5).Build()
type StepConfigBuilder struct {
cfg StepConfig
}
// DefaultStepConfig creates a StepConfig with intended defaults for the
// StepConfig fields. This function is the intended starting point for
// initializing a StepConfig and should always be used to create
// StepConfigBuilders.
//
// To see descriptions of the various StepConfig fields and their defaults, see
// the methods to StepConfigBuilder.
func DefaultStepConfig() *StepConfigBuilder {
return &StepConfigBuilder{
cfg: StepConfig{
OutputPerInput: 1, // Defaults shouldn't drop elements, so at least 1.
FilterRatio: 0.0, // Defaults shouldn't drop elements, so don't filter.
Splittable: false, // Default to non-splittable, SDFs are situational.
InitialSplits: 1, // Defaults to 1, i.e. no initial splitting.
},
}
}
// OutputPerInput is the number of outputs to emit per input received. Each
// output is identical to the original input. A value of 0 drops all inputs and
// produces no output.
//
// Valid values are in the range of [0, ...] and the default value is 1. Values
// below 0 are invalid as they have no logical meaning for this field.
func (b *StepConfigBuilder) OutputPerInput(val int) *StepConfigBuilder {
b.cfg.OutputPerInput = val
return b
}
// FilterRatio indicates the random chance that an input will be filtered
// out, meaning that no outputs will get emitted for it. For example, a
// FilterRatio of 0.25 means that 25% of inputs will be filtered out, a
// FilterRatio of 0 means no elements are filtered, and a FilterRatio of 1.0
// means every element is filtered.
//
// In a non-splittable step, this is performed on each input element, meaning
// all outputs for that element would be filtered. In a splittable step, this is
// performed on each input restriction instead of the entire element, meaning
// that some outputs for an element may be filtered and others kept.
//
// Note that even when elements are filtered out, the work associated with
// processing those elements is still performed, which differs from setting an
// OutputPerInput of 0. Also note that if a
//
// Valid values are in the range if [0.0, 1.0], and the default value is 0. In
// order to avoid precision errors, invalid values do not cause errors. Instead,
// values below 0 are functionally equivalent to 0, and values above 1 are
// functionally equivalent to 1.
func (b *StepConfigBuilder) FilterRatio(val float64) *StepConfigBuilder {
b.cfg.FilterRatio = val
return b
}
// Splittable indicates whether the step should use the splittable DoFn or
// non-splittable DoFn implementation.
//
// Splittable steps will split along restrictions representing the number of
// OutputPerInput for each element, so it is most useful for steps with a high
// OutputPerInput. Conversely, if OutputPerInput is 1, then there is no way to
// split restrictions further, so making the step splittable will do nothing.
func (b *StepConfigBuilder) Splittable(val bool) *StepConfigBuilder {
b.cfg.Splittable = val
return b
}
// InitialSplits is only applicable if Splittable is set to true, and determines
// the number of initial splits to perform in the step's SplitRestriction
// method. Restrictions in synthetic steps represent the number of elements to
// emit for each input element, as defined by the OutputPerInput config field,
// and this split is performed evenly across that number of elements.
//
// Each resulting restriction will have at least 1 element in it, and each
// element being emitted will be contained in exactly one restriction. That
// means that if the desired number of splits is greater than the OutputPerInput
// N, then N initial restrictions will be created, each containing 1 element.
//
// Valid values are in the range of [1, ...] and the default value is 1. Values
// of 0 (and below) are invalid as they would result in dropping elements that
// are expected to be emitted.
func (b *StepConfigBuilder) InitialSplits(val int) *StepConfigBuilder {
b.cfg.InitialSplits = val
return b
}
// Build constructs the StepConfig initialized by this builder. It also performs
// error checking on the fields, and panics if any have been set to invalid
// values.
func (b *StepConfigBuilder) Build() StepConfig {
if b.cfg.InitialSplits <= 0 {
panic(fmt.Sprintf("StepConfig.InitialSplits must be >= 1. Got: %v", b.cfg.InitialSplits))
}
if b.cfg.OutputPerInput < 0 {
panic(fmt.Sprintf("StepConfig.OutputPerInput cannot be negative. Got: %v", b.cfg.OutputPerInput))
}
return b.cfg
}
// StepConfig is a struct containing all the configuration options for a
// synthetic step. It should be created via a StepConfigBuilder, not by directly
// initializing it (the fields are public to allow encoding).
type StepConfig struct {
OutputPerInput int
FilterRatio float64
Splittable bool
InitialSplits int
}