/
tracer.go
76 lines (59 loc) 路 1.67 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package start
import (
"sync"
"github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/tracer"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)
type Tracer struct {
cidsLk sync.RWMutex
cids map[string]chan<- peer.ID
}
var _ tracer.Tracer = (*Tracer)(nil)
func NewTracer() *Tracer {
return &Tracer{
cidsLk: sync.RWMutex{},
cids: map[string]chan<- peer.ID{},
}
}
func (t *Tracer) Register(contentID cid.Cid) <-chan peer.ID {
log.WithField("cid", contentID).Debugln("Tracer registered CID")
t.cidsLk.Lock()
defer t.cidsLk.Unlock()
ch := make(chan peer.ID)
t.cids[string(contentID.Bytes())] = ch
return ch
}
func (t *Tracer) Unregister(contentID cid.Cid) {
t.cidsLk.Lock()
defer t.cidsLk.Unlock()
log.WithField("cid", contentID).Debugln("Tracer unregistered CID")
ch, ok := t.cids[string(contentID.Bytes())]
if !ok {
return
}
close(ch)
delete(t.cids, string(contentID.Bytes()))
}
func (t *Tracer) MessageReceived(id peer.ID, msg message.BitSwapMessage) {
log.WithField("peerID", id).WithField("size", msg.Size()).Traceln("Received Bitswap message")
t.cidsLk.RLock()
defer t.cidsLk.RUnlock()
for _, e := range msg.Wantlist() {
ch, ok := t.cids[string(e.Cid.Bytes())]
if !ok {
continue
}
select {
case ch <- id:
log.WithField("peerID", id).WithField("cid", e.Cid).Traceln("Tracer delivered matched message")
default:
log.WithField("peerID", id).WithField("cid", e.Cid).Traceln("Tracer dropped matched message")
}
}
}
func (t *Tracer) MessageSent(id peer.ID, msg message.BitSwapMessage) {
log.WithField("peerID", id).WithField("size", msg.Size()).Traceln("Sent Bitswap message")
}