diff --git a/protocolproxy/forwardinghost.go b/protocolproxy/forwardinghost.go index a456860e0..662a3f550 100644 --- a/protocolproxy/forwardinghost.go +++ b/protocolproxy/forwardinghost.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/filecoin-project/boost/protocolproxy/messages" + "github.com/filecoin-project/boost/safe" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -30,7 +31,7 @@ func NewForwardingHost(h host.Host, proxy peer.AddrInfo) host.Host { proxy: proxy.ID, handlers: make(map[protocol.ID]network.StreamHandler), } - fh.Host.SetStreamHandler(ForwardingProtocolID, fh.handleForwarding) + fh.Host.SetStreamHandler(ForwardingProtocolID, safe.Handle(fh.handleForwarding)) return fh } @@ -45,7 +46,7 @@ func (fh *ForwardingHost) Close() error { // protocol will go through the forwarding handshake with the proxy, then the native // handler will be called func (fh *ForwardingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { - fh.Host.SetStreamHandler(pid, handler) + fh.Host.SetStreamHandler(pid, safe.Handle(handler)) // Save the handler so it can be invoked from the forwarding protocol's handler // only set the handler if we are successful in registering the route diff --git a/protocolproxy/protocolproxy.go b/protocolproxy/protocolproxy.go index 7d419276a..4476af210 100644 --- a/protocolproxy/protocolproxy.go +++ b/protocolproxy/protocolproxy.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/filecoin-project/boost/protocolproxy/messages" + "github.com/filecoin-project/boost/safe" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -50,10 +51,10 @@ func NewProtocolProxy(h host.Host, peerConfig map[peer.ID][]protocol.ID) (*Proto func (pp *ProtocolProxy) Start(ctx context.Context) { pp.ctx = ctx - pp.h.SetStreamHandler(ForwardingProtocolID, pp.handleForwarding) + pp.h.SetStreamHandler(ForwardingProtocolID, safe.Handle(pp.handleForwarding)) msg := "" for id, pid := range pp.supportedProtocols { - pp.h.SetStreamHandler(id, pp.handleIncoming) + pp.h.SetStreamHandler(id, safe.Handle(pp.handleIncoming)) msg += " " + pid.String() + ": " + string(id) + "\n" } pp.h.Network().Notify(pp) diff --git a/retrievalmarket/lp2pimpl/transports.go b/retrievalmarket/lp2pimpl/transports.go index 623e32e9d..6fc0828b1 100644 --- a/retrievalmarket/lp2pimpl/transports.go +++ b/retrievalmarket/lp2pimpl/transports.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/boost-gfm/shared" "github.com/filecoin-project/boost/retrievalmarket/types" + "github.com/filecoin-project/boost/safe" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime/codec/dagcbor" "github.com/libp2p/go-libp2p/core/host" @@ -92,7 +93,7 @@ func NewTransportsListener(h host.Host, protos []types.Protocol) *TransportsList } func (p *TransportsListener) Start() { - p.host.SetStreamHandler(TransportsProtocolID, p.handleNewQueryStream) + p.host.SetStreamHandler(TransportsProtocolID, safe.Handle(p.handleNewQueryStream)) } func (p *TransportsListener) Stop() { diff --git a/retrievalmarket/server/queryask.go b/retrievalmarket/server/queryask.go index 728d16a99..8fa79c94a 100644 --- a/retrievalmarket/server/queryask.go +++ b/retrievalmarket/server/queryask.go @@ -4,8 +4,11 @@ import ( "context" "errors" "fmt" + "time" + "github.com/filecoin-project/boost-gfm/retrievalmarket" "github.com/filecoin-project/boost/piecedirectory" + "github.com/filecoin-project/boost/safe" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/big" @@ -13,7 +16,6 @@ import ( "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" - "time" ) // The time limit to read a message from the client when the client opens a stream @@ -46,7 +48,7 @@ func NewQueryAskHandler(host host.Host, maddr address.Address, pd *piecedirector } func (qa *QueryAskHandler) Start() { - qa.host.SetStreamHandler(retrievalmarket.QueryProtocolID, qa.HandleQueryStream) + qa.host.SetStreamHandler(retrievalmarket.QueryProtocolID, safe.Handle(qa.HandleQueryStream)) } func (qa *QueryAskHandler) Stop() { diff --git a/safe/safe.go b/safe/safe.go new file mode 100644 index 000000000..b78ceeaa1 --- /dev/null +++ b/safe/safe.go @@ -0,0 +1,20 @@ +package safe + +import ( + "runtime/debug" + + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/network" +) + +var log = logging.Logger("safe") + +func Handle(h network.StreamHandler) network.StreamHandler { + defer func() { + if r := recover(); r != nil { + log.Error("panic occurred", "stack", debug.Stack()) + } + }() + + return h +} diff --git a/storagemarket/lp2pimpl/net.go b/storagemarket/lp2pimpl/net.go index b764498b6..c00532d57 100644 --- a/storagemarket/lp2pimpl/net.go +++ b/storagemarket/lp2pimpl/net.go @@ -12,6 +12,7 @@ import ( gfm_network "github.com/filecoin-project/boost-gfm/storagemarket/network" "github.com/filecoin-project/boost/api" "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/safe" "github.com/filecoin-project/boost/storagemarket" "github.com/filecoin-project/boost/storagemarket/sealingpipeline" "github.com/filecoin-project/boost/storagemarket/types" @@ -199,16 +200,16 @@ func (p *DealProvider) Start(ctx context.Context) { // set to false, which maintains the previous behaviour: // - SkipIPNIAnnounce=false: announce deal to IPNI // - RemoveUnsealedCopy=false: keep unsealed copy of deal data - p.host.SetStreamHandler(DealProtocolv121ID, p.handleNewDealStream) - p.host.SetStreamHandler(DealProtocolv120ID, p.handleNewDealStream) + p.host.SetStreamHandler(DealProtocolv121ID, safe.Handle(p.handleNewDealStream)) + p.host.SetStreamHandler(DealProtocolv120ID, safe.Handle(p.handleNewDealStream)) - p.host.SetStreamHandler(DealStatusV12ProtocolID, p.handleNewDealStatusStream) + p.host.SetStreamHandler(DealStatusV12ProtocolID, safe.Handle(p.handleNewDealStatusStream)) // Handle legacy deal stream here and reject all legacy deals if !p.enableLegacyDeals { - p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID101, p.handleLegacyDealStream) - p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID110, p.handleLegacyDealStream) - p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID111, p.handleLegacyDealStream) + p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID101, safe.Handle(p.handleLegacyDealStream)) + p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID110, safe.Handle(p.handleLegacyDealStream)) + p.host.SetStreamHandler(gfm_storagemarket.DealProtocolID111, safe.Handle(p.handleLegacyDealStream)) } }