forked from ipfs/go-ipfs-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
90 lines (67 loc) · 1.66 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package shell
import (
"encoding/binary"
"encoding/json"
"github.com/libp2p/go-libp2p-peer"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
// PubSubRecord is a record received via PubSub.
type PubSubRecord interface {
// From returns the peer ID of the node that published this record
From() peer.ID
// Data returns the data field
Data() []byte
// SeqNo is the sequence number of this record
SeqNo() int64
//TopicIDs is the list of topics this record belongs to
TopicIDs() []string
}
type message struct {
*pb.Message
}
func (m *message) GetFrom() peer.ID {
return peer.ID(m.Message.GetFrom())
}
type floodsubRecord struct {
msg *message
}
func (r floodsubRecord) From() peer.ID {
return r.msg.GetFrom()
}
func (r floodsubRecord) Data() []byte {
return r.msg.GetData()
}
func (r floodsubRecord) SeqNo() int64 {
return int64(binary.BigEndian.Uint64(r.msg.GetSeqno()))
}
func (r floodsubRecord) TopicIDs() []string {
return r.msg.GetTopicIDs()
}
///
// PubSubSubscription allow you to receive pubsub records that where published on the network.
type PubSubSubscription struct {
resp *Response
}
func newPubSubSubscription(resp *Response) *PubSubSubscription {
sub := &PubSubSubscription{
resp: resp,
}
return sub
}
// Next waits for the next record and returns that.
func (s *PubSubSubscription) Next() (PubSubRecord, error) {
if s.resp.Error != nil {
return nil, s.resp.Error
}
d := json.NewDecoder(s.resp.Output)
r := &message{}
err := d.Decode(r)
return floodsubRecord{msg: r}, err
}
// Cancel cancels the given subscription.
func (s *PubSubSubscription) Cancel() error {
if s.resp.Output == nil {
return nil
}
return s.resp.Output.Close()
}