Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14499] Fix TestStream integration tests #17733

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 35 additions & 8 deletions sdks/go/pkg/beam/testing/passert/equals.go
Expand Up @@ -22,6 +22,8 @@ import (
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

// Equals verifies the given collection has the same values as the given
Expand All @@ -33,11 +35,11 @@ func Equals(s beam.Scope, col beam.PCollection, values ...interface{}) beam.PCol
return Empty(subScope, col)
}
if other, ok := values[0].(beam.PCollection); ok && len(values) == 1 {
return equals(subScope, col, other)
return windowedEquals(subScope, window.NewGlobalWindows(), col, other)
}

other := beam.Create(subScope, values...)
return equals(subScope, col, other)
return windowedEquals(subScope, window.NewGlobalWindows(), col, other)
}

// EqualsList verifies that the given collection has the same values as a
Expand All @@ -50,13 +52,33 @@ func EqualsList(s beam.Scope, col beam.PCollection, list interface{}) beam.PColl
return Empty(subScope, col)
}
listCollection := beam.CreateList(subScope, list)
return equals(subScope, col, listCollection)
return windowedEquals(subScope, window.NewGlobalWindows(), col, listCollection)
}

// equals verifies that the actual values match the expected ones.
func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
unexpected, correct, missing := Diff(s, actual, expected)
beam.ParDo0(s, failIfBadEntries, beam.Impulse(s), beam.SideInput{Input: unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing})
// WindowedEqualsList verifies that the given collection has the same values as a
// given list, under coder equality. The values must be provided as an array or a slice.
// This function also takes a window function to window the list into for cases where
// the PCollections cannot be globally windowed (e.g. tests in unbounded pipelines.)
// This windowing function is applied to both the PCollection created from the list
// and the impulse used to trigger the Diff function.
//
// Both PCollections should have EventTimes associated with them to place expected elements
// in the correct windows or an extremely large window to cover all elements.
func WindowedEqualsList(s beam.Scope, wfn *window.Fn, col beam.PCollection, list interface{}) beam.PCollection {
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
subScope := s.Scope("passert.WindowedEqualsList")
if list == nil {
return Empty(subScope, col)
}
inter := beam.CreateList(subScope, list)
winList := beam.WindowInto(s, wfn, inter)
return windowedEquals(subScope, wfn, col, winList)
}

// windowedEquals verifies that the actual values match the expected ones in cases where the PCollections
// cannot be globally windowed.
func windowedEquals(s beam.Scope, wfn *window.Fn, actual, expected beam.PCollection) beam.PCollection {
unexpected, correct, missing := WindowedDiff(s, wfn, actual, expected)
beam.ParDo0(s, failIfBadEntries, beam.WindowInto(s, wfn, beam.Impulse(s)), beam.SideInput{Input: unexpected}, beam.SideInput{Input: correct}, beam.SideInput{Input: missing})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I don't think failIfBadEntries prints out windowing information - it probably should since that can now make a difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can be window function aware at that level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant information about the actual window, not the windowing function. Basically, if you get different results because of windowing it would be nice if that was obvious from the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay that was more doable. We have a windowing call-out now.

return actual
}

Expand All @@ -68,7 +90,7 @@ const (
// 'missing' PCollections, and fails if so. The returned error message contains
// a full list of each unexpected or missing entry.
// If all the entries are in place, returns nil.
func failIfBadEntries(_ []byte, unexpected, correct, missing func(*beam.T) bool) error {
func failIfBadEntries(win typex.Window, _ []byte, unexpected, correct, missing func(*beam.T) bool) error {
goodCount := 0
var dummy beam.T
for correct(&dummy) {
Expand Down Expand Up @@ -101,6 +123,11 @@ func failIfBadEntries(_ []byte, unexpected, correct, missing func(*beam.T) bool)
for _, entry := range missingStrings {
outStrings = append(outStrings, "---", entry)
}
outStrings = append(
outStrings,
partSeparator,
fmt.Sprintf("windowing: %#v", win),
)
return errors.New(strings.Join(outStrings, "\n"))
}

Expand Down
22 changes: 22 additions & 0 deletions sdks/go/pkg/beam/testing/passert/equals_test.go
Expand Up @@ -35,6 +35,16 @@ func TestEquals_Good(t *testing.T) {
}
}

func TestEquals_Empty(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
gotC := beam.CreateList(s, []int{})

Equals(s, gotC)
if err := ptest.Run(p); err != nil {
t.Errorf("Pipeline failed: %v", err)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test or two for windowing equality? Ideally at least 1 success and 1+ failures

func TestEqualsList_Good(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
wantL := [3]string{"c", "b", "a"}
Expand All @@ -46,6 +56,16 @@ func TestEqualsList_Good(t *testing.T) {
}
}

func TestEqualsList_Empty(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
gotC := beam.CreateList(s, []int{})

EqualsList(s, gotC, nil)
if err := ptest.Run(p); err != nil {
t.Errorf("Pipeline failed: %v", err)
}
}

var badEqualsTests = []struct {
name string
actual []string
Expand Down Expand Up @@ -197,4 +217,6 @@ func ExampleEqualsList_mismatch() {
// 1 missing entries (missing in actual, present in expected)
// ---
// wrong
// =========
// windowing: window.GlobalWindow{}
}
16 changes: 16 additions & 0 deletions sdks/go/pkg/beam/testing/passert/passert.go
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
Expand All @@ -44,6 +45,21 @@ func Diff(s beam.Scope, a, b beam.PCollection) (left, both, right beam.PCollecti
return beam.ParDo3(s, &diffFn{Type: beam.EncodedType{T: t.Type()}}, imp, beam.SideInput{Input: a}, beam.SideInput{Input: b})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to call out that we no longer actually use this ourselves, so its theoretically a candidate for deprecation/removal. With that said, I think this is both public and usable enough that people might have dependencies on it, so I don't really think that's a good idea. Mostly just had the thought and wanted to see how others feel about it.


// WindowedDiff splits 2 incoming PCollections into 3: left only, both, right only. Duplicates are
// preserved, so a value may appear multiple times and in multiple collections. Coder
// equality is used to determine equality. Should only be used for small collections,
// because all values are held in memory at the same time. WindowedDiff accepts a window function
// to apply to the Impulse so it can operate on PCollections that are not globally windowed but are windowed using
// the same window function.
func WindowedDiff(s beam.Scope, wfn *window.Fn, a, b beam.PCollection) (left, both, right beam.PCollection) {
imp := beam.Impulse(s)
wImp := beam.WindowInto(s, wfn, imp)

t := beam.ValidateNonCompositeType(a)
beam.ValidateNonCompositeType(b)
return beam.ParDo3(s, &diffFn{Type: beam.EncodedType{T: t.Type()}}, wImp, beam.SideInput{Input: a}, beam.SideInput{Input: b})
}

// diffFn computes the symmetrical multi-set difference of 2 collections, under
// coder equality. The Go values returned may be any of the coder-equal ones.
type diffFn struct {
Expand Down
50 changes: 39 additions & 11 deletions sdks/go/test/integration/primitives/teststream.go
Expand Up @@ -16,11 +16,16 @@
package primitives

import (
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
)

var bigWinFn *window.Fn = window.NewFixedWindows(10000 * time.Hour)

// TestStreamSequence tests the TestStream primitive by inserting string elements
// then advancing the watermark past the point where they were inserted.
func TestStreamStrings() *beam.Pipeline {
Expand All @@ -37,16 +42,21 @@ func TestStreamStrings() *beam.Pipeline {
}

// TestStreamByteSliceSequence tests the TestStream primitive by inserting byte slice elements
// then advancing the watermark to infinity and comparing the output..
// then advancing the watermark to infinity and comparing the output.
func TestStreamByteSliceSequence() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
b := []byte{91, 92, 93}
con.AddElements(1, b)
bOne := []byte{1, 91}
bTwo := []byte{1, 92}
bThree := []byte{1, 93}
con.AddElements(1, bOne, bTwo, bThree)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream byte", 1)
passert.Equals(s, col, append([]byte{3}, b...))
passert.Count(s, col, "teststream byte", 3)

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, append([][]byte{}, bOne, bTwo, bThree))
return p
}

Expand All @@ -62,7 +72,10 @@ func TestStreamInt64Sequence() *beam.Pipeline {
col := teststream.Create(s, con)

passert.Count(s, col, "teststream int64", 3)
passert.EqualsList(s, col, ele)

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, ele)
return p
}

Expand All @@ -81,7 +94,10 @@ func TestStreamTwoInt64Sequences() *beam.Pipeline {
col := teststream.Create(s, con)

passert.Count(s, col, "teststream int64", 6)
passert.EqualsList(s, col, append(eo, et...))

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, append(eo, et...))
return p
}

Expand All @@ -97,7 +113,10 @@ func TestStreamFloat64Sequence() *beam.Pipeline {
col := teststream.Create(s, con)

passert.Count(s, col, "teststream float64", 3)
passert.EqualsList(s, col, ele)

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, ele)
return p
}

Expand All @@ -116,7 +135,10 @@ func TestStreamTwoFloat64Sequences() *beam.Pipeline {
col := teststream.Create(s, con)

passert.Count(s, col, "teststream float64", 6)
passert.EqualsList(s, col, append(eo, et...))

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, append(eo, et...))
return p
}

Expand All @@ -132,7 +154,10 @@ func TestStreamBoolSequence() *beam.Pipeline {
col := teststream.Create(s, con)

passert.Count(s, col, "teststream bool", 3)
passert.EqualsList(s, col, ele)

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, ele)
return p
}

Expand All @@ -151,6 +176,9 @@ func TestStreamTwoBoolSequences() *beam.Pipeline {
col := teststream.Create(s, con)

passert.Count(s, col, "teststream bool", 6)
passert.EqualsList(s, col, append(eo, et...))

winCol := beam.WindowInto(s, bigWinFn, col)

passert.WindowedEqualsList(s, bigWinFn, winCol, append(eo, et...))
return p
}