From fb563514fa5f22e6afc22a2002b0fda1bd91cf2d Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Wed, 18 Feb 2026 00:18:43 +0000 Subject: [PATCH] Add custom window support in Go SDK Implement custom WindowFn registration for the Go SDK, reaching capability parity with Java/Python AssignContext. Custom WindowFns are plain structs registered via RegisterWindowFn and validated structurally at registration time, matching the DoFn pattern used elsewhere in the SDK. Two AssignWindows shapes are accepted: AssignWindows(typex.EventTime) []typex.Window AssignWindows(typex.EventTime, T) []typex.Window The second form is element-aware: the element value drives which window(s) it lands in. Validation happens via reflection at registration, so misuse fails at pipeline construction rather than at runtime. A package-level registry records the struct type and optional element type for cross-package lookup via LookupWindowFnMeta. An interface-based shape (WindowAssigner) was explored first. Structural typing was chosen instead because it keeps custom WindowFns consistent with DoFns, avoids forcing users to satisfy a Go-specific interface, and lets the same registry carry the element-type metadata that the dispatch and translation paths need. WindowFnInvoker dispatches in three tiers: typed interface (zero alloc), element-aware typed interface (zero alloc), and reflect.Value.Method.Call as a fallback (2 allocs/element). Serialization piggybacks on the Beam model proto (FunctionSpec with URN + JSON payload), so the internal v1 proto needs no new fields. The side-input mapping path panics for element-aware WindowFns, matching the existing sessions guard, since side-input windows have no element to feed AssignWindows. Integration coverage exercises an element-aware WindowFn end-to-end through the direct runner, verifying that elements with different SizeMs values land in the windows dictated by their own data. --- sdks/go/pkg/beam/core/graph/window/fn.go | 46 ++++++ sdks/go/pkg/beam/core/graph/window/fn_test.go | 75 +++++++++ sdks/go/pkg/beam/core/graph/window/invoke.go | 117 ++++++++++++++ .../pkg/beam/core/graph/window/invoke_test.go | 147 ++++++++++++++++++ .../go/pkg/beam/core/graph/window/register.go | 147 ++++++++++++++++++ .../pkg/beam/core/runtime/exec/translate.go | 49 ++++++ sdks/go/pkg/beam/core/runtime/exec/window.go | 17 +- .../pkg/beam/core/runtime/exec/window_test.go | 47 +++++- .../core/runtime/graphx/serialize_test.go | 35 +++++ .../pkg/beam/core/runtime/graphx/translate.go | 38 ++++- .../test/integration/primitives/windowinto.go | 136 ++++++++++++++++ .../integration/primitives/windowinto_test.go | 15 ++ 12 files changed, 864 insertions(+), 5 deletions(-) create mode 100644 sdks/go/pkg/beam/core/graph/window/invoke.go create mode 100644 sdks/go/pkg/beam/core/graph/window/invoke_test.go create mode 100644 sdks/go/pkg/beam/core/graph/window/register.go diff --git a/sdks/go/pkg/beam/core/graph/window/fn.go b/sdks/go/pkg/beam/core/graph/window/fn.go index df32a97b89c2..a1b890e214de 100644 --- a/sdks/go/pkg/beam/core/graph/window/fn.go +++ b/sdks/go/pkg/beam/core/graph/window/fn.go @@ -17,6 +17,7 @@ package window import ( "fmt" + "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -30,6 +31,7 @@ const ( FixedWindows Kind = "FIX" SlidingWindows Kind = "SLI" Sessions Kind = "SES" + CustomWindows Kind = "CUS" // User-defined custom WindowFn ) // NewGlobalWindows returns the default WindowFn, which places all elements @@ -53,6 +55,28 @@ func NewSessions(gap time.Duration) *Fn { return &Fn{Kind: Sessions, Gap: gap} } +// NewCustom returns a WindowFn backed by a user-defined custom window +// function. The fn value must be a pointer-to-struct whose concrete type +// has been registered with [RegisterWindowFn] during init. Custom window +// functions are non-merging and must return [IntervalWindow] values from +// their AssignWindows method. +// +// NewCustom panics if fn is nil or its type was not registered. +func NewCustom(fn any) *Fn { + if fn == nil { + panic("window.NewCustom: fn must not be nil") + } + t := reflect.TypeOf(fn) + st := t + if t.Kind() == reflect.Pointer { + st = t.Elem() + } + if LookupWindowFnMeta(st) == nil { + panic(fmt.Sprintf("window.NewCustom: type %v is not registered; call window.RegisterWindowFn during init()", t)) + } + return &Fn{Kind: CustomWindows, CustomFn: fn} +} + // Fn defines the window fn. type Fn struct { Kind Kind @@ -60,6 +84,24 @@ type Fn struct { Size time.Duration // FixedWindows, SlidingWindows Period time.Duration // SlidingWindows Gap time.Duration // Sessions + + CustomFn any // CustomWindows (nil for built-in kinds) +} + +// NeedsElement reports whether a CustomWindows Fn has an element-aware +// AssignWindows signature. Returns false for all built-in window kinds. +func (w *Fn) NeedsElement() bool { + if w.Kind != CustomWindows || w.CustomFn == nil { + return false + } + t := reflect.TypeOf(w.CustomFn) + if t.Kind() == reflect.Pointer { + t = t.Elem() + } + if meta := LookupWindowFnMeta(t); meta != nil { + return meta.NeedsElement() + } + return false } // TODO(herohde) 4/17/2018: do we need to expose the window type as well? @@ -82,6 +124,8 @@ func (w *Fn) String() string { return fmt.Sprintf("%v[%v@%v]", w.Kind, w.Size, w.Period) case Sessions: return fmt.Sprintf("%v[%v]", w.Kind, w.Gap) + case CustomWindows: + return fmt.Sprintf("%v[%v]", w.Kind, reflect.TypeOf(w.CustomFn)) default: return string(w.Kind) } @@ -105,6 +149,8 @@ func (w *Fn) Equals(o *Fn) bool { return w.Period == o.Period && w.Size == o.Size case Sessions: return w.Gap == o.Gap + case CustomWindows: + return reflect.DeepEqual(w.CustomFn, o.CustomFn) default: panic(fmt.Sprintf("unknown window type: %v", w)) } diff --git a/sdks/go/pkg/beam/core/graph/window/fn_test.go b/sdks/go/pkg/beam/core/graph/window/fn_test.go index 6baa37c41ea0..55a3bf697258 100644 --- a/sdks/go/pkg/beam/core/graph/window/fn_test.go +++ b/sdks/go/pkg/beam/core/graph/window/fn_test.go @@ -16,10 +16,18 @@ package window import ( + "strings" "testing" "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" ) +func init() { + RegisterWindowFn[*testWindowFn]() +} + func TestEquals(t *testing.T) { tests := []struct { name string @@ -81,6 +89,24 @@ func TestEquals(t *testing.T) { NewSlidingWindows(10*time.Millisecond, 100*time.Millisecond), false, }, + { + "custom equal", + NewCustom(&testWindowFn{BucketSize: 3000}), + NewCustom(&testWindowFn{BucketSize: 3000}), + true, + }, + { + "custom inequal", + NewCustom(&testWindowFn{BucketSize: 3000}), + NewCustom(&testWindowFn{BucketSize: 5000}), + false, + }, + { + "custom vs fixed", + NewCustom(&testWindowFn{BucketSize: 3000}), + NewFixedWindows(3 * time.Second), + false, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -90,3 +116,52 @@ func TestEquals(t *testing.T) { }) } } + +// testWindowFn is a minimal custom WindowFn for testing. +type testWindowFn struct { + BucketSize int64 +} + +func (f *testWindowFn) AssignWindows(ts typex.EventTime) []typex.Window { + bucket := typex.EventTime(f.BucketSize) + // Euclidean remainder; correct floor for negative ts. + start := ts - ((ts%bucket)+bucket)%bucket + end := start + bucket + return []typex.Window{IntervalWindow{Start: start, End: end}} +} + +func TestNewCustom(t *testing.T) { + fn := NewCustom(&testWindowFn{BucketSize: 3000}) + if fn.Kind != CustomWindows { + t.Errorf("NewCustom().Kind = %v, want %v", fn.Kind, CustomWindows) + } + if fn.CustomFn == nil { + t.Fatal("NewCustom().CustomFn is nil") + } +} + +func TestNewCustomPanicsOnNil(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Error("NewCustom(nil) did not panic") + } + }() + NewCustom(nil) +} + +func TestCustomCoder(t *testing.T) { + fn := NewCustom(&testWindowFn{BucketSize: 3000}) + got := fn.Coder() + want := coder.NewIntervalWindow() + if got.Kind != want.Kind { + t.Errorf("Coder().Kind = %v, want %v", got.Kind, want.Kind) + } +} + +func TestCustomString(t *testing.T) { + fn := NewCustom(&testWindowFn{BucketSize: 3000}) + s := fn.String() + if !strings.HasPrefix(s, "CUS[") { + t.Errorf("String() = %q, want prefix CUS[", s) + } +} diff --git a/sdks/go/pkg/beam/core/graph/window/invoke.go b/sdks/go/pkg/beam/core/graph/window/invoke.go new file mode 100644 index 000000000000..95bacc2c288a --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/window/invoke.go @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "fmt" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +// tsOnlyAssigner is the fast-path interface for timestamp-only custom WindowFns. +type tsOnlyAssigner interface { + AssignWindows(typex.EventTime) []typex.Window +} + +// anyElemAssigner is the fast-path interface for element-aware WindowFns +// whose element parameter is typed as any (interface{}). +type anyElemAssigner interface { + AssignWindows(typex.EventTime, any) []typex.Window +} + +// WindowFnInvoker wraps a custom WindowFn instance and dispatches +// AssignWindows calls through one of three paths, chosen once at +// construction: +// +// 1. Type-assert to tsOnlyAssigner — zero allocation. +// 2. Type-assert to anyElemAssigner — zero allocation. +// 3. reflect.Value.Method.Call — small per-call allocation. +type WindowFnInvoker struct { + call func(typex.EventTime, any) []typex.Window + needsElement bool +} + +// NewWindowFnInvoker builds an invoker for fn. The concrete type of fn +// must have been previously registered via RegisterWindowFn. +// Panics if fn's type is not registered. +func NewWindowFnInvoker(fn any) *WindowFnInvoker { + t := reflect.TypeOf(fn) + if t == nil { + panic("window.NewWindowFnInvoker: fn must not be nil") + } + structType := t + if t.Kind() == reflect.Pointer { + structType = t.Elem() + } + + meta := LookupWindowFnMeta(structType) + if meta == nil { + panic(fmt.Sprintf("window.NewWindowFnInvoker: type %v is not registered; call window.RegisterWindowFn during init()", t)) + } + + inv := &WindowFnInvoker{needsElement: meta.NeedsElement()} + + if !meta.NeedsElement() { + // Fast path 1: timestamp-only. + if a, ok := fn.(tsOnlyAssigner); ok { + inv.call = func(ts typex.EventTime, _ any) []typex.Window { + return a.AssignWindows(ts) + } + return inv + } + // Unreachable for well-typed registrations, but fall through to reflect. + } else { + // Fast path 2: element typed as any. + if a, ok := fn.(anyElemAssigner); ok { + inv.call = func(ts typex.EventTime, elem any) []typex.Window { + return a.AssignWindows(ts, elem) + } + return inv + } + } + + // Slow path 3: concrete element type — use reflect. + rv := reflect.ValueOf(fn) + m := rv.MethodByName("AssignWindows") + if !m.IsValid() { + panic(fmt.Sprintf("window.NewWindowFnInvoker: %v has no AssignWindows method", t)) + } + + if !meta.NeedsElement() { + inv.call = func(ts typex.EventTime, _ any) []typex.Window { + out := m.Call([]reflect.Value{reflect.ValueOf(ts)}) + return out[0].Interface().([]typex.Window) + } + } else { + inv.call = func(ts typex.EventTime, elem any) []typex.Window { + out := m.Call([]reflect.Value{reflect.ValueOf(ts), reflect.ValueOf(elem)}) + return out[0].Interface().([]typex.Window) + } + } + return inv +} + +// Invoke calls AssignWindows on the underlying WindowFn. +// If the WindowFn is timestamp-only, elem is ignored. +func (inv *WindowFnInvoker) Invoke(ts typex.EventTime, elem any) []typex.Window { + return inv.call(ts, elem) +} + +// NeedsElement reports whether the underlying WindowFn accepts an element. +func (inv *WindowFnInvoker) NeedsElement() bool { + return inv.needsElement +} diff --git a/sdks/go/pkg/beam/core/graph/window/invoke_test.go b/sdks/go/pkg/beam/core/graph/window/invoke_test.go new file mode 100644 index 000000000000..f70f4750ca27 --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/window/invoke_test.go @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +// elemAwareAnyWindowFn accepts an element typed as any — fast path 2. +type elemAwareAnyWindowFn struct { + SizeMs int64 +} + +func (f *elemAwareAnyWindowFn) AssignWindows(ts typex.EventTime, _ any) []typex.Window { + size := typex.EventTime(f.SizeMs) + start := ts - (ts % size) + return []typex.Window{IntervalWindow{Start: start, End: start + size}} +} + +// elemAwareConcreteWindowFn accepts a concrete element type — reflect path. +type elemAwareConcreteWindowFn struct { + DefaultSizeMs int64 +} + +func (f *elemAwareConcreteWindowFn) AssignWindows(ts typex.EventTime, elem int64) []typex.Window { + size := typex.EventTime(elem) + if size <= 0 { + size = typex.EventTime(f.DefaultSizeMs) + } + start := ts - (ts % size) + return []typex.Window{IntervalWindow{Start: start, End: start + size}} +} + +func init() { + RegisterWindowFn[*elemAwareAnyWindowFn]() + RegisterWindowFn[*elemAwareConcreteWindowFn]() +} + +func TestWindowFnInvoker_TimestampOnly(t *testing.T) { + fn := &testWindowFn{BucketSize: 3000} + inv := NewWindowFnInvoker(fn) + + if inv.NeedsElement() { + t.Fatal("NeedsElement() = true, want false") + } + + windows := inv.Invoke(1500, nil) + if len(windows) != 1 { + t.Fatalf("got %d windows, want 1", len(windows)) + } + want := IntervalWindow{Start: 0, End: 3000} + if !windows[0].Equals(want) { + t.Errorf("Invoke(1500, nil) = %v, want %v", windows[0], want) + } +} + +func TestWindowFnInvoker_AnyElem(t *testing.T) { + fn := &elemAwareAnyWindowFn{SizeMs: 5000} + inv := NewWindowFnInvoker(fn) + + if !inv.NeedsElement() { + t.Fatal("NeedsElement() = false, want true") + } + + windows := inv.Invoke(7500, "ignored") + if len(windows) != 1 { + t.Fatalf("got %d windows, want 1", len(windows)) + } + want := IntervalWindow{Start: 5000, End: 10000} + if !windows[0].Equals(want) { + t.Errorf("Invoke(7500, ignored) = %v, want %v", windows[0], want) + } +} + +func TestWindowFnInvoker_ConcreteElem(t *testing.T) { + fn := &elemAwareConcreteWindowFn{DefaultSizeMs: 1000} + inv := NewWindowFnInvoker(fn) + + if !inv.NeedsElement() { + t.Fatal("NeedsElement() = false, want true") + } + + // Element provides window size of 5000ms. + windows := inv.Invoke(7500, int64(5000)) + if len(windows) != 1 { + t.Fatalf("got %d windows, want 1", len(windows)) + } + want := IntervalWindow{Start: 5000, End: 10000} + if !windows[0].Equals(want) { + t.Errorf("Invoke(7500, 5000) = %v, want %v", windows[0], want) + } + + // Element <= 0: falls back to default. + windows = inv.Invoke(1500, int64(0)) + if len(windows) != 1 { + t.Fatalf("got %d windows, want 1", len(windows)) + } + want = IntervalWindow{Start: 1000, End: 2000} + if !windows[0].Equals(want) { + t.Errorf("Invoke(1500, 0) = %v, want %v", windows[0], want) + } +} + +func TestWindowFnInvoker_NeedsElementCorrectness(t *testing.T) { + tests := []struct { + name string + fn any + want bool + }{ + {"timestamp-only", &testWindowFn{BucketSize: 1000}, false}, + {"any-elem", &elemAwareAnyWindowFn{SizeMs: 1000}, true}, + {"concrete-elem", &elemAwareConcreteWindowFn{DefaultSizeMs: 1000}, true}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + inv := NewWindowFnInvoker(tc.fn) + if got := inv.NeedsElement(); got != tc.want { + t.Errorf("NeedsElement() = %v, want %v", got, tc.want) + } + }) + } +} + +func TestWindowFnInvoker_PanicOnUnregistered(t *testing.T) { + type unregisteredFn struct{} + defer func() { + if r := recover(); r == nil { + t.Error("NewWindowFnInvoker did not panic on unregistered type") + } + }() + NewWindowFnInvoker(&unregisteredFn{}) +} diff --git a/sdks/go/pkg/beam/core/graph/window/register.go b/sdks/go/pkg/beam/core/graph/window/register.go new file mode 100644 index 000000000000..840c842636a4 --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/window/register.go @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "fmt" + "reflect" + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +// windowFnMeta holds validated metadata about a registered custom WindowFn type. +type windowFnMeta struct { + // Type is the struct type (with pointer stripped), e.g. myWindowFn. + Type reflect.Type + // ElemType is the concrete element parameter type, or nil for + // timestamp-only signatures. + ElemType reflect.Type +} + +// NeedsElement reports whether this WindowFn signature accepts an element. +func (m *windowFnMeta) NeedsElement() bool { + return m.ElemType != nil +} + +var ( + windowFnRegistryMu sync.RWMutex + windowFnRegistry = map[reflect.Type]*windowFnMeta{} +) + +// LookupWindowFnMeta returns the validated metadata for a registered custom +// WindowFn type. The key is the struct type (pointer stripped). +// Returns nil if not registered. +func LookupWindowFnMeta(t reflect.Type) *windowFnMeta { + windowFnRegistryMu.RLock() + defer windowFnRegistryMu.RUnlock() + return windowFnRegistry[t] +} + +var ( + eventTimeType = reflect.TypeFor[typex.EventTime]() + windowSliceType = reflect.TypeFor[[]typex.Window]() +) + +// RegisterWindowFn registers a custom WindowFn type so it can be serialized +// and deserialized across process boundaries. Call RegisterWindowFn during +// init for every custom window function type used in the pipeline. +// +// The type parameter T must be a pointer-to-struct type with an +// AssignWindows method of one of the following shapes: +// +// func (f *MyFn) AssignWindows(ts typex.EventTime) []typex.Window +// func (f *MyFn) AssignWindows(ts typex.EventTime, elem T) []typex.Window +// +// RegisterWindowFn panics if the type is invalid or already registered. +// +// Example: +// +// func init() { +// window.RegisterWindowFn[*myWindowFn]() +// } +func RegisterWindowFn[T any]() { + var v T + t := reflect.TypeOf(v) + if t == nil { + panic("window.RegisterWindowFn: T must not be an untyped nil interface") + } + if t.Kind() != reflect.Pointer || t.Elem().Kind() != reflect.Struct { + panic(fmt.Sprintf("window.RegisterWindowFn: T must be a pointer to struct, got %v", t)) + } + + structType := t.Elem() + + m, ok := t.MethodByName("AssignWindows") + if !ok { + panic(fmt.Sprintf("window.RegisterWindowFn: %v has no AssignWindows method", t)) + } + + meta := validateAssignWindows(t, m) + + windowFnRegistryMu.Lock() + defer windowFnRegistryMu.Unlock() + + if _, dup := windowFnRegistry[structType]; dup { + panic(fmt.Sprintf("window.RegisterWindowFn: %v is already registered", t)) + } + windowFnRegistry[structType] = meta + + runtime.RegisterType(reflect.TypeOf(v)) +} + +// validateAssignWindows checks that the method has a valid signature and +// returns the corresponding metadata. +func validateAssignWindows(ptrType reflect.Type, m reflect.Method) *windowFnMeta { + mt := m.Type + // Method type includes the receiver as first param. + // Valid shapes: + // (receiver, typex.EventTime) -> []typex.Window numIn=2 + // (receiver, typex.EventTime, elemType) -> []typex.Window numIn=3 + + if mt.NumOut() != 1 || mt.Out(0) != windowSliceType { + panic(fmt.Sprintf( + "window.RegisterWindowFn: %v.AssignWindows must return []typex.Window, got %v", + ptrType, mt)) + } + + switch mt.NumIn() { + case 2: + // (receiver, typex.EventTime) + if mt.In(1) != eventTimeType { + panic(fmt.Sprintf( + "window.RegisterWindowFn: %v.AssignWindows first param must be typex.EventTime, got %v", + ptrType, mt.In(1))) + } + return &windowFnMeta{Type: ptrType.Elem(), ElemType: nil} + + case 3: + // (receiver, typex.EventTime, elemType) + if mt.In(1) != eventTimeType { + panic(fmt.Sprintf( + "window.RegisterWindowFn: %v.AssignWindows first param must be typex.EventTime, got %v", + ptrType, mt.In(1))) + } + elemType := mt.In(2) + return &windowFnMeta{Type: ptrType.Elem(), ElemType: elemType} + + default: + panic(fmt.Sprintf( + "window.RegisterWindowFn: %v.AssignWindows must take (typex.EventTime) or (typex.EventTime, T), got %d params (excluding receiver)", + ptrType, mt.NumIn()-1)) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index 13b40ea0d1c6..257dd9bd809c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -16,8 +16,10 @@ package exec import ( + "encoding/json" "fmt" "math/rand" + "reflect" "strconv" "strings" @@ -25,10 +27,12 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -275,6 +279,27 @@ func unmarshalWindowFn(wfn *pipepb.FunctionSpec) (*window.Fn, error) { gap := gapPB.AsDuration() return window.NewSessions(gap), nil + case graphx.URNCustomWindowFn: + var envelope struct { + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` + } + if err := json.Unmarshal(wfn.GetPayload(), &envelope); err != nil { + return nil, errors.Wrapf(err, "unmarshaling custom WindowFn envelope") + } + t, ok := runtime.LookupType(envelope.Type) + if !ok { + return nil, errors.Errorf("custom WindowFn type key %q not found in registry", envelope.Type) + } + if window.LookupWindowFnMeta(t) == nil { + return nil, errors.Errorf("type %v is not registered via window.RegisterWindowFn", t) + } + val := reflect.New(t) + if err := jsonx.Unmarshal(val.Interface(), envelope.Payload); err != nil { + return nil, errors.Wrapf(err, "unmarshaling custom WindowFn %v", t) + } + return window.NewCustom(val.Interface()), nil + default: return nil, errors.Errorf("unsupported window type: %v", urn) } @@ -312,6 +337,30 @@ func unmarshalAndMakeWindowMapping(wmfn *pipepb.FunctionSpec) (WindowMapper, err } size := sizePB.AsDuration() return &windowMapper{wfn: window.NewSlidingWindows(period, size)}, nil + case graphx.URNWindowMappingCustom: + var envelope struct { + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` + } + if err := json.Unmarshal(wmfn.GetPayload(), &envelope); err != nil { + return nil, errors.Wrapf(err, "unmarshaling custom window mapping envelope") + } + t, ok := runtime.LookupType(envelope.Type) + if !ok { + return nil, errors.Errorf("custom WindowFn type key %q not found in registry", envelope.Type) + } + meta := window.LookupWindowFnMeta(t) + if meta == nil { + return nil, errors.Errorf("type %v is not registered via window.RegisterWindowFn", t) + } + if meta.NeedsElement() { + return nil, errors.Errorf("element-aware custom WindowFn %v cannot be used for side input window mapping", t) + } + val := reflect.New(t) + if err := jsonx.Unmarshal(val.Interface(), envelope.Payload); err != nil { + return nil, errors.Wrapf(err, "unmarshaling custom WindowFn %v for window mapping", t) + } + return &windowMapper{wfn: window.NewCustom(val.Interface())}, nil default: return nil, fmt.Errorf("unsupported window mapping fn URN %v", urn) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/window.go b/sdks/go/pkg/beam/core/runtime/exec/window.go index fabd6af933d8..e0c2af422804 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/window.go +++ b/sdks/go/pkg/beam/core/runtime/exec/window.go @@ -31,6 +31,8 @@ type WindowInto struct { UID UnitID Fn *window.Fn Out Node + + invoker *window.WindowFnInvoker // non-nil for CustomWindows } // ID returns the UnitID for this unit. @@ -39,6 +41,9 @@ func (w *WindowInto) ID() UnitID { } func (w *WindowInto) Up(ctx context.Context) error { + if w.Fn.Kind == window.CustomWindows { + w.invoker = window.NewWindowFnInvoker(w.Fn.CustomFn) + } return nil } @@ -48,7 +53,7 @@ func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataContex func (w *WindowInto) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { windowed := &FullValue{ - Windows: assignWindows(w.Fn, elm.Timestamp), + Windows: assignWindows(w.Fn, elm.Timestamp, elm), Timestamp: elm.Timestamp, Elm: elm.Elm, Elm2: elm.Elm2, @@ -57,7 +62,7 @@ func (w *WindowInto) ProcessElement(ctx context.Context, elm *FullValue, values return w.Out.ProcessElement(ctx, windowed, values...) } -func assignWindows(wfn *window.Fn, ts typex.EventTime) []typex.Window { +func assignWindows(wfn *window.Fn, ts typex.EventTime, elm *FullValue) []typex.Window { switch wfn.Kind { case window.GlobalWindows: return window.SingleGlobalWindow @@ -82,6 +87,12 @@ func assignWindows(wfn *window.Fn, ts typex.EventTime) []typex.Window { // each other) will be merged. return []typex.Window{window.IntervalWindow{Start: ts, End: ts.Add(wfn.Gap)}} + case window.CustomWindows: + // Timestamp-only fast path for window mapping (side inputs). + // Element-aware path is handled by WindowInto.invoker. + inv := window.NewWindowFnInvoker(wfn.CustomFn) + return inv.Invoke(ts, elm) + default: panic(fmt.Sprintf("Unexpected window fn: %v", wfn)) } @@ -173,7 +184,7 @@ type windowMapper struct { } func (f *windowMapper) MapWindow(w typex.Window) (typex.Window, error) { - candidates := assignWindows(f.wfn, w.MaxTimestamp()) + candidates := assignWindows(f.wfn, w.MaxTimestamp(), nil) if len(candidates) == 0 { return nil, fmt.Errorf("failed to map main input window to side input window with WindowFn %v", f.wfn.String()) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/window_test.go b/sdks/go/pkg/beam/core/runtime/exec/window_test.go index e0bca2a74f4d..1fc47f5352a7 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/window_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/window_test.go @@ -113,10 +113,32 @@ func TestAssignWindow(t *testing.T) { window.IntervalWindow{Start: 60000, End: 120000}, }, }, + { + // Custom window that mimics 3-second fixed windows. + window.NewCustom(&fixedCustomWindowFn{SizeMs: 3000}), + 0, + []typex.Window{ + window.IntervalWindow{Start: 0, End: 3000}, + }, + }, + { + window.NewCustom(&fixedCustomWindowFn{SizeMs: 3000}), + 2999, + []typex.Window{ + window.IntervalWindow{Start: 0, End: 3000}, + }, + }, + { + window.NewCustom(&fixedCustomWindowFn{SizeMs: 3000}), + 3000, + []typex.Window{ + window.IntervalWindow{Start: 3000, End: 6000}, + }, + }, } for _, test := range tests { - out := assignWindows(test.fn, test.in) + out := assignWindows(test.fn, test.in, nil) if !window.IsEqualList(out, test.out) { t.Errorf("assignWindows(%v, %v) = %v, want %v", test.fn, test.in, out, test.out) } @@ -219,6 +241,29 @@ func TestMapWindows(t *testing.T) { } } +func init() { + window.RegisterWindowFn[*fixedCustomWindowFn]() +} + +// fixedCustomWindowFn is a test custom WindowFn that mimics fixed windows. +type fixedCustomWindowFn struct { + SizeMs int64 +} + +func (f *fixedCustomWindowFn) AssignWindows(ts typex.EventTime) []typex.Window { + size := typex.EventTime(f.SizeMs) + start := ts - (ts % size) + if ts < 0 { + // Go's % truncates toward zero, so for negative dividends + // ts%size is non-positive and ts-(ts%size) rounds toward + // zero instead of toward -inf. The double-mod expression + // computes the Euclidean (non-negative) remainder, giving + // a correct floor to the window boundary. + start = ts - (ts%size+size)%size + } + return []typex.Window{window.IntervalWindow{Start: start, End: start + size}} +} + func makeNoncedWindowValues(in []typex.Window, expect []typex.Window) ([]MainInput, []FullValue) { if len(in) != len(expect) { panic("provided window slices must be the same length") diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize_test.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize_test.go index 265d936a71ec..3adf628671c4 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize_test.go @@ -21,7 +21,9 @@ import ( "reflect" "strings" "testing" + "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" ) @@ -89,3 +91,36 @@ func TestEncodeType(t *testing.T) { } }) } + +func TestWindowFnRoundTrip(t *testing.T) { + tests := []struct { + name string + fn *window.Fn + }{ + {"global", window.NewGlobalWindows()}, + {"fixed", window.NewFixedWindows(5 * time.Second)}, + {"sliding", window.NewSlidingWindows(1*time.Second, 3*time.Second)}, + {"sessions", window.NewSessions(10 * time.Second)}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pb := encodeWindowFn(tc.fn) + got := decodeWindowFn(pb) + if !tc.fn.Equals(got) { + t.Errorf("roundtrip mismatch: got %v, want %v", got, tc.fn) + } + }) + } +} + +func TestWindowFnRoundTrip_CustomKind(t *testing.T) { + // Custom WindowFns are serialized via the Beam model proto + // (FunctionSpec), not the internal v1 proto. The v1 path only + // preserves the Kind so that EncodeMultiEdge does not fail. + fn := &window.Fn{Kind: window.CustomWindows} + pb := encodeWindowFn(fn) + got := decodeWindowFn(pb) + if got.Kind != window.CustomWindows { + t.Errorf("kind mismatch: got %v, want %v", got.Kind, window.CustomWindows) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 3994397e7ba5..5e57690eca39 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -17,7 +17,9 @@ package graphx import ( "context" + "encoding/json" "fmt" + "reflect" "sort" "strings" @@ -26,11 +28,14 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/contextreg" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource" @@ -69,6 +74,8 @@ const ( URNWindowMappingGlobal = "beam:go:windowmapping:global:v1" URNWindowMappingFixed = "beam:go:windowmapping:fixed:v1" URNWindowMappingSliding = "beam:go:windowmapping:sliding:v1" + URNWindowMappingCustom = "beam:go:windowmapping:custom:v1" + URNCustomWindowFn = "beam:go:windowfn:custom:v1" URNProgressReporting = "beam:protocol:progress_reporting:v1" URNMultiCore = "beam:protocol:multi_core_bundle_processing:v1" @@ -358,6 +365,11 @@ func getSideWindowMappingUrn(winFn *window.Fn) string { mappingUrn = URNWindowMappingSliding case window.Sessions: panic("session windowing is not supported for side inputs") + case window.CustomWindows: + if winFn.NeedsElement() { + panic("element-aware custom WindowFn is not supported for side inputs") + } + mappingUrn = URNWindowMappingCustom } return mappingUrn } @@ -1461,6 +1473,30 @@ func makeWindowFn(w *window.Fn) (*pipepb.FunctionSpec, error) { }, ), }, nil + case window.CustomWindows: + t := reflect.TypeOf(w.CustomFn) + key, ok := runtime.TypeKey(reflectx.SkipPtr(t)) + if !ok { + return nil, errors.Errorf("custom WindowFn type %v is not registered", t) + } + structPayload, err := jsonx.Marshal(w.CustomFn) + if err != nil { + return nil, errors.Wrapf(err, "marshaling custom WindowFn %v", t) + } + envelope, err := json.Marshal(struct { + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` + }{ + Type: key, + Payload: structPayload, + }) + if err != nil { + return nil, errors.Wrapf(err, "marshaling custom WindowFn envelope") + } + return &pipepb.FunctionSpec{ + Urn: URNCustomWindowFn, + Payload: envelope, + }, nil default: return nil, errors.Errorf("unexpected windowing strategy: %v", w) } @@ -1470,7 +1506,7 @@ func makeWindowCoder(w *window.Fn) (*coder.WindowCoder, error) { switch w.Kind { case window.GlobalWindows: return coder.NewGlobalWindow(), nil - case window.FixedWindows, window.SlidingWindows, window.Sessions, URNSlidingWindowsWindowFn: + case window.FixedWindows, window.SlidingWindows, window.Sessions, window.CustomWindows, URNSlidingWindowsWindowFn: return coder.NewIntervalWindow(), nil default: return nil, errors.Errorf("unexpected windowing strategy for coder: %v", w) diff --git a/sdks/go/test/integration/primitives/windowinto.go b/sdks/go/test/integration/primitives/windowinto.go index f5d01bdfbba5..29f6ec79c1be 100644 --- a/sdks/go/test/integration/primitives/windowinto.go +++ b/sdks/go/test/integration/primitives/windowinto.go @@ -16,12 +16,14 @@ package primitives import ( + "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" @@ -36,6 +38,13 @@ func init() { register.Emitter3[beam.EventTime, string, int]() register.Emitter1[int]() register.Iter1[int]() + + window.RegisterWindowFn[*customFixedWindowFn]() + window.RegisterWindowFn[*elemAwareWindowFn]() + + register.DoFn2x0[[]byte, func(beam.EventTime, elemWithSize)](&createElemAwareData{}) + register.Emitter2[beam.EventTime, elemWithSize]() + register.Function1x1(extractValue) } // createTimestampedData produces data timestamped with the ordinal. @@ -413,3 +422,130 @@ func TriggerOrFinally(s beam.Scope) { beam.Trigger(trigger), }, 4) } + +// customFixedWindowFn is a user-defined custom WindowFn that replicates +// 3-second fixed windows, used to validate the custom WindowFn path +// end-to-end against the known-correct output from WindowSums_GBK. +type customFixedWindowFn struct { + SizeMs int64 +} + +func (f *customFixedWindowFn) AssignWindows(ts typex.EventTime) []typex.Window { + size := typex.EventTime(f.SizeMs) + start := ts - (ts.Add(time.Duration(f.SizeMs)*time.Millisecond) % mtime.FromDuration(time.Duration(f.SizeMs)*time.Millisecond)) + end := start + size + return []typex.Window{window.IntervalWindow{Start: start, End: end}} +} + +// ValidateCustomWindowedSideInputs checks that side inputs windowed with +// a custom WindowFn produce the same results as the equivalent fixed windows. +// Uses 1s custom fixed windows for the side input, same as "Fixed-Same" in +// ValidateWindowedSideInputs, expecting output 2, 4, 6. +func ValidateCustomWindowedSideInputs(s beam.Scope) { + timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s)) + timestampedData = beam.DropKey(s, timestampedData) + + windowSize := 1 * time.Second + customSideFn := window.NewCustom(&customFixedWindowFn{SizeMs: 1000}) + + // Main in standard 1s fixed windows, side in custom 1s fixed windows. + // Each window has one element; the side input in the same window adds + // the element to itself: 1+1=2, 2+2=4, 3+3=6. + wData := beam.WindowInto(s.Scope("MainWindow"), window.NewFixedWindows(windowSize), timestampedData) + wSide := beam.WindowInto(s.Scope("SideWindow"), customSideFn, timestampedData) + sums := beam.ParDo(s.Scope("Combine"), sumSideInputs, wData, beam.SideInput{Input: wSide}) + sums = beam.WindowInto(s.Scope("Rewindow"), window.NewGlobalWindows(), sums) + passert.Equals(s, sums, 2, 4, 6) +} + +// WindowSums_Custom validates that a custom WindowFn produces the same +// results as a 3-second fixed window. The magic square rows sum to 15 +// in each window, same as WindowSums_GBK. +func WindowSums_Custom(s beam.Scope) { + timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s)) + + wfn := window.NewCustom(&customFixedWindowFn{SizeMs: 3000}) + windowed := beam.WindowInto(s.Scope("Custom"), wfn, timestampedData) + sums := gbkSumPerKey(s.Scope("Sum"), windowed) + sums = beam.WindowInto(s.Scope("Rewindow"), window.NewGlobalWindows(), sums) + sums = beam.DropKey(s, sums) + passert.Equals(s, sums, 15, 15, 15) +} + +// elemWithSize carries a value and a window size in milliseconds. +// Each element dictates which window it belongs to. +type elemWithSize struct { + Value int + SizeMs int64 +} + +func init() { + beam.RegisterType(reflect.TypeOf((*elemWithSize)(nil)).Elem()) +} + +// elemAwareWindowFn uses the element's SizeMs field to determine the +// window size, demonstrating data-driven window assignment. +type elemAwareWindowFn struct{} + +func (f *elemAwareWindowFn) AssignWindows(ts typex.EventTime, elem elemWithSize) []typex.Window { + size := typex.EventTime(elem.SizeMs) + if size <= 0 { + size = 1000 // fallback: 1s + } + start := ts - (ts % size) + return []typex.Window{window.IntervalWindow{Start: start, End: start + size}} +} + +// createElemAwareData produces elemWithSize values with timestamps. +// The PCollection is single-valued (not KV) so that WindowInto sees +// the elemWithSize directly via elm.Elm. +type createElemAwareData struct { + Data []elemWithSize +} + +func (f *createElemAwareData) ProcessElement(_ []byte, emit func(beam.EventTime, elemWithSize)) { + for i, v := range f.Data { + timestamp := mtime.FromMilliseconds(int64((i + 1) * 1000)).Subtract(10 * time.Millisecond) + emit(timestamp, v) + } +} + +// WindowSums_ElementAware validates that an element-aware custom WindowFn +// correctly routes elements into different windows based on their content. +// +// We emit 6 elements at timestamps 990ms, 1990ms, ..., 5990ms. +// The first 3 elements carry SizeMs=3000 (3s windows) and values 4, 9, 2. +// The last 3 elements carry SizeMs=6000 (6s windows) and values 3, 5, 7. +// +// With 3s windows: ts 990->[0,3000), ts 1990->[0,3000), ts 2990->[0,3000) +// +// -> sum = 4+9+2 = 15 +// +// With 6s windows: ts 3990->[0,6000), ts 4990->[0,6000), ts 5990->[0,6000) +// +// -> sum = 3+5+7 = 15 +// +// After windowing, we extract values, add a fixed key, GBK, and sum. +func WindowSums_ElementAware(s beam.Scope) { + data := []elemWithSize{ + {Value: 4, SizeMs: 3000}, + {Value: 9, SizeMs: 3000}, + {Value: 2, SizeMs: 3000}, + {Value: 3, SizeMs: 6000}, + {Value: 5, SizeMs: 6000}, + {Value: 7, SizeMs: 6000}, + } + timestampedData := beam.ParDo(s, &createElemAwareData{Data: data}, beam.Impulse(s)) + + wfn := window.NewCustom(&elemAwareWindowFn{}) + windowed := beam.WindowInto(s.Scope("ElemAware"), wfn, timestampedData) + // Extract the Value field for summation. + values := beam.ParDo(s.Scope("ExtractValue"), extractValue, windowed) + sums := stats.Sum(s.Scope("Sum"), values) + sums = beam.WindowInto(s.Scope("Rewindow"), window.NewGlobalWindows(), sums) + passert.Equals(s, sums, 15, 15) +} + +func extractValue(e elemWithSize) int { + return e.Value +} diff --git a/sdks/go/test/integration/primitives/windowinto_test.go b/sdks/go/test/integration/primitives/windowinto_test.go index 39a1df6e9e74..02502846c2f5 100644 --- a/sdks/go/test/integration/primitives/windowinto_test.go +++ b/sdks/go/test/integration/primitives/windowinto_test.go @@ -97,3 +97,18 @@ func TestTriggerOrFinally(t *testing.T) { integration.CheckFilters(t) ptest.BuildAndRun(t, TriggerOrFinally) } + +func TestWindowSums_Custom(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, WindowSums_Custom) +} + +func TestValidateCustomWindowedSideInputs(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, ValidateCustomWindowedSideInputs) +} + +func TestWindowSums_ElementAware(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, WindowSums_ElementAware) +}