Skip to content

Commit

Permalink
fix(provider): cache no subscribed peer
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Jan 29, 2019
1 parent 45e6590 commit 8765c48
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
5 changes: 3 additions & 2 deletions core/network/p2p/driver.go
Expand Up @@ -452,19 +452,20 @@ func (d *Driver) SendTo(ctx context.Context, pi pstore.PeerInfo, e *p2p.Envelope
return fmt.Errorf("write stream: `%s`", err.Error())
}

go inet.FullClose(s)
return nil
}

func (d *Driver) handleEnvelope(s inet.Stream) {
logger().Debug("receiving envelope")

if d.handler == nil {
logger().Error("handler is not set")
return
}

e := &p2p.Envelope{}
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)

e := &p2p.Envelope{}
switch err := pbr.ReadMsg(e); err {
case io.EOF:
s.Close()
Expand Down
3 changes: 2 additions & 1 deletion core/network/p2p/protocol/provider/provider.go
Expand Up @@ -47,7 +47,8 @@ func (m *Manager) addPeerToSub(id cid.Cid, pi pstore.PeerInfo) error {
logger().Debug("registering", zap.String("id", id.String()))
ps, ok := m.subs[id]
if !ok {
return fmt.Errorf("not subscribed to %s", id)
// If no subscription(s), create one to cache peer
m.subs[id] = make(Peers, 0)
}

m.subs[id] = ps.add(pi)
Expand Down
20 changes: 14 additions & 6 deletions core/network/p2p/protocol/provider/pubsub/pubsub.go
Expand Up @@ -3,6 +3,7 @@ package pubsub
import (
"context"
fmt "fmt"
io "io"
"sync"
"time"

Expand Down Expand Up @@ -181,24 +182,30 @@ func (p *Provider) getProvider(pid peer.ID) (string, error) {
// @TODO: for the moment handleStream accept any id,
// improve to match published message
func (p *Provider) handleStream(s inet.Stream) {
defer inet.FullClose(s)

pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)

remoteProvider := &ProviderInfo{}
if err := pbr.ReadMsg(remoteProvider); err != nil {
logger().Error("invalid provider info", zap.Error(err))
switch err := pbr.ReadMsg(remoteProvider); err {
case io.EOF:
s.Close()
return
case nil:
default:
s.Reset()
logger().Error("Error unmarshaling provider info", zap.Error(err))
return
}

pinfo, err := p.getPeerInfo(remoteProvider)
if err != nil {
s.Reset()
logger().Error("malformed provider info", zap.Error(err))
return
}

id, err := cid.Decode(remoteProvider.GetId())
if err != nil {
s.Reset()
logger().Error("invalid provider id", zap.String("id", id.String()), zap.Error(err))
}

Expand Down Expand Up @@ -282,11 +289,12 @@ func (p *Provider) handleSubscription(ctx context.Context) error {

pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(self); err != nil {
s.Reset()
logger().Error("write stream", zap.Error(err))
} else {
p.handler(id, pinfo)
continue
}

p.handler(id, pinfo)
go inet.FullClose(s)
}
}
Expand Down

0 comments on commit 8765c48

Please sign in to comment.