Skip to content

Commit

Permalink
providers search
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Jun 19, 2023
1 parent 0b2dd21 commit f03c9f9
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/metadata/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.logContractMetadata(*cm, "trying to resolve")
cm.RetryCount += 1

resolved, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
resolved, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link, cm.RetryCount)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
Expand Down
8 changes: 8 additions & 0 deletions cmd/metadata/resolver/ipfs_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ func (s IpfsNode) Resolve(ctx context.Context, network, address, link string) (i
func (s IpfsNode) Is(link string) bool {
return strings.HasPrefix(link, prefixIpfs)
}

// FindPeers -
func (s IpfsNode) FindPeers(ctx context.Context, link string) error {
requestCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

return s.node.FindPeersForContent(requestCtx, strings.TrimPrefix(link, prefixIpfs))
}
9 changes: 8 additions & 1 deletion cmd/metadata/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/dipdup-net/metadata/internal/tezos"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand Down Expand Up @@ -94,14 +95,20 @@ func New(ctx context.Context, settings config.Settings, tezosKeys *tezoskeys.Tez
}

// Resolve -
func (r Receiver) Resolve(ctx context.Context, network, address, link string) (resolved Resolved, err error) {
func (r Receiver) Resolve(ctx context.Context, network, address, link string, attempt int8) (resolved Resolved, err error) {
if len(link) < 7 { // the shortest prefix is http://
return resolved, errors.Wrap(ErrUnknownStorageType, link)
}

switch {
case r.ipfs.Is(link):
resolved.By = ResolverTypeIPFS
if attempt == 3 && network == "mainnet" {
if err := r.ipfs.FindPeers(ctx, link); err != nil {
log.Err(err).Str("link", link).Str("network", network).Msg("can't find peers for CID")
}
}

data, err := r.ipfs.Resolve(ctx, network, address, link)
if err != nil {
if errors.Is(err, ipfs.ErrInvalidCID) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/metadata/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.Tok
indexer.logTokenMetadata(*tm, "trying to resolve")
tm.RetryCount += 1

resolved, err := indexer.resolver.Resolve(ctx, tm.Network, tm.Contract, tm.Link)
resolved, err := indexer.resolver.Resolve(ctx, tm.Network, tm.Contract, tm.Link, tm.RetryCount)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/labstack/echo/v4 v4.9.0
github.com/libp2p/go-libp2p v0.28.0
github.com/libp2p/go-libp2p-kad-dht v0.24.0
github.com/multiformats/go-multiaddr v0.9.0
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.26.1
Expand Down Expand Up @@ -140,7 +141,6 @@ require (
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-core v0.20.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.24.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.6.1 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
Expand Down
68 changes: 65 additions & 3 deletions internal/ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
icore "github.com/ipfs/boxo/coreiface"
icorepath "github.com/ipfs/boxo/coreiface/path"
"github.com/ipfs/boxo/files"
"github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
Expand All @@ -22,13 +23,16 @@ import (
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/plugin/loader" // This package is needed so that all the preloaded plugins are loaded automatically
"github.com/ipfs/kubo/repo/fsrepo"
p2p "github.com/libp2p/go-libp2p"
kadDHT "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
)

// Node -
type Node struct {
api icore.CoreAPI
node *core.IpfsNode
dht *kadDHT.IpfsDHT
limit int64
}

Expand All @@ -38,13 +42,25 @@ func NewNode(ctx context.Context, dir string, limit int64, blacklist []string, p
if err != nil {
return nil, errors.Wrap(err, "failed to spawn node")
}
return &Node{api, node, limit}, nil
host, err := p2p.New()
if err != nil {
return nil, errors.Wrap(err, "failed to create p2p host")
}
dht, err := kadDHT.New(ctx, host, kadDHT.Mode(kadDHT.ModeClient), kadDHT.BootstrapPeers(kadDHT.GetDefaultBootstrapPeerAddrInfos()...))
if err != nil {
return nil, errors.Wrap(err, "failed to create dht client")
}
return &Node{api, node, dht, limit}, nil
}

// Start -
func (n *Node) Start(ctx context.Context, bootstrap ...string) error {
log.Info().Msg("going to connect to bootstrap nodes...")

if err := n.dht.Bootstrap(ctx); err != nil {
return errors.Wrap(err, "dht client connection to bootstrap")
}

bootstrapNodes := []string{
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
Expand Down Expand Up @@ -170,6 +186,51 @@ func (n *Node) Get(ctx context.Context, cid string) (Data, error) {
}, nil
}

// FindPeersForContent -
func (n *Node) FindPeersForContent(ctx context.Context, cidString string) error {
c, err := cid.Decode(cidString)
if err != nil {
return errors.Wrapf(err, "cid decoding: %s", cidString)
}
providers, err := n.dht.FindProviders(ctx, c)
if err != nil {
return errors.Wrapf(err, "finding peers for cid: %s", cidString)
}
if len(providers) == 0 {
return nil
}

peers, err := n.api.Swarm().Peers(ctx)
if err != nil {
return errors.Wrap(err, "receiving current peers")
}

for i := range providers {
connectCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

var connected bool
for j := range peers {
if peers[j].ID() == providers[i].ID {
connected = true
break
}
}
if connected {
continue
}

if err := n.api.Swarm().Connect(connectCtx, providers[i]); err != nil {
log.Warn().
Str("peer", providers[i].ID.String()).
Msgf("failed to connect: %s", err)
} else {
log.Info().Str("peer", providers[i].ID.String()).Msg("connected")
}
}
return nil
}

var loadPluginsOnce sync.Once

func spawn(ctx context.Context, dir string, blacklist []string, providers []Provider) (icore.CoreAPI, *core.IpfsNode, error) {
Expand Down Expand Up @@ -232,8 +293,8 @@ func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) err

connectCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err := ipfs.Swarm().Connect(connectCtx, *peerInfo)
if err != nil {

if err := ipfs.Swarm().Connect(connectCtx, *peerInfo); err != nil {
log.Warn().
Str("peer", peerInfo.ID.String()).
Msgf("failed to connect: %s", err)
Expand All @@ -251,6 +312,7 @@ func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) err
for i := range connected {
log.Info().Str("peer_id", connected[i].ID().String()).Str("address", connected[i].Address().String()).Msg("connected to peer")
}

return nil
}

Expand Down

0 comments on commit f03c9f9

Please sign in to comment.