From 11fea99767f549bebd1355b4c9065115af25ea91 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 27 Apr 2023 16:40:13 +0200 Subject: [PATCH] fix(nodebuilder/header): Cancel subscription when done (#2134) Found while debugging swamp. We never cancel subscription so when node attempts to shut down, there are outstanding subs that need to be closed. --------- Co-authored-by: Hlib Kanunnikov --- nodebuilder/header/service.go | 2 ++ nodebuilder/tests/api_test.go | 57 +++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 nodebuilder/tests/api_test.go diff --git a/nodebuilder/header/service.go b/nodebuilder/header/service.go index a4c6149f49..7947b8975f 100644 --- a/nodebuilder/header/service.go +++ b/nodebuilder/header/service.go @@ -79,6 +79,8 @@ func (s *Service) Subscribe(ctx context.Context) (<-chan *header.ExtendedHeader, headerCh := make(chan *header.ExtendedHeader) go func() { defer close(headerCh) + defer subscription.Cancel() + for { h, err := subscription.NextHeader(ctx) if err != nil { diff --git a/nodebuilder/tests/api_test.go b/nodebuilder/tests/api_test.go new file mode 100644 index 0000000000..27a10a5592 --- /dev/null +++ b/nodebuilder/tests/api_test.go @@ -0,0 +1,57 @@ +package tests + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/nodebuilder" + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp" +) + +// TestHeaderSubscription ensures that the header subscription over RPC works +// as intended and gets canceled successfully after rpc context cancellation. +func TestHeaderSubscription(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) + t.Cleanup(cancel) + + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + + // start a bridge node + bridge := sw.NewBridgeNode() + err := bridge.Start(ctx) + require.NoError(t, err) + + cfg := nodebuilder.DefaultConfig(node.Light) + addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host)) + require.NoError(t, err) + cfg.Header.TrustedPeers = append(cfg.Header.TrustedPeers, addrs[0].String()) + + // start a light node that's connected to the bridge node + light := sw.NewNodeWithConfig(node.Light, cfg) + err = light.Start(ctx) + require.NoError(t, err) + + // subscribe to headers via the light node's RPC header subscription + subctx, subcancel := context.WithCancel(ctx) + sub, err := light.HeaderServ.Subscribe(subctx) + require.NoError(t, err) + // listen for 5 headers + for i := 0; i < 5; i++ { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case <-sub: + } + } + // cancel subscription via context + subcancel() + + // stop the light node and expect no outstanding subscription errors + err = light.Stop(ctx) + require.NoError(t, err) +}