Skip to content

Commit

Permalink
Allow no state or not start state (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 19, 2023
1 parent 35fed84 commit d0397b3
Show file tree
Hide file tree
Showing 25 changed files with 213 additions and 35 deletions.
2 changes: 1 addition & 1 deletion integ/basic_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type basicWorkflowState1 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b basicWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion integ/basic_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type basicWorkflowState2 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b basicWorkflowState2) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 2 additions & 0 deletions integ/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func init() {
&skipWaitUntilWorkflow{},
skipWaitUntilWorkflow2{}, // test register by struct
rpcWorkflow{},
noStateWorkflow{},
noStartStateWorkflow{},
)
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion integ/interstate_workflow_state0.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type interStateWorkflowState0 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b interStateWorkflowState0) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion integ/interstate_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type interStateWorkflowState1 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b interStateWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion integ/interstate_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type interStateWorkflowState2 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b interStateWorkflowState2) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
30 changes: 30 additions & 0 deletions integ/no_startstate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package integ

import (
"context"
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNoStartStateWorkflow(t *testing.T) {
wfId := "TestNoStartStateWorkflow" + strconv.Itoa(int(time.Now().Unix()))
wf := noStartStateWorkflow{}

runId, err := client.StartWorkflow(context.Background(), wf, wfId, 10, 1, nil)
assert.Nil(t, err)
assert.NotEmpty(t, runId)

var rpcOutput int
err = client.InvokeRPC(context.Background(), wfId, "", wf.TestRPC, 1, &rpcOutput)
assert.Nil(t, err)
assert.Equal(t, 2, rpcOutput)

time.Sleep(time.Second * 2)
info, err := client.DescribeWorkflow(context.Background(), wfId, "")
assert.Nil(t, err)
assert.Equal(t, iwfidl.COMPLETED, info.Status)
}
38 changes: 38 additions & 0 deletions integ/no_startstate_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package integ

import (
"github.com/indeedeng/iwf-golang-sdk/iwf"
)

type noStartStateWorkflow struct {
iwf.WorkflowDefaults
}

func (b noStartStateWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef {
return []iwf.CommunicationMethodDef{
iwf.RPCMethodDef(b.TestRPC, nil),
}
}

func (b noStartStateWorkflow) GetWorkflowStates() []iwf.StateDef {
return []iwf.StateDef{
iwf.NonStartingStateDef(&noStartStateWorkflowState1{}),
}
}

func (b noStartStateWorkflow) TestRPC(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (interface{}, error) {
var i int
input.Get(&i)
i++
communication.TriggerStateMovements(iwf.NewStateMovement(noStartStateWorkflowState1{}, nil))
return i, nil
}

type noStartStateWorkflowState1 struct {
iwf.WorkflowStateDefaults
iwf.NoWaitUntil
}

func (b noStartStateWorkflowState1) Execute(ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
return iwf.GracefulCompletingWorkflow, nil
}
42 changes: 42 additions & 0 deletions integ/no_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package integ

import (
"context"
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"github.com/indeedeng/iwf-golang-sdk/iwf"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNoStateWorkflow(t *testing.T) {
wfId := "TestNoStateWorkflow" + strconv.Itoa(int(time.Now().Unix()))
wf := noStateWorkflow{}

runId, err := client.StartWorkflow(context.Background(), wf, wfId, 10, 1, nil)
assert.Nil(t, err)
assert.NotEmpty(t, runId)

time.Sleep(time.Second)
info, err := client.DescribeWorkflow(context.Background(), wfId, "")
assert.Nil(t, err)
assert.Equal(t, iwfidl.RUNNING, info.Status)

err = client.InvokeRPC(context.Background(), wfId, "", wf.TestErrorRPC, 1, nil)
assert.NotNil(t, err)
assert.True(t, iwf.IsRPCError(err))
rpcErr, _ := err.(*iwf.ApiError)
assert.Equal(t, "worker API error, status:501, errorType:test-error-type", rpcErr.Response.GetDetail())

err = client.StopWorkflow(context.Background(), wfId, "", &iwf.WorkflowStopOptions{
StopType: iwfidl.FAIL,
Reason: "test",
})
assert.Nil(t, err)
time.Sleep(time.Second * 2)
info, err = client.DescribeWorkflow(context.Background(), wfId, "")
assert.Nil(t, err)
assert.Equal(t, iwfidl.FAILED, info.Status)
}
20 changes: 20 additions & 0 deletions integ/no_state_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package integ

import (
"fmt"
"github.com/indeedeng/iwf-golang-sdk/iwf"
)

type noStateWorkflow struct {
iwf.WorkflowDefaults
}

func (b noStateWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef {
return []iwf.CommunicationMethodDef{
iwf.RPCMethodDef(b.TestErrorRPC, nil),
}
}

func (b noStateWorkflow) TestErrorRPC(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (interface{}, error) {
return nil, fmt.Errorf("test error")
}
2 changes: 1 addition & 1 deletion integ/persistence_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type persistenceWorkflowState1 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b persistenceWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion integ/persistence_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type persistenceWorkflowState2 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

const testText = "Hail iWF!"
Expand Down
2 changes: 1 addition & 1 deletion integ/proceed_on_state_start_fail_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type proceedOnStateStartFailWorkflowState2 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
output string
}

Expand Down
5 changes: 2 additions & 3 deletions integ/rpc_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
)

type rpcWorkflow struct {
iwf.DefaultWorkflowType
iwf.EmptyPersistenceSchema
iwf.WorkflowDefaults
}

func (b rpcWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef {
Expand Down Expand Up @@ -37,7 +36,7 @@ func (b rpcWorkflow) TestErrorRPC(ctx iwf.WorkflowContext, input iwf.Object, per
}

type rpcWorkflowState1 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b rpcWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion integ/signal_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type signalWorkflowState1 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

func (b signalWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion integ/signal_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type signalWorkflowState2 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
}

const timerCommandId = "timerId"
Expand Down
2 changes: 1 addition & 1 deletion integ/skip_wait_until_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type skipWaitUntilState1 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
iwf.NoWaitUntil
}

Expand Down
2 changes: 1 addition & 1 deletion integ/skip_wait_until_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type skipWaitUntilState2 struct {
iwf.DefaultStateIdAndOptions
iwf.WorkflowStateDefaults
iwf.NoWaitUntil
}

Expand Down
32 changes: 20 additions & 12 deletions iwf/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@ type clientImpl struct {

func (c *clientImpl) StartWorkflow(ctx context.Context, workflow ObjectWorkflow, workflowId string, timeoutSecs int32, input interface{}, options *WorkflowOptions) (string, error) {
wfType := GetFinalWorkflowType(workflow)
wf := c.registry.getWorkflow(wfType)
if wf == nil {
return "", NewInvalidArgumentError("workflow is not registered")
}

state := c.registry.getWorkflowStartingState(wfType)

startStateOpt := state.GetStateOptions()
if ShouldSkipWaitUntilAPI(state) {
if startStateOpt == nil {
startStateOpt = &iwfidl.WorkflowStateOptions{
SkipWaitUntil: ptr.Any(true),
unregOpt := &UnregisteredWorkflowOptions{}

startStateId := ""
if state != nil {
startStateId = GetFinalWorkflowStateId(state)
startStateOpt := state.GetStateOptions()
if ShouldSkipWaitUntilAPI(state) {
if startStateOpt == nil {
startStateOpt = &iwfidl.WorkflowStateOptions{
SkipWaitUntil: ptr.Any(true),
}
} else {
startStateOpt.SkipWaitUntil = ptr.Any(true)
}
} else {
startStateOpt.SkipWaitUntil = ptr.Any(true)
}
}

unregOpt := &UnregisteredWorkflowOptions{
StartStateOptions: startStateOpt,
unregOpt.StartStateOptions = startStateOpt
}

if options != nil {
Expand All @@ -47,7 +55,7 @@ func (c *clientImpl) StartWorkflow(ctx context.Context, workflow ObjectWorkflow,
}
unregOpt.InitialSearchAttributes = convertedSAs
}
return c.UnregisteredClient.StartWorkflow(ctx, wfType, GetFinalWorkflowStateId(state), workflowId, timeoutSecs, input, unregOpt)
return c.UnregisteredClient.StartWorkflow(ctx, wfType, startStateId, workflowId, timeoutSecs, input, unregOpt)
}

func convertToSearchAttributeList(types map[string]iwfidl.SearchAttributeValueType, attributes map[string]interface{}) ([]iwfidl.SearchAttribute, error) {
Expand Down
1 change: 1 addition & 0 deletions iwf/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Registry interface {
GetAllRegisteredWorkflowTypes() []string

// below are all for internal implementation
getWorkflow(wfType string) ObjectWorkflow
getWorkflowStartingState(wfType string) WorkflowState
getWorkflowStateDef(wfType string, id string) StateDef
getWorkflowRPC(wfType string, rpcMethod string) CommunicationMethodDef
Expand Down
10 changes: 6 additions & 4 deletions iwf/registry_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (r *registryImpl) getWorkflowRPC(wfType string, rpcMethod string) Communica
return r.workflowRPCStore[wfType][rpcMethod]
}

func (r *registryImpl) getWorkflow(wfType string) ObjectWorkflow {
return r.workflowStore[wfType]
}

func (r *registryImpl) registerWorkflow(wf ObjectWorkflow) error {
wfType := GetFinalWorkflowType(wf)
_, ok := r.workflowStore[wfType]
Expand All @@ -86,9 +90,6 @@ func (r *registryImpl) registerWorkflow(wf ObjectWorkflow) error {

func (r *registryImpl) registerWorkflowState(wf ObjectWorkflow) error {
wfType := GetFinalWorkflowType(wf)
if len(wf.GetWorkflowStates()) == 0 {
return NewWorkflowDefinitionErrorFmt("Workflow type %s must contain at least one workflow state", wfType)
}
stateMap := map[string]StateDef{}
var startingState WorkflowState
for _, state := range wf.GetWorkflowStates() {
Expand All @@ -106,8 +107,9 @@ func (r *registryImpl) registerWorkflowState(wf ObjectWorkflow) error {
}
}
}
r.workflowStartingState[wfType] = startingState
r.workflowStateStore[wfType] = stateMap
r.workflowStartingState[wfType] = startingState

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions iwf/state_movement.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ const (
ForceCompletingWorkflowStateId = "_SYS_FORCE_COMPLETING_WORKFLOW"
ForceFailingWorkflowStateId = "_SYS_FORCE_FAILING_WORKFLOW"
)

func NewStateMovement(st WorkflowState, in interface{}) StateMovement {
return StateMovement{
NextStateId: GetFinalWorkflowStateId(st),
NextStateInput: in,
}
}
6 changes: 5 additions & 1 deletion iwf/unregistered_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (u *unregisteredClientImpl) StartWorkflow(ctx context.Context, workflowType
}
}

var startStateIdPtr *string
if startStateId != "" {
startStateIdPtr = &startStateId
}
var stateOptions *iwfidl.WorkflowStateOptions
var startOptions *iwfidl.WorkflowStartOptions
if options != nil {
Expand All @@ -47,7 +51,7 @@ func (u *unregisteredClientImpl) StartWorkflow(ctx context.Context, workflowType
IwfWorkflowType: workflowType,
WorkflowTimeoutSeconds: timeoutSecs,
IwfWorkerUrl: u.options.WorkerUrl,
StartStateId: &startStateId,
StartStateId: startStateIdPtr,
StateInput: encodedInput,
StateOptions: stateOptions,
WorkflowStartOptions: startOptions,
Expand Down
Loading

0 comments on commit d0397b3

Please sign in to comment.