-
Notifications
You must be signed in to change notification settings - Fork 110
/
controller.go
211 lines (163 loc) · 5.13 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package events
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/logger"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/repository"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
"github.com/hatchet-dev/hatchet/internal/telemetry"
)
type EventsController interface {
Start(ctx context.Context) error
}
type EventsControllerImpl struct {
mq msgqueue.MessageQueue
l *zerolog.Logger
repo repository.EngineRepository
dv datautils.DataDecoderValidator
}
type EventsControllerOpt func(*EventsControllerOpts)
type EventsControllerOpts struct {
mq msgqueue.MessageQueue
l *zerolog.Logger
repo repository.EngineRepository
dv datautils.DataDecoderValidator
}
func defaultEventsControllerOpts() *EventsControllerOpts {
logger := logger.NewDefaultLogger("events-controller")
return &EventsControllerOpts{
l: &logger,
dv: datautils.NewDataDecoderValidator(),
}
}
func WithMessageQueue(mq msgqueue.MessageQueue) EventsControllerOpt {
return func(opts *EventsControllerOpts) {
opts.mq = mq
}
}
func WithLogger(l *zerolog.Logger) EventsControllerOpt {
return func(opts *EventsControllerOpts) {
opts.l = l
}
}
func WithRepository(r repository.EngineRepository) EventsControllerOpt {
return func(opts *EventsControllerOpts) {
opts.repo = r
}
}
func WithDataDecoderValidator(dv datautils.DataDecoderValidator) EventsControllerOpt {
return func(opts *EventsControllerOpts) {
opts.dv = dv
}
}
func New(fs ...EventsControllerOpt) (*EventsControllerImpl, error) {
opts := defaultEventsControllerOpts()
for _, f := range fs {
f(opts)
}
if opts.mq == nil {
return nil, fmt.Errorf("task queue is required. use WithMessageQueue")
}
if opts.repo == nil {
return nil, fmt.Errorf("repository is required. use WithRepository")
}
newLogger := opts.l.With().Str("service", "events-controller").Logger()
opts.l = &newLogger
return &EventsControllerImpl{
mq: opts.mq,
l: opts.l,
repo: opts.repo,
dv: opts.dv,
}, nil
}
func (ec *EventsControllerImpl) Start() (func() error, error) {
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
f := func(task *msgqueue.Message) error {
wg.Add(1)
defer wg.Done()
err := ec.handleTask(ctx, task)
if err != nil {
ec.l.Error().Err(err).Msgf("could not handle event task %s", task.ID)
return err
}
return nil
}
cleanupQueue, err := ec.mq.Subscribe(msgqueue.EVENT_PROCESSING_QUEUE, f, msgqueue.NoOpHook)
if err != nil {
cancel()
return nil, fmt.Errorf("could not subscribe to event processing queue: %w", err)
}
cleanup := func() error {
cancel()
if err := cleanupQueue(); err != nil {
return fmt.Errorf("could not cleanup event processing queue: %w", err)
}
return nil
}
return cleanup, nil
}
func (ec *EventsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Message) error {
payload := tasktypes.EventTaskPayload{}
metadata := tasktypes.EventTaskMetadata{}
err := ec.dv.DecodeAndValidate(task.Payload, &payload)
if err != nil {
return fmt.Errorf("could not decode task payload: %w", err)
}
err = ec.dv.DecodeAndValidate(task.Metadata, &metadata)
if err != nil {
return fmt.Errorf("could not decode task metadata: %w", err)
}
var additionalMetadata map[string]interface{}
if payload.EventAdditionalMetadata != "" {
err = json.Unmarshal([]byte(payload.EventAdditionalMetadata), &additionalMetadata)
if err != nil {
return fmt.Errorf("could not unmarshal additional metadata: %w", err)
}
}
return ec.processEvent(ctx, metadata.TenantId, payload.EventId, payload.EventKey, []byte(payload.EventData), additionalMetadata)
}
func (ec *EventsControllerImpl) processEvent(ctx context.Context, tenantId, eventId, eventKey string, data []byte, additionalMetadata map[string]interface{}) error {
ctx, span := telemetry.NewSpan(ctx, "process-event")
defer span.End()
// query for matching workflows in the system
workflowVersions, err := ec.repo.Workflow().ListWorkflowsForEvent(ctx, tenantId, eventKey)
if err != nil {
return fmt.Errorf("could not query workflows for event: %w", err)
}
// create a new workflow run in the database
var g = new(errgroup.Group)
for _, workflowVersion := range workflowVersions {
workflowCp := workflowVersion
g.Go(func() error {
// create a new workflow run in the database
createOpts, err := repository.GetCreateWorkflowRunOptsFromEvent(eventId, workflowCp, data, additionalMetadata)
if err != nil {
return fmt.Errorf("could not get create workflow run opts: %w", err)
}
workflowRunId, err := ec.repo.WorkflowRun().CreateNewWorkflowRun(ctx, tenantId, createOpts)
if err != nil {
return fmt.Errorf("could not create workflow run: %w", err)
}
// send to workflow processing queue
return ec.mq.AddMessage(
context.Background(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunQueuedToTask(
tenantId,
workflowRunId,
),
)
})
}
if err := g.Wait(); err != nil {
return err
}
return nil
}