Skip to content

Commit

Permalink
pool: Consolidate waitgroup logic.
Browse files Browse the repository at this point in the history
This switches client, hub, endpoint over to use the new pattern that
consolidates the waitgroup logic in a single location.

This pattern is easier to reason about and less error prone since it's
trivial to see at a glance that the calls to Done are happening as
intended versus having to chase them down all over the code.
  • Loading branch information
davecgh committed Sep 16, 2023
1 parent 97c3a47 commit 69839c5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 32 deletions.
32 changes: 20 additions & 12 deletions pool/client.go
Expand Up @@ -147,7 +147,6 @@ type Client struct {

hashRate *big.Rat
hashRateMtx sync.RWMutex
wg sync.WaitGroup
}

// NewClient creates client connection instance.
Expand Down Expand Up @@ -673,10 +672,9 @@ func (c *Client) rollWork() {
for {
select {
case <-c.ctx.Done():
c.wg.Done()
return
case <-ticker.C:

case <-ticker.C:
// Send a timetamp-rolled work to the client if it fails to
// generate a work submission in twice the time it is estimated
// to according to its pool target.
Expand Down Expand Up @@ -824,7 +822,6 @@ func (c *Client) process() {
c.mtx.RUnlock()
log.Errorf("%s: unable to close connection: %v", id, err)
}
c.wg.Done()
return

case payload := <-c.readCh:
Expand Down Expand Up @@ -1126,7 +1123,6 @@ func (c *Client) hashMonitor() {
for {
select {
case <-c.ctx.Done():
c.wg.Done()
return

case <-ticker.C:
Expand Down Expand Up @@ -1210,7 +1206,6 @@ func (c *Client) send() {
for {
select {
case <-c.ctx.Done():
c.wg.Done()
return

case msg := <-c.ch:
Expand Down Expand Up @@ -1290,12 +1285,25 @@ func (c *Client) send() {
func (c *Client) run() {
go c.read()

c.wg.Add(4)
go c.process()
go c.send()
go c.hashMonitor()
go c.rollWork()
c.wg.Wait()
var wg sync.WaitGroup
wg.Add(4)
go func() {
c.process()
wg.Done()
}()
go func() {
c.send()
wg.Done()
}()
go func() {
c.hashMonitor()
wg.Done()
}()
go func() {
c.rollWork()
wg.Done()
}()
wg.Wait()

c.shutdown()
c.cfg.Disconnect()
Expand Down
33 changes: 21 additions & 12 deletions pool/endpoint.go
Expand Up @@ -77,7 +77,6 @@ type Endpoint struct {
cfg *EndpointConfig
clients map[string]*Client
clientsMtx sync.Mutex
wg sync.WaitGroup
}

// NewEndpoint creates an new miner endpoint.
Expand Down Expand Up @@ -108,8 +107,6 @@ func (e *Endpoint) removeClient(c *Client) {
// listen accepts incoming client connections on the endpoint.
// It must be run as a goroutine.
func (e *Endpoint) listen(ctx context.Context) {
defer e.wg.Done()

log.Infof("listening on %s", e.listenAddr)
for {
conn, err := e.listener.Accept()
Expand Down Expand Up @@ -137,7 +134,10 @@ func (e *Endpoint) listen(ctx context.Context) {
// connect creates new pool clients from established connections.
// It must be run as a goroutine.
func (e *Endpoint) connect(ctx context.Context) {
defer e.wg.Done()
// Separate waitgroup for all client connections to ensure all clients are
// disconnected prior to terminating the goroutine.
var clientWg sync.WaitGroup
defer clientWg.Wait()

for {
select {
Expand Down Expand Up @@ -168,7 +168,7 @@ func (e *Endpoint) connect(ctx context.Context) {
Blake256Pad: e.cfg.Blake256Pad,
NonceIterations: e.cfg.NonceIterations,
FetchMinerDifficulty: e.cfg.FetchMinerDifficulty,
Disconnect: func() { e.wg.Done() },
Disconnect: func() { clientWg.Done() },
RemoveClient: e.removeClient,
SubmitWork: e.cfg.SubmitWork,
FetchCurrentWork: e.cfg.FetchCurrentWork,
Expand All @@ -192,7 +192,7 @@ func (e *Endpoint) connect(ctx context.Context) {
e.clients[client.extraNonce1] = client
e.clientsMtx.Unlock()
e.cfg.AddConnection(host)
e.wg.Add(1)
clientWg.Add(1)
go client.run()

log.Infof("Mining client connected. extranonce1=%s, addr=%s",
Expand All @@ -212,7 +212,6 @@ func (e *Endpoint) disconnect(ctx context.Context) {
client.cancel()
}
e.clientsMtx.Unlock()
e.wg.Done()
}

// generateHashIDs generates hash ids of all client connections to the pool.
Expand All @@ -232,9 +231,19 @@ func (e *Endpoint) generateHashIDs() map[string]struct{} {
// run handles the lifecycle of all endpoint related processes.
// This should be run as a goroutine.
func (e *Endpoint) run(ctx context.Context) {
e.wg.Add(3)
go e.listen(ctx)
go e.connect(ctx)
go e.disconnect(ctx)
e.wg.Wait()
var wg sync.WaitGroup
wg.Add(3)
go func() {
e.listen(ctx)
wg.Done()
}()
go func() {
e.connect(ctx)
wg.Done()
}()
go func() {
e.disconnect(ctx)
wg.Done()
}()
wg.Wait()
}
12 changes: 6 additions & 6 deletions pool/hub.go
Expand Up @@ -194,7 +194,6 @@ type Hub struct {
connectionsMtx sync.RWMutex
endpoint *Endpoint
blake256Pad []byte
wg sync.WaitGroup
cacheCh chan CacheUpdateEvent
}

Expand Down Expand Up @@ -623,22 +622,23 @@ func (h *Hub) shutdown() {

// Run handles the process lifecycles of the pool hub.
func (h *Hub) Run(ctx context.Context) {
h.wg.Add(3)
var wg sync.WaitGroup
wg.Add(3)
go func() {
h.endpoint.run(ctx)
h.wg.Done()
wg.Done()
}()
go func() {
h.chainState.handleChainUpdates(ctx)
h.wg.Done()
wg.Done()
}()
go func() {
h.paymentMgr.handlePayments(ctx)
h.wg.Done()
wg.Done()
}()

// Wait until all hub processes have terminated, and then shutdown.
h.wg.Wait()
wg.Wait()
h.shutdown()
}

Expand Down
10 changes: 8 additions & 2 deletions pool/hub_test.go
Expand Up @@ -13,6 +13,7 @@ import (
"math/big"
"net"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -397,7 +398,13 @@ func testHub(t *testing.T) {
" than account x's work quota")
}

go hub.Run(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
hub.Run(ctx)
wg.Done()
}()
defer wg.Wait()

// Create the mined work to be confirmed.
work := NewAcceptedWork(
Expand Down Expand Up @@ -534,5 +541,4 @@ func testHub(t *testing.T) {
}

cancel()
hub.wg.Wait()
}

0 comments on commit 69839c5

Please sign in to comment.