-
Notifications
You must be signed in to change notification settings - Fork 376
/
notifiee.go
106 lines (87 loc) · 2.1 KB
/
notifiee.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package bertymessenger
import (
"sync"
"github.com/gogo/protobuf/proto"
"go.uber.org/multierr"
"berty.tech/berty/v2/go/pkg/messengertypes"
)
// Notifiee system inspired from ipfs
type Notifiee interface {
StreamEvent(*messengertypes.StreamEvent) error
}
type Dispatcher struct {
mutex sync.RWMutex
notifiees map[Notifiee]struct{}
}
func (d *Dispatcher) Register(n Notifiee) func() {
d.mutex.Lock()
d.notifiees[n] = struct{}{}
d.mutex.Unlock()
return func() { d.Unregister(n) }
}
func (d *Dispatcher) Unregister(n Notifiee) {
d.mutex.Lock()
delete(d.notifiees, n)
d.mutex.Unlock()
}
func (d *Dispatcher) UnregisterAll() {
if d == nil {
return
}
d.mutex.Lock()
d.notifiees = make(map[Notifiee]struct{})
d.mutex.Unlock()
}
func (d *Dispatcher) StreamEvent(typ messengertypes.StreamEvent_Type, msg proto.Message, isNew bool) error {
payload, err := proto.Marshal(msg)
if err != nil {
return err
}
event := &messengertypes.StreamEvent{
Type: typ,
Payload: payload,
IsNew: isNew,
}
// can be parallelized if needed
var errs error
d.mutex.RLock()
for n := range d.notifiees {
if err := n.StreamEvent(event); err != nil {
errs = multierr.Append(errs, err)
}
}
d.mutex.RUnlock()
return errs
}
func (d *Dispatcher) Notify(typ messengertypes.StreamEvent_Notified_Type, title, body string, msg proto.Message) error {
var payload []byte
if msg != nil {
var err error
if payload, err = proto.Marshal(msg); err != nil {
return err
}
}
event := &messengertypes.StreamEvent_Notified{
Title: title,
Body: body,
Type: typ,
Payload: payload,
}
return d.StreamEvent(messengertypes.StreamEvent_TypeNotified, event, false)
}
func (d *Dispatcher) IsEnabled() bool {
return true
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{notifiees: make(map[Notifiee]struct{})}
}
type NotifieeBundle struct {
StreamEventImpl func(c *messengertypes.StreamEvent) error
}
func (nb *NotifieeBundle) StreamEvent(c *messengertypes.StreamEvent) error {
if nb.StreamEventImpl != nil {
return nb.StreamEventImpl(c)
}
return nil
}
var _ Notifiee = (*NotifieeBundle)(nil)