Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2067 from OpenBazaar/chrisfixes
Browse files Browse the repository at this point in the history
Merge in some changes from that were in the mobile branch
  • Loading branch information
cpacia committed May 6, 2020
2 parents f3f0b76 + fc05296 commit 755e025
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 71 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -22,11 +22,11 @@ help:

.PHONY: ios_framework
ios_framework: ## Build iOS Framework for mobile
gomobile bind -target=ios github.com/OpenBazaar/openbazaar-go/mobile
gomobile bind -target=ios/arm64,ios/amd64 -iosversion=10 -ldflags="-s -w" github.com/OpenBazaar/openbazaar-go/mobile

.PHONY: android_framework
android_framework: ## Build Android Framework for mobile
gomobile bind -target=android github.com/OpenBazaar/openbazaar-go/mobile
gomobile bind -target=android/arm,android/arm64,android/amd64 -ldflags="-s -w" github.com/OpenBazaar/openbazaar-go/mobile

##
## Protobuf compilation
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Expand Up @@ -265,7 +265,7 @@ func (n *OpenBazaarNode) retryableSeedStoreToPeer(pid peer.ID, graphHash string,
}
err := n.SendStore(pid.Pretty(), graph)
if err != nil {
if retryTimeout > 60*time.Second {
if retryTimeout > 8*time.Second {
log.Errorf("error pushing to peer %s: %s", pid.Pretty(), err.Error())
return
}
Expand Down
31 changes: 20 additions & 11 deletions core/net.go
Expand Up @@ -98,23 +98,32 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
}
}
log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String())
OfflineMessageWaitGroup.Add(2)
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PublishPointer(n.DHT, ctx, pointer)
if err != nil {
log.Error(err)
}

// Push provider to our push nodes for redundancy
for _, p := range n.PushNodes {
// We publish our pointers to three different locations:
// 1. The pushnodes
// 2. The DHT
// 3. Pubsub
// Each one is done in a separate goroutine so as to not block but we
// do increment the OfflineMessageWaitGroup which is used to block
// shutdown until all publishing is finished.
OfflineMessageWaitGroup.Add(2 + len(n.PushNodes))
for _, p := range n.PushNodes {
go func(pid peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer)
err := ipfs.PutPointerToPeer(n.DHT, ctx, pid, pointer)
if err != nil {
log.Error(err)
}
OfflineMessageWaitGroup.Done()
}(p)
}
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PublishPointer(n.DHT, ctx, pointer)
if err != nil {
log.Error(err)
}

OfflineMessageWaitGroup.Done()
Expand Down
24 changes: 4 additions & 20 deletions mobile/cmd/main.go
Expand Up @@ -2,12 +2,10 @@ package main

import (
"fmt"
"os"
"sync"
"time"

"github.com/OpenBazaar/openbazaar-go/mobile"
"github.com/jessevdk/go-flags"
"os"
"path"
)

type Options struct {
Expand All @@ -21,7 +19,7 @@ var (
)

func main() {
var dataPath = "/Users/mg/work/ob/openbazaar-go/config_mobile_test"
var dataPath = path.Join(os.TempDir(), "ob-mobile")
if _, err := parser.Parse(); err != nil {
if len(os.Args) > 1 && os.Args[1] == "-h" {
os.Exit(0)
Expand All @@ -35,7 +33,6 @@ func main() {
}

var (
wg sync.WaitGroup
n, err = mobile.NewNodeWithConfig(&mobile.NodeConfig{
RepoPath: dataPath,
Testnet: options.TestnetEnabled,
Expand All @@ -48,18 +45,5 @@ func main() {
fmt.Println(err.Error())
}

time.Sleep(time.Second * 10)
fmt.Println("restarting...", time.Now())

wg.Add(1)

go func() {
err := n.Restart()
if err != nil {
panic(fmt.Sprintf("failed to restart: %s", err.Error()))
}
}()

wg.Wait()

select {}
}
15 changes: 12 additions & 3 deletions mobile/node.go
Expand Up @@ -70,7 +70,7 @@ type Node struct {

var (
fileLogFormat = logging.MustStringFormatter(
`%{time:2006-01-02 15:04:05.000} [%{level}] [%{module}/%{shortfunc}] %{message}`,
`[Haven] %{time:2006-01-02 15:04:05.000} [%{level}] [%{module}/%{shortfunc}] %{message}`,
)
publishUnlocked = false
mainLoggingBackend logging.Backend
Expand Down Expand Up @@ -122,7 +122,9 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N
}
obFileBackend := logging.NewLogBackend(obLog, "", 0)
obFileBackendFormatted := logging.NewBackendFormatter(obFileBackend, fileLogFormat)
mainLoggingBackend = logging.SetBackend(obFileBackendFormatted)
stdoutBackend := logging.NewLogBackend(os.Stdout, "", 0)
stdoutBackendFormatted := logging.NewBackendFormatter(stdoutBackend, fileLogFormat)
mainLoggingBackend = logging.SetBackend(obFileBackendFormatted, stdoutBackendFormatted)
logging.SetLevel(logging.INFO, "")

sqliteDB, err := initializeRepo(config.RepoPath, "", "", true, time.Now(), wi.Bitcoin)
Expand Down Expand Up @@ -282,7 +284,7 @@ func NewNodeWithConfig(config *NodeConfig, password string, mnemonic string) (*N
Datastore: sqliteDB,
MasterPrivateKey: mPrivKey,
Multiwallet: mw,
OfflineMessageFailoverTimeout: 5 * time.Second,
OfflineMessageFailoverTimeout: 3 * time.Second,
PushNodes: pushNodes,
RepoPath: config.RepoPath,
UserAgent: core.USERAGENT,
Expand Down Expand Up @@ -324,6 +326,13 @@ func (n *Node) startIPFSNode(repoPath string, config *ipfscore.BuildCfg) (*ipfsc
n.cancel = cancel

ctx := commands.Context{}

ipfscore.DefaultBootstrapConfig = ipfscore.BootstrapConfig{
MinPeerThreshold: 8,
Period: time.Second * 10,
ConnectionTimeout: time.Second * 10 / 3,
}

nd, err := ipfscore.NewNode(cctx, config)
if err != nil {
return nil, ctx, err
Expand Down
64 changes: 34 additions & 30 deletions net/retriever/retriever.go
Expand Up @@ -91,66 +91,70 @@ func NewMessageRetriever(cfg MRConfig) *MessageRetriever {
WaitGroup: new(sync.WaitGroup),
}

mr.Add(1)
mr.Add(2)
return &mr
}

func (m *MessageRetriever) Run() {
dht := time.NewTicker(time.Hour)
peers := time.NewTicker(time.Minute * 10)
peers := time.NewTicker(time.Minute)
defer dht.Stop()
defer peers.Stop()
go m.fetchPointers(true)
go m.fetchPointersFromDHT()
go m.fetchPointersFromPushNodes()
for {
select {
case <-dht.C:
m.Add(1)
go m.fetchPointers(true)
go m.fetchPointersFromDHT()
case <-peers.C:
m.Add(1)
go m.fetchPointers(false)
go m.fetchPointersFromPushNodes()
}
}
}

// RunOnce - used to fetch messages only once
func (m *MessageRetriever) RunOnce() {
m.Add(1)
go m.fetchPointers(true)
go m.fetchPointersFromDHT()
m.Add(1)
go m.fetchPointers(false)
go m.fetchPointersFromPushNodes()
}

func (m *MessageRetriever) fetchPointers(useDHT bool) {
func (m *MessageRetriever) fetchPointersFromDHT() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := new(sync.WaitGroup)
downloaded := 0
mh, _ := multihash.FromB58String(m.node.Identity.Pretty())
peerOut := make(chan ps.PeerInfo)
go func(c chan ps.PeerInfo) {
pwg := new(sync.WaitGroup)
pwg.Add(1)
go func(c chan ps.PeerInfo) {
out := m.getPointersDataPeers()
for p := range out {
c <- p
}
pwg.Done()
}(c)
if useDHT {
pwg.Add(1)
go func(c chan ps.PeerInfo) {
iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen)
for p := range iout {
c <- p
}
pwg.Done()
}(c)
iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen)
for p := range iout {
c <- p
}
close(c)

}(peerOut)

m.downloadMessages(peerOut)
}

func (m *MessageRetriever) fetchPointersFromPushNodes() {
peerOut := make(chan ps.PeerInfo)
go func(c chan ps.PeerInfo) {
out := m.getPointersDataPeers()
for p := range out {
c <- p
}
pwg.Wait()
close(c)

}(peerOut)
m.downloadMessages(peerOut)
}

func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
wg := new(sync.WaitGroup)
downloaded := 0

inFlight := make(map[string]bool)
// Iterate over the pointers, adding 1 to the waitgroup for each pointer found
Expand Down Expand Up @@ -239,7 +243,7 @@ func (m *MessageRetriever) fetchIPFS(pid peer.ID, n *core.IpfsNode, addr ma.Mult
var err error

go func() {
ciphertext, err = ipfs.Cat(n, addr.String(), time.Minute*5)
ciphertext, err = ipfs.Cat(n, addr.String(), time.Second*10)
c <- struct{}{}
}()

Expand Down
5 changes: 3 additions & 2 deletions net/service/messagesender.go
Expand Up @@ -111,8 +111,9 @@ func (ms *messageSender) prep() error {
if ms.s != nil {
return nil
}

nstr, err := ms.service.host.NewStream(ms.service.ctx, ms.p, ipfs.IPFSProtocolAppMainnetOne)
ctx, cancel := context.WithTimeout(ms.service.ctx, 3*time.Second)
defer cancel()
nstr, err := ms.service.host.NewStream(ctx, ms.p, ipfs.IPFSProtocolAppMainnetOne)
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions repo/listing.go
Expand Up @@ -185,6 +185,18 @@ func UpdateListing(r []byte, isTestnet bool, dstore *Datastore, repoPath string)
if err != nil {
return Listing{}, err
}
skus := ld.Item.Skus
for _, sku := range skus {
if sku.BigSurcharge == "" {
sku.BigSurcharge = "0"
}
if sku.BigQuantity == "" {
sku.BigQuantity = "0"
}
}

ld.Item.Skus = skus

slug := ld.Slug
exists, err := listingExists(slug, repoPath, isTestnet)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions wallet/listeners/transaction_listener.go
Expand Up @@ -90,8 +90,6 @@ func (l *TransactionListener) cleanupOrderState(isSale bool, contract *pb.Ricard
}

func (l *TransactionListener) OnTransactionReceived(cb wallet.TransactionCallback) {
log.Info("Transaction received", cb.Txid, cb.Height)

l.Lock()
defer l.Unlock()
for _, output := range cb.Outputs {
Expand Down

0 comments on commit 755e025

Please sign in to comment.