Skip to content

Commit

Permalink
Refresh DHT
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Jun 21, 2023
1 parent a01aa23 commit 5b61bdd
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 59 deletions.
14 changes: 1 addition & 13 deletions build/dipdup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ metadata:
delay: ${IPFS_DELAY:-10}
providers:
# Pinata
- id: QmWaik1eJcGHq1ybTWe7sezRfqKNcDRNkeBaLnGwQJz1Cj
addr: /dnsaddr/fra1-1.hostnodes.pinata.cloud
- id: QmNfpLrQQZr5Ns9FAJKpyzgnDL2GgC6xBug1yUZozKFgu4
addr: /dnsaddr/fra1-2.hostnodes.pinata.cloud
- id: QmPo1ygpngghu5it8u4Mr3ym6SEU2Wp2wA66Z91Y1S1g29
addr: /dnsaddr/fra1-3.hostnodes.pinata.cloud
- id: QmRjLSisUCHVpFa5ELVvX3qVPfdxajxWJEHs9kN3EcxAW6
addr: /dnsaddr/nyc1-1.hostnodes.pinata.cloud
- id: QmPySsdmbczdZYBpbi2oq2WMJ8ErbfxtkG8Mo192UHkfGP
addr: /dnsaddr/nyc1-2.hostnodes.pinata.cloud
- id: QmSarArpxemsPESa6FNkmuu9iSE1QWqPX2R3Aw6f5jq4D5
addr: /dnsaddr/nyc1-3.hostnodes.pinata.cloud
- id: Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn
addr: /dns4/production-ipfs-peer.pinata.cloud
# Cloudflare
Expand Down Expand Up @@ -75,7 +63,7 @@ metadata:
addr: /dnsaddr/node-12.ingress.cloudflare-ipfs.com
# web3 storage
- id: QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC
# addr: /dns4/elastic.dag.house
addr: /dns4/elastic.dag.house
# European EPC Competence Center
- id: 12D3KooWGaHbxpDWn4JVYud899Wcpa4iHPa3AMYydfxQDb3MhDME
addr: /dnsaddr/ipfs.ssi.eecc.de
Expand Down
4 changes: 1 addition & 3 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.prom.IncrementErrorCounter(indexer.network, e)
err = e.Err

if e.Type == resolver.ErrorInvalidHTTPURI ||
e.Type == resolver.ErrorTypeInvalidJSON ||
e.Type == resolver.ErrorInvalidCID {
if e.IsFatal() {
cm.RetryCount = int8(indexer.settings.MaxRetryCountOnError)
}
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/metadata/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
ErrorTypeInvalidJSON ErrorType = "invalid_json"
ErrorInvalidHTTPURI ErrorType = "invalid_http_uri"
ErrorInvalidCID ErrorType = "invalid_ipfs_cid"
ErrorUnknownStorageType ErrorType = "unknown_storage_type"
)

// ResolvingError -
Expand All @@ -59,6 +60,14 @@ func newResolvingError(code int, typ ErrorType, err error) ResolvingError {
return ResolvingError{code, typ, err}
}

// IsFatal -
func (err ResolvingError) IsFatal() bool {
return err.Type == ErrorInvalidHTTPURI ||
err.Type == ErrorTypeInvalidJSON ||
err.Type == ErrorInvalidCID ||
err.Type == ErrorUnknownStorageType
}

// Resolved -
type Resolved struct {
By ResolverType
Expand Down Expand Up @@ -137,7 +146,7 @@ func (r Receiver) Resolve(ctx context.Context, network, address, link string, at
resolved.Data, err = r.sha.Resolve(ctx, network, address, link)

default:
return resolved, errors.Wrap(ErrUnknownStorageType, link)
return resolved, newResolvingError(0, ErrorUnknownStorageType, errors.Wrap(ErrUnknownStorageType, err.Error()))
}

if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions cmd/metadata/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.Tok
indexer.prom.IncrementErrorCounter(indexer.network, e)
err = e.Err

if e.Type == resolver.ErrorInvalidHTTPURI ||
e.Type == resolver.ErrorTypeInvalidJSON ||
e.Type == resolver.ErrorInvalidCID {
if e.IsFatal() {
tm.RetryCount = int8(indexer.settings.MaxRetryCountOnError)
}
}
Expand Down
96 changes: 57 additions & 39 deletions internal/ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

// Node -
type Node struct {
api icore.CoreAPI
node *core.IpfsNode
dht *kadDHT.IpfsDHT
limit int64
wg *sync.WaitGroup
api icore.CoreAPI
node *core.IpfsNode
dht *kadDHT.IpfsDHT
providers []Provider
limit int64
wg *sync.WaitGroup
}

// NewNode -
Expand All @@ -51,11 +52,12 @@ func NewNode(ctx context.Context, dir string, limit int64, blacklist []string, p
return nil, errors.Wrap(err, "failed to create dht client")
}
return &Node{
api: api,
node: node,
dht: dht,
limit: limit,
wg: new(sync.WaitGroup),
api: api,
node: node,
dht: dht,
providers: providers,
limit: limit,
wg: new(sync.WaitGroup),
}, nil
}

Expand Down Expand Up @@ -95,7 +97,7 @@ func (n *Node) Start(ctx context.Context, bootstrap ...string) error {
"/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6",
"/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic",
"/dns4/production-ipfs-peer.pinata.cloud/tcp/3000/ws/p2p/Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn",
"/dns4/elastic.dag.house/tcp/443/wss/p2p/QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC",
"/ip4/104.18.21.126/tcp/4001/p2p/QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC",
"/ip4/141.94.193.54/tcp/4001/p2p/12D3KooWAJJJwXsB5b68cbq69KpXiKqQAgTKssg76heHkg6mo2qB",
"/ip4/18.232.70.69/tcp/4001/p2p/12D3KooWAYgR87jsuQMUno9MXQHGv3A7GGf4wLPVQhSG7jPNtejk",
"/ip4/94.130.71.31/tcp/4001/p2p/12D3KooWJFBXTQaRhU3JJo5JiS4rHusKB7KuWmz1nfRjrkViaaMQ",
Expand All @@ -114,7 +116,7 @@ func (n *Node) Start(ctx context.Context, bootstrap ...string) error {
"/ip4/54.87.223.182/tcp/4001/p2p/12D3KooWHRtiyBEGddhe7wvNz5q8A6gyDReqpzN5aiyT2gktKqXd",
"/ip4/128.199.70.49/tcp/4001/p2p/12D3KooWQySWhisgDXXJEJTHZiewHasbsmfAMYbERdtnAS39397v",
"/ip4/35.171.4.239/tcp/4001/p2p/12D3KooWHJCJJrAjSnJB9Mx9JWMeBAjgdSXrV7FwkCZk61if2bR3",
"/ip4/45.32.130.169/tcp/4001/p2p/12D3KooWA1x69gRbUDaJqpZvQARRCR6H848ZydM6BBnszrTQV4w1",
"/ip4/45.32.130.169/tcp/4001/p2p/12D3KooWPhuXjAH2wprpAQdtUvGmB3A8Hbw18JtL99MRvasWqrjF",
"/ip4/54.147.190.40/tcp/4001/p2p/12D3KooWHR1v13MD6ybgj5T3Ds56MB8LGcaXRH8W9cNLJP19AnRy",
"/ip4/54.80.114.62/tcp/4001/p2p/12D3KooWSMc3sjPAAxdNXPg5nUa9M76WK2Vp3uf9FhfARpnmKjEH",
"/ip4/54.174.102.221/tcp/4001/p2p/12D3KooWQo32RF8QSanP2LUnPnuKshqZdCFuUtypexzpAiUCK3js",
Expand Down Expand Up @@ -336,27 +338,13 @@ func createRepository(dir string, blacklist []string, providers []Provider) (str
cfg.Swarm.Transports.Network.Relay = config.False
// cfg.Swarm.Transports.Network.QUIC = config.False
cfg.Swarm.AddrFilters = blacklist
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(10000)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(450)
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(900)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(600)
cfg.Routing.AcceleratedDHTClient = true

peers := make([]peer.AddrInfo, 0)
for i := range providers {
id, err := peer.Decode(providers[i].ID)
if err != nil {
log.Err(err).Str("peer", providers[i].ID).Msg("invalid identity")
continue
}
info := peer.AddrInfo{
ID: id,
}
if providers[i].Address != "" {
info.Addrs = []ma.Multiaddr{
ma.StringCast(providers[i].Address),
}
}

peers = append(peers, info)
peers, err := providersToAddrInfo(providers)
if err != nil {
return "", errors.Wrap(err, "collecting providers info error")
}
cfg.Peering = config.Peering{
Peers: peers,
Expand Down Expand Up @@ -398,17 +386,17 @@ func (n *Node) reconnect(ctx context.Context) {
return
}

peerInfo := make([]*peer.AddrInfo, len(connected))
peerInfo := make([]peer.AddrInfo, len(connected))
for i := range connected {
peerInfo[i] = &peer.AddrInfo{
peerInfo[i] = peer.AddrInfo{
ID: connected[i].ID(),
Addrs: []ma.Multiaddr{
connected[i].Address(),
},
}
}

ticker := time.NewTicker(time.Minute * 5)
ticker := time.NewTicker(time.Minute * 3)
defer ticker.Stop()

for {
Expand All @@ -422,6 +410,7 @@ func (n *Node) reconnect(ctx context.Context) {
continue
}

var wg sync.WaitGroup
for _, pi := range peerInfo {
var found bool
for i := range peers {
Expand All @@ -434,19 +423,48 @@ func (n *Node) reconnect(ctx context.Context) {
if found {
log.Info().Str("peer_id", pi.ID.String()).Msg("connected to peer")
} else {
n.wg.Add(1)
go func(p *peer.AddrInfo) {
connectCtx, cancel := context.WithTimeout(ctx, time.Second*15)
wg.Add(1)
go func(p peer.AddrInfo, wg *sync.WaitGroup) {
defer wg.Done()

connectCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

if err := n.api.Swarm().Connect(connectCtx, *p); err == nil {
if err := n.api.Swarm().Connect(connectCtx, p); err == nil {
log.Info().Str("peer_id", p.ID.String()).Msg("reconnected to peer")
} else {
log.Debug().Str("peer_id", p.ID.String()).Msgf("failed to reconnect: %s", err.Error())
}
}(pi)
}(pi, &wg)
}
}

wg.Wait()
log.Info().Msg("reconnection completed")

<-n.dht.RefreshRoutingTable()
log.Info().Msg("refresh routing table completed")
}
}
}

func providersToAddrInfo(providers []Provider) ([]peer.AddrInfo, error) {
peers := make([]peer.AddrInfo, 0)
for i := range providers {
id, err := peer.Decode(providers[i].ID)
if err != nil {
return nil, errors.Wrap(err, "providersToAddrInfo")
}
info := peer.AddrInfo{
ID: id,
}
if providers[i].Address != "" {
info.Addrs = []ma.Multiaddr{
ma.StringCast(providers[i].Address),
}
}

peers = append(peers, info)
}
return peers, nil
}

0 comments on commit 5b61bdd

Please sign in to comment.