Skip to content

Commit

Permalink
Handle stateful windows correctly + integration test (#22918)
Browse files Browse the repository at this point in the history
* Add a windowed value state integration test

* Window into global window before passert

* Fix issues with test and windowing
  • Loading branch information
damccorm committed Aug 29, 2022
1 parent 4ae54b2 commit 7cc48e9
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
11 changes: 6 additions & 5 deletions sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func (n *ParDo) ProcessElement(_ context.Context, elm *FullValue, values ...ReSt
func (n *ParDo) processMainInput(mainIn *MainInput) error {
elm := &mainIn.Key

// If the function observes windows, we must invoke it for each window. The expected fast path
// is that either there is a single window or the function doesn't observe windows, so we can
// optimize it by treating all windows as a single one.
// If the function observes windows or uses per window state, we must invoke it for each window.
// The expected fast path is that either there is a single window or the function doesn't observe
// windows, so we can optimize it by treating all windows as a single one.
if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) {
// The ProcessContinuation return value is ignored because only SDFs can return ProcessContinuations.
_, processResult := n.processSingleWindow(mainIn)
Expand Down Expand Up @@ -209,13 +209,14 @@ func rtErrHelper(err error) error {

// mustExplodeWindows returns true iif we need to call the function
// for each window. It is needed if the function either observes the
// window, either directly or indirectly via (windowed) side inputs.
// window, either directly or indirectly via (windowed) side inputs or state.
func mustExplodeWindows(fn *funcx.Fn, elm *FullValue, usesSideInput bool) bool {
if len(elm.Windows) < 2 {
return false
}
_, explode := fn.Window()
return explode || usesSideInput
_, observesState := fn.StateProvider()
return explode || usesSideInput || observesState
}

// FinishBundle does post-bundle processing operations for the DoFn.
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var directFilters = []string{
"TestOomParDo",
// The direct runner does not support user state.
"TestValueState",
"TestValueState_Windowed",
"TestValueState_Clear",
"TestBagState",
"TestBagState_Clear",
Expand Down Expand Up @@ -118,6 +119,7 @@ var portableFilters = []string{
"TestOomParDo",
// The portable runner does not support user state.
"TestValueState",
"TestValueState_Windowed",
"TestValueState_Clear",
"TestBagState",
"TestBagState_Clear",
Expand Down Expand Up @@ -166,6 +168,7 @@ var samzaFilters = []string{
"TestOomParDo",
// The samza runner does not support user state.
"TestValueState",
"TestValueState_Windowed",
"TestValueState_Clear",
"TestBagState",
"TestBagState_Clear",
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/test/integration/primitives/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
Expand Down Expand Up @@ -87,6 +89,19 @@ func ValueStateParDo() *beam.Pipeline {
return p
}

// ValueStateParDo tests a DoFn that uses windowed value state.
func ValueStateParDo_Windowed() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()

timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 1, 1, 2, 2, 3, 4, 4, 4, 4}}, beam.Impulse(s))
wData := beam.WindowInto(s, window.NewFixedWindows(3*time.Second), timestampedData)
counts := beam.ParDo(s, &valueStateFn{State1: state.MakeValueState[int]("key1"), State2: state.MakeValueState[string]("key2")}, wData)
globalCounts := beam.WindowInto(s, window.NewGlobalWindows(), counts)
passert.Equals(s, globalCounts, "magic: 1, I", "magic: 2, II", "magic: 3, III", "magic: 1, I", "magic: 2, II", "magic: 3, III", "magic: 1, I", "magic: 2, II", "magic: 3, III", "magic: 1, I")

return p
}

type valueStateClearFn struct {
State1 state.Value[int]
}
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/test/integration/primitives/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func TestValueState(t *testing.T) {
ptest.RunAndValidate(t, ValueStateParDo())
}

func TestValueState_Windowed(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, ValueStateParDo_Windowed())
}

func TestValueState_Clear(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, ValueStateParDo_Clear())
Expand Down

0 comments on commit 7cc48e9

Please sign in to comment.