/
sdf.go
131 lines (120 loc) · 6.71 KB
/
sdf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package contains interfaces used specifically for splittable DoFns.
//
// Warning: Splittable DoFns are still experimental, largely untested, and
// likely to have bugs.
package sdf
import (
"time"
)
// RTracker is an interface used to interact with restrictions while processing elements in
// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
// of a single restriction.
//
// All RTracker methods should be thread-safe for dynamic splits to function correctly.
type RTracker interface {
// TryClaim attempts to claim the block of work located in the given position of the
// restriction. This method must be called in ProcessElement to claim work before it can be
// processed. Processing work without claiming it first can lead to incorrect output.
//
// The position type is up to individual implementations, and will usually be related to the
// kind of restriction used. For example, a simple restriction representing a numeric range
// might use an int64. A more complex restriction, such as one representing a multidimensional
// space, might use a more complex type.
//
// If the claim is successful, the DoFn must process the entire block. If the claim is
// unsuccessful ProcessElement method of the DoFn must return without performing
// any additional work or emitting any outputs.
//
// If the claim fails due to an error, that error is stored and will be automatically emitted
// when the RTracker is validated, or can be manually retrieved with GetError.
//
// This pseudocode example illustrates the typical usage of TryClaim:
//
// pos = position of first block within the restriction
// for TryClaim(pos) == true {
// // Do all work in the claimed block and emit outputs.
// pos = position of next block within the restriction
// }
// return
TryClaim(pos interface{}) (ok bool)
// GetError returns the error that made this RTracker stop executing, and returns nil if no
// error occurred. This is the error that is emitted if automated validation fails.
GetError() error
// TrySplit splits the current restriction into a primary (currently executing work) and
// residual (work to be split off) based on a fraction of work remaining. The split is performed
// at the first valid split point located after the given fraction of remaining work.
//
// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.
//
// This method modifies the underlying restriction in the RTracker to reflect the primary. It
// then returns a copy of the newly modified restriction as a primary, and returns a new
// restriction for the residual. If the split would produce an empty residual (either because
// the only split point is the end of the restriction, or the split failed for some recoverable
// reason), then this function returns nil as the residual.
//
// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return
// a primary restriction that represents no remaining work, and the residual should
// contain all remaining work. The RTracker should be marked as done
// (and return true when IsDone() is called) after that split.
// This will ensure that there is no data loss, which would result in
// the pipeline failing during the checkpoint.
//
// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
TrySplit(fraction float64) (primary, residual interface{}, err error)
// GetProgress returns two abstract scalars representing the amount of done and remaining work.
// These values have no specific units, but are used to estimate work in relation to each other
// and should be self-consistent.
GetProgress() (done float64, remaining float64)
// IsDone returns a boolean indicating whether all blocks inside the restriction have been
// claimed. This method is called by the SDK Harness to validate that a splittable DoFn has
// correctly processed all work in a restriction before finishing. If this method still returns
// false after processing, then GetError is expected to return a non-nil error.
//
// When called immediately following a checkpointing TrySplit() call (with value 0.0), this
// should return true.
IsDone() bool
// GetRestriction returns the restriction this tracker is tracking, or nil if the restriction
// is unavailable for some reason.
GetRestriction() interface{}
}
// BoundableRTracker is an interface used to interact with restrictions that may be bounded or unbounded
// while processing elements in splittable DoFns (specifically, in the ProcessElement method and TruncateRestriction method).
// Each BoundableRTracker tracks the progress of a single restriction.
//
// All BoundableRTracker methods should be thread-safe for dynamic splits to function correctly.
type BoundableRTracker interface {
RTracker
// IsBounded returns the boundedness of the current restriction. If the current restriction represents a
// finite amount of work, it should return true. Otherwise, it should return false.
IsBounded() bool
}
// WatermarkEstimator is an interface used to represent a user defined watermark estimator.
// Watermark estimators allow users to advance the output watermark of the current sdf.
type WatermarkEstimator interface {
// CurrentWatermark returns the estimator's current watermark. It is called any time a DoFn
// splits or checkpoints to advance the output watermark of the restriction's stage.
CurrentWatermark() time.Time
}
// TimestampObservingEstimator is an interface used to represent a user defined watermark estimator that
// has the ability to observe timestamps of elements outputted from a ParDo's emit function.
type TimestampObservingEstimator interface {
WatermarkEstimator
// ObserveTimestamp is called any time a DoFn emits an element and can use that element's
// event time to modify the state of the estimator.
ObserveTimestamp(ts time.Time)
}