/
utils.go
35 lines (29 loc) · 999 Bytes
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package host
import (
"context"
"encoding/binary"
"time"
"github.com/araskachoi/go-libp2p-pubsub-benchmark-tools/pkg/logger"
pb "github.com/araskachoi/go-libp2p-pubsub-benchmark-tools/pkg/pb/publisher"
"github.com/davecgh/go-spew/spew"
peer "github.com/libp2p/go-libp2p-peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
func pubsubHandler(ctx context.Context, hostID peer.ID, sub *pubsub.Subscription) {
for {
nxt, err := sub.Next(ctx)
if err != nil {
logger.Errorf("err reading next:\n%v", err)
continue
}
logger.Info("msg received on pubsub channel")
msg := pb.Message{}
if err = msg.XXX_Unmarshal(nxt.GetData()); err != nil {
logger.Errorf("err unmarshaling next message:\n%v", err)
continue
}
spew.Dump(msg)
// TODO: how to increment sequence before sending out?
logger.Infof("Pubsub message received: %v,%v,%v,%v,%d,%d", hostID, nxt.GetFrom(), msg.GetId(), binary.BigEndian.Uint64(nxt.GetSeqno()), time.Now().UnixNano(), msg.GetSequence())
}
}