From 647fc6a2f151e9945f814036d80b4da2a7f51709 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 23 May 2022 11:46:16 -0400 Subject: [PATCH 1/3] [BEAM-14499] Fix TestStream integration tests --- sdks/go/pkg/beam/testing/passert/equals.go | 25 ++++++++++ sdks/go/pkg/beam/testing/passert/passert.go | 16 ++++++ .../test/integration/primitives/teststream.go | 50 +++++++++++++++---- 3 files changed, 80 insertions(+), 11 deletions(-) diff --git a/sdks/go/pkg/beam/testing/passert/equals.go b/sdks/go/pkg/beam/testing/passert/equals.go index 2311e961e4326..5c71a353239fd 100644 --- a/sdks/go/pkg/beam/testing/passert/equals.go +++ b/sdks/go/pkg/beam/testing/passert/equals.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" ) // Equals verifies the given collection has the same values as the given @@ -53,6 +54,22 @@ func EqualsList(s beam.Scope, col beam.PCollection, list interface{}) beam.PColl return equals(subScope, col, listCollection) } +// WindowedEqualsList verifies that the given collection has the same values as a +// given list, under coder equality. The values must be provided as an array or a slice. +// This function also takes a window function to window the list into for cases where +// the PCollections cannot be globally windowed (e.g. tests in unbounded pipelines.) +// This windowing function is applied to both the PCollection created from the list +// and the impulse used to trigger the Diff function. +func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, list interface{}) beam.PCollection { + subScope := s.Scope("passert.WindowedEqualsList") + if list == nil { + return Empty(subScope, col) + } + inter := beam.CreateList(subScope, list) + winList := beam.WindowInto(s, wfn, inter) + return windowedEquals(subScope, wfn, col, winList) +} + // equals verifies that the actual values match the expected ones. func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection { unexpected, correct, missing := Diff(s, actual, expected) @@ -60,6 +77,14 @@ func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection { return actual } +// windowedEquals verifies that the actual values match the expected ones in cases where the PCollections +// cannot be globally windowed. +func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected beam.PCollection) beam.PCollection { + unexpected, correct, missing := WindowedDiff(s, wfn, actual, expected) + beam.ParDo0(s, failIfBadEntries, beam.WindowInto(s, wfn, beam.Impulse(s)), beam.SideInput{Input: unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing}) + return actual +} + const ( partSeparator = "=========" ) diff --git a/sdks/go/pkg/beam/testing/passert/passert.go b/sdks/go/pkg/beam/testing/passert/passert.go index 6530c67e2d6ab..1a4d013cd09f8 100644 --- a/sdks/go/pkg/beam/testing/passert/passert.go +++ b/sdks/go/pkg/beam/testing/passert/passert.go @@ -23,6 +23,7 @@ import ( "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" @@ -44,6 +45,21 @@ func Diff(s beam.Scope, a, b beam.PCollection) (left, both, right beam.PCollecti return beam.ParDo3(s, &diffFn{Type: beam.EncodedType{T: t.Type()}}, imp, beam.SideInput{Input: a}, beam.SideInput{Input: b}) } +// WindowedDiff splits 2 incoming PCollections into 3: left only, both, right only. Duplicates are +// preserved, so a value may appear multiple times and in multiple collections. Coder +// equality is used to determine equality. Should only be used for small collections, +// because all values are held in memory at the same time. WindowedDiff accepts a window function +// to apply to the Impulse so it can operate on PCollections that are not globally windowed but are windowed using +// the same window function. +func WindowedDiff(s beam.Scope, wfn *window.Fn, a, b beam.PCollection) (left, both, right beam.PCollection) { + imp := beam.Impulse(s) + wImp := beam.WindowInto(s, wfn, imp) + + t := beam.ValidateNonCompositeType(a) + beam.ValidateNonCompositeType(b) + return beam.ParDo3(s, &diffFn{Type: beam.EncodedType{T: t.Type()}}, wImp, beam.SideInput{Input: a}, beam.SideInput{Input: b}) +} + // diffFn computes the symmetrical multi-set difference of 2 collections, under // coder equality. The Go values returned may be any of the coder-equal ones. type diffFn struct { diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index 7df6a485804f5..feafc547cf9da 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -16,11 +16,16 @@ package primitives import ( + "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" ) +var bigWinFn *window.Fn = window.NewFixedWindows(10000 * time.Hour) + // TestStreamSequence tests the TestStream primitive by inserting string elements // then advancing the watermark past the point where they were inserted. func TestStreamStrings() *beam.Pipeline { @@ -37,16 +42,21 @@ func TestStreamStrings() *beam.Pipeline { } // TestStreamByteSliceSequence tests the TestStream primitive by inserting byte slice elements -// then advancing the watermark to infinity and comparing the output.. +// then advancing the watermark to infinity and comparing the output. func TestStreamByteSliceSequence() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() - b := []byte{91, 92, 93} - con.AddElements(1, b) + bOne := []byte{1, 91} + bTwo := []byte{1, 92} + bThree := []byte{1, 93} + con.AddElements(1, bOne, bTwo, bThree) con.AdvanceWatermarkToInfinity() col := teststream.Create(s, con) - passert.Count(s, col, "teststream byte", 1) - passert.Equals(s, col, append([]byte{3}, b...)) + passert.Count(s, col, "teststream byte", 3) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, append([][]byte{}, bOne, bTwo, bThree)) return p } @@ -62,7 +72,10 @@ func TestStreamInt64Sequence() *beam.Pipeline { col := teststream.Create(s, con) passert.Count(s, col, "teststream int64", 3) - passert.EqualsList(s, col, ele) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, ele) return p } @@ -81,7 +94,10 @@ func TestStreamTwoInt64Sequences() *beam.Pipeline { col := teststream.Create(s, con) passert.Count(s, col, "teststream int64", 6) - passert.EqualsList(s, col, append(eo, et...)) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, append(eo, et...)) return p } @@ -97,7 +113,10 @@ func TestStreamFloat64Sequence() *beam.Pipeline { col := teststream.Create(s, con) passert.Count(s, col, "teststream float64", 3) - passert.EqualsList(s, col, ele) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, ele) return p } @@ -116,7 +135,10 @@ func TestStreamTwoFloat64Sequences() *beam.Pipeline { col := teststream.Create(s, con) passert.Count(s, col, "teststream float64", 6) - passert.EqualsList(s, col, append(eo, et...)) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, append(eo, et...)) return p } @@ -132,7 +154,10 @@ func TestStreamBoolSequence() *beam.Pipeline { col := teststream.Create(s, con) passert.Count(s, col, "teststream bool", 3) - passert.EqualsList(s, col, ele) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, ele) return p } @@ -151,6 +176,9 @@ func TestStreamTwoBoolSequences() *beam.Pipeline { col := teststream.Create(s, con) passert.Count(s, col, "teststream bool", 6) - passert.EqualsList(s, col, append(eo, et...)) + + winCol := beam.WindowInto(s, bigWinFn, col) + + passert.WindowedEqualsList(s, bigWinFn, winCol, append(eo, et...)) return p } From 762b859cf4f1d086e736c66c16916c2895f94c20 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 23 May 2022 13:04:16 -0400 Subject: [PATCH 2/3] Reroute Equals() through windowedEquals --- sdks/go/pkg/beam/testing/passert/equals.go | 13 +++--------- .../pkg/beam/testing/passert/equals_test.go | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/sdks/go/pkg/beam/testing/passert/equals.go b/sdks/go/pkg/beam/testing/passert/equals.go index 5c71a353239fd..08b7b4176cef2 100644 --- a/sdks/go/pkg/beam/testing/passert/equals.go +++ b/sdks/go/pkg/beam/testing/passert/equals.go @@ -34,11 +34,11 @@ func Equals(s beam.Scope, col beam.PCollection, values ...interface{}) beam.PCol return Empty(subScope, col) } if other, ok := values[0].(beam.PCollection); ok && len(values) == 1 { - return equals(subScope, col, other) + return windowedEquals(subScope, window.NewGlobalWindows(), col, other) } other := beam.Create(subScope, values...) - return equals(subScope, col, other) + return windowedEquals(subScope, window.NewGlobalWindows(), col, other) } // EqualsList verifies that the given collection has the same values as a @@ -51,7 +51,7 @@ func EqualsList(s beam.Scope, col beam.PCollection, list interface{}) beam.PColl return Empty(subScope, col) } listCollection := beam.CreateList(subScope, list) - return equals(subScope, col, listCollection) + return windowedEquals(subScope, window.NewGlobalWindows(), col, listCollection) } // WindowedEqualsList verifies that the given collection has the same values as a @@ -70,13 +70,6 @@ func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, list return windowedEquals(subScope, wfn, col, winList) } -// equals verifies that the actual values match the expected ones. -func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection { - unexpected, correct, missing := Diff(s, actual, expected) - beam.ParDo0(s, failIfBadEntries, beam.Impulse(s), beam.SideInput{Input: unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing}) - return actual -} - // windowedEquals verifies that the actual values match the expected ones in cases where the PCollections // cannot be globally windowed. func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected beam.PCollection) beam.PCollection { diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go index b0ddeae8d6f75..4f3da23bb39e0 100644 --- a/sdks/go/pkg/beam/testing/passert/equals_test.go +++ b/sdks/go/pkg/beam/testing/passert/equals_test.go @@ -35,6 +35,16 @@ func TestEquals_Good(t *testing.T) { } } +func TestEquals_Empty(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + gotC := beam.CreateList(s, []int{}) + + Equals(s, gotC) + if err := ptest.Run(p); err != nil { + t.Errorf("Pipeline failed: %v", err) + } +} + func TestEqualsList_Good(t *testing.T) { p, s := beam.NewPipelineWithRoot() wantL := [3]string{"c", "b", "a"} @@ -46,6 +56,16 @@ func TestEqualsList_Good(t *testing.T) { } } +func TestEqualsList_Empty(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + gotC := beam.CreateList(s, []int{}) + + EqualsList(s, gotC, nil) + if err := ptest.Run(p); err != nil { + t.Errorf("Pipeline failed: %v", err) + } +} + var badEqualsTests = []struct { name string actual []string From 4d699ffc4aaf090fd62365b03d8c4431a896d892 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 23 May 2022 14:10:19 -0400 Subject: [PATCH 3/3] Add windowing information to failIfBadEntries --- sdks/go/pkg/beam/testing/passert/equals.go | 11 ++++++++++- sdks/go/pkg/beam/testing/passert/equals_test.go | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/testing/passert/equals.go b/sdks/go/pkg/beam/testing/passert/equals.go index 08b7b4176cef2..ea8e62729dabb 100644 --- a/sdks/go/pkg/beam/testing/passert/equals.go +++ b/sdks/go/pkg/beam/testing/passert/equals.go @@ -23,6 +23,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" ) // Equals verifies the given collection has the same values as the given @@ -60,6 +61,9 @@ func EqualsList(s beam.Scope, col beam.PCollection, list interface{}) beam.PColl // the PCollections cannot be globally windowed (e.g. tests in unbounded pipelines.) // This windowing function is applied to both the PCollection created from the list // and the impulse used to trigger the Diff function. +// +// Both PCollections should have EventTimes associated with them to place expected elements +// in the correct windows or an extremely large window to cover all elements. func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, list interface{}) beam.PCollection { subScope := s.Scope("passert.WindowedEqualsList") if list == nil { @@ -86,7 +90,7 @@ const ( // 'missing' PCollections, and fails if so. The returned error message contains // a full list of each unexpected or missing entry. // If all the entries are in place, returns nil. -func failIfBadEntries(_ []byte, unexpected, correct, missing func(*beam.T) bool) error { +func failIfBadEntries(win typex.Window, _ []byte, unexpected, correct, missing func(*beam.T) bool) error { goodCount := 0 var dummy beam.T for correct(&dummy) { @@ -119,6 +123,11 @@ func failIfBadEntries(_ []byte, unexpected, correct, missing func(*beam.T) bool) for _, entry := range missingStrings { outStrings = append(outStrings, "---", entry) } + outStrings = append( + outStrings, + partSeparator, + fmt.Sprintf("windowing: %#v", win), + ) return errors.New(strings.Join(outStrings, "\n")) } diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go index 4f3da23bb39e0..ee7c1d95dea2f 100644 --- a/sdks/go/pkg/beam/testing/passert/equals_test.go +++ b/sdks/go/pkg/beam/testing/passert/equals_test.go @@ -217,4 +217,6 @@ func ExampleEqualsList_mismatch() { // 1 missing entries (missing in actual, present in expected) // --- // wrong + // ========= + // windowing: window.GlobalWindow{} }