Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beacon,console,core,eth,p2p,rpc: keep wg.Add and wg.Done closer #29818

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions beacon/blsync/engineclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ func startEngineClient(config *lightClientConfig, rpc *rpc.Client, headCh <-chan
cancelRoot: cancel,
}
ec.wg.Add(1)
go ec.updateLoop(headCh)
go func() {
defer ec.wg.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the difference between two approaches. They should be equivalent, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjl493456442 Yes sir, in most cases they are equivalent.

But keeping wg.Done close to wg.Add is clearer for us to ensure we call wg.Done correctly.

And if we just wanna run ec.updateLoop in the main process for a test, wrapping wg.Done in ec.updateLoop will force us to call wg.Add even though I don't wanna run in a goroutine.

IMO letting the caller decide the way to call ec.updateLoop is more flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too, I think if we follow the previous behavior, we must run this function in a goroutine

ec.updateLoop(headCh)
}()
return ec
}

Expand All @@ -57,8 +60,6 @@ func (ec *engineClient) stop() {
}

func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
defer ec.wg.Done()

for {
select {
case <-ec.rootCtx.Done():
Expand Down
7 changes: 4 additions & 3 deletions console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ func New(config Config) (*Console, error) {
}

console.wg.Add(1)
go console.interruptHandler()
go func() {
defer console.wg.Done()
console.interruptHandler()
}()

return console, nil
}
Expand Down Expand Up @@ -363,8 +366,6 @@ func (c *Console) Evaluate(statement string) {
// interruptHandler runs in its own goroutine and waits for signals.
// When a signal is received, it interrupts the JS interpreter.
func (c *Console) interruptHandler() {
defer c.wg.Done()

// During Interactive, liner inhibits the signal while it is prompting for
// input. However, the signal will be received while evaluating JS.
//
Expand Down
16 changes: 10 additions & 6 deletions core/bloombits/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,14 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by

// Start the pipeline schedulers to forward between user -> distributor -> user
wg.Add(2)
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
go func() {
defer wg.Done()
s.scheduleRequests(sections, dist, pend, quit)
}()
go func() {
defer wg.Done()
s.scheduleDeliveries(pend, done, quit)
}()
}

// reset cleans up any leftovers from previous runs. This is required before a
Expand All @@ -84,9 +90,8 @@ func (s *scheduler) reset() {
// scheduleRequests reads section retrieval requests from the input channel,
// deduplicates the stream and pushes unique retrieval tasks into the distribution
// channel for a database or network layer to honour.
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(pend)

// Keep reading and scheduling section requests
Expand Down Expand Up @@ -131,9 +136,8 @@ func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend

// scheduleDeliveries reads section acceptance notifications and waits for them
// to be delivered, pushing them into the output data buffer.
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(done)

// Keep reading notifications and scheduling deliveries
Expand Down
20 changes: 12 additions & 8 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,20 @@ func TestCopy(t *testing.T) {
}

// Finalise the changes on all concurrently
finalise := func(wg *sync.WaitGroup, db *StateDB) {
defer wg.Done()
db.Finalise(true)
}

var wg sync.WaitGroup
wg.Add(3)
go finalise(&wg, orig)
go finalise(&wg, copy)
go finalise(&wg, ccopy)
go func() {
defer wg.Done()
orig.Finalise(true)
}()
go func() {
defer wg.Done()
copy.Finalise(true)
}()
go func() {
defer wg.Done()
ccopy.Finalise(true)
}()
wg.Wait()

// Verify that the three states have been updated independently
Expand Down
12 changes: 8 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func newHandler(config *handlerConfig) (*handler, error) {

// protoTracker tracks the number of active protocol handlers.
func (h *handler) protoTracker() {
defer h.wg.Done()
var active int
for {
select {
Expand Down Expand Up @@ -426,14 +425,20 @@ func (h *handler) Start(maxPeers int) {
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()
go func() {
defer h.wg.Done()
h.txBroadcastLoop()
}()

// start sync handlers
h.txFetcher.Start()

// start peer handler tracker
h.wg.Add(1)
go h.protoTracker()
go func() {
defer h.wg.Done()
h.protoTracker()
}()
}

func (h *handler) Stop() {
Expand Down Expand Up @@ -535,7 +540,6 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {

// txBroadcastLoop announces new transactions to connected peers.
func (h *handler) txBroadcastLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.txsCh:
Expand Down
13 changes: 8 additions & 5 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,14 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF
d.lastStatsLog = d.clock.Now()
d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
go d.loop(it)
go func() {
defer d.wg.Done()
d.readNodes(it)
}()
go func() {
defer d.wg.Done()
d.loop(it)
}()
return d
}

Expand Down Expand Up @@ -311,14 +317,11 @@ loop:
for range d.dialing {
<-d.doneCh
}
d.wg.Done()
}

// readNodes runs in its own goroutine and delivers nodes from
// the input iterator to the nodesIn channel.
func (d *dialScheduler) readNodes(it enode.Iterator) {
defer d.wg.Done()

for it.Next() {
select {
case d.nodesIn <- it.Node():
Expand Down
13 changes: 8 additions & 5 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,14 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
go tab.loop()

t.wg.Add(2)
go t.loop()
go t.readLoop(cfg.Unhandled)
go func() {
defer t.wg.Done()
t.loop()
}()
go func() {
defer t.wg.Done()
t.readLoop(cfg.Unhandled)
}()
return t, nil
}

Expand Down Expand Up @@ -405,8 +411,6 @@ func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) boo
// loop runs in its own goroutine. it keeps track of
// the refresh timer and the pending reply queue.
func (t *UDPv4) loop() {
defer t.wg.Done()

var (
plist = list.New()
timeout = time.NewTimer(0)
Expand Down Expand Up @@ -512,7 +516,6 @@ func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet []

// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) {
defer t.wg.Done()
if unhandled != nil {
defer close(unhandled)
}
Expand Down
14 changes: 8 additions & 6 deletions p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
}
go t.tab.loop()
t.wg.Add(2)
go t.readLoop()
go t.dispatch()
go func() {
defer t.wg.Done()
t.readLoop()
}()
go func() {
defer t.wg.Done()
t.dispatch()
}()
return t, nil
}

Expand Down Expand Up @@ -513,8 +519,6 @@ func (t *UDPv5) callDone(c *callV5) {
// When that happens the call is simply re-sent to complete the handshake. We allow one
// handshake attempt per call.
func (t *UDPv5) dispatch() {
defer t.wg.Done()

// Arm first read.
t.readNextCh <- struct{}{}

Expand Down Expand Up @@ -651,8 +655,6 @@ func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c

// readLoop runs in its own goroutine and reads packets from the network.
func (t *UDPv5) readLoop() {
defer t.wg.Done()

buf := make([]byte, maxPacketSize)
for range t.readNextCh {
nbytes, from, err := t.conn.ReadFromUDP(buf)
Expand Down
8 changes: 5 additions & 3 deletions p2p/enode/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,13 @@ func (m *FairMix) AddSource(it Iterator) {
if m.closed == nil {
return
}
m.wg.Add(1)
source := &mixSource{it, make(chan *Node), m.timeout}
m.sources = append(m.sources, source)
go m.runSource(m.closed, source)
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.runSource(m.closed, source)
}()
}

// Close shuts down the mixer and all current sources.
Expand Down Expand Up @@ -281,7 +284,6 @@ func (m *FairMix) deleteSource(s *mixSource) {

// runSource reads a single source in a loop.
func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
defer m.wg.Done()
defer close(s.next)
for s.it.Next() {
n := s.it.Node()
Expand Down
13 changes: 8 additions & 5 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,14 @@ func (p *Peer) run() (remoteRequested bool, err error) {
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
go func() {
defer p.wg.Done()
p.readLoop(readErr)
}()
go func() {
defer p.wg.Done()
p.pingLoop()
}()

// Start all protocol handlers.
writeStart <- struct{}{}
Expand Down Expand Up @@ -295,8 +301,6 @@ loop:
}

func (p *Peer) pingLoop() {
defer p.wg.Done()

ping := time.NewTimer(pingInterval)
defer ping.Stop()

Expand All @@ -319,7 +323,6 @@ func (p *Peer) pingLoop() {
}

func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,12 @@ func (srv *Server) setupListening() error {
}

srv.loopWG.Add(1)
go srv.listenLoop()
go func() {
// Wait for slots to be returned on exit. This ensures all connection goroutines
// are down before listenLoop returns.
defer srv.loopWG.Done()
srv.listenLoop()
}()
return nil
}

Expand Down Expand Up @@ -859,9 +864,6 @@ func (srv *Server) listenLoop() {
slots <- struct{}{}
}

// Wait for slots to be returned on exit. This ensures all connection goroutines
// are down before listenLoop returns.
defer srv.loopWG.Done()
defer func() {
for i := 0; i < cap(slots); i++ {
<-slots
Expand Down
13 changes: 9 additions & 4 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,20 @@ func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error {
}
var wg sync.WaitGroup
wg.Add(2)
go wsCopy(&wg, conn, clientConn)
go wsCopy(&wg, clientConn, conn)
go func() {
defer wg.Done()
wsCopy(conn, clientConn)
}()
go func() {
defer wg.Done()
wsCopy(clientConn, conn)
}()
wg.Wait()
conn.Close()
return nil
}

func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) {
defer wg.Done()
func wsCopy(src, dst *websocket.Conn) {
for {
msgType, r, err := src.NextReader()
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ func testClientCancel(transport string, t *testing.T) {
ncallers = 10
)
caller := func(index int) {
defer wg.Done()
for i := 0; i < nreqs; i++ {
var (
ctx context.Context
Expand Down Expand Up @@ -386,7 +385,10 @@ func testClientCancel(transport string, t *testing.T) {
}
wg.Add(ncallers)
for i := 0; i < ncallers; i++ {
go caller(i)
go func(idx int) {
defer wg.Done()
caller(idx)
}(i)
}
wg.Wait()
}
Expand Down
6 changes: 4 additions & 2 deletions rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readL
return nil
})
wc.wg.Add(1)
go wc.pingLoop()
go func() {
defer wc.wg.Done()
wc.pingLoop()
}()
return wc
}

Expand Down Expand Up @@ -347,7 +350,6 @@ func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError
// pingLoop sends periodic ping frames when the connection is idle.
func (wc *websocketCodec) pingLoop() {
var pingTimer = time.NewTimer(wsPingInterval)
defer wc.wg.Done()
defer pingTimer.Stop()

for {
Expand Down