-
Notifications
You must be signed in to change notification settings - Fork 17
/
projection_notification_processor.go
182 lines (154 loc) · 4.83 KB
/
projection_notification_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package sql
import (
"context"
"errors"
"runtime"
"sync"
"github.com/hellofresh/goengine"
)
type (
// ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
ProjectionNotificationProcessor struct {
queueProcessors int
logger goengine.Logger
metrics Metrics
notificationQueue NotificationQueuer
}
// ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
// the original notification can trigger other notifications
ProcessHandler func(context.Context, *ProjectionNotification, ProjectionTrigger) error
)
// NewBackgroundProcessor create a new projectionNotificationProcessor
func NewBackgroundProcessor(
queueProcessors,
queueBuffer int,
logger goengine.Logger,
metrics Metrics,
notificationQueue NotificationQueuer,
) (*ProjectionNotificationProcessor, error) {
if queueProcessors <= 0 {
return nil, errors.New("queueProcessors must be greater then zero")
}
if queueBuffer < 0 {
return nil, errors.New("queueBuffer must be greater or equal to zero")
}
if logger == nil {
logger = goengine.NopLogger
}
if metrics == nil {
metrics = NopMetrics
}
if notificationQueue == nil {
notificationQueue = newNotificationQueue(queueBuffer, 0, metrics)
}
return &ProjectionNotificationProcessor{
queueProcessors: queueProcessors,
logger: logger,
metrics: metrics,
notificationQueue: notificationQueue,
}, nil
}
// Execute starts the background worker and wait for the notification to be executed
func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler ProcessHandler, notification *ProjectionNotification) error {
// Wrap the processNotification in order to know that the first trigger finished
handler, handlerDone := b.wrapProcessHandlerForSingleRun(handler)
// Start the background processes
stopExecutor := b.Start(ctx, handler)
defer stopExecutor()
// Execute a run of the internal.
if err := b.notificationQueue.Queue(ctx, nil); err != nil {
return err
}
// Wait for the trigger to be called or the context to be cancelled
select {
case <-handlerDone:
return nil
case <-ctx.Done():
return nil
}
}
// Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func() {
queueClose := b.notificationQueue.Open()
var wg sync.WaitGroup
wg.Add(b.queueProcessors)
for i := 0; i < b.queueProcessors; i++ {
go func() {
defer wg.Done()
b.startProcessor(ctx, handler)
}()
}
// Yield the processor so the go routines can start
runtime.Gosched()
return func() {
queueClose()
wg.Wait()
}
}
// Queue puts the notification on the queue to be processed
func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error {
return b.notificationQueue.Queue(ctx, notification)
}
func (b *ProjectionNotificationProcessor) startProcessor(ctx context.Context, handler ProcessHandler) {
for {
notification, stopped := b.notificationQueue.Next(ctx)
if stopped {
return
}
var queueFunc ProjectionTrigger
if notification == nil {
queueFunc = b.notificationQueue.Queue
} else {
queueFunc = b.notificationQueue.ReQueue
}
// Execute the notification
b.metrics.StartNotificationProcessing(notification)
if err := handler(ctx, notification, queueFunc); err != nil {
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
e.Error(err)
e.Any("notification", notification)
})
b.metrics.FinishNotificationProcessing(notification, false)
} else {
b.metrics.FinishNotificationProcessing(notification, true)
}
}
}
// wrapProcessHandlerForSingleRun returns a wrapped ProcessHandler with a done channel that is closed after the
// provided ProcessHandler it's first call and related messages are finished or when the context is done.
func (b *ProjectionNotificationProcessor) wrapProcessHandlerForSingleRun(handler ProcessHandler) (ProcessHandler, chan struct{}) {
done := make(chan struct{})
var doneOnce sync.Once
var m sync.Mutex
var triggers int32
return func(ctx context.Context, notification *ProjectionNotification, trigger ProjectionTrigger) error {
m.Lock()
triggers++
m.Unlock()
defer func() {
m.Lock()
defer m.Unlock()
triggers--
if triggers != 0 {
return
}
// Only close the done channel when the queue is empty or the context is closed
select {
case <-done:
case <-ctx.Done():
// Context is expired
doneOnce.Do(func() {
close(done)
})
default:
// No more queued messages to close the run
if b.notificationQueue.Empty() {
doneOnce.Do(func() {
close(done)
})
}
}
}()
return handler(ctx, notification, trigger)
}, done
}