-
Notifications
You must be signed in to change notification settings - Fork 50
/
continueAsNewer.go
213 lines (186 loc) · 8.04 KB
/
continueAsNewer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package interpreter
import (
"encoding/json"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/interpreter/env"
"strings"
"time"
)
type ContinueAsNewer struct {
provider WorkflowProvider
StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo
inflightUpdateOperations int
stateRequestQueue *StateRequestQueue
interStateChannel *InterStateChannel
stateExecutionCounter *StateExecutionCounter
persistenceManager *PersistenceManager
signalReceiver *SignalReceiver
outputCollector *OutputCollector
timerProcessor *TimerProcessor
}
func NewContinueAsNewer(
provider WorkflowProvider,
interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, timerProcessor *TimerProcessor,
) *ContinueAsNewer {
return &ContinueAsNewer{
provider: provider,
StateExecutionToResumeMap: map[string]service.StateExecutionResumeInfo{},
stateRequestQueue: stateRequestQueue,
interStateChannel: interStateChannel,
signalReceiver: signalReceiver,
stateExecutionCounter: stateExecutionCounter,
persistenceManager: persistenceManager,
outputCollector: collector,
timerProcessor: timerProcessor,
}
}
func LoadInternalsFromPreviousRun(ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32) (*service.ContinueAsNewDumpResponse, error) {
activityOptions := ActivityOptions{
StartToCloseTimeout: 5 * time.Second,
RetryPolicy: &iwfidl.RetryPolicy{
MaximumIntervalSeconds: iwfidl.PtrInt32(5),
},
}
config := env.GetSharedConfig()
if config.Interpreter.InterpreterActivityConfig.DumpWorkflowInternalActivityConfig != nil {
activityConfig := config.Interpreter.InterpreterActivityConfig.DumpWorkflowInternalActivityConfig
activityOptions.StartToCloseTimeout = activityConfig.StartToCloseTimeout
if activityConfig.RetryPolicy != nil {
activityOptions.RetryPolicy = activityConfig.RetryPolicy
}
}
ctx = provider.WithActivityOptions(ctx, activityOptions)
workflowId := provider.GetWorkflowInfo(ctx).WorkflowExecution.ID
pageSize := continueAsNewPageSizeInBytes
if pageSize == 0 {
pageSize = service.DefaultContinueAsNewPageSizeInBytes
}
var sb strings.Builder
lastChecksum := ""
pageNum := int32(0)
for {
var resp iwfidl.WorkflowDumpResponse
err := provider.ExecuteActivity(ctx, DumpWorkflowInternal, provider.GetBackendType(),
iwfidl.WorkflowDumpRequest{
WorkflowId: workflowId,
WorkflowRunId: previousRunId,
PageNum: pageNum,
PageSizeInBytes: pageSize,
}).Get(ctx, &resp)
if err != nil {
return nil, err
}
if lastChecksum != "" && lastChecksum != resp.Checksum {
// reset to start from beginning
pageNum = 0
lastChecksum = ""
sb.Reset()
provider.GetLogger(ctx).Error("checksum has changed during the loading", lastChecksum, resp.Checksum)
continue
} else {
lastChecksum = resp.Checksum
sb.WriteString(resp.JsonData)
pageNum++
if pageNum >= resp.TotalPages {
break
}
}
}
var resp service.ContinueAsNewDumpResponse
err := json.Unmarshal([]byte(sb.String()), &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) error {
return c.provider.SetQueryHandler(ctx, service.ContinueAsNewDumpQueryType, func() (*service.ContinueAsNewDumpResponse, error) {
return &service.ContinueAsNewDumpResponse{
InterStateChannelReceived: c.interStateChannel.ReadReceived(nil),
SignalsReceived: c.signalReceiver.DumpReceived(nil),
StateExecutionCounterInfo: c.stateExecutionCounter.Dump(),
DataObjects: c.persistenceManager.GetAllDataObjects(),
SearchAttributes: c.persistenceManager.GetAllSearchAttributes(),
StatesToStartFromBeginning: c.stateRequestQueue.GetAllStateStartRequests(),
StateExecutionsToResume: c.StateExecutionToResumeMap,
StateOutputs: c.outputCollector.GetAll(),
StaleSkipTimerSignals: c.timerProcessor.Dump(),
}, nil
})
}
func (c *ContinueAsNewer) AddPotentialStateExecutionToResume(
stateExecutionId string, state iwfidl.StateMovement, stateExecLocals []iwfidl.KeyValue, commandRequest iwfidl.CommandRequest,
completedTimerCommands map[int]service.InternalTimerStatus, completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject,
) {
c.StateExecutionToResumeMap[stateExecutionId] = service.StateExecutionResumeInfo{
StateExecutionId: stateExecutionId,
State: state,
StateExecutionLocals: stateExecLocals,
CommandRequest: commandRequest,
StateExecutionCompletedCommands: service.StateExecutionCompletedCommands{
CompletedTimerCommands: completedTimerCommands,
CompletedSignalCommands: completedSignalCommands,
CompletedInterStateChannelCommands: completedInterStateChannelCommands,
},
}
}
func (c *ContinueAsNewer) HasAnyStateExecutionToResume() bool {
return len(c.StateExecutionToResumeMap) > 0
}
func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string) {
delete(c.StateExecutionToResumeMap, stateExecutionId)
}
func (c *ContinueAsNewer) DrainThreads(ctx UnifiedContext) error {
// TODO: add metric for before and after Await to monitor stuck
// NOTE: consider using AwaitWithTimeout to get an alert when workflow stuck due to a bug in the draining logic for continueAsNew
errWait := c.provider.Await(ctx, func() bool {
return c.allThreadsDrained(ctx)
})
c.provider.GetLogger(ctx).Info("done draining threads for continueAsNew", errWait)
return errWait
}
func (c *ContinueAsNewer) IncreaseInflightOperation() {
c.inflightUpdateOperations++
}
func (c *ContinueAsNewer) DecreaseInflightOperation() {
c.inflightUpdateOperations--
}
// if the DrainAllSignalsAndThreads await is being called more than a few times and cannot get through,
// there is likely something wrong in the continueAsNew logic (unless state API is stuck)
// the key is runId, the value is how many times it has been called in this worker
// Using this in memory counter sot hat we don't have to use AwaitWithTimeout which will consume a timer
// TODO add TTL support because we don't have to keep the value in memory forever(likely a few hours or a day is enough)
var inMemoryContinueAsNewMonitor = make(map[string]time.Time)
const warnThreshold = time.Second * 5
const errThreshold = time.Second * 15
func (c *ContinueAsNewer) allThreadsDrained(ctx UnifiedContext) bool {
runId := c.provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID
remainingThreadCount := c.provider.GetThreadCount()
if remainingThreadCount == 0 && c.inflightUpdateOperations == 0 {
delete(inMemoryContinueAsNewMonitor, runId)
return true
}
c.provider.GetLogger(ctx).Debug("continueAsNew is in draining remainingThreadCount, attempt, threadNames, inflightUpdateOperations",
remainingThreadCount, inMemoryContinueAsNewMonitor[runId], c.provider.GetPendingThreadNames(), c.inflightUpdateOperations)
// TODO using a flag to control this debugging info
initTime, ok := inMemoryContinueAsNewMonitor[runId]
if !ok {
inMemoryContinueAsNewMonitor[runId] = time.Now()
return false
}
elapsed := time.Since(initTime)
if elapsed >= errThreshold {
c.provider.GetLogger(ctx).Warn(
"continueAsNew is likely stuck (unless state API is stuck) in draining remainingThreadCount, attempt, threadNames, inflightUpdateOperations",
remainingThreadCount, inMemoryContinueAsNewMonitor[runId], c.provider.GetPendingThreadNames(), c.inflightUpdateOperations)
return false
}
if elapsed >= warnThreshold {
c.provider.GetLogger(ctx).Warn(
"continueAsNew may be stuck (unless state API is stuck) in draining remainingThreadCount, attempt, threadNames, inflightUpdateOperations",
remainingThreadCount, inMemoryContinueAsNewMonitor[runId], c.provider.GetPendingThreadNames(), c.inflightUpdateOperations)
}
return false
}