Skip to content

Commit

Permalink
fix(network): emit to sync
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Mar 8, 2019
1 parent 87e1f83 commit 6b1ce91
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 43 deletions.
Expand Up @@ -20,6 +20,7 @@
android:networkSecurityConfig="@xml/network_security_config"
>


<activity
android:name=".MainActivity"
android:launchMode="singleTop"
Expand Down
42 changes: 14 additions & 28 deletions core/network/driver.go
Expand Up @@ -214,43 +214,34 @@ func (net *Network) EmitTo(ctx context.Context, channel string, e *entity.Envelo
ctx = tracer.Context()

logger().Debug("looking for peers", zap.String("channel", channel))
ss, err := net.FindProvidersAndWait(ctx, channel, true)
c, err := net.createCid(channel)
if err != nil {
return err
}

ss := net.host.Routing.FindProvidersAsync(ctx, c, 100)

// @TODO: we need to split this, and let the node do the logic to try
// back if the send fail with the given peer

logger().Debug("found peers", zap.String("channel", channel), zap.Int("number", len(ss)))

mu := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(ss))

ok := false
for i, s := range ss {
go func(pi pstore.PeerInfo, index int) {
gotracer := tracing.EnterFunc(ctx, index)
goctx := tracer.Context()

defer gotracer.Finish()
defer wg.Done()
for pi := range ss {
if pi.ID == "" {
break
}
logger().Debug(fmt.Sprintf("send to peer: %+v", pi))

if err := net.SendTo(goctx, pi, e); err != nil {
logger().Warn("sendTo", zap.Error(err))
return
}
if err := net.SendTo(ctx, pi, e); err != nil {
logger().Warn("sendTo", zap.Error(err))
continue
}

mu.Lock()
ok = true
mu.Unlock()
return
}(s, i)
ok = true
break
}

// wait until all goroutines are done
wg.Wait()
if !ok {
return fmt.Errorf("unable to send evenlope to at last one peer")
}
Expand All @@ -264,11 +255,6 @@ func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.En
return fmt.Errorf("cannot dial to self")
}

logger().Debug("connecting", zap.String("peerID", peerID))
if err := net.Connect(ctx, pi); err != nil {
return fmt.Errorf("failed to connect: `%s`", err.Error())
}

s, err := net.host.NewStream(ctx, pi.ID, ProtocolID)
if err != nil {
return fmt.Errorf("new stream failed: `%s`", err.Error())
Expand Down
17 changes: 16 additions & 1 deletion core/network/host/connmanager.go
Expand Up @@ -9,6 +9,7 @@ import (
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func (cm *BertyConnMgr) Disconnected(net inet.Network, c inet.Conn) {
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
case <-time.After(time.Second * 1):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
Expand Down Expand Up @@ -145,6 +146,20 @@ func (cm *BertyConnMgr) Disconnected(net inet.Network, c inet.Conn) {
}
}

// TODO
func (cm *BertyConnMgr) getRelayPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}
func (cm *BertyConnMgr) getBootstrapPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}
func (cm *BertyConnMgr) getKbucketPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}

// Listen is no-op in this implementation.
func (cm *BertyConnMgr) Listen(n inet.Network, addr ma.Multiaddr) {
cm.BasicConnMgr.Notifee().Listen(n, addr)
Expand Down
2 changes: 2 additions & 0 deletions core/network/host/host.go
Expand Up @@ -71,6 +71,7 @@ func (bh *BertyHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
var err error
addrs, err = bh.findPeerAddrs(ctx, pi.ID)
if err != nil {
logger().Error(fmt.Sprintf("failed to find peer addrs: peer: %+v, error: %+v", pi, err.Error()))
return err
}
}
Expand Down Expand Up @@ -113,6 +114,7 @@ func (bh *BertyHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {

// if we're here, we got some addrs. let's use our wrapped host to connect.
pi.Addrs = addrs
logger().Debug("BertyHost::Connect: try to connect with addrs :" + fmt.Sprintf("%+v", pi.Addrs))
return bh.Host.Connect(ctx, pi)
}

Expand Down
26 changes: 12 additions & 14 deletions core/network/host/routing.go
Expand Up @@ -107,13 +107,17 @@ func (br *BertyRouting) GetValue(ctx context.Context, ns string, opts ...ropts.O
func (br *BertyRouting) SearchValue(ctx context.Context, ns string, opts ...ropts.Option) (<-chan []byte, error) {
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
return nil, err
}

return br.dht.SearchValue(ctx, ns, opts...)
}

func (br *BertyRouting) FindPeer(ctx context.Context, pid peer.ID) (pstore.PeerInfo, error) {
br.waitIsReady(ctx)
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
return pstore.PeerInfo{}, err
}
return br.dht.FindPeer(ctx, pid)

}
Expand All @@ -122,13 +126,18 @@ func (br *BertyRouting) FindPeer(ctx context.Context, pid peer.ID) (pstore.PeerI
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
func (br *BertyRouting) Provide(ctx context.Context, id cid.Cid, brd bool) error {
br.waitIsReady(ctx)
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
return err
}
return br.dht.Provide(ctx, id, brd)
}

// Search for peers who are able to provide a given key
func (br *BertyRouting) FindProvidersAsync(ctx context.Context, id cid.Cid, n int) <-chan pstore.PeerInfo {
br.waitIsReady(ctx)
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
}
return br.dht.FindProvidersAsync(ctx, id, n)
}

Expand All @@ -141,17 +150,6 @@ func (br *BertyRouting) isReady(ctx context.Context) error {
}
}

func (br *BertyRouting) waitIsReady(ctx context.Context) {
for {
if err := br.isReady(ctx); err != nil {
logger().Error("routing isn't ready", zap.Error(err))
time.Sleep(time.Second)
continue
}
break
}
}

func (br *BertyRouting) getBackoffDelay(currentDelay time.Duration) time.Duration {
delay := float64(currentDelay) * factor
if delay > float64(maxDelay) {
Expand Down

0 comments on commit 6b1ce91

Please sign in to comment.