Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StateOptions and execute failure policy #74

Merged
merged 4 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions integ/execute_api_fail_recovery_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ import (
"github.com/indeedeng/iwf-golang-sdk/iwf"
)

type executeApiFailRecoveryWorkflowState1 struct{}
type executeApiFailRecoveryWorkflowState1 struct {
iwf.WorkflowStateDefaultsNoWaitUntil
}

func (b executeApiFailRecoveryWorkflowState1) GetStateId() string {
return "execute_api_fail_recovery_workflow_state1"
}

func (b executeApiFailRecoveryWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
options := iwf.NewWorkflowStateOptionsExtension(nil).SetProceedOnExecuteFailure(executeApiFailRecoveryWorkflowState2{}, nil)
options.ExecuteApiRetryPolicy = &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(1),
MaximumAttempts: iwfidl.PtrInt32(1),
func (b executeApiFailRecoveryWorkflowState1) GetStateOptions() *iwf.StateOptions {
options := &iwf.StateOptions{
ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(1),
MaximumAttempts: iwfidl.PtrInt32(1),
},
ExecuteApiFailureProceedState: &executeApiFailRecoveryWorkflowState2{},
}

return options
}

func (b executeApiFailRecoveryWorkflowState1) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
return iwf.EmptyCommandRequest(), nil
}

func (b executeApiFailRecoveryWorkflowState1) Execute(ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
return nil, errors.New("error")
}
10 changes: 1 addition & 9 deletions integ/execute_api_fail_recovery_workflow_state2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@ import (
)

type executeApiFailRecoveryWorkflowState2 struct {
iwf.DefaultStateOptions
}

func (b executeApiFailRecoveryWorkflowState2) GetStateId() string {
return "execute_api_fail_recovery_workflow_state2"
}

func (b executeApiFailRecoveryWorkflowState2) WaitUntil(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
return iwf.EmptyCommandRequest(), nil
iwf.WorkflowStateDefaultsNoWaitUntil
}

func (b executeApiFailRecoveryWorkflowState2) Execute(ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
Expand Down
4 changes: 2 additions & 2 deletions integ/proceed_on_state_start_fail_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (b *proceedOnStateStartFailWorkflowState1) Execute(ctx iwf.WorkflowContext,
return iwf.SingleNextState(&proceedOnStateStartFailWorkflowState2{}, b.output), nil
}

func (b *proceedOnStateStartFailWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
return &iwfidl.WorkflowStateOptions{
func (b *proceedOnStateStartFailWorkflowState1) GetStateOptions() *iwf.StateOptions {
return &iwf.StateOptions{
WaitUntilApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(1),
MaximumAttempts: iwfidl.PtrInt32(2),
Expand Down
4 changes: 2 additions & 2 deletions integ/state_api_fail_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func (b stateApiFailWorkflowState1) Execute(ctx iwf.WorkflowContext, input iwf.O
return iwf.ForceFailWorkflow("a failing message"), nil
}

func (b stateApiFailWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
return &iwfidl.WorkflowStateOptions{
func (b stateApiFailWorkflowState1) GetStateOptions() *iwf.StateOptions {
return &iwf.StateOptions{
WaitUntilApiRetryPolicy: &iwfidl.RetryPolicy{
MaximumAttempts: iwfidl.PtrInt32(1),
},
Expand Down
4 changes: 2 additions & 2 deletions integ/state_api_timeout_workflow_state1.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func (b stateApiTimeoutWorkflowState1) Execute(ctx iwf.WorkflowContext, input iw
return iwf.ForceFailWorkflow("a failing message"), nil
}

func (b stateApiTimeoutWorkflowState1) GetStateOptions() *iwfidl.WorkflowStateOptions {
return &iwfidl.WorkflowStateOptions{
func (b stateApiTimeoutWorkflowState1) GetStateOptions() *iwf.StateOptions {
return &iwf.StateOptions{
WaitUntilApiRetryPolicy: &iwfidl.RetryPolicy{
MaximumAttempts: iwfidl.PtrInt32(1),
},
Expand Down
11 changes: 1 addition & 10 deletions iwf/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,7 @@ func (c *clientImpl) StartWorkflow(ctx context.Context, workflow ObjectWorkflow,
if state != nil {
startStateId = GetFinalWorkflowStateId(state)
startStateOpt := state.GetStateOptions()
if ShouldSkipWaitUntilAPI(state) {
if startStateOpt == nil {
startStateOpt = &iwfidl.WorkflowStateOptions{
SkipWaitUntil: ptr.Any(true),
}
} else {
startStateOpt.SkipWaitUntil = ptr.Any(true)
}
}
unregOpt.StartStateOptions = startStateOpt
unregOpt.StartStateOptions = toIdlStateOptions(ShouldSkipWaitUntilAPI(state), startStateOpt)
}

if options != nil {
Expand Down
45 changes: 35 additions & 10 deletions iwf/internal_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,7 @@
var options *iwfidl.WorkflowStateOptions
if !strings.HasPrefix(fromMv.NextStateId, ReservedStateIdPrefix) {
stateDef := registry.getWorkflowStateDef(wfType, fromMv.NextStateId)
options = stateDef.State.GetStateOptions()
if ShouldSkipWaitUntilAPI(stateDef.State) {
if options == nil {
options = &iwfidl.WorkflowStateOptions{
SkipWaitUntil: ptr.Any(true),
}
} else {
options.SkipWaitUntil = ptr.Any(true)
}
}
options = toIdlStateOptions(ShouldSkipWaitUntilAPI(stateDef.State), stateDef.State.GetStateOptions())
}
mv := iwfidl.StateMovement{
StateId: fromMv.NextStateId,
Expand All @@ -125,3 +116,37 @@
NextStates: mvs,
}, nil
}

func toIdlStateOptions(skipWaitUntil bool, stateOptions *StateOptions) *iwfidl.WorkflowStateOptions {
if stateOptions == nil {
stateOptions = &StateOptions{}
}

idlStOptions := &iwfidl.WorkflowStateOptions{
SearchAttributesLoadingPolicy: stateOptions.SearchAttributesLoadingPolicy,
DataAttributesLoadingPolicy: stateOptions.DataAttributesLoadingPolicy,
WaitUntilApiTimeoutSeconds: stateOptions.WaitUntilApiTimeoutSeconds,
ExecuteApiTimeoutSeconds: stateOptions.ExecuteApiTimeoutSeconds,
WaitUntilApiRetryPolicy: stateOptions.WaitUntilApiRetryPolicy,
ExecuteApiRetryPolicy: stateOptions.ExecuteApiRetryPolicy,
WaitUntilApiFailurePolicy: stateOptions.WaitUntilApiFailurePolicy,
}

if skipWaitUntil {
idlStOptions.SkipWaitUntil = ptr.Any(true)
}

if stateOptions.ExecuteApiFailureProceedState != nil {
idlStOptions.ExecuteApiFailurePolicy = iwfidl.PROCEED_TO_CONFIGURED_STATE.Ptr()
idlStOptions.ExecuteApiFailureProceedStateId = ptr.Any(GetFinalWorkflowStateId(stateOptions.ExecuteApiFailureProceedState))

proceedStateOptions := stateOptions.ExecuteApiFailureProceedState.GetStateOptions()
if proceedStateOptions != nil && proceedStateOptions.ExecuteApiFailureProceedState != nil {
panic("nested failure handling/recovery is not supported: ExecuteApiFailureProceedState cannot have ExecuteApiFailureProceedState")

Check warning on line 145 in iwf/internal_mapper.go

View check run for this annotation

Codecov / codecov/patch

iwf/internal_mapper.go#L145

Added line #L145 was not covered by tests
}
idlStOptions.ExecuteApiFailureProceedStateOptions =
toIdlStateOptions(ShouldSkipWaitUntilAPI(stateOptions.ExecuteApiFailureProceedState), proceedStateOptions)
}

return idlStOptions
}
16 changes: 16 additions & 0 deletions iwf/state_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package iwf

import (
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
)

type StateOptions struct {
SearchAttributesLoadingPolicy *iwfidl.PersistenceLoadingPolicy
DataAttributesLoadingPolicy *iwfidl.PersistenceLoadingPolicy
WaitUntilApiTimeoutSeconds *int32
ExecuteApiTimeoutSeconds *int32
WaitUntilApiRetryPolicy *iwfidl.RetryPolicy
ExecuteApiRetryPolicy *iwfidl.RetryPolicy
WaitUntilApiFailurePolicy *iwfidl.WaitUntilApiFailurePolicy
ExecuteApiFailureProceedState WorkflowState
}
5 changes: 2 additions & 3 deletions iwf/workflow_state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package iwf

import (
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"reflect"
)

Expand Down Expand Up @@ -51,7 +50,7 @@ type WorkflowState interface {

// GetStateOptions can just return nil to use the default Options
// StateOptions is optional configuration to adjust the state behaviors
GetStateOptions() *iwfidl.WorkflowStateOptions
GetStateOptions() *StateOptions
}

// GetFinalWorkflowStateId returns the stateId that will be registered and used
Expand Down Expand Up @@ -95,7 +94,7 @@ func (d DefaultStateId) GetStateId() string {

type DefaultStateOptions struct{}

func (d DefaultStateOptions) GetStateOptions() *iwfidl.WorkflowStateOptions {
func (d DefaultStateOptions) GetStateOptions() *StateOptions {
return nil
}

Expand Down
37 changes: 0 additions & 37 deletions iwf/workflow_state_options_extension.go

This file was deleted.