-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
The root is that the Go SDK doesn't use the "beam:transform:combine_globally:v1" URN, and always uses "beam:transform:combine_per_key:v1" even for global combines, with a AddFixedKey DoFn.
URN in the proto: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L347
Go SDK only having combine_per_key https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L42
We currently "detect" combines via a CombinePerKey scope https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L434
added at beam.TryCombinePerKey
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L58
We convert combines into the CombinePayload here
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L253
called above here:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L241
We probably want to just add a graph.CombineGlobal op ( vs the existing combine node), or modify the "CombinePerKey" scope hack to have a CombineCombineGlobal variant, or somehting that is cleaner than currently exists.
We'd also want to make sure the optimization takes place properly, which should be simple enough to detect timing wise at least once, if not as a regular benchmark.
Imported from Jira BEAM-11928. Original Jira may contain additional context.
Reported by: lostluck.