Skip to content

Commit

Permalink
[BEAM-11095] Better error handling for iter/reiter/multimap (#16794)
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Feb 10, 2022
1 parent 5a9a1bc commit 8b213c6
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 19 deletions.
10 changes: 9 additions & 1 deletion sdks/go/pkg/beam/core/funcx/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,15 @@ func New(fn reflectx.Func) (*Fn, error) {
if ok, err := IsMalformedEmit(t); ok {
return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t)
}
// TODO(damccorm) 2022.02.08: Handle IsMalformed[Iter, ReIter, MultiMap] cases (BEAM-11095)
if ok, err := IsMalformedIter(t); ok {
return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t)
}
if ok, err := IsMalformedReIter(t); ok {
return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t)
}
if ok, err := IsMalformedMultiMap(t); ok {
return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t)
}
return nil, errors.Errorf("bad parameter type for %s: %v", fn.Name(), t)
}

Expand Down
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/core/funcx/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,21 @@ func TestNew(t *testing.T) {
},
Err: errors.New(errIllegalParametersInEmit),
},
{
Name: "errIllegalParametersInIter - malformed Iter",
Fn: func(int, func(*nonConcreteType) bool, func(*int, *string) bool) {},
Err: errors.New(errIllegalParametersInIter),
},
{
Name: "errIllegalParametersInIter - malformed ReIter",
Fn: func(int, func() func(*nonConcreteType) bool, func(*int, *string) bool) {},
Err: errors.New(errIllegalParametersInReIter),
},
{
Name: "errIllegalParametersInMultiMap - malformed MultiMap",
Fn: func(int, func(string) func(*nonConcreteType) bool) {},
Err: errors.New(errIllegalParametersInMultiMap),
},
}

for _, test := range tests {
Expand Down
95 changes: 77 additions & 18 deletions sdks/go/pkg/beam/core/funcx/sideinput.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)

var (
errIllegalParametersInIter = "All parameters in an iter must be universal type, container type, or concrete type"
errIllegalParametersInReIter = "Output of a reiter must be valid iter type"
errIllegalParametersInMultiMap = "Output of a multimap must be valid iter type"
)

// IsIter returns true iff the supplied type is a "single sweep functional iterator".
Expand All @@ -30,10 +37,20 @@ import (
// will be copied into the supplied pointers. The function returns true if
// data was copied, and false if there is no more data available.
func IsIter(t reflect.Type) bool {
_, ok := UnfoldIter(t)
_, ok, _ := unfoldIter(t)
return ok
}

// IsMalformedIter returns true iff the supplied type is an illegal "single sweep
// functional iterator" and an error explaining why it is illegal. For example,
// an iterator is not legal if one of its parameters is not concrete, universal, or
// a container type. If the type does not have the structure of an iter or it is a
// legal iter, IsMalformedIter returns false and no error.
func IsMalformedIter(t reflect.Type) (bool, error) {
_, _, err := unfoldIter(t)
return err != nil, err
}

// UnfoldIter returns the parameter types, if a single sweep functional
// iterator. For example:
//
Expand All @@ -42,15 +59,20 @@ func IsIter(t reflect.Type) bool {
// func (*typex.EventTime, *int) bool returns {typex.EventTime, int}
//
func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
types, ok, _ := unfoldIter(t)
return types, ok
}

func unfoldIter(t reflect.Type) ([]reflect.Type, bool, error) {
if t.Kind() != reflect.Func {
return nil, false
return nil, false, nil
}

if t.NumOut() != 1 || t.Out(0) != reflectx.Bool {
return nil, false
return nil, false, nil
}
if t.NumIn() == 0 {
return nil, false
return nil, false, nil
}

var ret []reflect.Type
Expand All @@ -60,23 +82,26 @@ func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
skip = 1
}
if t.NumIn()-skip > 2 || t.NumIn() == skip {
return nil, false
return nil, false, nil
}

for i := skip; i < t.NumIn(); i++ {
if !isOutParam(t.In(i)) {
return nil, false
if ok, err := isOutParam(t.In(i)); !ok {
return nil, false, errors.Wrap(err, errIllegalParametersInIter)
}
ret = append(ret, t.In(i).Elem())
}
return ret, true
return ret, true, nil
}

func isOutParam(t reflect.Type) bool {
func isOutParam(t reflect.Type) (bool, error) {
if t.Kind() != reflect.Ptr {
return false
return false, errors.Errorf("Type %v of kind %v not allowed, must be ptr type", t, t.Kind())
}
return typex.IsConcrete(t.Elem()) || typex.IsUniversal(t.Elem()) || typex.IsContainer(t.Elem())
if typex.IsUniversal(t.Elem()) || typex.IsContainer(t.Elem()) {
return true, nil
}
return typex.CheckConcrete(t.Elem())
}

// IsReIter returns true iff the supplied type is a functional iterator generator.
Expand All @@ -88,15 +113,34 @@ func IsReIter(t reflect.Type) bool {
return ok
}

// IsMalformedReIter returns true iff the supplied type is an illegal functional
// iterator generator and an error explaining why it is illegal. An iterator generator
// is not legal if its output is not of type iterator. If the type does not
// have the structure of an iterator generator or it is a legal iterator generator,
// IsMalformedReIter returns false and no error.
func IsMalformedReIter(t reflect.Type) (bool, error) {
_, _, err := unfoldReIter(t)
return err != nil, err
}

// UnfoldReIter returns the parameter types, if a functional iterator generator.
func UnfoldReIter(t reflect.Type) ([]reflect.Type, bool) {
types, ok, _ := unfoldReIter(t)
return types, ok
}

func unfoldReIter(t reflect.Type) ([]reflect.Type, bool, error) {
if t.Kind() != reflect.Func {
return nil, false
return nil, false, nil
}
if t.NumIn() != 0 || t.NumOut() != 1 {
return nil, false
return nil, false, nil
}
types, ok, err := unfoldIter(t.Out(0))
if err != nil {
err = errors.Wrap(err, errIllegalParametersInReIter)
}
return UnfoldIter(t.Out(0))
return types, ok, err
}

// IsMultiMap returns true iff the supplied type is a keyed functional iterator
Expand All @@ -109,17 +153,32 @@ func IsMultiMap(t reflect.Type) bool {
return ok
}

// IsMalformedMultiMap returns true iff the supplied type is an illegal keyed functional
// iterator generator and an error explaining why it is illegal. A keyed iterator generator
// is not legal if its output is not of type iterator. If the type does not have the
// structure of a keyed iterator generator or it is a legal iterator generator,
// IsMalformedMultiMap returns false and no error.
func IsMalformedMultiMap(t reflect.Type) (bool, error) {
_, _, err := unfoldMultiMap(t)
return err != nil, err
}

// UnfoldMultiMap returns the parameter types for the input key and the output
// values iff the type is a keyed functional iterator generator.
func UnfoldMultiMap(t reflect.Type) ([]reflect.Type, bool) {
types, ok, _ := unfoldMultiMap(t)
return types, ok
}

func unfoldMultiMap(t reflect.Type) ([]reflect.Type, bool, error) {
if t.Kind() != reflect.Func {
return nil, false
return nil, false, nil
}
if t.NumIn() != 1 || t.NumOut() != 1 {
return nil, false
return nil, false, nil
}
types := []reflect.Type{t.In(0)}
iterTypes, is := UnfoldIter(t.Out(0))
iterTypes, is, err := unfoldIter(t.Out(0))
types = append(types, iterTypes...)
return types, is
return types, is, errors.Wrap(err, errIllegalParametersInMultiMap)
}

0 comments on commit 8b213c6

Please sign in to comment.