Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14470] Use Generic Registrations in loadtests. #17673

Merged
merged 7 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
def batchScenarios = {
[
[
title : 'SideInput Go Load test: 400mb-1kb-10workers-1window-first-iterable',
title : 'SideInput Go Load test: 10gb-1kb-10workers-1window-first-iterable',
test : 'sideinput',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
Expand All @@ -41,7 +41,7 @@ def batchScenarios = {
influx_namespace : 'dataflow',
influx_measurement : 'go_batch_sideinput_3',
input_options : '\'{' +
'"num_records": 400000,' +
'"num_records": 10000000,' +
'"key_size": 100,' +
'"value_size": 900}\'',
access_percentage: 1,
Expand All @@ -52,7 +52,7 @@ def batchScenarios = {
]
],
[
title : 'SideInput Go Load test: 400mb-1kb-10workers-1window-iterable',
title : 'SideInput Go Load test: 10gb-1kb-10workers-1window-iterable',
test : 'sideinput',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
Expand All @@ -64,7 +64,7 @@ def batchScenarios = {
influx_namespace : 'dataflow',
influx_measurement : 'go_batch_sideinput_4',
input_options : '\'{' +
'"num_records": 400000,' +
'"num_records": 10000000,' +
'"key_size": 100,' +
'"value_size": 900}\'',
num_workers : 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"links": [],
"panels": [
{
"content": "The following options should be used by default:\n* key size: 100B\n* value size: 900B\n* number of workers: 10\n* size of the window (if fixed windows are used): 1 second\n\n[Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)\n\nUntil the issue [BEAM-11427](https://issues.apache.org/jira/browse/BEAM-11427) in Go SDK is resolved, sideinput iteration test have 400MB, instead of 10GB.",
"content": "Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)",
"datasource": null,
"gridPos": {
"h": 8,
Expand Down
4 changes: 3 additions & 1 deletion sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func NewFn(fn interface{}) (*Fn, error) {
}
methods[name] = f
}
return &Fn{Recv: fn, methods: methods, annotations: annotations}, nil
}
// TODO(lostluck): Consider moving this into the reflectx package.
for i := 0; i < val.Type().NumMethod(); i++ {
Expand All @@ -133,6 +132,9 @@ func NewFn(fn interface{}) (*Fn, error) {
if m.Name == "String" {
continue // skip: harmless
}
if _, ok := methods[m.Name]; ok {
continue // skip : already wrapped
}

// CAVEAT(herohde) 5/22/2017: The type val.Type.Method.Type is not
// the same as val.Method.Type: the former has the explicit receiver.
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/io/synthetic/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (
"encoding/json"
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*SourceConfig)(nil)).Elem())
register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func([]byte, []byte), error]((*sourceFn)(nil))
register.Emitter2[[]byte, []byte]()
}

// Source creates a synthetic source transform that emits randomly
Expand Down
7 changes: 4 additions & 3 deletions sdks/go/pkg/beam/io/synthetic/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package synthetic
import (
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
)

func init() {
beam.RegisterType(reflect.TypeOf((*stepFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*sdfStepFn)(nil)).Elem())
register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*stepFn)(nil))
register.DoFn4x0[*sdf.LockRTracker, []byte, []byte, func([]byte, []byte)]((*sdfStepFn)(nil))
register.Emitter2[[]byte, []byte]()
}

// Step creates a synthetic step transform that receives KV<[]byte, []byte>
Expand Down
8 changes: 6 additions & 2 deletions sdks/go/test/load/cogbk/cogbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package main
import (
"context"
"flag"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)
Expand All @@ -43,14 +43,18 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*ungroupAndReiterateFn)(nil)).Elem())
register.DoFn4x0[[]byte, func(*[]byte) bool, func(*[]byte) bool, func([]byte, []byte)]((*ungroupAndReiterateFn)(nil))
register.Emitter2[[]byte, []byte]()
register.Iter1[[]byte]()
}

// ungroupAndReiterateFn reiterates given number of times over CoGBK's output.
type ungroupAndReiterateFn struct {
Iterations int
}

// TODO use re-iterators once supported.

func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, p1values, p2values func(*[]byte) bool, emit func([]byte, []byte)) {
var value []byte
for i := 0; i < fn.Iterations; i++ {
Expand Down
8 changes: 8 additions & 0 deletions sdks/go/test/load/combine/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
Expand Down Expand Up @@ -52,6 +53,12 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
}

func init() {
register.Function2x1(compareLess)
register.Function3x0(getElement)
register.Emitter2[[]byte, []byte]()
}

func compareLess(key []byte, value []byte) bool {
return bytes.Compare(key, value) < 0
}
Expand All @@ -73,6 +80,7 @@ func main() {
pcoll := top.LargestPerKey(s, src, *topCount, compareLess)
pcoll = beam.ParDo(s, getElement, pcoll)
pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll)
_ = pcoll
}

presult, err := beamx.RunWithMetrics(ctx, p)
Expand Down
38 changes: 27 additions & 11 deletions sdks/go/test/load/group_by_key/group_by_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)
Expand Down Expand Up @@ -52,6 +53,30 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
}

func init() {
register.DoFn2x2[[]byte, func(*[]byte) bool, []byte, []byte]((*ungroupAndReiterateFn)(nil))
register.Iter1[[]byte]()
}

// ungroupAndReiterateFn reiterates given number of times over GBK's output.
type ungroupAndReiterateFn struct {
Iterations int
}

// TODO use re-iterators once supported.

func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, values func(*[]byte) bool) ([]byte, []byte) {
var value []byte
for i := 0; i < fn.Iterations; i++ {
for values(&value) {
if i == fn.Iterations-1 {
return key, value
}
}
}
return key, []byte{0}
}

func main() {
flag.Parse()
beam.Init()
Expand All @@ -63,18 +88,9 @@ func main() {
src = beam.ParDo(s, &load.RuntimeMonitor{}, src)
for i := 0; i < *fanout; i++ {
pcoll := beam.GroupByKey(s, src)
pcoll = beam.ParDo(s, func(key []byte, values func(*[]byte) bool) ([]byte, []byte) {
for i := 0; i < *iterations; i++ {
var value []byte
for values(&value) {
if i == *iterations-1 {
return key, value
}
}
}
return key, []byte{0}
}, pcoll)
pcoll = beam.ParDo(s, &ungroupAndReiterateFn{*iterations}, pcoll)
pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll)
_ = pcoll
}

presult, err := beamx.RunWithMetrics(ctx, p)
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/test/load/pardo/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"context"
"flag"
"fmt"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)
Expand All @@ -48,7 +48,8 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*counterOperationFn)(nil)).Elem())
register.DoFn4x0[context.Context, []byte, []byte, func([]byte, []byte)]((*counterOperationFn)(nil))
register.Emitter2[[]byte, []byte]()
}

type counterOperationFn struct {
Expand All @@ -57,7 +58,10 @@ type counterOperationFn struct {
}

func newCounterOperationFn(operations, numCounters int) *counterOperationFn {
return &counterOperationFn{operations, numCounters, nil}
return &counterOperationFn{
Operations: operations,
NumCounters: numCounters,
}
}

func (fn *counterOperationFn) Setup() {
Expand Down
32 changes: 22 additions & 10 deletions sdks/go/test/load/sideinput/sideinput.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package main
import (
"context"
"flag"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)

func init() {
beam.RegisterDoFn(reflect.TypeOf((*doFn)(nil)))
register.DoFn4x0[[]byte, []byte, func(*[]byte, *[]byte) bool, func([]byte, []byte)]((*iterSideInputFn)(nil))
register.Emitter2[[]byte, []byte]()
register.Iter2[[]byte, []byte]()
register.Function2x0(impToKV)
}

var (
Expand All @@ -51,11 +54,17 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
}

type doFn struct {
// impToKV just turns an impulse signal into a KV instead of
// adding a single value input version of RuntimeMonitor
func impToKV(imp []byte, emit func([]byte, []byte)) {
emit(imp, imp)
}

type iterSideInputFn struct {
ElementsToAccess int64
}

func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
func (fn *iterSideInputFn) ProcessElement(_, _ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
var key, value []byte
var i int64
for values(&key, &value) {
Expand All @@ -74,18 +83,21 @@ func main() {
p, s := beam.NewPipelineWithRoot()

syntheticConfig := parseSyntheticConfig()
elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100)
elementsToAccess := syntheticConfig.NumElements * int64(float64(*accessPercentage)/float64(100))

src := synthetic.SourceSingle(s, syntheticConfig)
src = beam.ParDo(s, &load.RuntimeMonitor{}, src)

src = beam.ParDo(
imp := beam.Impulse(s)
impKV := beam.ParDo(s, impToKV, imp)
monitored := beam.ParDo(s, &load.RuntimeMonitor{}, impKV)

useSide := beam.ParDo(
s,
&doFn{ElementsToAccess: elementsToAccess},
beam.Impulse(s),
&iterSideInputFn{ElementsToAccess: elementsToAccess},
monitored,
beam.SideInput{Input: src})

beam.ParDo(s, &load.RuntimeMonitor{}, src)
beam.ParDo(s, &load.RuntimeMonitor{}, useSide)

presult, err := beamx.RunWithMetrics(ctx, p)
if err != nil {
Expand Down
21 changes: 14 additions & 7 deletions sdks/go/test/load/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"log"
"net/http"
"os"
"reflect"
"strings"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

const (
Expand Down Expand Up @@ -60,7 +60,8 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem())
register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*RuntimeMonitor)(nil))
register.Emitter2[[]byte, []byte]()
}

// RuntimeMonitor is a DoFn to record processing time in the pipeline.
Expand Down Expand Up @@ -132,10 +133,16 @@ func newLoadTestResult(value float64) loadTestResult {
// PublishMetrics calculates the runtime and sends the result to InfluxDB database.
func PublishMetrics(results metrics.QueryResults) {
options := newInfluxDBOptions()
ress := toLoadTestResults(results)
for _, res := range ress {
log.Printf("%s %v", res.metric, time.Duration(float64(time.Second)*res.value))
}
if len(ress) == 0 {
log.Print("No metrics returned.")
return
}
if options.validate() {
if res := toLoadTestResults(results); len(res) > 0 {
publishMetricstoInfluxDB(options, toLoadTestResults(results))
}
publishMetricstoInfluxDB(options, ress)
} else {
log.Print("Missing InfluxDB options. Metrics will not be published to InfluxDB")
}
Expand Down Expand Up @@ -212,8 +219,8 @@ func publishMetricstoInfluxDB(options *influxDBOptions, results []loadTestResult
if resp.StatusCode != 204 {
jsonData := make(map[string]string)
json.Unmarshal(body, &jsonData)
log.Print(fmt.Errorf("Failed to publish metrics to InfluxDB. Received status code %v "+
"with an error message: %v", resp.StatusCode, jsonData["error"]))
log.Printf("Failed to publish metrics to InfluxDB. Received status code %v "+
"with an error message: %v", resp.StatusCode, jsonData["error"])
}
}

Expand Down