-
Notifications
You must be signed in to change notification settings - Fork 7
/
lookup.go
100 lines (81 loc) · 2.89 KB
/
lookup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
record_pb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-multihash"
"github.com/libp2p/go-libp2p-core/routing"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
)
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*record_pb.Record, []*peer.AddrInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}
// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
if record := pmes.GetRecord(); record != nil {
return record, peers, err
}
if len(peers) > 0 {
return nil, peers, nil
}
return nil, nil, routing.ErrNotFound
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
return dht.sendRequest(ctx, p, pmes)
}
func (dht *IpfsDHT) fp(ctx context.Context, p peer.ID, key multihash.Multihash) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
return nil, err
}
provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
_ = provs
// Give closer peers back to the query to be queried
closer := pmes.GetCloserPeers()
peers := pb.PBPeersToPeerInfos(closer)
return peers, nil
}
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ([]*peer.AddrInfo, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
res, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
return nil, err
}
if res == nil {
return nil, fmt.Errorf("null result returned")
}
return pb.PBPeersToPeerInfos(res.CloserPeers), nil
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
return dht.sendRequest(ctx, p, pmes)
}
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
pi := peer.AddrInfo{
ID: dht.host.ID(),
Addrs: dht.host.Addrs(),
}
// // only share WAN-friendly addresses ??
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
if len(pi.Addrs) < 1 {
return nil, fmt.Errorf("no known addresses for self, cannot put provider")
}
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
return pmes, nil
}