Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13262] Forward for metrics.SingleResult #15993

Merged
merged 2 commits into from Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/go/examples/snippets/10metrics.go
Expand Up @@ -27,7 +27,7 @@ import (
// [START metrics_query]

func queryMetrics(pr beam.PipelineResult, ns, n string) metrics.QueryResults {
return pr.Metrics().Query(func(r metrics.SingleResult) bool {
return pr.Metrics().Query(func(r beam.MetricResult) bool {
return r.Namespace() == ns && r.Name() == n
})
}
Expand All @@ -49,7 +49,7 @@ func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metric
}

// Request the metric called "counter1" in namespace called "namespace"
ms := pr.Metrics().Query(func(r metrics.SingleResult) bool {
ms := pr.Metrics().Query(func(r beam.MetricResult) bool {
return r.Namespace() == "namespace" && r.Name() == "counter1"
})

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/metrics/metrics.go
Expand Up @@ -525,7 +525,7 @@ type SingleResult interface {
}

// Query allows metrics querying with filter. The filter takes the form of predicate function. Example:
// qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool {
// qr = pr.Metrics().Query(func(mr beam.MetricResult) bool {
// return sr.Namespace() == test.namespace
// })
func (mr Results) Query(f func(SingleResult) bool) QueryResults {
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/forward.go
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/genx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
Expand Down Expand Up @@ -215,3 +216,6 @@ var (

// EventTimeType is the reflect.Type of EventTime.
var EventTimeType = typex.EventTimeType

riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
// MetricResult represents a single metric value, for use in writing predicate functions to query PipelineResults.
type MetricResult = metrics.SingleResult
8 changes: 4 additions & 4 deletions sdks/go/test/integration/wordcount/wordcount_test.go
Expand Up @@ -113,8 +113,8 @@ func TestWordCount(t *testing.T) {
t.Errorf("WordCount(\"%v\") failed: %v", strings.Join(test.lines, "|"), err)
}

qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool {
return sr.Name() == "smallWords"
qr := pr.Metrics().Query(func(mr beam.MetricResult) bool {
return mr.Name() == "smallWords"
})
counter := metrics.CounterResult{}
if len(qr.Counters()) != 0 {
Expand All @@ -124,8 +124,8 @@ func TestWordCount(t *testing.T) {
t.Errorf("Metrics().Query(by Name) failed. Got %d counters, Want %d counters", counter.Result(), test.smallWordsCount)
}

qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool {
return sr.Name() == "lineLenDistro"
qr = pr.Metrics().Query(func(mr beam.MetricResult) bool {
return mr.Name() == "lineLenDistro"
})
distribution := metrics.DistributionResult{}
if len(qr.Distributions()) != 0 {
Expand Down