-
Notifications
You must be signed in to change notification settings - Fork 145
/
run.go
122 lines (98 loc) · 2.4 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package rampup
import (
"context"
"fmt"
"sync"
"time"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type stepOneOutput struct {
Message string `json:"message"`
}
func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
return "my-key", nil
}
func run(ctx context.Context, delay time.Duration, concurrency int, maxAcceptableDuration time.Duration, hook chan<- time.Duration, executedCh chan<- int64) (int64, int64) {
c, err := client.New(
client.WithLogLevel("warn"),
)
if err != nil {
panic(err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
worker.WithLogLevel("warn"),
worker.WithMaxRuns(200),
)
if err != nil {
panic(err)
}
mx := sync.Mutex{}
var count int64
var uniques int64
var executed []int64
var concurrencyOpts *worker.WorkflowConcurrency
if concurrency > 0 {
concurrencyOpts = worker.Concurrency(getConcurrencyKey).MaxRuns(int32(concurrency))
}
err = w.On(
worker.Event("load-test:event"),
&worker.WorkflowJob{
Name: "load-test",
Description: "Load testing",
Concurrency: concurrencyOpts,
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
var input Event
err = ctx.WorkflowInput(&input)
if err != nil {
return nil, err
}
took := time.Since(input.CreatedAt)
l.Debug().Msgf("executing %d took %s", input.ID, took)
if took > maxAcceptableDuration {
hook <- took
}
executedCh <- input.ID
mx.Lock()
// detect duplicate in executed slice
var duplicate bool
for i := 0; i < len(executed)-1; i++ {
if executed[i] == input.ID {
duplicate = true
}
}
if duplicate {
l.Warn().Str("step-run-id", ctx.StepRunId()).Msgf("duplicate %d", input.ID)
} else {
uniques += 1
}
count += 1
executed = append(executed, input.ID)
mx.Unlock()
time.Sleep(delay)
return &stepOneOutput{
Message: "This ran at: " + time.Now().Format(time.RFC3339Nano),
}, nil
}).SetName("step-one"),
},
},
)
if err != nil {
panic(err)
}
cleanup, err := w.Start()
if err != nil {
panic(fmt.Errorf("error starting worker: %w", err))
}
<-ctx.Done()
if err := cleanup(); err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}
mx.Lock()
defer mx.Unlock()
return count, uniques
}