Skip to content

Commit

Permalink
multi: Use atomic types in unexported modules.
Browse files Browse the repository at this point in the history
  • Loading branch information
jholdstock authored and davecgh committed Feb 13, 2023
1 parent 152aa8b commit 1c326a9
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 54 deletions.
2 changes: 1 addition & 1 deletion go.mod
@@ -1,6 +1,6 @@
module github.com/decred/dcrd

go 1.17
go 1.19

require (
github.com/davecgh/go-spew v1.1.1
Expand Down
10 changes: 5 additions & 5 deletions internal/blockchain/indexers/indexsubscriber.go
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2022 The Decred developers
// Copyright (c) 2021-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *IndexSubscription) stop() error {

// IndexSubscriber subscribes clients for index updates.
type IndexSubscriber struct {
subscribers uint32 // update atomically.
subscribers atomic.Uint32

c chan IndexNtfn
subscriptions map[string]*IndexSubscription
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *IndexSubscriber) Subscribe(index Indexer, prerequisite string) (*IndexS
}

prereq.dependent = sub
atomic.AddUint32(&s.subscribers, 1)
s.subscribers.Add(1)

return sub, nil
}
Expand All @@ -183,14 +183,14 @@ func (s *IndexSubscriber) Subscribe(index Indexer, prerequisite string) (*IndexS
s.subscriptions[sub.id] = sub
s.mtx.Unlock()

atomic.AddUint32(&s.subscribers, 1)
s.subscribers.Add(1)

return sub, nil
}

// Notify relays an index notification to subscribed indexes for processing.
func (s *IndexSubscriber) Notify(ntfn *IndexNtfn) {
subscribers := atomic.LoadUint32(&s.subscribers)
subscribers := s.subscribers.Load()

// Only relay notifications when there are subscribed indexes
// to be notified.
Expand Down
9 changes: 4 additions & 5 deletions internal/mempool/mempool.go
Expand Up @@ -239,8 +239,7 @@ type orphanTx struct {
// and relayed to other peers. It is safe for concurrent access from multiple
// peers.
type TxPool struct {
// The following variables must only be used atomically.
lastUpdated int64 // last time pool was updated.
lastUpdated atomic.Int64 // last time pool was updated.

mtx sync.RWMutex
cfg Config
Expand Down Expand Up @@ -821,7 +820,7 @@ func (mp *TxPool) removeTransaction(tx *dcrutil.Tx, removeRedeemers bool) {

delete(mp.pool, *txHash)

atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
mp.lastUpdated.Store(time.Now().Unix())

// Inform associated fee estimator that the transaction has been removed
// from the mempool
Expand Down Expand Up @@ -909,7 +908,7 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, txDesc *TxD
for _, txIn := range msgTx.TxIn {
mp.outpoints[txIn.PreviousOutPoint] = txDesc
}
atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
mp.lastUpdated.Store(time.Now().Unix())

// Add unconfirmed exists address index entries associated with the
// transaction if enabled.
Expand Down Expand Up @@ -2283,7 +2282,7 @@ func (mp *TxPool) miningDescs() []*mining.TxDesc {
//
// This function is safe for concurrent access.
func (mp *TxPool) LastUpdated() time.Time {
return time.Unix(atomic.LoadInt64(&mp.lastUpdated), 0)
return time.Unix(mp.lastUpdated.Load(), 0)
}

// MiningView returns a slice of mining descriptors for all the transactions
Expand Down
27 changes: 14 additions & 13 deletions internal/mining/cpuminer/cpuminer.go
@@ -1,5 +1,5 @@
// Copyright (c) 2014-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -55,8 +55,8 @@ var (
// speedStats houses tracking information used to monitor the hashing speed of
// the CPU miner.
type speedStats struct {
totalHashes uint64 // atomic
elapsedMicros uint64 // atomic
totalHashes atomic.Uint64
elapsedMicros atomic.Uint64
}

// Config is a descriptor containing the CPU miner configuration.
Expand Down Expand Up @@ -112,7 +112,7 @@ type Config struct {
// workers which means it will be idle. The number of worker goroutines for the
// normal mining mode can be set via the SetNumWorkers method.
type CPUMiner struct {
numWorkers uint32 // update atomically
numWorkers atomic.Uint32

sync.Mutex
g *mining.BgBlkTmplGenerator
Expand Down Expand Up @@ -165,8 +165,8 @@ out:
hashesPerSec = 0
m.Lock()
for _, stats := range m.speedStats {
totalHashes := atomic.SwapUint64(&stats.totalHashes, 0)
elapsedMicros := atomic.SwapUint64(&stats.elapsedMicros, 0)
totalHashes := stats.totalHashes.Swap(0)
elapsedMicros := stats.elapsedMicros.Swap(0)
elapsedSecs := (elapsedMicros / 1000000)
if totalHashes == 0 || elapsedSecs == 0 {
continue
Expand Down Expand Up @@ -285,9 +285,9 @@ func (m *CPUMiner) solveBlock(ctx context.Context, header *wire.BlockHeader, sta
hashesCompleted := uint64(0)
start := time.Now()
updateSpeedStats := func() {
atomic.AddUint64(&stats.totalHashes, hashesCompleted)
stats.totalHashes.Add(hashesCompleted)
elapsedMicros := time.Since(start).Microseconds()
atomic.AddUint64(&stats.elapsedMicros, uint64(elapsedMicros))
stats.elapsedMicros.Add(uint64(elapsedMicros))

hashesCompleted = 0
start = time.Now()
Expand Down Expand Up @@ -552,7 +552,7 @@ out:
// Update the number of running workers.
case <-m.updateNumWorkers:
numRunning := uint32(len(runningWorkers))
numWorkers := atomic.LoadUint32(&m.numWorkers)
numWorkers := m.numWorkers.Load()

// No change.
if numWorkers == numRunning {
Expand Down Expand Up @@ -678,7 +678,7 @@ func (m *CPUMiner) SetNumWorkers(numWorkers int32) {
} else if targetNumWorkers > MaxNumWorkers {
targetNumWorkers = MaxNumWorkers
}
atomic.StoreUint32(&m.numWorkers, targetNumWorkers)
m.numWorkers.Store(targetNumWorkers)

// Set the normal mining state accordingly.
if targetNumWorkers != 0 {
Expand All @@ -699,7 +699,7 @@ func (m *CPUMiner) SetNumWorkers(numWorkers int32) {
//
// This function is safe for concurrent access.
func (m *CPUMiner) NumWorkers() int32 {
return int32(atomic.LoadUint32(&m.numWorkers))
return int32(m.numWorkers.Load())
}

// GenerateNBlocks generates the requested number of blocks in the discrete
Expand Down Expand Up @@ -826,14 +826,15 @@ out:
//
// See the documentation for CPUMiner type for more details.
func New(cfg *Config) *CPUMiner {
return &CPUMiner{
miner := &CPUMiner{
g: cfg.BgBlkTmplGenerator,
cfg: cfg,
numWorkers: defaultNumWorkers,
updateNumWorkers: make(chan struct{}),
queryHashesPerSec: make(chan float64),
speedStats: make(map[uint64]*speedStats),
minedOnParents: make(map[chainhash.Hash]uint8),
quit: make(chan struct{}),
}
miner.numWorkers.Store(defaultNumWorkers)
return miner
}
10 changes: 5 additions & 5 deletions internal/mining/mining_harness_test.go
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 The Decred developers
// Copyright (c) 2020-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -263,7 +263,7 @@ type fakeTxSource struct {
votes map[chainhash.Hash][]VoteDesc
tspends map[chainhash.Hash]*dcrutil.Tx
miningView *TxMiningView
lastUpdated int64
lastUpdated atomic.Int64
}

// isTransactionInTxSource returns whether or not the passed transaction exists
Expand All @@ -289,7 +289,7 @@ func (p *fakeTxSource) isTransactionStaged(hash *chainhash.Hash) bool {
// LastUpdated returns the last time a transaction was added to or removed from
// the fake tx source.
func (p *fakeTxSource) LastUpdated() time.Time {
return time.Unix(atomic.LoadInt64(&p.lastUpdated), 0)
return time.Unix(p.lastUpdated.Load(), 0)
}

// HaveTransaction returns whether or not the passed transaction hash exists in
Expand Down Expand Up @@ -508,7 +508,7 @@ func (p *fakeTxSource) addTransaction(tx *dcrutil.Tx, txType stake.TxType, heigh
for _, txIn := range msgTx.TxIn {
p.outpoints[txIn.PreviousOutPoint] = tx
}
atomic.StoreInt64(&p.lastUpdated, time.Now().Unix())
p.lastUpdated.Store(time.Now().Unix())
}

// insertVote inserts a vote into the map of block votes.
Expand Down Expand Up @@ -607,7 +607,7 @@ func (p *fakeTxSource) RemoveTransaction(tx *dcrutil.Tx, removeRedeemers,

delete(p.pool, *txHash)

atomic.StoreInt64(&p.lastUpdated, time.Now().Unix())
p.lastUpdated.Store(time.Now().Unix())

// Stop tracking if it's a tspend.
delete(p.tspends, *txHash)
Expand Down
11 changes: 5 additions & 6 deletions internal/rpcserver/rpcserver.go
@@ -1,5 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -4913,8 +4913,7 @@ func handleVersion(_ context.Context, _ *Server, _ interface{}) (interface{}, er

// Server provides a concurrent safe RPC server to a chain server.
type Server struct {
// atomic
numClients int32
numClients atomic.Int32

cfg Config
hmac hash.Hash
Expand Down Expand Up @@ -5097,7 +5096,7 @@ func (s *Server) NotifyWinningTickets(wtnd *WinningTicketsNtfnData) {
//
// This function is safe for concurrent access.
func (s *Server) limitConnections(w http.ResponseWriter, remoteAddr string) bool {
if int(atomic.LoadInt32(&s.numClients)+1) > s.cfg.RPCMaxClients {
if int(s.numClients.Load()+1) > s.cfg.RPCMaxClients {
log.Infof("Max RPC clients exceeded [%d] - "+
"disconnecting client %s", s.cfg.RPCMaxClients,
remoteAddr)
Expand All @@ -5114,7 +5113,7 @@ func (s *Server) limitConnections(w http.ResponseWriter, remoteAddr string) bool
//
// This function is safe for concurrent access.
func (s *Server) incrementClients() {
atomic.AddInt32(&s.numClients, 1)
s.numClients.Add(1)
}

// decrementClients subtracts one from the number of connected RPC clients.
Expand All @@ -5123,7 +5122,7 @@ func (s *Server) incrementClients() {
//
// This function is safe for concurrent access.
func (s *Server) decrementClients() {
atomic.AddInt32(&s.numClients, -1)
s.numClients.Add(-1)
}

// authMAC calculates the MAC (currently HMAC-SHA256) of an Authorization
Expand Down
13 changes: 6 additions & 7 deletions internal/rpcserver/rpcwebsocket.go
@@ -1,5 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -1262,8 +1262,7 @@ type wsResponse struct {
// subsystems can't block. Ultimately, all messages are sent via the
// outHandler.
type wsClient struct {
// The following variables must only be used atomically.
disconnected int32 // Websocket client disconnected?
disconnected atomic.Bool // Websocket client disconnected?

sync.Mutex

Expand Down Expand Up @@ -1309,7 +1308,7 @@ type wsClient struct {
func (c *wsClient) shouldLogReadError(err error) bool {
// No logging when the client is being forcibly disconnected from the server
// side.
if atomic.LoadInt32(&c.disconnected) != 0 {
if c.disconnected.Load() {
return false
}

Expand All @@ -1327,7 +1326,7 @@ func (c *wsClient) shouldLogReadError(err error) bool {
// must be run as a goroutine.
func (c *wsClient) inHandler(ctx context.Context) {
out:
for atomic.LoadInt32(&c.disconnected) == 0 {
for !c.disconnected.Load() {
_, msg, err := c.conn.ReadMessage()
if err != nil {
// Log the error if it's not due to disconnecting.
Expand Down Expand Up @@ -1916,13 +1915,13 @@ func (c *wsClient) QueueNotification(marshalledJSON []byte) error {

// Disconnected returns whether or not the websocket client is disconnected.
func (c *wsClient) Disconnected() bool {
return atomic.LoadInt32(&c.disconnected) > 0
return c.disconnected.Load()
}

// Disconnect disconnects the websocket client.
func (c *wsClient) Disconnect() {
// Nothing to do if already disconnected.
if atomic.AddInt32(&c.disconnected, 1) != 1 {
if !c.disconnected.CompareAndSwap(false, true) {
return
}

Expand Down
21 changes: 9 additions & 12 deletions server.go
@@ -1,5 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2022 The Decred developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -466,11 +466,9 @@ func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr
// server provides a Decred server for handling communications to and from
// Decred peers.
type server struct {
// The following variables must only be used atomically.
// Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
shutdown int32
bytesReceived atomic.Uint64 // Total bytes received from all peers since start.
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
shutdown atomic.Bool

// minKnownWork houses the minimum known work from the associated network
// params converted to a uint256 so the conversion only needs to be
Expand Down Expand Up @@ -1693,7 +1691,7 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
}

// Ignore new peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
if s.shutdown.Load() {
srvrLog.Infof("New peer %s ignored - server is shutting down", sp)
sp.Disconnect()
return false
Expand Down Expand Up @@ -2472,20 +2470,19 @@ func (s *server) AddedNodeInfo() []*serverPeer {
// AddBytesSent adds the passed number of bytes to the total bytes sent counter
// for the server. It is safe for concurrent access.
func (s *server) AddBytesSent(bytesSent uint64) {
atomic.AddUint64(&s.bytesSent, bytesSent)
s.bytesSent.Add(bytesSent)
}

// AddBytesReceived adds the passed number of bytes to the total bytes received
// counter for the server. It is safe for concurrent access.
func (s *server) AddBytesReceived(bytesReceived uint64) {
atomic.AddUint64(&s.bytesReceived, bytesReceived)
s.bytesReceived.Add(bytesReceived)
}

// NetTotals returns the sum of all bytes received and sent across the network
// for all peers. It is safe for concurrent access.
func (s *server) NetTotals() (uint64, uint64) {
return atomic.LoadUint64(&s.bytesReceived),
atomic.LoadUint64(&s.bytesSent)
return s.bytesReceived.Load(), s.bytesSent.Load()
}

// notifiedWinningTickets returns whether or not the winning tickets
Expand Down Expand Up @@ -3111,7 +3108,7 @@ func (s *server) Run(ctx context.Context) {

// Wait until the server is signalled to shutdown.
<-ctx.Done()
atomic.AddInt32(&s.shutdown, 1)
s.shutdown.Store(true)

srvrLog.Warnf("Server shutting down")

Expand Down

0 comments on commit 1c326a9

Please sign in to comment.