-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
notification_processor.go
138 lines (125 loc) · 4.11 KB
/
notification_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package notification
import (
"context"
"sync/atomic"
"github.com/chroma-core/chroma/go/pkg/common"
"github.com/chroma-core/chroma/go/pkg/model"
"github.com/pingcap/log"
"go.uber.org/zap"
)
type NotificationProcessor interface {
common.Component
Process(ctx context.Context) error
Trigger(ctx context.Context, triggerMsg TriggerMessage)
}
type SimpleNotificationProcessor struct {
ctx context.Context
store NotificationStore
notifer Notifier
channel chan TriggerMessage
doneChannel chan bool
running atomic.Bool
}
type TriggerMessage struct {
Msg model.Notification
ResultChan chan error
}
const triggerChannelSize = 1000
var _ NotificationProcessor = &SimpleNotificationProcessor{}
func NewSimpleNotificationProcessor(ctx context.Context, store NotificationStore, notifier Notifier) *SimpleNotificationProcessor {
return &SimpleNotificationProcessor{
ctx: ctx,
store: store,
notifer: notifier,
channel: make(chan TriggerMessage, triggerChannelSize),
doneChannel: make(chan bool),
}
}
func (n *SimpleNotificationProcessor) Start() error {
// During startup, first sending all pending notifications in the store to the notification topic
log.Info("Starting notification processor")
err := n.sendPendingNotifications(n.ctx)
if err != nil {
log.Error("Failed to send pending notifications", zap.Error(err))
return err
}
n.running.Store(true)
go n.Process(n.ctx)
return nil
}
func (n *SimpleNotificationProcessor) Stop() error {
n.running.Store(false)
n.doneChannel <- true
return nil
}
func (n *SimpleNotificationProcessor) Process(ctx context.Context) error {
log.Info("Waiting for new notifications")
for {
select {
case triggerMsg := <-n.channel:
msg := triggerMsg.Msg
log.Info("Received notification", zap.Any("msg", msg))
running := n.running.Load()
log.Info("Notification processor is running", zap.Bool("running", running))
// We need to block here until the notifications are sent successfully
for running {
// Check the notification store if this notification is already processed
// If it is already processed, just return
// If it is not processed, send notifications and remove from the store
notifications, err := n.store.GetNotifications(ctx, msg.CollectionID)
if err != nil {
log.Error("Failed to get notifications", zap.Error(err))
triggerMsg.ResultChan <- err
continue
}
if len(notifications) == 0 {
log.Info("No pending notifications found")
triggerMsg.ResultChan <- nil
break
}
log.Info("Got notifications from notification store", zap.Any("notifications", notifications))
err = n.notifer.Notify(ctx, notifications)
if err != nil {
log.Error("Failed to send pending notifications", zap.Error(err))
} else {
n.store.RemoveNotifications(ctx, notifications)
log.Info("Rmove notifications from notification store", zap.Any("notifications", notifications))
triggerMsg.ResultChan <- nil
break
}
}
case <-n.doneChannel:
log.Info("Stopping notification processor")
return nil
}
}
}
func (n *SimpleNotificationProcessor) Trigger(ctx context.Context, triggerMsg TriggerMessage) {
log.Info("Triggering notification", zap.Any("msg", triggerMsg.Msg))
if len(n.channel) == triggerChannelSize {
log.Error("Notification channel is full, dropping notification", zap.Any("msg", triggerMsg.Msg))
triggerMsg.ResultChan <- nil
return
}
n.channel <- triggerMsg
}
func (n *SimpleNotificationProcessor) sendPendingNotifications(ctx context.Context) error {
notificationMap, err := n.store.GetAllPendingNotifications(ctx)
if err != nil {
log.Error("Failed to get all pending notifications", zap.Error(err))
return err
}
for collectionID, notifications := range notificationMap {
log.Info("Sending pending notifications", zap.Any("collectionID", collectionID), zap.Any("notifications", notifications))
for {
err = n.notifer.Notify(ctx, notifications)
if err != nil {
log.Error("Failed to send pending notifications", zap.Error(err))
} else {
n.store.RemoveNotifications(ctx, notifications)
break
}
}
}
return nil
}