Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14511] Growable Tracker for Go SDK #17754

Merged
merged 13 commits into from
Jun 2, 2022
123 changes: 113 additions & 10 deletions sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"reflect"

Expand All @@ -33,6 +34,7 @@ import (
func init() {
runtime.RegisterType(reflect.TypeOf((*Tracker)(nil)))
runtime.RegisterType(reflect.TypeOf((*Restriction)(nil)).Elem())
runtime.RegisterType(reflect.TypeOf((*GrowableTracker)(nil)))
runtime.RegisterFunction(restEnc)
runtime.RegisterFunction(restDec)
coder.RegisterCoder(reflect.TypeOf((*Restriction)(nil)).Elem(), restEnc, restDec)
Expand Down Expand Up @@ -122,19 +124,21 @@ func (r Restriction) Size() float64 {
// no assumptions about the positions of blocks within the range, so users must handle validation
// of block positions if needed.
type Tracker struct {
rest Restriction
claimed int64 // Tracks the last claimed position.
stopped bool // Tracks whether TryClaim has indicated to stop processing elements.
err error
rest Restriction
claimed int64 // Tracks the last claimed position.
stopped bool // Tracks whether TryClaim has indicated to stop processing elements.
attempted int64 // Tracks the last attempted position to claim.
err error
}

// NewTracker is a constructor for an Tracker given a start and end range.
func NewTracker(rest Restriction) *Tracker {
return &Tracker{
rest: rest,
claimed: rest.Start - 1,
stopped: false,
err: nil,
rest: rest,
claimed: rest.Start - 1,
attempted: -1,
stopped: false,
err: nil,
}
}

Expand All @@ -154,7 +158,7 @@ func (tracker *Tracker) TryClaim(rawPos interface{}) bool {
}

pos := rawPos.(int64)

tracker.attempted = pos
if pos < tracker.rest.Start {
tracker.stopped = true
tracker.err = errors.New("position claimed is out of bounds of the restriction")
Expand Down Expand Up @@ -210,7 +214,7 @@ func (tracker *Tracker) GetProgress() (done, remaining float64) {

// IsDone returns true if the most recent claimed element is past the end of the restriction.
func (tracker *Tracker) IsDone() bool {
return tracker.err == nil && tracker.claimed >= tracker.rest.End
return tracker.err == nil && (tracker.claimed+1 >= tracker.rest.End || tracker.rest.Start >= tracker.rest.End)
}

// GetRestriction returns a copy of the tracker's underlying offsetrange.Restriction.
Expand All @@ -223,3 +227,102 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}

// RangeEndEstimator provides the estimated end offset of the range. Users must implement this interface to
// use the offsetrange.GrowableTracker.
type RangeEndEstimator interface {
// Estimate is called to get the end offset in TrySplit() or GetProgress() functions.
//
// The end offset is exclusive for the range. The estimated end is not required to
// monotonically increase as it will only be taken into consideration when the
// estimated end offset is larger than the current position.
// Returning math.MaxInt64 as the estimate implies the largest possible position for the range
// is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be provided.
//
// Providing a good estimate is important for an accurate progress signal and will impact
// splitting decisions by the runner.
Estimate() int64
}

// GrowableTracker tracks a growable offset range restriction that can be represented as a range of integer values,
// for example for byte offsets in a file, or indices in an array. Note that this tracker makes
// no assumptions about the positions of blocks within the range, so users must handle validation
// of block positions if needed.
type GrowableTracker struct {
Tracker
rangeEndEstimator RangeEndEstimator
}

// NewGrowableTracker is a constructor for an GrowableTracker given a offsetrange.Restriction and RangeEndEstimator.
// This tracker should be used when dealing with streaming use cases where the end of the restriction is
// undefined (math.MaxInt64) in this case. Otherwise, this tracker works the same as offsetrange.Tracker, so it is
// recommended to use that directly for bounded restrictions.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) (*GrowableTracker, error) {
if rangeEndEstimator == nil {
return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. Implementing offsetrange.RangeEndEstimator may be required")
}
return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: rest.End}), rangeEndEstimator}, nil
}

// Start returns the starting range of the restriction tracked by a tracker.
func (tracker *GrowableTracker) Start() int64 {
return tracker.GetRestriction().(Restriction).Start
}

// End returns the end range of the restriction tracked by a tracker.
func (tracker *GrowableTracker) End() int64 {
return tracker.GetRestriction().(Restriction).End
}

func max(x, y int64) int64 {
if x > y {
return x
}
return y
}

// TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the
// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual interface{}, err error) {
if tracker.stopped || tracker.IsDone() {
return tracker.rest, nil, nil
}

// If current tracking range is no longer growable, split it as a normal range.
if tracker.End() != math.MaxInt64 {
return tracker.Tracker.TrySplit(fraction)
}

// If current range has been done, there is no more space to split.
if tracker.attempted == math.MaxInt64 {
return nil, nil, nil
}

cur := max(tracker.attempted, tracker.Start()-1)
estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)

splitPt := cur + int64(math.Ceil(math.Max(1, float64(estimatedEnd-cur)*(fraction))))
if splitPt > estimatedEnd {
return tracker.rest, nil, nil
}
residual = Restriction{Start: splitPt, End: tracker.End()}
tracker.rest.End = splitPt
return tracker.rest, residual, nil
}

// GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.
func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
// If current tracking range is no longer growable, get its progress as a normal range.
if tracker.End() != math.MaxInt64 {
return tracker.Tracker.GetProgress()
}

done = float64((tracker.claimed + 1) - tracker.Start())
remaining = float64(tracker.End() - (tracker.claimed + 1))
return done, remaining
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}

// IsBounded checks if the current restriction is bounded or not.
func (tracker *GrowableTracker) IsBounded() bool {
return tracker.End() != math.MaxInt64
}