-
Notifications
You must be signed in to change notification settings - Fork 151
/
main.go
145 lines (116 loc) · 2.99 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"fmt"
"time"
"github.com/joho/godotenv"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type concurrencyLimitEvent struct {
UserId int `json:"user_id"`
}
type stepOneOutput struct {
Message string `json:"message"`
}
func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}
events := make(chan string, 50)
if err := run(cmdutils.InterruptChan(), events); err != nil {
panic(err)
}
}
func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
input := &concurrencyLimitEvent{}
err := ctx.WorkflowInput(input)
if err != nil {
return "", fmt.Errorf("error getting input: %w", err)
}
return fmt.Sprintf("%d", input.UserId), nil
}
func run(ch <-chan interface{}, events chan<- string) error {
c, err := client.New()
if err != nil {
return fmt.Errorf("error creating client: %w", err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
return fmt.Errorf("error creating worker: %w", err)
}
testSvc := w.NewService("test")
err = testSvc.On(
worker.Events("concurrency-test-event-rr"),
&worker.WorkflowJob{
Name: "concurrency-limit-round-robin",
Description: "This limits concurrency to 2 runs at a time.",
Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(2).LimitStrategy(types.GroupRoundRobin),
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &concurrencyLimitEvent{}
err = ctx.WorkflowInput(input)
if err != nil {
return nil, fmt.Errorf("error getting input: %w", err)
}
fmt.Println("received event", input.UserId)
time.Sleep(5 * time.Second)
fmt.Println("processed event", input.UserId)
return nil, nil
},
).SetName("step-one"),
},
},
)
if err != nil {
return fmt.Errorf("error registering workflow: %w", err)
}
interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch)
defer cancel()
cleanup, err := w.Start()
if err != nil {
return fmt.Errorf("error starting worker: %w", err)
}
go func() {
// sleep with interrupt context
select {
case <-interruptCtx.Done(): // context cancelled
fmt.Println("interrupted")
return
case <-time.After(2 * time.Second): // timeout
}
for i := 0; i < 20; i++ {
var event concurrencyLimitEvent
if i < 10 {
event = concurrencyLimitEvent{0}
} else {
event = concurrencyLimitEvent{1}
}
c.Event().Push(context.Background(), "concurrency-test-event-rr", event)
}
select {
case <-interruptCtx.Done(): // context cancelled
fmt.Println("interrupted")
return
case <-time.After(10 * time.Second): //timeout
}
}()
for {
select {
case <-interruptCtx.Done():
if err := cleanup(); err != nil {
return fmt.Errorf("error cleaning up: %w", err)
}
return nil
default:
time.Sleep(time.Second)
}
}
}