Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/funcx/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
FnBundleFinalization FnParamKind = 0x800
// FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator
FnWatermarkEstimator FnParamKind = 0x1000
// FnState indicates a function input parameter that implements state.Provider
// FnStateProvider indicates a function input parameter that implements state.Provider
FnStateProvider FnParamKind = 0x2000
)

Expand Down
16 changes: 9 additions & 7 deletions sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,15 +1272,10 @@ func validateState(fn *DoFn, numIn mainInputs) error {
err := errors.Errorf("Duplicate state key %v", k)
return errors.SetTopLevelMsgf(err, "Duplicate state key %v used by %v and %v. Ensure that state keys are"+
"unique per DoFn", k, orig, s)
} else {
stateKeys[k] = s
}
}

// TODO(#22736) - Remove this once state is fully supported
err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
"Please try upgrading to a newer release if one exists or wait for state support to be released.")
stateKeys[k] = s
}
} else {
if len(ps) > 0 {
err := errors.Errorf("ProcessElement doesn't use a StateProvider, but State structs are attached to "+
Expand All @@ -1291,6 +1286,13 @@ func validateState(fn *DoFn, numIn mainInputs) error {
}
}

if len(ps) > 0 {
// TODO(#22736) - Remove this once state is fully supported
err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
"Please try upgrading to a newer release if one exists or wait for state support to be released.")
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/core/runtime/exec/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ type stateProvider struct {
}

// ReadValueState reads a value state from the State API
func (s *stateProvider) ReadValueState(userStateId string) (interface{}, []state.Transaction, error) {
func (s *stateProvider) ReadValueState(userStateID string) (interface{}, []state.Transaction, error) {
// TODO(#22736) - read from the state api.
return nil, nil, errors.New("Stateful DoFns are not supported yet.")
return nil, nil, errors.New("stateful DoFns are not supported yet")
}

// WriteValueState writes a value state to the State API
func (s *stateProvider) WriteValueState(val state.Transaction) error {
// TODO(#22736) - read from the state api.
return errors.New("Stateful DoFns are not supported yet.")
return errors.New("stateful DoFns are not supported yet")
}

// Invoke invokes the fn with the given values. The extra values must match the non-main
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,15 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
input := unmarshalKeyedValues(transform.GetInputs())

if len(userState) > 0 {
stateIdToCoder := make(map[string]*coder.Coder)
stateIDToCoder := make(map[string]*coder.Coder)
for key, spec := range userState {
// TODO(#22736) - this will eventually need to be aware of which type of state its modifying to support non-Value state types.
cID := spec.GetReadModifyWriteSpec().CoderId
c, err := b.coders.Coder(cID)
if err != nil {
return nil, err
}
stateIdToCoder[key] = c
stateIDToCoder[key] = c
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
m.requirements[URNRequiresStatefulProcessing] = true
stateSpecs := make(map[string]*pipepb.StateSpec)
for _, ps := range edge.Edge.DoFn.PipelineState() {
coderId, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
coderID, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
if err != nil {
return handleErr(err)
}
Expand All @@ -471,7 +471,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
// See https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635
Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
CoderId: coderId,
CoderId: coderID,
},
},
Protocol: &pipepb.FunctionSpec{
Expand Down
16 changes: 9 additions & 7 deletions sdks/go/pkg/beam/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (
"reflect"
)

type TransactionType_Enum int32
// TransactionTypeEnum represents the type of state manipulation being done (e.g. set, clear, etc...)
type TransactionTypeEnum int32

const (
TransactionType_Set TransactionType_Enum = 0
TransactionType_Clear TransactionType_Enum = 1
TransactionTypeSet TransactionTypeEnum = 0
TransactionTypeClear TransactionTypeEnum = 1
)

var (
// ProviderType is the reflected type of state.Provider
ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
)

Expand All @@ -37,7 +39,7 @@ var (
// it is primarily used for implementations of the Provider interface to talk to the various State objects.
type Transaction struct {
Key string
Type TransactionType_Enum
Type TransactionTypeEnum
Val interface{}
}

Expand Down Expand Up @@ -67,7 +69,7 @@ type Value[T any] struct {
func (s *Value[T]) Write(p Provider, val T) error {
return p.WriteValueState(Transaction{
Key: s.Key,
Type: TransactionType_Set,
Type: TransactionTypeSet,
Val: val,
})
}
Expand All @@ -84,9 +86,9 @@ func (s *Value[T]) Read(p Provider) (T, bool, error) {
}
for _, t := range bufferedTransactions {
switch t.Type {
case TransactionType_Set:
case TransactionTypeSet:
cur = t.Val
case TransactionType_Clear:
case TransactionTypeClear:
cur = nil
}
}
Expand Down
18 changes: 9 additions & 9 deletions sdks/go/pkg/beam/core/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type fakeProvider struct {
err map[string]error
}

func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, []Transaction, error) {
if err, ok := s.err[userStateId]; ok {
func (s *fakeProvider) ReadValueState(userStateID string) (interface{}, []Transaction, error) {
if err, ok := s.err[userStateID]; ok {
return nil, nil, err
}
base := s.initialState[userStateId]
trans, ok := s.transactions[userStateId]
base := s.initialState[userStateID]
trans, ok := s.transactions[userStateID]
if !ok {
trans = []Transaction{}
}
Expand All @@ -58,15 +58,15 @@ func TestValueRead(t *testing.T) {
is["no_transactions"] = 1
ts["no_transactions"] = nil
is["basic_set"] = 1
ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionType_Set, Val: 3}}
ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionTypeSet, Val: 3}}
is["basic_clear"] = 1
ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionType_Clear, Val: nil}}
ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionTypeClear, Val: nil}}
is["set_then_clear"] = 1
ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: TransactionType_Clear, Val: nil}}
ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionTypeSet, Val: 3}, {Key: "set_then_clear", Type: TransactionTypeClear, Val: nil}}
is["set_then_clear_then_set"] = 1
ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 4}}
ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionTypeSet, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionTypeClear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionTypeSet, Val: 4}}
is["err"] = 1
ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 3}}
ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val: 3}}
es["err"] = errFake

f := fakeProvider{
Expand Down