Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
Merge 3ca3c0d into 9e17a66
Browse files Browse the repository at this point in the history
  • Loading branch information
hoffmabc committed Jun 25, 2020
2 parents 9e17a66 + 3ca3c0d commit 996f7d2
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 5 deletions.
1 change: 1 addition & 0 deletions ipfs/pointers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func PutPointerToPeer(dht *routing.IpfsDHT, ctx context.Context, peer peer.ID, p

func GetPointersFromPeer(dht *routing.IpfsDHT, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) {
pmes := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, key.Bytes(), 0)
log.Debugf("Fetching pointers from: %v\n", p.Pretty())
resp, err := dht.SendRequest(ctx, p, pmes)
if err != nil {
return []*ps.PeerInfo{}, err
Expand Down
3 changes: 2 additions & 1 deletion mobile/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,13 @@ func (n *Node) start() error {
SendAck: n.OpenBazaarNode.SendOfflineAck,
SendError: n.OpenBazaarNode.SendError,
})
go MR.ResetPointerList()
go MR.Run()
n.OpenBazaarNode.MessageRetriever = MR
PR := rep.NewPointerRepublisher(n.OpenBazaarNode.DHT, n.OpenBazaarNode.Datastore, n.OpenBazaarNode.PushNodes, n.OpenBazaarNode.IsModerator)
go PR.Run()
n.OpenBazaarNode.PointerRepublisher = PR
MR.Wait()
// MR.Wait()

n.OpenBazaarNode.PublishLock.Unlock()
publishUnlocked = true
Expand Down
31 changes: 27 additions & 4 deletions net/retriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ import (

const DefaultPointerPrefixLength = 14

var log = logging.MustGetLogger("retriever")
var (
// Initialize a clear pointerList for the DHT on start
pointerList = []string{}
log = logging.MustGetLogger("retriever")
)

type MRConfig struct {
Db repo.Datastore
Expand Down Expand Up @@ -66,6 +70,20 @@ type offlineMessage struct {
env pb.Envelope
}

func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}

// Reset on startup
func (m *MessageRetriever) ResetPointerList() {
pointerList = []string{}
}

func NewMessageRetriever(cfg MRConfig) *MessageRetriever {
var client *http.Client
if cfg.Dialer != nil {
Expand Down Expand Up @@ -100,8 +118,8 @@ func (m *MessageRetriever) Run() {
peers := time.NewTicker(time.Minute)
defer dht.Stop()
defer peers.Stop()
go m.fetchPointersFromDHT()
go m.fetchPointersFromPushNodes()
go m.fetchPointersFromDHT()
for {
select {
case <-dht.C:
Expand Down Expand Up @@ -159,7 +177,9 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
inFlight := make(map[string]bool)
// Iterate over the pointers, adding 1 to the waitgroup for each pointer found
for p := range peerOut {
if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !inFlight[p.Addrs[0].String()] {
if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !stringInSlice(p.Addrs[0].String(), pointerList) && !inFlight[p.Addrs[0].String()] {
pointerList = append(pointerList, p.Addrs[0].String())
log.Debugf("Looking for pointer [%v] at %v\n", p.ID.Pretty(), p.Addrs)
inFlight[p.Addrs[0].String()] = true
log.Debugf("Found pointer with location %s", p.Addrs[0].String())
// IPFS
Expand Down Expand Up @@ -215,12 +235,15 @@ func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerI
wg.Add(1)
go func(pid peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*35)
defer cancel()
time.Sleep(time.Second * 15)
provs, err := ipfs.GetPointersFromPeer(m.routing, ctx, pid, &k)
if err != nil {
log.Errorf("Could not get pointers from push node because: %v", err)
return
}
log.Debugf("Successfully queried %s for pointers", pid.Pretty())
for _, pi := range provs {
peerOut <- *pi
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket"
pb "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -65,13 +66,15 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
ctx, _ = context.WithTimeout(ctx, time.Second*3)
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})

ctx, _ = context.WithTimeout(ctx, time.Second*3)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket"
cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
Expand Down Expand Up @@ -65,13 +66,15 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
ctx, _ = context.WithTimeout(ctx, time.Second*3)
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})

ctx, _ = context.WithTimeout(ctx, time.Second*3)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
Expand Down

0 comments on commit 996f7d2

Please sign in to comment.