-
Notifications
You must be signed in to change notification settings - Fork 49
/
continueasnew.go
86 lines (77 loc) · 2.45 KB
/
continueasnew.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package command
import (
"github.com/benbjohnson/clock"
"github.com/cschleiden/go-workflows/internal/core"
"github.com/cschleiden/go-workflows/internal/history"
"github.com/cschleiden/go-workflows/internal/payload"
"github.com/google/uuid"
)
type ContinueAsNewCommand struct {
command
Instance *core.WorkflowInstance
Name string
Metadata *core.WorkflowMetadata
Inputs []payload.Payload
Result payload.Payload
}
var _ Command = (*ContinueAsNewCommand)(nil)
func NewContinueAsNewCommand(id int64, instance *core.WorkflowInstance, result payload.Payload, name string, metadata *core.WorkflowMetadata, inputs []payload.Payload) *ContinueAsNewCommand {
return &ContinueAsNewCommand{
command: command{
id: id,
name: "ContinueAsNew",
state: CommandState_Pending,
},
Instance: instance,
Name: name,
Metadata: metadata,
Inputs: inputs,
Result: result,
}
}
func (c *ContinueAsNewCommand) Execute(clock clock.Clock) *CommandResult {
switch c.state {
case CommandState_Pending:
continuedExecutionID := uuid.NewString()
var continuedInstance *core.WorkflowInstance
if c.Instance.SubWorkflow() {
// If the current workflow execution was a sub-workflow, ensure the new workflow execution is also a sub-workflow.
// This will guarantee that finished event for the new execution will be delivered to the right parent instance
continuedInstance = core.NewSubWorkflowInstance(
c.Instance.InstanceID, continuedExecutionID, c.Instance.Parent, c.Instance.ParentEventID)
} else {
continuedInstance = core.NewWorkflowInstance(c.Instance.InstanceID, continuedExecutionID)
}
c.state = CommandState_Committed
return &CommandResult{
State: core.WorkflowInstanceStateContinuedAsNew,
Events: []*history.Event{
// End the current workflow execution
history.NewPendingEvent(
clock.Now(),
history.EventType_WorkflowExecutionContinuedAsNew,
&history.ExecutionContinuedAsNewAttributes{
Result: c.Result,
ContinuedExecutionID: continuedExecutionID,
},
),
},
WorkflowEvents: []history.WorkflowEvent{
// Schedule a new workflow execution
{
WorkflowInstance: continuedInstance,
HistoryEvent: history.NewPendingEvent(
clock.Now(),
history.EventType_WorkflowExecutionStarted,
&history.ExecutionStartedAttributes{
Name: c.Name,
Metadata: c.Metadata,
Inputs: c.Inputs,
},
),
},
},
}
}
return nil
}