Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions sdks/go/pkg/beam/core/graph/window/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package window

import (
"fmt"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
Expand All @@ -30,6 +31,7 @@ const (
FixedWindows Kind = "FIX"
SlidingWindows Kind = "SLI"
Sessions Kind = "SES"
CustomWindows Kind = "CUS" // User-defined custom WindowFn
)

// NewGlobalWindows returns the default WindowFn, which places all elements
Expand All @@ -53,13 +55,53 @@ func NewSessions(gap time.Duration) *Fn {
return &Fn{Kind: Sessions, Gap: gap}
}

// NewCustom returns a WindowFn backed by a user-defined custom window
// function. The fn value must be a pointer-to-struct whose concrete type
// has been registered with [RegisterWindowFn] during init. Custom window
// functions are non-merging and must return [IntervalWindow] values from
// their AssignWindows method.
//
// NewCustom panics if fn is nil or its type was not registered.
func NewCustom(fn any) *Fn {
if fn == nil {
panic("window.NewCustom: fn must not be nil")
}
t := reflect.TypeOf(fn)
st := t
if t.Kind() == reflect.Pointer {
st = t.Elem()
}
if LookupWindowFnMeta(st) == nil {
panic(fmt.Sprintf("window.NewCustom: type %v is not registered; call window.RegisterWindowFn during init()", t))
}
return &Fn{Kind: CustomWindows, CustomFn: fn}
}

// Fn defines the window fn.
type Fn struct {
Kind Kind

Size time.Duration // FixedWindows, SlidingWindows
Period time.Duration // SlidingWindows
Gap time.Duration // Sessions

CustomFn any // CustomWindows (nil for built-in kinds)
}

// NeedsElement reports whether a CustomWindows Fn has an element-aware
// AssignWindows signature. Returns false for all built-in window kinds.
func (w *Fn) NeedsElement() bool {
if w.Kind != CustomWindows || w.CustomFn == nil {
return false
}
t := reflect.TypeOf(w.CustomFn)
if t.Kind() == reflect.Pointer {
t = t.Elem()
}
if meta := LookupWindowFnMeta(t); meta != nil {
return meta.NeedsElement()
}
return false
}

// TODO(herohde) 4/17/2018: do we need to expose the window type as well?
Expand All @@ -82,6 +124,8 @@ func (w *Fn) String() string {
return fmt.Sprintf("%v[%v@%v]", w.Kind, w.Size, w.Period)
case Sessions:
return fmt.Sprintf("%v[%v]", w.Kind, w.Gap)
case CustomWindows:
return fmt.Sprintf("%v[%v]", w.Kind, reflect.TypeOf(w.CustomFn))
default:
return string(w.Kind)
}
Expand All @@ -105,6 +149,8 @@ func (w *Fn) Equals(o *Fn) bool {
return w.Period == o.Period && w.Size == o.Size
case Sessions:
return w.Gap == o.Gap
case CustomWindows:
return reflect.DeepEqual(w.CustomFn, o.CustomFn)
default:
panic(fmt.Sprintf("unknown window type: %v", w))
}
Expand Down
75 changes: 75 additions & 0 deletions sdks/go/pkg/beam/core/graph/window/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
package window

import (
"strings"
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

func init() {
RegisterWindowFn[*testWindowFn]()
}

func TestEquals(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -81,6 +89,24 @@ func TestEquals(t *testing.T) {
NewSlidingWindows(10*time.Millisecond, 100*time.Millisecond),
false,
},
{
"custom equal",
NewCustom(&testWindowFn{BucketSize: 3000}),
NewCustom(&testWindowFn{BucketSize: 3000}),
true,
},
{
"custom inequal",
NewCustom(&testWindowFn{BucketSize: 3000}),
NewCustom(&testWindowFn{BucketSize: 5000}),
false,
},
{
"custom vs fixed",
NewCustom(&testWindowFn{BucketSize: 3000}),
NewFixedWindows(3 * time.Second),
false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -90,3 +116,52 @@ func TestEquals(t *testing.T) {
})
}
}

// testWindowFn is a minimal custom WindowFn for testing.
type testWindowFn struct {
BucketSize int64
}

func (f *testWindowFn) AssignWindows(ts typex.EventTime) []typex.Window {
bucket := typex.EventTime(f.BucketSize)
// Euclidean remainder; correct floor for negative ts.
start := ts - ((ts%bucket)+bucket)%bucket
end := start + bucket
return []typex.Window{IntervalWindow{Start: start, End: end}}
}

func TestNewCustom(t *testing.T) {
fn := NewCustom(&testWindowFn{BucketSize: 3000})
if fn.Kind != CustomWindows {
t.Errorf("NewCustom().Kind = %v, want %v", fn.Kind, CustomWindows)
}
if fn.CustomFn == nil {
t.Fatal("NewCustom().CustomFn is nil")
}
}

func TestNewCustomPanicsOnNil(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("NewCustom(nil) did not panic")
}
}()
NewCustom(nil)
}

func TestCustomCoder(t *testing.T) {
fn := NewCustom(&testWindowFn{BucketSize: 3000})
got := fn.Coder()
want := coder.NewIntervalWindow()
if got.Kind != want.Kind {
t.Errorf("Coder().Kind = %v, want %v", got.Kind, want.Kind)
}
}

func TestCustomString(t *testing.T) {
fn := NewCustom(&testWindowFn{BucketSize: 3000})
s := fn.String()
if !strings.HasPrefix(s, "CUS[") {
t.Errorf("String() = %q, want prefix CUS[", s)
}
}
117 changes: 117 additions & 0 deletions sdks/go/pkg/beam/core/graph/window/invoke.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package window

import (
"fmt"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

// tsOnlyAssigner is the fast-path interface for timestamp-only custom WindowFns.
type tsOnlyAssigner interface {
AssignWindows(typex.EventTime) []typex.Window
}

// anyElemAssigner is the fast-path interface for element-aware WindowFns
// whose element parameter is typed as any (interface{}).
type anyElemAssigner interface {
AssignWindows(typex.EventTime, any) []typex.Window
}

// WindowFnInvoker wraps a custom WindowFn instance and dispatches
// AssignWindows calls through one of three paths, chosen once at
// construction:
//
// 1. Type-assert to tsOnlyAssigner — zero allocation.
// 2. Type-assert to anyElemAssigner — zero allocation.
// 3. reflect.Value.Method.Call — small per-call allocation.
type WindowFnInvoker struct {
call func(typex.EventTime, any) []typex.Window
needsElement bool
}

// NewWindowFnInvoker builds an invoker for fn. The concrete type of fn
// must have been previously registered via RegisterWindowFn.
// Panics if fn's type is not registered.
func NewWindowFnInvoker(fn any) *WindowFnInvoker {
t := reflect.TypeOf(fn)
if t == nil {
panic("window.NewWindowFnInvoker: fn must not be nil")
}
structType := t
if t.Kind() == reflect.Pointer {
structType = t.Elem()
}

meta := LookupWindowFnMeta(structType)
if meta == nil {
panic(fmt.Sprintf("window.NewWindowFnInvoker: type %v is not registered; call window.RegisterWindowFn during init()", t))
}

inv := &WindowFnInvoker{needsElement: meta.NeedsElement()}

if !meta.NeedsElement() {
// Fast path 1: timestamp-only.
if a, ok := fn.(tsOnlyAssigner); ok {
inv.call = func(ts typex.EventTime, _ any) []typex.Window {
return a.AssignWindows(ts)
}
return inv
}
// Unreachable for well-typed registrations, but fall through to reflect.
} else {
// Fast path 2: element typed as any.
if a, ok := fn.(anyElemAssigner); ok {
inv.call = func(ts typex.EventTime, elem any) []typex.Window {
return a.AssignWindows(ts, elem)
}
return inv
}
}

// Slow path 3: concrete element type — use reflect.
rv := reflect.ValueOf(fn)
m := rv.MethodByName("AssignWindows")
if !m.IsValid() {
panic(fmt.Sprintf("window.NewWindowFnInvoker: %v has no AssignWindows method", t))
}

if !meta.NeedsElement() {
inv.call = func(ts typex.EventTime, _ any) []typex.Window {
out := m.Call([]reflect.Value{reflect.ValueOf(ts)})
return out[0].Interface().([]typex.Window)
}
} else {
inv.call = func(ts typex.EventTime, elem any) []typex.Window {
out := m.Call([]reflect.Value{reflect.ValueOf(ts), reflect.ValueOf(elem)})
return out[0].Interface().([]typex.Window)
}
}
return inv
}

// Invoke calls AssignWindows on the underlying WindowFn.
// If the WindowFn is timestamp-only, elem is ignored.
func (inv *WindowFnInvoker) Invoke(ts typex.EventTime, elem any) []typex.Window {
return inv.call(ts, elem)
}

// NeedsElement reports whether the underlying WindowFn accepts an element.
func (inv *WindowFnInvoker) NeedsElement() bool {
return inv.needsElement
}
Loading
Loading