-
Notifications
You must be signed in to change notification settings - Fork 1
/
lookup.go
134 lines (114 loc) · 3.26 KB
/
lookup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package dht
import (
"context"
"fmt"
"strings"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
notif "github.com/libp2p/go-libp2p-routing/notifications"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)
func tryFormatLoggableKey(k string) (string, error) {
if len(k) == 0 {
return "", fmt.Errorf("loggableKey is empty")
}
var proto, cstr string
if k[0] == '/' {
// it's a path (probably)
protoEnd := strings.IndexByte(k[1:], '/')
if protoEnd < 0 {
return k, fmt.Errorf("loggableKey starts with '/' but is not a path: %x", k)
}
proto = k[1 : protoEnd+1]
cstr = k[protoEnd+2:]
} else {
proto = "provider"
cstr = k
}
var encStr string
c, err := cid.Cast([]byte(cstr))
if err == nil {
encStr = c.String()
} else {
encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
}
return fmt.Sprintf("/%s/%s", proto, encStr), nil
}
func loggableKey(k string) logging.LoggableMap {
newKey, err := tryFormatLoggableKey(k)
if err != nil {
logger.Debug(err)
} else {
k = newKey
}
return logging.LoggableMap{
"key": k,
}
}
func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
return logging.LoggableMap{
"multihash": base32.RawStdEncoding.EncodeToString(mh),
}
}
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
if len(tablepeers) == 0 {
return nil, kb.ErrLookupFailure
}
out := make(chan peer.ID, dht.bucketSize)
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
// For DHT query command
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: peers,
})
return &dhtQueryResult{closerPeers: peers}, nil
})
go func() {
defer close(out)
defer e.Done()
timedCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
// run it!
res, err := query.Run(timedCtx, tablepeers)
if err != nil {
logger.Debugf("closestPeers query run error: %s", err)
}
if res != nil && res.queriedSet != nil {
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
l := len(sorted)
if l > dht.bucketSize {
sorted = sorted[:dht.bucketSize]
}
for _, p := range sorted {
out <- p
}
}
}()
return out, nil
}