Skip to content

Commit

Permalink
[BEAM-14470] Use Generic Registrations in loadtests. (#17673)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed May 14, 2022
1 parent 780ad62 commit 5064cc2
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 45 deletions.
8 changes: 4 additions & 4 deletions .test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
def batchScenarios = {
[
[
title : 'SideInput Go Load test: 400mb-1kb-10workers-1window-first-iterable',
title : 'SideInput Go Load test: 10gb-1kb-10workers-1window-first-iterable',
test : 'sideinput',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
Expand All @@ -41,7 +41,7 @@ def batchScenarios = {
influx_namespace : 'dataflow',
influx_measurement : 'go_batch_sideinput_3',
input_options : '\'{' +
'"num_records": 400000,' +
'"num_records": 10000000,' +
'"key_size": 100,' +
'"value_size": 900}\'',
access_percentage: 1,
Expand All @@ -52,7 +52,7 @@ def batchScenarios = {
]
],
[
title : 'SideInput Go Load test: 400mb-1kb-10workers-1window-iterable',
title : 'SideInput Go Load test: 10gb-1kb-10workers-1window-iterable',
test : 'sideinput',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
Expand All @@ -64,7 +64,7 @@ def batchScenarios = {
influx_namespace : 'dataflow',
influx_measurement : 'go_batch_sideinput_4',
input_options : '\'{' +
'"num_records": 400000,' +
'"num_records": 10000000,' +
'"key_size": 100,' +
'"value_size": 900}\'',
num_workers : 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"links": [],
"panels": [
{
"content": "The following options should be used by default:\n* key size: 100B\n* value size: 900B\n* number of workers: 10\n* size of the window (if fixed windows are used): 1 second\n\n[Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)\n\nUntil the issue [BEAM-11427](https://issues.apache.org/jira/browse/BEAM-11427) in Go SDK is resolved, sideinput iteration test have 400MB, instead of 10GB.",
"content": "Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)",
"datasource": null,
"gridPos": {
"h": 8,
Expand Down
4 changes: 3 additions & 1 deletion sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func NewFn(fn interface{}) (*Fn, error) {
}
methods[name] = f
}
return &Fn{Recv: fn, methods: methods, annotations: annotations}, nil
}
// TODO(lostluck): Consider moving this into the reflectx package.
for i := 0; i < val.Type().NumMethod(); i++ {
Expand All @@ -133,6 +132,9 @@ func NewFn(fn interface{}) (*Fn, error) {
if m.Name == "String" {
continue // skip: harmless
}
if _, ok := methods[m.Name]; ok {
continue // skip : already wrapped
}

// CAVEAT(herohde) 5/22/2017: The type val.Type.Method.Type is not
// the same as val.Method.Type: the former has the explicit receiver.
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/io/synthetic/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (
"encoding/json"
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*SourceConfig)(nil)).Elem())
register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func([]byte, []byte), error]((*sourceFn)(nil))
register.Emitter2[[]byte, []byte]()
}

// Source creates a synthetic source transform that emits randomly
Expand Down
7 changes: 4 additions & 3 deletions sdks/go/pkg/beam/io/synthetic/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package synthetic
import (
"fmt"
"math/rand"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
)

func init() {
beam.RegisterType(reflect.TypeOf((*stepFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*sdfStepFn)(nil)).Elem())
register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*stepFn)(nil))
register.DoFn4x0[*sdf.LockRTracker, []byte, []byte, func([]byte, []byte)]((*sdfStepFn)(nil))
register.Emitter2[[]byte, []byte]()
}

// Step creates a synthetic step transform that receives KV<[]byte, []byte>
Expand Down
8 changes: 6 additions & 2 deletions sdks/go/test/load/cogbk/cogbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package main
import (
"context"
"flag"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)
Expand All @@ -43,14 +43,18 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*ungroupAndReiterateFn)(nil)).Elem())
register.DoFn4x0[[]byte, func(*[]byte) bool, func(*[]byte) bool, func([]byte, []byte)]((*ungroupAndReiterateFn)(nil))
register.Emitter2[[]byte, []byte]()
register.Iter1[[]byte]()
}

// ungroupAndReiterateFn reiterates given number of times over CoGBK's output.
type ungroupAndReiterateFn struct {
Iterations int
}

// TODO use re-iterators once supported.

func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, p1values, p2values func(*[]byte) bool, emit func([]byte, []byte)) {
var value []byte
for i := 0; i < fn.Iterations; i++ {
Expand Down
8 changes: 8 additions & 0 deletions sdks/go/test/load/combine/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
Expand Down Expand Up @@ -52,6 +53,12 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
}

func init() {
register.Function2x1(compareLess)
register.Function3x0(getElement)
register.Emitter2[[]byte, []byte]()
}

func compareLess(key []byte, value []byte) bool {
return bytes.Compare(key, value) < 0
}
Expand All @@ -73,6 +80,7 @@ func main() {
pcoll := top.LargestPerKey(s, src, *topCount, compareLess)
pcoll = beam.ParDo(s, getElement, pcoll)
pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll)
_ = pcoll
}

presult, err := beamx.RunWithMetrics(ctx, p)
Expand Down
38 changes: 27 additions & 11 deletions sdks/go/test/load/group_by_key/group_by_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)
Expand Down Expand Up @@ -52,6 +53,30 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
}

func init() {
register.DoFn2x2[[]byte, func(*[]byte) bool, []byte, []byte]((*ungroupAndReiterateFn)(nil))
register.Iter1[[]byte]()
}

// ungroupAndReiterateFn reiterates given number of times over GBK's output.
type ungroupAndReiterateFn struct {
Iterations int
}

// TODO use re-iterators once supported.

func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, values func(*[]byte) bool) ([]byte, []byte) {
var value []byte
for i := 0; i < fn.Iterations; i++ {
for values(&value) {
if i == fn.Iterations-1 {
return key, value
}
}
}
return key, []byte{0}
}

func main() {
flag.Parse()
beam.Init()
Expand All @@ -63,18 +88,9 @@ func main() {
src = beam.ParDo(s, &load.RuntimeMonitor{}, src)
for i := 0; i < *fanout; i++ {
pcoll := beam.GroupByKey(s, src)
pcoll = beam.ParDo(s, func(key []byte, values func(*[]byte) bool) ([]byte, []byte) {
for i := 0; i < *iterations; i++ {
var value []byte
for values(&value) {
if i == *iterations-1 {
return key, value
}
}
}
return key, []byte{0}
}, pcoll)
pcoll = beam.ParDo(s, &ungroupAndReiterateFn{*iterations}, pcoll)
pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll)
_ = pcoll
}

presult, err := beamx.RunWithMetrics(ctx, p)
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/test/load/pardo/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"context"
"flag"
"fmt"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)
Expand All @@ -48,7 +48,8 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*counterOperationFn)(nil)).Elem())
register.DoFn4x0[context.Context, []byte, []byte, func([]byte, []byte)]((*counterOperationFn)(nil))
register.Emitter2[[]byte, []byte]()
}

type counterOperationFn struct {
Expand All @@ -57,7 +58,10 @@ type counterOperationFn struct {
}

func newCounterOperationFn(operations, numCounters int) *counterOperationFn {
return &counterOperationFn{operations, numCounters, nil}
return &counterOperationFn{
Operations: operations,
NumCounters: numCounters,
}
}

func (fn *counterOperationFn) Setup() {
Expand Down
32 changes: 22 additions & 10 deletions sdks/go/test/load/sideinput/sideinput.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package main
import (
"context"
"flag"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/test/load"
)

func init() {
beam.RegisterDoFn(reflect.TypeOf((*doFn)(nil)))
register.DoFn4x0[[]byte, []byte, func(*[]byte, *[]byte) bool, func([]byte, []byte)]((*iterSideInputFn)(nil))
register.Emitter2[[]byte, []byte]()
register.Iter2[[]byte, []byte]()
register.Function2x0(impToKV)
}

var (
Expand All @@ -51,11 +54,17 @@ func parseSyntheticConfig() synthetic.SourceConfig {
}
}

type doFn struct {
// impToKV just turns an impulse signal into a KV instead of
// adding a single value input version of RuntimeMonitor
func impToKV(imp []byte, emit func([]byte, []byte)) {
emit(imp, imp)
}

type iterSideInputFn struct {
ElementsToAccess int64
}

func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
func (fn *iterSideInputFn) ProcessElement(_, _ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
var key, value []byte
var i int64
for values(&key, &value) {
Expand All @@ -74,18 +83,21 @@ func main() {
p, s := beam.NewPipelineWithRoot()

syntheticConfig := parseSyntheticConfig()
elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100)
elementsToAccess := syntheticConfig.NumElements * int64(float64(*accessPercentage)/float64(100))

src := synthetic.SourceSingle(s, syntheticConfig)
src = beam.ParDo(s, &load.RuntimeMonitor{}, src)

src = beam.ParDo(
imp := beam.Impulse(s)
impKV := beam.ParDo(s, impToKV, imp)
monitored := beam.ParDo(s, &load.RuntimeMonitor{}, impKV)

useSide := beam.ParDo(
s,
&doFn{ElementsToAccess: elementsToAccess},
beam.Impulse(s),
&iterSideInputFn{ElementsToAccess: elementsToAccess},
monitored,
beam.SideInput{Input: src})

beam.ParDo(s, &load.RuntimeMonitor{}, src)
beam.ParDo(s, &load.RuntimeMonitor{}, useSide)

presult, err := beamx.RunWithMetrics(ctx, p)
if err != nil {
Expand Down
21 changes: 14 additions & 7 deletions sdks/go/test/load/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"log"
"net/http"
"os"
"reflect"
"strings"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

const (
Expand Down Expand Up @@ -60,7 +60,8 @@ var (
)

func init() {
beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem())
register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*RuntimeMonitor)(nil))
register.Emitter2[[]byte, []byte]()
}

// RuntimeMonitor is a DoFn to record processing time in the pipeline.
Expand Down Expand Up @@ -132,10 +133,16 @@ func newLoadTestResult(value float64) loadTestResult {
// PublishMetrics calculates the runtime and sends the result to InfluxDB database.
func PublishMetrics(results metrics.QueryResults) {
options := newInfluxDBOptions()
ress := toLoadTestResults(results)
for _, res := range ress {
log.Printf("%s %v", res.metric, time.Duration(float64(time.Second)*res.value))
}
if len(ress) == 0 {
log.Print("No metrics returned.")
return
}
if options.validate() {
if res := toLoadTestResults(results); len(res) > 0 {
publishMetricstoInfluxDB(options, toLoadTestResults(results))
}
publishMetricstoInfluxDB(options, ress)
} else {
log.Print("Missing InfluxDB options. Metrics will not be published to InfluxDB")
}
Expand Down Expand Up @@ -212,8 +219,8 @@ func publishMetricstoInfluxDB(options *influxDBOptions, results []loadTestResult
if resp.StatusCode != 204 {
jsonData := make(map[string]string)
json.Unmarshal(body, &jsonData)
log.Print(fmt.Errorf("Failed to publish metrics to InfluxDB. Received status code %v "+
"with an error message: %v", resp.StatusCode, jsonData["error"]))
log.Printf("Failed to publish metrics to InfluxDB. Received status code %v "+
"with an error message: %v", resp.StatusCode, jsonData["error"])
}
}

Expand Down

0 comments on commit 5064cc2

Please sign in to comment.