-
Notifications
You must be signed in to change notification settings - Fork 10
/
sub.go
89 lines (79 loc) · 2.02 KB
/
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package floodsub
import (
"context"
"sync"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
)
// subscription implements the pubsub subscription handle.
type subscription struct {
// ctx is the context for publishing
ctx context.Context
// relOnce guards release
relOnce sync.Once
// m is the floodsub instance
m *FloodSub
// channelID is the channel identifier
channelID string
// privKey is the private key for the sub
privKey crypto.PrivKey
// peerID is the peer ID for the sub
peerID peer.ID
// mtx guards handlers
mtx sync.Mutex
// handlers are the active handlers
handlers map[*subscriptionHandler]struct{}
}
// GetPeerId returns the peer ID for this subscription derived from private key.
func (s *subscription) GetPeerId() peer.ID {
return s.peerID
}
// GetChannelId returns the channel id.
func (s *subscription) GetChannelId() string {
return s.channelID
}
// Publish writes to the channel.
func (s *subscription) Publish(data []byte) error {
return s.m.Publish(s.ctx, s.channelID, s.privKey, data)
}
// AddHandler adds a callback that is called with each received message.
// The callback should not block.
// Returns a remove function.
// The handler(s) are also removed when the subscription is released.
func (b *subscription) AddHandler(cb func(m pubsub.Message)) func() {
sh := &subscriptionHandler{cb: cb}
b.mtx.Lock()
b.handlers[sh] = struct{}{}
b.mtx.Unlock()
relOnce := sync.Once{}
return func() {
relOnce.Do(func() {
b.mtx.Lock()
delete(b.handlers, sh)
b.mtx.Unlock()
})
}
}
// Release releases the handle.
func (s *subscription) Release() {
s.mtx.Lock()
for h := range s.handlers {
delete(s.handlers, h)
}
s.mtx.Unlock()
s.relOnce.Do(func() {
chid := s.channelID
s.m.mtx.Lock()
subs := s.m.channels[chid]
if subs != nil {
delete(subs, s)
}
if len(subs) == 0 {
defer s.m.wake()
}
s.m.mtx.Unlock()
})
}
// _ is a type assertion
var _ pubsub.Subscription = ((*subscription)(nil))