forked from temporalio/omes
/
state_transitions_steady.go
144 lines (133 loc) · 4.83 KB
/
state_transitions_steady.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package scenarios
import (
"context"
"fmt"
"sync"
"time"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/kitchensink"
"go.temporal.io/api/workflowservice/v1"
)
func init() {
loadgen.MustRegisterScenario(loadgen.Scenario{
Description: "Run a certain number of state transitions per second. This requires duration option to be set " +
"and the state-transitions-per-second scenario option to be set. Any iteration configuration is ignored. For " +
"example, can be run with: run-scenario-with-worker --scenario state_transitions_steady --language go " +
"--embedded-server --duration 5m --option state-transitions-per-second=3",
Executor: loadgen.ExecutorFunc(func(ctx context.Context, runOptions loadgen.ScenarioInfo) error {
return (&stateTransitionsSteady{runOptions}).run(ctx)
}),
})
}
type stateTransitionsSteady struct{ loadgen.ScenarioInfo }
func (s *stateTransitionsSteady) run(ctx context.Context) error {
// The goal here is to meet a certain number of state transitions per second.
// For us this means a certain number of workflows per second. So we must
// first execute a basic workflow (i.e. with a simple activity) and get the
// number of state transitions it took
// TODO(cretz): Note, this is a known naive approach because if workflows are
// backed up and/or slow down this won't match.
if s.Configuration.Duration == 0 {
return fmt.Errorf("duration required for this scenario")
}
stateTransitionsPerSecond := s.ScenarioOptionInt("state-transitions-per-second", 0)
if stateTransitionsPerSecond == 0 {
return fmt.Errorf("state-transitions-per-second scenario option required")
}
durationPerStateTransition := time.Second / time.Duration(stateTransitionsPerSecond)
s.Logger.Infof(
"State transitions per second is %v, which means %v between every state transition",
stateTransitionsPerSecond,
durationPerStateTransition,
)
// Execute initial workflow and get the transition count
workflowParams := &kitchensink.TestInput{
WorkflowInput: &kitchensink.WorkflowInput{
InitialActions: []*kitchensink.ActionSet{
kitchensink.NoOpSingleActivityActionSet(),
},
},
}
workflowRun, err := s.Client.ExecuteWorkflow(
ctx,
s.NewRun(0).DefaultStartWorkflowOptions(),
"kitchenSink",
workflowParams,
)
if err != nil {
return fmt.Errorf("failed starting initial workflow: %w", err)
} else if err := workflowRun.Get(ctx, nil); err != nil {
return fmt.Errorf("failed executing initial workflow: %w", err)
}
resp, err := s.Client.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
if err != nil {
return fmt.Errorf("failed describing workflow: %w", err)
}
stateTransitionsPerWorkflow := resp.WorkflowExecutionInfo.StateTransitionCount
workflowStartInterval := durationPerStateTransition * time.Duration(stateTransitionsPerWorkflow)
s.Logger.Infof(
"Simple workflow takes %v state transitions, which means we need to start a workflow every %v. "+
"Running for %v now...",
stateTransitionsPerWorkflow,
workflowStartInterval,
s.Configuration.Duration,
)
// Start a workflow every X interval until duration reached or there are N
// start failures in a row
const maxConsecutiveErrors = 5
errCh := make(chan error, 10000)
ticker := time.NewTicker(workflowStartInterval)
defer ticker.Stop()
var consecutiveErrCount int
iter := 1
var startWG sync.WaitGroup
for begin := time.Now(); time.Since(begin) < s.Configuration.Duration; iter++ {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
if err == nil {
consecutiveErrCount = 0
} else {
consecutiveErrCount++
if consecutiveErrCount >= maxConsecutiveErrors {
return fmt.Errorf("got %v consecutive errors, most recent: %w", maxConsecutiveErrors, err)
}
}
case <-ticker.C:
s.Logger.Debugf("Running iteration %v", iter)
startWG.Add(1)
go func(iter int) {
defer startWG.Done()
_, err := s.Client.ExecuteWorkflow(
ctx,
s.NewRun(iter).DefaultStartWorkflowOptions(),
"kitchenSink",
workflowParams,
)
// Only send to err ch if there is room just in case
select {
case errCh <- err:
default:
}
}(iter)
}
}
// We are waiting to let workflows complete here, knowing that this part of
// the scenario is not steady state transitions. We will wait a minute max to
// confirm all workflows on the task queue are no longer running.
s.Logger.Infof("Run complete, ran %v iterations, waiting on all workflows to complete", iter)
// First, wait for all starts to have started (they are done in goroutine)
startWG.Wait()
return loadgen.VisibilityCountIsEventually(
ctx,
s.Client,
&workflowservice.CountWorkflowExecutionsRequest{
Namespace: s.Namespace,
Query: fmt.Sprintf("TaskQueue = %q and ExecutionStatus = 'Running'",
loadgen.TaskQueueForRun(s.ScenarioName, s.RunID)),
},
0,
time.Minute,
)
}