-
Notifications
You must be signed in to change notification settings - Fork 376
/
pubsub_api.go
156 lines (125 loc) · 3.62 KB
/
pubsub_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package ipfsutil
import (
"context"
"sync"
ipfs_interface "github.com/ipfs/interface-go-ipfs-core"
ipfs_iopts "github.com/ipfs/interface-go-ipfs-core/options"
p2p_disc "github.com/libp2p/go-libp2p-core/discovery"
p2p_peer "github.com/libp2p/go-libp2p-core/peer"
p2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/zap"
)
type PubSubAPI struct {
*p2p_pubsub.PubSub
disc p2p_disc.Discovery
logger *zap.Logger
muTopics sync.RWMutex
topics map[string]*p2p_pubsub.Topic
}
func NewPubSubAPI(ctx context.Context, logger *zap.Logger, disc p2p_disc.Discovery, ps *p2p_pubsub.PubSub) ipfs_interface.PubSubAPI {
return &PubSubAPI{
PubSub: ps,
disc: disc,
logger: logger,
topics: make(map[string]*p2p_pubsub.Topic),
}
}
func (ps *PubSubAPI) topicJoin(topic string, opts ...p2p_pubsub.TopicOpt) (*p2p_pubsub.Topic, error) {
ps.muTopics.Lock()
defer ps.muTopics.Unlock()
var err error
t, ok := ps.topics[topic]
if ok {
return t, nil
}
if t, err = ps.PubSub.Join(topic, opts...); err != nil {
return nil, err
}
if _, err = t.Relay(); err != nil {
t.Close()
return nil, err
}
ps.topics[topic] = t
return t, nil
}
// func (ps *PubSubAPI) topicLeave(topic string) (err error) {
// ps.muTopics.Lock()
// if t, ok := ps.topics[topic]; ok {
// err = t.Close()
// delete(ps.topics, topic)
// }
// ps.muTopics.Unlock()
// return
// }
// Ls lists subscribed topics by name
func (ps *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
return ps.PubSub.GetTopics(), nil
}
// Peers list peers we are currently pubsubbing with
func (ps *PubSubAPI) Peers(ctx context.Context, opts ...ipfs_iopts.PubSubPeersOption) ([]p2p_peer.ID, error) {
s, err := ipfs_iopts.PubSubPeersOptions(opts...)
if err != nil {
return nil, err
}
return ps.PubSub.ListPeers(s.Topic), nil
}
var minTopicSize = p2p_pubsub.WithReadiness(p2p_pubsub.MinTopicSize(1))
// Publish a message to a given pubsub topic
func (ps *PubSubAPI) Publish(ctx context.Context, topic string, msg []byte) error {
t, err := ps.topicJoin(topic)
if err != nil {
return err
}
return t.Publish(ctx, msg, minTopicSize)
}
// Subscribe to messages on a given topic
func (ps *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...ipfs_iopts.PubSubSubscribeOption) (ipfs_interface.PubSubSubscription, error) {
t, err := ps.topicJoin(topic)
if err != nil {
return nil, err
}
ps.logger.Debug("subscribing", zap.String("topic", topic))
sub, err := t.Subscribe()
if err != nil {
return nil, err
}
return &pubsubSubscriptionAPI{ps.logger, sub}, nil
}
// PubSubSubscription is an active PubSub subscription
type pubsubSubscriptionAPI struct {
logger *zap.Logger
*p2p_pubsub.Subscription
}
// io.Closer
func (pss *pubsubSubscriptionAPI) Close() (_ error) {
pss.Subscription.Cancel()
return
}
// Next return the next incoming message
func (pss *pubsubSubscriptionAPI) Next(ctx context.Context) (ipfs_interface.PubSubMessage, error) {
m, err := pss.Subscription.Next(ctx)
if err != nil {
return nil, err
}
return &pubsubMessageAPI{m}, nil
}
// // PubSubMessage is a single PubSub message
type pubsubMessageAPI struct {
*p2p_pubsub.Message
}
// // From returns id of a peer from which the message has arrived
func (psm *pubsubMessageAPI) From() p2p_peer.ID {
return psm.Message.GetFrom()
}
// Data returns the message body
func (psm *pubsubMessageAPI) Data() []byte {
return psm.Message.GetData()
}
// Seq returns message identifier
func (psm *pubsubMessageAPI) Seq() []byte {
return psm.Message.GetSeqno()
}
// // Topics returns list of topics this message was set to
func (psm *pubsubMessageAPI) Topics() []string {
return psm.Message.GetTopicIDs()
}