diff --git a/beacon/blsync/engineclient.go b/beacon/blsync/engineclient.go index 97ef6f5cb88e..2af424e6f0d6 100644 --- a/beacon/blsync/engineclient.go +++ b/beacon/blsync/engineclient.go @@ -47,7 +47,10 @@ func startEngineClient(config *lightClientConfig, rpc *rpc.Client, headCh <-chan cancelRoot: cancel, } ec.wg.Add(1) - go ec.updateLoop(headCh) + go func() { + defer ec.wg.Done() + ec.updateLoop(headCh) + }() return ec } @@ -57,8 +60,6 @@ func (ec *engineClient) stop() { } func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) { - defer ec.wg.Done() - for { select { case <-ec.rootCtx.Done(): diff --git a/console/console.go b/console/console.go index 5acb4cdccb5b..22c8d478fb6a 100644 --- a/console/console.go +++ b/console/console.go @@ -120,7 +120,10 @@ func New(config Config) (*Console, error) { } console.wg.Add(1) - go console.interruptHandler() + go func() { + defer console.wg.Done() + console.interruptHandler() + }() return console, nil } @@ -363,8 +366,6 @@ func (c *Console) Evaluate(statement string) { // interruptHandler runs in its own goroutine and waits for signals. // When a signal is received, it interrupts the JS interpreter. func (c *Console) interruptHandler() { - defer c.wg.Done() - // During Interactive, liner inhibits the signal while it is prompting for // input. However, the signal will be received while evaluating JS. // diff --git a/core/bloombits/scheduler.go b/core/bloombits/scheduler.go index 6449c7465a17..688573b8ed2f 100644 --- a/core/bloombits/scheduler.go +++ b/core/bloombits/scheduler.go @@ -63,8 +63,14 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by // Start the pipeline schedulers to forward between user -> distributor -> user wg.Add(2) - go s.scheduleRequests(sections, dist, pend, quit, wg) - go s.scheduleDeliveries(pend, done, quit, wg) + go func() { + defer wg.Done() + s.scheduleRequests(sections, dist, pend, quit) + }() + go func() { + defer wg.Done() + s.scheduleDeliveries(pend, done, quit) + }() } // reset cleans up any leftovers from previous runs. This is required before a @@ -84,9 +90,8 @@ func (s *scheduler) reset() { // scheduleRequests reads section retrieval requests from the input channel, // deduplicates the stream and pushes unique retrieval tasks into the distribution // channel for a database or network layer to honour. -func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) { +func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}) { // Clean up the goroutine and pipeline when done - defer wg.Done() defer close(pend) // Keep reading and scheduling section requests @@ -131,9 +136,8 @@ func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend // scheduleDeliveries reads section acceptance notifications and waits for them // to be delivered, pushing them into the output data buffer. -func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { +func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}) { // Clean up the goroutine and pipeline when done - defer wg.Done() defer close(done) // Keep reading notifications and scheduling deliveries diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 2ce2b868faa9..91b1b379ed1c 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -197,16 +197,20 @@ func TestCopy(t *testing.T) { } // Finalise the changes on all concurrently - finalise := func(wg *sync.WaitGroup, db *StateDB) { - defer wg.Done() - db.Finalise(true) - } - var wg sync.WaitGroup wg.Add(3) - go finalise(&wg, orig) - go finalise(&wg, copy) - go finalise(&wg, ccopy) + go func() { + defer wg.Done() + orig.Finalise(true) + }() + go func() { + defer wg.Done() + copy.Finalise(true) + }() + go func() { + defer wg.Done() + ccopy.Finalise(true) + }() wg.Wait() // Verify that the three states have been updated independently diff --git a/eth/handler.go b/eth/handler.go index 143ac2a8a57b..723cf5ff11d2 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -198,7 +198,6 @@ func newHandler(config *handlerConfig) (*handler, error) { // protoTracker tracks the number of active protocol handlers. func (h *handler) protoTracker() { - defer h.wg.Done() var active int for { select { @@ -426,14 +425,20 @@ func (h *handler) Start(maxPeers int) { h.wg.Add(1) h.txsCh = make(chan core.NewTxsEvent, txChanSize) h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) - go h.txBroadcastLoop() + go func() { + defer h.wg.Done() + h.txBroadcastLoop() + }() // start sync handlers h.txFetcher.Start() // start peer handler tracker h.wg.Add(1) - go h.protoTracker() + go func() { + defer h.wg.Done() + h.protoTracker() + }() } func (h *handler) Stop() { @@ -535,7 +540,6 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { // txBroadcastLoop announces new transactions to connected peers. func (h *handler) txBroadcastLoop() { - defer h.wg.Done() for { select { case event := <-h.txsCh: diff --git a/p2p/dial.go b/p2p/dial.go index 08e1db28771e..42ccfa562621 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -178,8 +178,14 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF d.lastStatsLog = d.clock.Now() d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(2) - go d.readNodes(it) - go d.loop(it) + go func() { + defer d.wg.Done() + d.readNodes(it) + }() + go func() { + defer d.wg.Done() + d.loop(it) + }() return d } @@ -311,14 +317,11 @@ loop: for range d.dialing { <-d.doneCh } - d.wg.Done() } // readNodes runs in its own goroutine and delivers nodes from // the input iterator to the nodesIn channel. func (d *dialScheduler) readNodes(it enode.Iterator) { - defer d.wg.Done() - for it.Next() { select { case d.nodesIn <- it.Node(): diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 7a0a0f1c7779..5bc2a129587e 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -150,8 +150,14 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { go tab.loop() t.wg.Add(2) - go t.loop() - go t.readLoop(cfg.Unhandled) + go func() { + defer t.wg.Done() + t.loop() + }() + go func() { + defer t.wg.Done() + t.readLoop(cfg.Unhandled) + }() return t, nil } @@ -405,8 +411,6 @@ func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) boo // loop runs in its own goroutine. it keeps track of // the refresh timer and the pending reply queue. func (t *UDPv4) loop() { - defer t.wg.Done() - var ( plist = list.New() timeout = time.NewTimer(0) @@ -512,7 +516,6 @@ func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet [] // readLoop runs in its own goroutine. it handles incoming UDP packets. func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { - defer t.wg.Done() if unhandled != nil { defer close(unhandled) } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 20a8bccd058e..c8e5f8a23593 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -138,8 +138,14 @@ func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { } go t.tab.loop() t.wg.Add(2) - go t.readLoop() - go t.dispatch() + go func() { + defer t.wg.Done() + t.readLoop() + }() + go func() { + defer t.wg.Done() + t.dispatch() + }() return t, nil } @@ -513,8 +519,6 @@ func (t *UDPv5) callDone(c *callV5) { // When that happens the call is simply re-sent to complete the handshake. We allow one // handshake attempt per call. func (t *UDPv5) dispatch() { - defer t.wg.Done() - // Arm first read. t.readNextCh <- struct{}{} @@ -651,8 +655,6 @@ func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c // readLoop runs in its own goroutine and reads packets from the network. func (t *UDPv5) readLoop() { - defer t.wg.Done() - buf := make([]byte, maxPacketSize) for range t.readNextCh { nbytes, from, err := t.conn.ReadFromUDP(buf) diff --git a/p2p/enode/iter.go b/p2p/enode/iter.go index b8ab4a758aee..d6df2f98119e 100644 --- a/p2p/enode/iter.go +++ b/p2p/enode/iter.go @@ -174,10 +174,13 @@ func (m *FairMix) AddSource(it Iterator) { if m.closed == nil { return } - m.wg.Add(1) source := &mixSource{it, make(chan *Node), m.timeout} m.sources = append(m.sources, source) - go m.runSource(m.closed, source) + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.runSource(m.closed, source) + }() } // Close shuts down the mixer and all current sources. @@ -281,7 +284,6 @@ func (m *FairMix) deleteSource(s *mixSource) { // runSource reads a single source in a loop. func (m *FairMix) runSource(closed chan struct{}, s *mixSource) { - defer m.wg.Done() defer close(s.next) for s.it.Next() { n := s.it.Node() diff --git a/p2p/peer.go b/p2p/peer.go index e4482deae9f7..b56d3e70c53b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -252,8 +252,14 @@ func (p *Peer) run() (remoteRequested bool, err error) { reason DiscReason // sent to the peer ) p.wg.Add(2) - go p.readLoop(readErr) - go p.pingLoop() + go func() { + defer p.wg.Done() + p.readLoop(readErr) + }() + go func() { + defer p.wg.Done() + p.pingLoop() + }() // Start all protocol handlers. writeStart <- struct{}{} @@ -295,8 +301,6 @@ loop: } func (p *Peer) pingLoop() { - defer p.wg.Done() - ping := time.NewTimer(pingInterval) defer ping.Stop() @@ -319,7 +323,6 @@ func (p *Peer) pingLoop() { } func (p *Peer) readLoop(errc chan<- error) { - defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { diff --git a/p2p/server.go b/p2p/server.go index 5b9a4aa71fdc..15a702e15d7c 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -656,7 +656,12 @@ func (srv *Server) setupListening() error { } srv.loopWG.Add(1) - go srv.listenLoop() + go func() { + // Wait for slots to be returned on exit. This ensures all connection goroutines + // are down before listenLoop returns. + defer srv.loopWG.Done() + srv.listenLoop() + }() return nil } @@ -859,9 +864,6 @@ func (srv *Server) listenLoop() { slots <- struct{}{} } - // Wait for slots to be returned on exit. This ensures all connection goroutines - // are down before listenLoop returns. - defer srv.loopWG.Done() defer func() { for i := 0; i < cap(slots); i++ { <-slots diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 6307b90bf81c..39754988b0b1 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -334,15 +334,20 @@ func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error { } var wg sync.WaitGroup wg.Add(2) - go wsCopy(&wg, conn, clientConn) - go wsCopy(&wg, clientConn, conn) + go func() { + defer wg.Done() + wsCopy(conn, clientConn) + }() + go func() { + defer wg.Done() + wsCopy(clientConn, conn) + }() wg.Wait() conn.Close() return nil } -func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) { - defer wg.Done() +func wsCopy(src, dst *websocket.Conn) { for { msgType, r, err := src.NextReader() if err != nil { diff --git a/rpc/client_test.go b/rpc/client_test.go index 01c326afb017..dd45dcaa74ee 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -352,7 +352,6 @@ func testClientCancel(transport string, t *testing.T) { ncallers = 10 ) caller := func(index int) { - defer wg.Done() for i := 0; i < nreqs; i++ { var ( ctx context.Context @@ -386,7 +385,10 @@ func testClientCancel(transport string, t *testing.T) { } wg.Add(ncallers) for i := 0; i < ncallers; i++ { - go caller(i) + go func(idx int) { + defer wg.Done() + caller(idx) + }(i) } wg.Wait() } diff --git a/rpc/websocket.go b/rpc/websocket.go index 9f67caf859f1..0c905ed17107 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -319,7 +319,10 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readL return nil }) wc.wg.Add(1) - go wc.pingLoop() + go func() { + defer wc.wg.Done() + wc.pingLoop() + }() return wc } @@ -347,7 +350,6 @@ func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError // pingLoop sends periodic ping frames when the connection is idle. func (wc *websocketCodec) pingLoop() { var pingTimer = time.NewTimer(wsPingInterval) - defer wc.wg.Done() defer pingTimer.Stop() for {