Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3612] Closurize method invocations #7161

Merged
merged 1 commit into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ func NewFn(fn interface{}) (*Fn, error) {

case reflect.Struct:
methods := make(map[string]*funcx.Fn)
if methodsFuncs, ok := reflectx.WrapMethods(fn); ok {
for name, mfn := range methodsFuncs {
f, err := funcx.New(mfn)
if err != nil {
return nil, fmt.Errorf("method %v invalid: %v", name, err)
}
methods[name] = f
}
return &Fn{Recv: fn, methods: methods}, nil
}
// TODO(lostluck): Consider moving this into the reflectx package.
for i := 0; i < val.Type().NumMethod(); i++ {
m := val.Type().Method(i)
if m.PkgPath != "" {
Expand Down
186 changes: 117 additions & 69 deletions sdks/go/pkg/beam/core/runtime/exec/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,21 +505,33 @@ func BenchmarkMethodCalls(b *testing.B) {

indirectFunc := reflect.ValueOf(WhatsB).Interface().(func(int) int)

nrF := fV.Method(0)
nrFi := nrF.Interface().(func(int) int)
rxnrF := reflectx.MakeFunc(nrFi)
rx0x1nrF := reflectx.ToFunc1x1(rxnrF)
shimnrF := funcMakerInt(nrFi) // as if this shim were registered
shim0x1nrF := reflectx.ToFunc1x1(shimnrF) // would be MakeFunc0x1 if registered

wrF := fV.Type().Method(0).Func
wrFi := wrF.Interface().(func(*Foo, int) int)

rxF := reflectx.MakeFunc(wrFi)
rx1x1F := reflectx.ToFunc2x1(rxF)
shimF := funcMakerFooRInt(wrFi) // as if this shim were registered
shim1x1F := reflectx.ToFunc2x1(shimF) // would be MakeFunc1x1 if registered

// Implicit Receivers
impRF := fV.Method(0)
impRFi := impRF.Interface().(func(int) int)
impRxF := reflectx.MakeFunc(impRFi)
impRx1x1F := reflectx.ToFunc1x1(impRxF)
impRShimF := funcMakerInt(impRFi) // as if this shim were registered
impRShim1x1F := reflectx.ToFunc1x1(impRShimF) // would be MakeFunc1x1 if registered

// Explicit Receivers
expRF := fV.Type().Method(0).Func
expRFi := expRF.Interface().(func(*Foo, int) int)

expRxF := reflectx.MakeFunc(expRFi)
expRx2c1F := reflectx.ToFunc2x1(expRxF)
expRShimF := funcMakerFooRInt(expRFi) // as if this shim were registered
expRShim2x1F := reflectx.ToFunc2x1(expRShimF) // would be MakeFunc2x1 if registered

// Closured Receivers
wrappedWhatsA := func(a int) int { return f.WhatsA(a) }
clsrRF := reflect.ValueOf(wrappedWhatsA)
clsrRFi := clsrRF.Interface().(func(int) int)
clsrRxF := reflectx.MakeFunc(clsrRFi)
clsrRx1x1F := reflectx.ToFunc1x1(clsrRxF)
clsrRShimF := funcMakerInt(clsrRFi) // as if this shim were registered
clsrRShim1x1F := reflectx.ToFunc1x1(clsrRShimF) // would be MakeFunc1x1 if registered

// Parameters
var a int
var ai interface{} = a
aV := reflect.ValueOf(a)
Expand All @@ -532,39 +544,57 @@ func BenchmarkMethodCalls(b *testing.B) {
name string
fn func()
}{
{"DirectMethod", func() { a = g.WhatsA(a) }}, // Baseline as low as we can go.
{"DirectFunc", func() { a = WhatsB(a) }}, // For comparison purposes
{"DirectMethod", func() { a = g.WhatsA(a) }}, // Baseline as low as we can go.
{"DirectFunc", func() { a = WhatsB(a) }}, // For comparison purposes
{"IndirectFunc", func() { a = indirectFunc(a) }}, // For comparison purposes

// Implicits
{"IndirectImplicit", func() { a = impRFi(a) }}, // Measures the indirection through reflect.Value cost.
{"TypeAssertedImplicit", func() { ai = impRFi(ai.(int)) }}, // Measures the type assertion cost over the above.

{"ReflectCallImplicit", func() { a = impRF.Call([]reflect.Value{reflect.ValueOf(a)})[0].Interface().(int) }},
{"ReflectCallImplicit-NoWrap", func() { a = impRF.Call([]reflect.Value{aV})[0].Interface().(int) }},
{"ReflectCallImplicit-NoReallocSlice", func() { a = impRF.Call(rvSlice)[0].Interface().(int) }},

{"ReflectXCallImplicit", func() { a = impRxF.Call([]interface{}{a})[0].(int) }},
{"ReflectXCallImplicit-NoReallocSlice", func() { a = impRxF.Call(efaceSlice)[0].(int) }},
{"ReflectXCall1x1Implicit", func() { a = impRx1x1F.Call1x1(a).(int) }}, // Measures the default shimfunc overhead.

{"ShimedCallImplicit", func() { a = impRShimF.Call([]interface{}{a})[0].(int) }}, // What we're currently using for invoking methods
{"ShimedCallImplicit-NoReallocSlice", func() { a = impRShimF.Call(efaceSlice)[0].(int) }}, // Closer to what we're using now.
{"ShimedCall1x1Implicit", func() { a = impRShim1x1F.Call1x1(a).(int) }},

{"IndirectFunc", func() { a = indirectFunc(a) }}, // For comparison purposes
{"IndirectImplicit", func() { a = nrFi(a) }}, // Measures the indirection through reflect.Value cost.
{"TypeAssertedImplicit", func() { ai = nrFi(ai.(int)) }}, // Measures the type assertion cost over the above.
// Explicit
{"IndirectExplicit", func() { a = expRFi(g, a) }}, // Measures the indirection through reflect.Value cost.
{"TypeAssertedExplicit", func() { ai = expRFi(gi.(*Foo), ai.(int)) }}, // Measures the type assertion cost over the above.

{"ReflectCallImplicit", func() { a = nrF.Call([]reflect.Value{reflect.ValueOf(a)})[0].Interface().(int) }},
{"ReflectCallImplicit-NoWrap", func() { a = nrF.Call([]reflect.Value{aV})[0].Interface().(int) }},
{"ReflectCallImplicit-NoReallocSlice", func() { a = nrF.Call(rvSlice)[0].Interface().(int) }},
{"ReflectCallExplicit", func() { a = expRF.Call([]reflect.Value{reflect.ValueOf(g), reflect.ValueOf(a)})[0].Interface().(int) }},
{"ReflectCallExplicit-NoWrap", func() { a = expRF.Call([]reflect.Value{gV, aV})[0].Interface().(int) }},
{"ReflectCallExplicit-NoReallocSlice", func() { a = expRF.Call(grvSlice)[0].Interface().(int) }},

{"ReflectXCallImplicit", func() { a = rxnrF.Call([]interface{}{a})[0].(int) }},
{"ReflectXCallImplicit-NoReallocSlice", func() { a = rxnrF.Call(efaceSlice)[0].(int) }},
{"ReflectXCall1x1Implicit", func() { a = rx0x1nrF.Call1x1(a).(int) }}, // Measures the default shimfunc overhead.
{"ReflectXCallExplicit", func() { a = expRxF.Call([]interface{}{g, a})[0].(int) }},
{"ReflectXCallExplicit-NoReallocSlice", func() { a = expRxF.Call(gEfaceSlice)[0].(int) }},
{"ReflectXCall2x1Explicit", func() { a = expRx2c1F.Call2x1(g, a).(int) }},

{"ShimedCallImplicit", func() { a = shimnrF.Call([]interface{}{a})[0].(int) }}, // What we're currently using for invoking methods
{"ShimedCallImplicit-NoReallocSlice", func() { a = shimnrF.Call(efaceSlice)[0].(int) }}, // Closer to what we're using now.
{"ShimedCall1x1Implicit", func() { a = shim0x1nrF.Call1x1(a).(int) }},
{"ShimedCallExplicit", func() { a = expRShimF.Call([]interface{}{g, a})[0].(int) }},
{"ShimedCallExplicit-NoReallocSlice", func() { a = expRShimF.Call(gEfaceSlice)[0].(int) }},
{"ShimedCall2x1Explicit", func() { a = expRShim2x1F.Call2x1(g, a).(int) }},

{"IndirectExplicit", func() { a = wrFi(g, a) }}, // Measures the indirection through reflect.Value cost.
{"TypeAssertedExplicit", func() { ai = wrFi(gi.(*Foo), ai.(int)) }}, // Measures the type assertion cost over the above.
// Closured
{"IndirectClosured", func() { a = clsrRFi(a) }}, // Measures the indirection through reflect.Value cost.
{"TypeAssertedClosured", func() { ai = clsrRFi(ai.(int)) }}, // Measures the type assertion cost over the above.

{"ReflectCallExplicit", func() { a = wrF.Call([]reflect.Value{reflect.ValueOf(g), reflect.ValueOf(a)})[0].Interface().(int) }},
{"ReflectCallExplicit-NoWrap", func() { a = wrF.Call([]reflect.Value{gV, aV})[0].Interface().(int) }},
{"ReflectCallExplicit-NoReallocSlice", func() { a = wrF.Call(grvSlice)[0].Interface().(int) }},
{"ReflectCallClosured", func() { a = clsrRF.Call([]reflect.Value{reflect.ValueOf(a)})[0].Interface().(int) }},
{"ReflectCallClosured-NoWrap", func() { a = clsrRF.Call([]reflect.Value{aV})[0].Interface().(int) }},
{"ReflectCallClosured-NoReallocSlice", func() { a = clsrRF.Call(rvSlice)[0].Interface().(int) }},

{"ReflectXCallExplicit", func() { a = rxF.Call([]interface{}{g, a})[0].(int) }},
{"ReflectXCallExplicit-NoReallocSlice", func() { a = rxF.Call(gEfaceSlice)[0].(int) }},
{"ReflectXCall2x1Explicit", func() { a = rx1x1F.Call2x1(g, a).(int) }},
{"ReflectXCallClosured", func() { a = clsrRxF.Call([]interface{}{a})[0].(int) }},
{"ReflectXCallClosured-NoReallocSlice", func() { a = clsrRxF.Call(efaceSlice)[0].(int) }},
{"ReflectXCall1x1Closured", func() { a = clsrRx1x1F.Call1x1(a).(int) }}, // Measures the default shimfunc overhead.

{"ShimedCallExplicit", func() { a = shimF.Call([]interface{}{g, a})[0].(int) }},
{"ShimedCallExplicit-NoReallocSlice", func() { a = shimF.Call(gEfaceSlice)[0].(int) }},
{"ShimedCall2x1Explicit", func() { a = shim1x1F.Call2x1(g, a).(int) }},
{"ShimedCallClosured", func() { a = clsrRShimF.Call([]interface{}{a})[0].(int) }}, // What we're currently using for invoking methods
{"ShimedCallClosured-NoReallocSlice", func() { a = clsrRShimF.Call(efaceSlice)[0].(int) }}, // Closer to what we're using now.
{"ShimedCall1x1Closured", func() { a = clsrRShim1x1F.Call1x1(a).(int) }},
}
for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
Expand All @@ -577,33 +607,51 @@ func BenchmarkMethodCalls(b *testing.B) {
}

/*
@lostluck 2018/10/30 on a desktop machine.

BenchmarkMethodCalls/DirectMethod-12 1000000000 2.02 ns/op
BenchmarkMethodCalls/DirectFunc-12 2000000000 1.81 ns/op
BenchmarkMethodCalls/IndirectFunc-12 300000000 4.66 ns/op
BenchmarkMethodCalls/IndirectImplicit-12 10000000 185 ns/op
BenchmarkMethodCalls/TypeAssertedImplicit-12 10000000 228 ns/op
BenchmarkMethodCalls/ReflectCallImplicit-12 3000000 479 ns/op
BenchmarkMethodCalls/ReflectCallImplicit-NoWrap-12 3000000 451 ns/op
BenchmarkMethodCalls/ReflectCallImplicit-NoReallocSlice-12 3000000 424 ns/op
BenchmarkMethodCalls/ReflectXCallImplicit-12 2000000 756 ns/op
BenchmarkMethodCalls/ReflectXCallImplicit-NoReallocSlice-12 2000000 662 ns/op **Default**
BenchmarkMethodCalls/ReflectXCall1x1Implicit-12 2000000 762 ns/op
BenchmarkMethodCalls/ShimedCallImplicit-12 5000000 374 ns/op
BenchmarkMethodCalls/ShimedCallImplicit-NoReallocSlice-12 5000000 289 ns/op **With specialized shims**
BenchmarkMethodCalls/ShimedCall1x1Implicit-12 5000000 249 ns/op **Arity specialized re-work of the invoker**

** Everything below requires an overhaul of structural DoFn invocation code, and regeneration of all included shims. **
BenchmarkMethodCalls/IndirectExplicit-12 300000000 4.81 ns/op
BenchmarkMethodCalls/TypeAssertedExplicit-12 50000000 35.4 ns/op
BenchmarkMethodCalls/ReflectCallExplicit-12 3000000 434 ns/op
BenchmarkMethodCalls/ReflectCallExplicit-NoWrap-12 5000000 397 ns/op
BenchmarkMethodCalls/ReflectCallExplicit-NoReallocSlice-12 5000000 390 ns/op
BenchmarkMethodCalls/ReflectXCallExplicit-12 2000000 755 ns/op
BenchmarkMethodCalls/ReflectXCallExplicit-NoReallocSlice-12 2000000 601 ns/op
BenchmarkMethodCalls/ReflectXCall2x1Explicit-12 2000000 735 ns/op
BenchmarkMethodCalls/ShimedCallExplicit-12 10000000 198 ns/op
BenchmarkMethodCalls/ShimedCallExplicit-NoReallocSlice-12 20000000 93.5 ns/op
BenchmarkMethodCalls/ShimedCall2x1Explicit-12 20000000 68.3 ns/op **Best we could do**
@lostluck 2018/11/29 on a Intel(R) Core(TM) i7-7Y75 CPU @ 1.30GHz Pixelbook.
The actual times will vary per machine, but the relative differences are unlikely to change
between machines.

*** Standard candles **
BenchmarkMethodCalls/DirectMethod-4 1000000000 2.00 ns/op
BenchmarkMethodCalls/DirectFunc-4 2000000000 1.87 ns/op
BenchmarkMethodCalls/IndirectFunc-4 300000000 4.68 ns/op

*** Implicit receiver variants **
BenchmarkMethodCalls/IndirectImplicit-4 10000000 187 ns/op
BenchmarkMethodCalls/TypeAssertedImplicit-4 10000000 207 ns/op
BenchmarkMethodCalls/ReflectCallImplicit-4 5000000 362 ns/op
BenchmarkMethodCalls/ReflectCallImplicit-NoWrap-4 5000000 350 ns/op
BenchmarkMethodCalls/ReflectCallImplicit-NoReallocSlice-4 5000000 365 ns/op
BenchmarkMethodCalls/ReflectXCallImplicit-4 2000000 874 ns/op
BenchmarkMethodCalls/ReflectXCallImplicit-NoReallocSlice-4 2000000 1227 ns/op **Default**
BenchmarkMethodCalls/ReflectXCall1x1Implicit-4 1000000 1184 ns/op
BenchmarkMethodCalls/ShimedCallImplicit-4 2000000 647 ns/op
BenchmarkMethodCalls/ShimedCallImplicit-NoReallocSlice-4 3000000 589 ns/op
BenchmarkMethodCalls/ShimedCall1x1Implicit-4 3000000 446 ns/op

*** Explicit receiver variants ***
BenchmarkMethodCalls/IndirectExplicit-4 200000000 7.64 ns/op
BenchmarkMethodCalls/TypeAssertedExplicit-4 50000000 26.9 ns/op
BenchmarkMethodCalls/ReflectCallExplicit-4 3000000 430 ns/op
BenchmarkMethodCalls/ReflectCallExplicit-NoWrap-4 5000000 394 ns/op
BenchmarkMethodCalls/ReflectCallExplicit-NoReallocSlice-4 3000000 375 ns/op
BenchmarkMethodCalls/ReflectXCallExplicit-4 2000000 621 ns/op
BenchmarkMethodCalls/ReflectXCallExplicit-NoReallocSlice-4 3000000 552 ns/op
BenchmarkMethodCalls/ReflectXCall2x1Explicit-4 2000000 839 ns/op
BenchmarkMethodCalls/ShimedCallExplicit-4 5000000 208 ns/op
BenchmarkMethodCalls/ShimedCallExplicit-NoReallocSlice-4 20000000 70.1 ns/op
BenchmarkMethodCalls/ShimedCall2x1Explicit-4 30000000 48.9 ns/op

*** Closured direct invocation variants ***
BenchmarkMethodCalls/IndirectClosured-4 300000000 4.93 ns/op
BenchmarkMethodCalls/TypeAssertedClosured-4 100000000 25.7 ns/op
BenchmarkMethodCalls/ReflectCallClosured-4 5000000 318 ns/op
BenchmarkMethodCalls/ReflectCallClosured-NoWrap-4 5000000 269 ns/op
BenchmarkMethodCalls/ReflectCallClosured-NoReallocSlice-4 5000000 266 ns/op
BenchmarkMethodCalls/ReflectXCallClosured-4 3000000 440 ns/op
BenchmarkMethodCalls/ReflectXCallClosured-NoReallocSlice-4 5000000 377 ns/op
BenchmarkMethodCalls/ReflectXCall1x1Closured-4 3000000 460 ns/op
BenchmarkMethodCalls/ShimedCallClosured-4 20000000 113 ns/op
BenchmarkMethodCalls/ShimedCallClosured-NoReallocSlice-4 20000000 61.5 ns/op **With specialized shims**
BenchmarkMethodCalls/ShimedCall1x1Closured-4 30000000 45.5 ns/op **Arity specialized re-work of the invoker**
*/
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/util/reflectx/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func RegisterFunc(t reflect.Type, maker func(interface{}) Func) {

key := t.String()
if _, exists := funcs[key]; exists {
log.Warnf(context.Background(), "Func for %v already registered. Overwriting.", key)
log.Debugf(context.Background(), "Func for %v already registered. Overwriting.", key)
}
funcs[key] = maker
}
Expand Down
73 changes: 73 additions & 0 deletions sdks/go/pkg/beam/core/util/reflectx/structs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reflectx

import (
"context"
"reflect"
"sync"

"github.com/apache/beam/sdks/go/pkg/beam/log"
)

var (
structFuncs = make(map[string]func(interface{}) map[string]Func)
structFuncsMu sync.Mutex
)

// RegisterStructWrapper takes in the reflect.Type of a structural DoFn, and
// a wrapping function that will take an instance of that struct type and
// produce a map of method names to of closured Funcs that call the method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... method names to closured Funcs...

// on the instance of the struct.
//
// The goal is to avoid the implicit reflective method invocation penalty
// that occurs when passing a method through the reflect package.
func RegisterStructWrapper(t reflect.Type, wrapper func(interface{}) map[string]Func) {
structFuncsMu.Lock()
defer structFuncsMu.Unlock()

if t.Kind() != reflect.Struct {
log.Fatalf(context.Background(), "RegisterStructWrapper for %v should be a struct type, but was %v", t, t.Kind())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the extent of validity checking that can be performed on the struct? No required members?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we could be more strict here. The generator does lifecycle method filtering so only the life cycle methods have code generated for them. Beam at pipeline construction time should be doing that required method checking at construction time anyway.

Short of always using the tool, the development flow would be using the reflection based paths, so one hopes that beam catches those errors then.

Finally, I'd rather have any more dedicated checks in a vetting runner of some kind, to catch typos and the like, rather than in the code generation code.

}

key := t.String()
if _, exists := funcs[key]; exists {
log.Warnf(context.Background(), "StructWrapper for %v already registered. Overwriting.", key)
}
structFuncs[key] = wrapper
}

// WrapMethods takes in a struct value as an interface, and returns a map of
// method names to Funcs of those methods wrapped in a closure for the struct instance.
func WrapMethods(fn interface{}) (map[string]Func, bool) {
return wrapMethodsKeyed(reflect.TypeOf(fn), fn)
}

// WrapMethodsKeyed takes in a struct value as an interface
func wrapMethodsKeyed(t reflect.Type, fn interface{}) (map[string]Func, bool) {
structFuncsMu.Lock()
defer structFuncsMu.Unlock()
// Registering happens on the value, not the proto type.
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
key := t.String()
if f, exists := structFuncs[key]; exists {
log.Debugf(context.Background(), "EXTRACTING StructWrapper for %v", key)
return f(fn), true
}
return nil, false
}
Loading