Skip to content

Commit

Permalink
safe handlers for all request handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Jan 10, 2024
1 parent 2a3b2f3 commit e4b7508
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 13 deletions.
5 changes: 3 additions & 2 deletions protocolproxy/forwardinghost.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/filecoin-project/boost/protocolproxy/messages"
"github.com/filecoin-project/boost/safe"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -30,7 +31,7 @@ func NewForwardingHost(h host.Host, proxy peer.AddrInfo) host.Host {
proxy: proxy.ID,
handlers: make(map[protocol.ID]network.StreamHandler),
}
fh.Host.SetStreamHandler(ForwardingProtocolID, fh.handleForwarding)
fh.Host.SetStreamHandler(ForwardingProtocolID, safe.Handle(fh.handleForwarding))
return fh
}

Expand All @@ -45,7 +46,7 @@ func (fh *ForwardingHost) Close() error {
// protocol will go through the forwarding handshake with the proxy, then the native
// handler will be called
func (fh *ForwardingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
fh.Host.SetStreamHandler(pid, handler)
fh.Host.SetStreamHandler(pid, safe.Handle(handler))

// Save the handler so it can be invoked from the forwarding protocol's handler
// only set the handler if we are successful in registering the route
Expand Down
5 changes: 3 additions & 2 deletions protocolproxy/protocolproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/filecoin-project/boost/protocolproxy/messages"
"github.com/filecoin-project/boost/safe"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -50,10 +51,10 @@ func NewProtocolProxy(h host.Host, peerConfig map[peer.ID][]protocol.ID) (*Proto

func (pp *ProtocolProxy) Start(ctx context.Context) {
pp.ctx = ctx
pp.h.SetStreamHandler(ForwardingProtocolID, pp.handleForwarding)
pp.h.SetStreamHandler(ForwardingProtocolID, safe.Handle(pp.handleForwarding))
msg := ""
for id, pid := range pp.supportedProtocols {
pp.h.SetStreamHandler(id, pp.handleIncoming)
pp.h.SetStreamHandler(id, safe.Handle(pp.handleIncoming))
msg += " " + pid.String() + ": " + string(id) + "\n"
}
pp.h.Network().Notify(pp)
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/lp2pimpl/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/filecoin-project/boost-gfm/shared"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/filecoin-project/boost/safe"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -92,7 +93,7 @@ func NewTransportsListener(h host.Host, protos []types.Protocol) *TransportsList
}

func (p *TransportsListener) Start() {
p.host.SetStreamHandler(TransportsProtocolID, p.handleNewQueryStream)
p.host.SetStreamHandler(TransportsProtocolID, safe.Handle(p.handleNewQueryStream))
}

func (p *TransportsListener) Stop() {
Expand Down
6 changes: 4 additions & 2 deletions retrievalmarket/server/queryask.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/filecoin-project/boost-gfm/retrievalmarket"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/safe"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"time"
)

// The time limit to read a message from the client when the client opens a stream
Expand Down Expand Up @@ -46,7 +48,7 @@ func NewQueryAskHandler(host host.Host, maddr address.Address, pd *piecedirector
}

func (qa *QueryAskHandler) Start() {
qa.host.SetStreamHandler(retrievalmarket.QueryProtocolID, qa.HandleQueryStream)
qa.host.SetStreamHandler(retrievalmarket.QueryProtocolID, safe.Handle(qa.HandleQueryStream))
}

func (qa *QueryAskHandler) Stop() {
Expand Down
20 changes: 20 additions & 0 deletions safe/safe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package safe

import (
"runtime/debug"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/network"
)

var log = logging.Logger("safe")

func Handle(h network.StreamHandler) network.StreamHandler {
defer func() {
if r := recover(); r != nil {
log.Error("panic occurred", "stack", debug.Stack())
}
}()

return h
}
13 changes: 7 additions & 6 deletions storagemarket/lp2pimpl/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
gfm_network "github.com/filecoin-project/boost-gfm/storagemarket/network"
"github.com/filecoin-project/boost/api"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/safe"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
"github.com/filecoin-project/boost/storagemarket/types"
Expand Down Expand Up @@ -199,16 +200,16 @@ func (p *DealProvider) Start(ctx context.Context) {
// set to false, which maintains the previous behaviour:
// - SkipIPNIAnnounce=false: announce deal to IPNI
// - RemoveUnsealedCopy=false: keep unsealed copy of deal data
p.host.SetStreamHandler(DealProtocolv121ID, p.handleNewDealStream)
p.host.SetStreamHandler(DealProtocolv120ID, p.handleNewDealStream)
p.host.SetStreamHandler(DealProtocolv121ID, safe.Handle(p.handleNewDealStream))
p.host.SetStreamHandler(DealProtocolv120ID, safe.Handle(p.handleNewDealStream))

p.host.SetStreamHandler(DealStatusV12ProtocolID, p.handleNewDealStatusStream)
p.host.SetStreamHandler(DealStatusV12ProtocolID, safe.Handle(p.handleNewDealStatusStream))

// Handle legacy deal stream here and reject all legacy deals
if !p.enableLegacyDeals {
p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID101, p.handleLegacyDealStream)
p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID110, p.handleLegacyDealStream)
p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID111, p.handleLegacyDealStream)
p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID101, safe.Handle(p.handleLegacyDealStream))
p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID110, safe.Handle(p.handleLegacyDealStream))
p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID111, safe.Handle(p.handleLegacyDealStream))
}
}

Expand Down

0 comments on commit e4b7508

Please sign in to comment.