Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions sdks/go/pkg/beam/core/graph/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ type MultiEdge struct {
id int
parent *Scope

Op Opcode
DoFn *DoFn // ParDo
CombineFn *CombineFn // Combine
AccumCoder *coder.Coder // Combine
Value []byte // Impulse
Payload *Payload // External
WindowFn *window.Fn // WindowInto
Op Opcode
DoFn *DoFn // ParDo
RestrictionCoder *coder.Coder // SplittableParDo
CombineFn *CombineFn // Combine
AccumCoder *coder.Coder // Combine
Value []byte // Impulse
Payload *Payload // External
WindowFn *window.Fn // WindowInto

Input []*Inbound
Output []*Outbound
Expand Down Expand Up @@ -296,11 +297,11 @@ func NewExternal(g *Graph, s *Scope, payload *Payload, in []*Node, out []typex.F
}

// NewParDo inserts a new ParDo edge into the graph.
func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, typedefs map[string]reflect.Type) (*MultiEdge, error) {
return newDoFnNode(ParDo, g, s, u, in, typedefs)
func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.Coder, typedefs map[string]reflect.Type) (*MultiEdge, error) {
return newDoFnNode(ParDo, g, s, u, in, rc, typedefs)
}

func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs map[string]reflect.Type) (*MultiEdge, error) {
func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.Coder, typedefs map[string]reflect.Type) (*MultiEdge, error) {
// TODO(herohde) 5/22/2017: revisit choice of ProcessElement as representative. We should
// perhaps create a synthetic method for binding purposes? The main question is how to
// tell which side input binds to which if the signatures differ, which is a downside of
Expand All @@ -321,6 +322,7 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma
n := g.NewNode(out[i], inputWindow(in), inputBounded(in))
edge.Output = append(edge.Output, &Outbound{To: n, Type: outbound[i]})
}
edge.RestrictionCoder = rc
return edge, nil
}

Expand Down
11 changes: 11 additions & 0 deletions sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ func (f *DoFn) Name() string {
return (*Fn)(f).Name()
}

// IsSplittable returns whether the DoFn is a valid Splittable DoFn.
func (f *DoFn) IsSplittable() bool {
return false // TODO(BEAM-3301): Implement this when we add SDFs.
}

// RestrictionT returns the restriction type from the DoFn if it's splittable.
// Otherwise, returns nil.
func (f *DoFn) RestrictionT() *reflect.Type {
return nil // TODO(BEAM-3301): Implement this when we add SDFs.
}

// TODO(herohde) 5/19/2017: we can sometimes detect whether the main input must be
// a KV or not based on the other signatures (unless we're more loose about which
// sideinputs are present). Bind should respect that.
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestParDo(t *testing.T) {
cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true)
dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true)

edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil)
edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil)
if err != nil {
t.Fatalf("invalid pardo: %v", err)
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func BenchmarkParDo_EmitSumFn(b *testing.B) {

g := graph.New()
nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true)
edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN}, nil)
edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN}, nil, nil)
if err != nil {
b.Fatalf("invalid pardo: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
},
SideInputs: si,
}
if edge.Edge.DoFn.IsSplittable() {
payload.Splittable = true
payload.RestrictionCoderId = m.coders.Add(edge.Edge.RestrictionCoder)
}
transformEnvID = m.addDefaultEnv()
spec = &pb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}

Expand Down
4 changes: 3 additions & 1 deletion sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func addDoFn(t *testing.T, g *graph.Graph, fn interface{}, scope *graph.Scope, i
if err != nil {
t.Fatal(err)
}
e, err := graph.NewParDo(g, scope, dofn, inputs, nil)
e, err := graph.NewParDo(g, scope, dofn, inputs, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -159,4 +159,6 @@ func TestMarshal(t *testing.T) {
}
})
}

// TODO(BEAM-3301): Add SDF test once we can make SDFs.
}
13 changes: 12 additions & 1 deletion sdks/go/pkg/beam/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package beam

import (
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"

"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
Expand Down Expand Up @@ -44,7 +46,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
for _, s := range side {
in = append(in, s.Input.n)
}
edge, err := graph.NewParDo(s.real, s.scope, fn, in, typedefs)

var rc *coder.Coder
if fn.IsSplittable() {
rc, err = inferCoder(typex.New(*fn.RestrictionT()))
if err != nil {
return nil, addParDoCtx(err, s)
}
}

edge, err := graph.NewParDo(s.real, s.scope, fn, in, rc, typedefs)
if err != nil {
return nil, addParDoCtx(err, s)
}
Expand Down