Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DHT query subcommand to get closest peers #2198

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 64 additions & 0 deletions commands/dht.go
Expand Up @@ -9,6 +9,7 @@ import (
cmds "gx/ipfs/QmQtQrtNioesAWtrx8csBvfY37gTe94d6wQ3VikZUjxD39/go-ipfs-cmds"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
pstore "gx/ipfs/QmRhFARzTHcFh8wUxwN5KvyTGq73FLC65EfFAhz8Ng7aGb/go-libp2p-peerstore"
"gx/ipfs/QmTu65MVbemtUxJEWgsTtzv9Zv9P8rvmqNA4eG9TrTRGYc/go-libp2p-peer"
notif "gx/ipfs/QmWaDSNoSdSXU9b6udyaq9T8y6LkzMwqWxECznFqvtcTsk/go-libp2p-routing/notifications"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)
Expand All @@ -27,9 +28,72 @@ var dhtCmd = &cmds.Command{
},

Subcommands: map[string]*cmds.Command{
"query": queryDhtCmd,
"findprovs": findProvidersDhtCmd,
},
}
var queryDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
ShortDescription: "Outputs a list of newline-delimited Peer IDs.",
},

Arguments: []cmdkit.Argument{
cmdkit.StringArg("peerID", true, true, "The peerID to run the query against."),
},
Options: []cmdkit.Option{
cmdkit.BoolOption(dhtVerboseOptionName, "v", "Print extra information."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

id, err := peer.IDB58Decode(req.Arguments[0])
if err != nil {
return cmds.ClientError("invalid peer ID")
}

ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx)

closestPeers, err := GetPorcelainAPI(env).NetworkGetClosestPeers(ctx, string(id))
if err != nil {
cancel()
return err
}

go func() {
defer cancel()
for p := range closestPeers {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
ID: p,
Type: notif.FinalPeer,
})
}
}()

for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{
notif.PeerResponse: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
for _, p := range obj.Responses {
fmt.Fprintf(out, "%s\n", p.ID.Pretty()) // nolint: errcheck
}
},
}
verbose, _ := req.Options[dhtVerboseOptionName].(bool)
printEvent(out, w, verbose, pfm)
return nil
}),
},
Type: notif.QueryEvent{},
}

var findProvidersDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Expand Down
12 changes: 12 additions & 0 deletions filnet/router.go
Expand Up @@ -2,10 +2,13 @@ package filnet

import (
"context"
"errors"

"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
pstore "gx/ipfs/QmRhFARzTHcFh8wUxwN5KvyTGq73FLC65EfFAhz8Ng7aGb/go-libp2p-peerstore"
"gx/ipfs/QmTu65MVbemtUxJEWgsTtzv9Zv9P8rvmqNA4eG9TrTRGYc/go-libp2p-peer"
routing "gx/ipfs/QmWaDSNoSdSXU9b6udyaq9T8y6LkzMwqWxECznFqvtcTsk/go-libp2p-routing"
"gx/ipfs/QmfM7kwroZsKhKFmnJagPvM28MZMyKxG3QV2AqfvZvEEqS/go-libp2p-kad-dht"
)

// This struct wraps the filecoin nodes router. This router is a
Expand Down Expand Up @@ -39,3 +42,12 @@ func NewRouter(r routing.IpfsRouting) *Router {
func (r *Router) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan pstore.PeerInfo {
return r.routing.FindProvidersAsync(ctx, key, count)
}

// GetClosestPeers returns peers of the K closest peers to the given key
func (r *Router) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
ipfsDHT, ok := r.routing.(*dht.IpfsDHT)
if !ok {
return nil, errors.New("underlying routing should be pointer of IpfsDHT")
}
return ipfsDHT.GetClosestPeers(ctx, key)
}
5 changes: 5 additions & 0 deletions plumbing/api.go
Expand Up @@ -187,6 +187,11 @@ func (api *API) NetworkFindProvidersAsync(ctx context.Context, key cid.Cid, coun
return api.network.FindProvidersAsync(ctx, key, count)
}

// NetworkGetClosestPeers issues a getClosestPeers query to the filecoin network.
func (api *API) NetworkGetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
return api.network.GetClosestPeers(ctx, key)
}

// SignBytes uses private key information associated with the given address to sign the given bytes.
func (api *API) SignBytes(data []byte, addr address.Address) (types.Signature, error) {
return api.wallet.SignBytes(data, addr)
Expand Down