Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefix lookup implementation #2

Open
wants to merge 41 commits into
base: noot/double-hash
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cc4e42b
wip on prefix lookup
noot Aug 31, 2022
a8900f4
merge
noot Aug 31, 2022
648fd4e
wip
noot Sep 8, 2022
f528622
merge, fix kbucket dependency, start impl
noot Sep 9, 2022
1fc0b02
modify providers manager to return full keys for prefix lookup
noot Sep 9, 2022
681dd33
update pb.Message_Peer to have provides keys for prefix lookup
noot Sep 12, 2022
adda234
discard peers that don't provide desired content
noot Sep 12, 2022
8bba3ce
cleanup comments, update fullrt for prefix lookup
noot Sep 13, 2022
a0577de
cleanup
noot Sep 13, 2022
6c648f1
cleanup
noot Sep 20, 2022
063e99e
Merge branch 'noot/double-hash' of github.com:ChainSafe/go-libp2p-kad…
noot Sep 29, 2022
88d68c7
don't cache for prefix lookups
noot Sep 29, 2022
a9ce60b
update go-datastore dep
noot Oct 18, 2022
5c1eda5
add prefixLength check
noot Oct 18, 2022
675dfaa
slice lookup key by bits instead of bytes
noot Oct 27, 2022
32b9a49
add SetPrefixLength
noot Oct 27, 2022
e2434a9
update prefix lookup to be by bits
noot Nov 1, 2022
2dcb661
update protobuf message
noot Nov 2, 2022
bf1f5b4
use map of keys to list of peers for prefix lookups
noot Nov 2, 2022
780e8b7
fixes and cleanups, unit tests ok
noot Nov 2, 2022
af92e26
merge, fix lint errors
noot Nov 2, 2022
8abd351
cleanup
noot Nov 2, 2022
3129f21
update pb.Message.Key to have prefixBitLength; use this to check if q…
noot Nov 4, 2022
d8b4570
merge
noot Nov 17, 2022
ca5e8ca
small cleanups
noot Nov 17, 2022
6a48f02
cleanup
noot Nov 17, 2022
3708392
update pb not to use map for provider peers, appears to fix prefix lo…
noot Nov 18, 2022
6884c30
fix various prefix lookup bugs; change key encoding in db to hex
noot Nov 22, 2022
236de05
prefix testing
noot Nov 29, 2022
3c81d7f
finally fix routing issue
noot Nov 30, 2022
24e8f3b
cleanup, add unit tests
noot Nov 30, 2022
9a733d2
fix db check
noot Nov 30, 2022
af9c81c
add concurrency safety for dht.prefixLength
noot Nov 30, 2022
2415ab3
cleanup
noot Nov 30, 2022
96cedef
update Sha256Multihas to return extra length
noot Dec 1, 2022
bd3535e
cleanup
noot Dec 1, 2022
19a3d31
prefixLength 0 set to 256
noot Dec 1, 2022
3b82cf8
cleanup
noot Dec 8, 2022
30979a1
update comment
noot Jan 3, 2023
0feb15d
remove chainsafe/go-datastore usage
noot Jan 4, 2023
74ff325
log cleanup
noot Jan 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"context"
"errors"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -149,6 +150,11 @@ type IpfsDHT struct {

// configuration variables for tests
testAddressUpdateProcessing bool

// length of prefix of keys for provider lookups
// if 0, the whole key is used.
prefixLength int
prefixLengthMu sync.RWMutex
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -197,6 +203,12 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
return nil, err
}

dht.prefixLength = cfg.PrefixLookupLength
if dht.prefixLength >= 256 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: Is the prefix length set globally for the kubo node, or can it be changed for each request (e.g with Options, or argument)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be changed for each request by calling SetPrefixLength right before the lookup. Another option I just thought of (that might be nicer API-wise?) would be to create an option specifically for the FindProviders function that the caller can optionally pass in. what do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be awesome if the prefix can be set on a per request basis, with a global default value (either static e.g taken from the conf, or adaptive) for if the option isn't specifically set.

// if prefixLength is greater than the hash length, then just look up the whole hash
dht.prefixLength = 0
}

dht.testAddressUpdateProcessing = cfg.TestAddressUpdateProcessing

dht.auto = cfg.Mode
Expand Down Expand Up @@ -344,7 +356,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
if cfg.ProviderStore != nil {
dht.providerStore = cfg.ProviderStore
} else {
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore)
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), h.Peerstore(), cfg.Datastore)
if err != nil {
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
}
Expand Down Expand Up @@ -419,6 +431,21 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound
return rt, err
}

// SetPrefixLength sets the prefix length for DHT provider lookups.
func (dht *IpfsDHT) SetPrefixLength(prefixLength int) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is SetPrefixLength called somewhere in go-libp2p-kad-dht? Or is this function meant to be called by kubo or go-libp2p to configure the DHT settings?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's meant to be called externally. I needed this for testing, but thought it could be useful for users of the library also, since the only other way to set the prefix length is at DHT initialization.

if prefixLength > 256 || prefixLength < 0 {
return errors.New("invalid prefix length")
}

dht.prefixLengthMu.Lock()
if prefixLength == 0 {
prefixLength = 256
}
dht.prefixLength = prefixLength
dht.prefixLengthMu.Unlock()
return nil
}

// ProviderStore returns the provider storage object for storing and retrieving provider records.
func (dht *IpfsDHT) ProviderStore() providers.ProviderStore {
return dht.providerStore
Expand Down Expand Up @@ -685,23 +712,42 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
}

// nearestPeersToQuery returns the routing tables closest peers.Digest
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := pmes.GetKey()
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) ([]peer.ID, error) {
key := pmes.GetKey().GetKey()
prefixBitLength := pmes.GetKey().GetPrefixBitLength()

if prefixBitLength != 0 {
key, err := internal.DecodePrefixedKey(key)
if err != nil {
return nil, err
}

// prefix lookup
closer := dht.routingTable.NearestPeersToPrefix(kb.ID(string(key)), count)
return closer, nil
}

// for GET_PROVIDERS messages, or sometimes FIND_NODE messages,
// the message key is the hashed multihash, so don't hash it again
decodedMH, err := multihash.Decode(key)
if err == nil && decodedMH.Code == multihash.DBL_SHA2_256 {
// normal non-prefixed lookup
closer := dht.routingTable.NearestPeers(kb.ID(string(decodedMH.Digest)), count)
return closer
return closer, nil
}

// non-prefixed, non double-hashed lookup
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), count)
return closer
return closer, nil
}

// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)
closer, err := dht.nearestPeersToQuery(pmes, count)
if err != nil {
logger.Errorf("key decoding error: %w", err)
return nil
}

// no node? nil
if closer == nil {
Expand Down
12 changes: 6 additions & 6 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}
return false
}
err = req.Unmarshal(msgbytes)
err = req.XXX_Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
if c := baseLogger.Check(zap.DebugLevel, "error unmarshaling message"); c != nil {
Expand Down Expand Up @@ -116,15 +116,15 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()))
zap.Binary("key", req.GetKey().GetKey()))
}
resp, err := handler(ctx, mPeer, &req)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
if c := baseLogger.Check(zap.DebugLevel, "error handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Binary("key", req.GetKey().GetKey()),
zap.Error(err))
}
return false
Expand All @@ -133,7 +133,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
if c := baseLogger.Check(zap.DebugLevel, "handled message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Binary("key", req.GetKey().GetKey()),
zap.Duration("time", time.Since(startTime)))
}

Expand All @@ -148,7 +148,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
if c := baseLogger.Check(zap.DebugLevel, "error writing response"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Binary("key", req.GetKey().GetKey()),
zap.Error(err))
}
return false
Expand All @@ -159,7 +159,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
if c := baseLogger.Check(zap.DebugLevel, "responded to message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Binary("key", req.GetKey().GetKey()),
zap.Duration("time", elapsedTime))
}

Expand Down
9 changes: 9 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ const DefaultPrefix protocol.ID = "/ipfs"

type Option = dhtcfg.Option

// PrefixLookups configures the DHT to use prefixes of keys for provider lookups.
// The length is in *bits*, and thus must be less than 256 (32 bytes).
func PrefixLookups(prefixLen int) Option {
return func(c *dhtcfg.Config) error {
c.PrefixLookupLength = prefixLen
return nil
}
}

// ProviderStore sets the provider storage manager.
func ProviderStore(ps providers.ProviderStore) Option {
return func(c *dhtcfg.Config) error {
Expand Down
51 changes: 50 additions & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,55 @@ func TestProvidesAsync(t *testing.T) {
}
}

func TestProvides_PrefixLookup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dhts := setupDHTS(t, ctx, 4)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()

connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])

for _, k := range testCaseCids {
logger.Debugf("announcing provider for %s", k)
if err := dhts[3].Provide(ctx, k, true); err != nil {
t.Fatal(err)
}
}

time.Sleep(time.Millisecond * 6)

n := 0
for _, c := range testCaseCids {
n = (n + 1) % 3

logger.Debugf("getting providers for %s from %d", c, n)
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
dhts[n].prefixLength = 16 // half the hashed CID for now
provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)

select {
case prov := <-provchan:
if prov.ID == "" {
t.Fatal("Got back nil provider")
}
if prov.ID != dhts[3].self {
t.Fatal("Got back wrong provider")
}
case <-ctxT.Done():
t.Fatal("Did not get a provider back.")
}
}
}

func TestLayeredGet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1299,7 +1348,7 @@ func TestClientModeConnect(t *testing.T) {

c := testCaseCids[0]
p := peer.ID("TestPeer")
mhHash := internal.Sha256Multihash(c.Hash())
mhHash, _ := internal.Sha256Multihash(c.Hash())
err := a.ProviderStore().AddProvider(ctx, mhHash[:], peer.AddrInfo{ID: p})
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 4 additions & 2 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ func TestGetFailures(t *testing.T) {

rec := record.MakePutRecord(str, []byte("blah"))
req := pb.Message{
Type: typ,
Key: []byte(str),
Type: typ,
Key: &pb.Message_Key{
Key: []byte(str),
},
Record: rec,
}

Expand Down
23 changes: 15 additions & 8 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
return fmt.Errorf("invalid cid: undefined")
}
keyMH := key.Hash()
mhHash := internal.Sha256Multihash(keyMH)
mhHash, _ := internal.Sha256Multihash(keyMH)
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH), "mhHash", mhHash)

// add self locally
Expand Down Expand Up @@ -938,7 +938,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)

fn := func(ctx context.Context, p, k peer.ID) error {
pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0)
pmes.ProviderPeers = pbPeers
pmes.ProviderPeers = dht_pb.PeersToPeersWithKey(pbPeers)

return dht.messageSender.SendMessage(ctx, p, pmes)
}
Expand Down Expand Up @@ -1215,14 +1215,17 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in

keyMH := key.Hash()

logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
return peerOut
}

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)

// hash multihash for double-hashing implementation
mhHash, _ := internal.Sha256Multihash(key)
logger.Debugw("finding providers", "cid", key, "mhHash", mhHash, "mh", internal.LoggableProviderRecordBytes(key))

findAll := count == 0
ps := make(map[peer.ID]struct{})
psLock := &sync.Mutex{}
Expand All @@ -1242,7 +1245,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return len(ps)
}

provs, err := dht.ProviderManager.GetProviders(ctx, key)
provs, err := dht.ProviderManager.GetProviders(ctx, mhHash[:])
if err != nil {
return
}
Expand Down Expand Up @@ -1278,7 +1281,12 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ID: p,
})

provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
var (
provs []*peer.AddrInfo
closer []*peer.AddrInfo
err error
)
provs, closer, err = dht.protoMessenger.GetProviders(ctx, p, key)
if err != nil {
return err
}
Expand All @@ -1300,18 +1308,17 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
}
if !findAll && psSize() >= count {
logger.Debugf("got enough providers (%d/%d)", psSize(), count)
cancelquery()
return nil
}
}

// Give closer peers back to the query to be queried
logger.Debugf("got closer peers: %d %s", len(closest), closest)
logger.Debugf("got closer peers: %d %s", len(closer), closer)

routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: closest,
Responses: closer,
})
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ require (
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-core v0.20.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.8.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
Expand Down Expand Up @@ -123,3 +122,7 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)

replace github.com/libp2p/go-libp2p-kbucket => ../../ChainSafe/go-libp2p-kbucket

replace github.com/ipfs/go-datastore => ../../ChainSafe/go-datastore
Loading