Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefix lookup implementation #2

Open
wants to merge 41 commits into
base: noot/double-hash
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cc4e42b
wip on prefix lookup
noot Aug 31, 2022
a8900f4
merge
noot Aug 31, 2022
648fd4e
wip
noot Sep 8, 2022
f528622
merge, fix kbucket dependency, start impl
noot Sep 9, 2022
1fc0b02
modify providers manager to return full keys for prefix lookup
noot Sep 9, 2022
681dd33
update pb.Message_Peer to have provides keys for prefix lookup
noot Sep 12, 2022
adda234
discard peers that don't provide desired content
noot Sep 12, 2022
8bba3ce
cleanup comments, update fullrt for prefix lookup
noot Sep 13, 2022
a0577de
cleanup
noot Sep 13, 2022
6c648f1
cleanup
noot Sep 20, 2022
063e99e
Merge branch 'noot/double-hash' of github.com:ChainSafe/go-libp2p-kad…
noot Sep 29, 2022
88d68c7
don't cache for prefix lookups
noot Sep 29, 2022
a9ce60b
update go-datastore dep
noot Oct 18, 2022
5c1eda5
add prefixLength check
noot Oct 18, 2022
675dfaa
slice lookup key by bits instead of bytes
noot Oct 27, 2022
32b9a49
add SetPrefixLength
noot Oct 27, 2022
e2434a9
update prefix lookup to be by bits
noot Nov 1, 2022
2dcb661
update protobuf message
noot Nov 2, 2022
bf1f5b4
use map of keys to list of peers for prefix lookups
noot Nov 2, 2022
780e8b7
fixes and cleanups, unit tests ok
noot Nov 2, 2022
af92e26
merge, fix lint errors
noot Nov 2, 2022
8abd351
cleanup
noot Nov 2, 2022
3129f21
update pb.Message.Key to have prefixBitLength; use this to check if q…
noot Nov 4, 2022
d8b4570
merge
noot Nov 17, 2022
ca5e8ca
small cleanups
noot Nov 17, 2022
6a48f02
cleanup
noot Nov 17, 2022
3708392
update pb not to use map for provider peers, appears to fix prefix lo…
noot Nov 18, 2022
6884c30
fix various prefix lookup bugs; change key encoding in db to hex
noot Nov 22, 2022
236de05
prefix testing
noot Nov 29, 2022
3c81d7f
finally fix routing issue
noot Nov 30, 2022
24e8f3b
cleanup, add unit tests
noot Nov 30, 2022
9a733d2
fix db check
noot Nov 30, 2022
af9c81c
add concurrency safety for dht.prefixLength
noot Nov 30, 2022
2415ab3
cleanup
noot Nov 30, 2022
96cedef
update Sha256Multihas to return extra length
noot Dec 1, 2022
bd3535e
cleanup
noot Dec 1, 2022
19a3d31
prefixLength 0 set to 256
noot Dec 1, 2022
3b82cf8
cleanup
noot Dec 8, 2022
30979a1
update comment
noot Jan 3, 2023
0feb15d
remove chainsafe/go-datastore usage
noot Jan 4, 2023
74ff325
log cleanup
noot Jan 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ type IpfsDHT struct {

// configuration variables for tests
testAddressUpdateProcessing bool

// length of prefix of keys for provider lookups
// if 0, the whole key is used.
prefixLength int
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -196,6 +200,8 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
return nil, err
}

dht.prefixLength = cfg.PrefixLookupLength

dht.testAddressUpdateProcessing = cfg.TestAddressUpdateProcessing

dht.auto = cfg.Mode
Expand Down Expand Up @@ -685,13 +691,22 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := pmes.GetKey()

if pmes.GetType() == pb.Message_GET_PROVIDERS {
// for GET_PROVIDERS messages, the message key is the hashed multihash, so don't hash it again
closer := dht.routingTable.NearestPeers(kb.ID(string(pmes.GetKey())), count)
return closer
if len(key) < 32 {
noot marked this conversation as resolved.
Show resolved Hide resolved
// prefix lookup
closer := dht.routingTable.NearestPeersToPrefix(kb.ID(string(key)), count)
return closer
} else {
// normal non-prefixed lookup
closer := dht.routingTable.NearestPeers(kb.ID(string(key)), count)
return closer
}
}

closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), count)
return closer
}

Expand Down
8 changes: 8 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ const DefaultPrefix protocol.ID = "/ipfs"

type Option = dhtcfg.Option

// PrefixLookups configures the DHT to use prefixes of keys for provider lookups.
func PrefixLookups(prefixLen int) Option {
return func(c *dhtcfg.Config) error {
c.PrefixLookupLength = prefixLen
return nil
}
}

// ProviderStore sets the provider storage manager.
func ProviderStore(ps providers.ProviderStore) Option {
return func(c *dhtcfg.Config) error {
Expand Down
49 changes: 49 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,55 @@ func TestProvidesAsync(t *testing.T) {
}
}

func TestProvides_PrefixLookup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dhts := setupDHTS(t, ctx, 4)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()

connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])

for _, k := range testCaseCids {
logger.Debugf("announcing provider for %s", k)
if err := dhts[3].Provide(ctx, k, true); err != nil {
t.Fatal(err)
}
}

time.Sleep(time.Millisecond * 6)

n := 0
for _, c := range testCaseCids {
n = (n + 1) % 3

logger.Debugf("getting providers for %s from %d", c, n)
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
dhts[n].prefixLength = 16 // half the hashed CID for now
provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)

select {
case prov := <-provchan:
if prov.ID == "" {
t.Fatal("Got back nil provider")
}
if prov.ID != dhts[3].self {
t.Fatal("Got back wrong provider")
}
case <-ctxT.Done():
t.Fatal("Did not get a provider back.")
}
}
}

func TestLayeredGet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
41 changes: 35 additions & 6 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type FullRT struct {
timeoutPerOp time.Duration

bulkSendParallelism int

prefixLength int
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -185,6 +187,8 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
crawlerInterval: time.Minute * 60,

bulkSendParallelism: 20,

prefixLength: dhtcfg.PrefixLookupLength,
}

rt.wg.Add(1)
Expand Down Expand Up @@ -1215,14 +1219,17 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in

keyMH := key.Hash()

logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
return peerOut
}

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)

// hash multihash for double-hashing implementation
mhHash := internal.Sha256Multihash(key)
logger.Debugw("finding providers", "cid", key, "mhHash", mhHash, "mh", internal.LoggableProviderRecordBytes(key))

findAll := count == 0
ps := make(map[peer.ID]struct{})
psLock := &sync.Mutex{}
Expand All @@ -1242,7 +1249,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return len(ps)
}

provs, err := dht.ProviderManager.GetProviders(ctx, key)
provs, err := dht.ProviderManager.GetProviders(ctx, mhHash[:])
if err != nil {
return
}
Expand Down Expand Up @@ -1278,7 +1285,14 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ID: p,
})

provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
var lookupKey []byte
if dht.prefixLength != 0 {
lookupKey = mhHash[:dht.prefixLength]
} else {
lookupKey = mhHash[:]
}

provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, lookupKey)
if err != nil {
return err
}
Expand All @@ -1287,12 +1301,27 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.

// Add unique providers from request, up to 'count'
for _, prov := range provs {
dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
// if this is a prefix lookup, the providers might not actually have
// the content we're looking for. discard all that don't
if dht.prefixLength > 0 {
var hasContent bool
for _, key := range prov.Keys {
if bytes.Equal(key, mhHash[:]) {
hasContent = true
break
}
}
if !hasContent {
continue
}
}

dht.maybeAddAddrs(prov.AddrInfo.ID, prov.AddrInfo.Addrs, peerstore.TempAddrTTL)
logger.Debugf("got provider: %s", prov)
if psTryAdd(prov.ID) {
if psTryAdd(prov.AddrInfo.ID) {
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
case peerOut <- *prov.AddrInfo:
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return ctx.Err()
Expand Down
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/ipfs/go-log v1.0.5
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-core v0.20.0
github.com/libp2p/go-libp2p-kbucket v0.4.7
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
Expand Down Expand Up @@ -71,7 +70,7 @@ require (
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.8.0 // indirect
github.com/libp2p/go-libp2p-core v0.20.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
Expand Down Expand Up @@ -123,3 +122,9 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)

replace github.com/libp2p/go-libp2p-kbucket => ../../ChainSafe/go-libp2p-kbucket

// replace github.com/libp2p/go-libp2p-kbucket => github.com/ChainSafe/go-libp2p-kbucket v0.0.0-20220908203920-02442af318da
// replace github.com/libp2p/go-libp2p-kbucket/keyspace => github.com/ChainSafe/go-libp2p-kbucket/keyspace v0.0.0-20220908203920-02442af318da
// replace github.com/libp2p/go-libp2p-kbucket/peerdiversity => github.com/ChainSafe/go-libp2p-kbucket/peerdiversity v0.0.0-20220908203920-02442af318da
Loading