From 1c326a9076eb801d85826254cb3fff08d26263f4 Mon Sep 17 00:00:00 2001 From: jholdstock Date: Mon, 13 Feb 2023 15:56:36 +0000 Subject: [PATCH] multi: Use atomic types in unexported modules. --- go.mod | 2 +- .../blockchain/indexers/indexsubscriber.go | 10 +++---- internal/mempool/mempool.go | 9 +++---- internal/mining/cpuminer/cpuminer.go | 27 ++++++++++--------- internal/mining/mining_harness_test.go | 10 +++---- internal/rpcserver/rpcserver.go | 11 ++++---- internal/rpcserver/rpcwebsocket.go | 13 +++++---- server.go | 21 +++++++-------- 8 files changed, 49 insertions(+), 54 deletions(-) diff --git a/go.mod b/go.mod index 7e16e9b73a..2e47384449 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/decred/dcrd -go 1.17 +go 1.19 require ( github.com/davecgh/go-spew v1.1.1 diff --git a/internal/blockchain/indexers/indexsubscriber.go b/internal/blockchain/indexers/indexsubscriber.go index d306378b38..7aeb0f99ce 100644 --- a/internal/blockchain/indexers/indexsubscriber.go +++ b/internal/blockchain/indexers/indexsubscriber.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 The Decred developers +// Copyright (c) 2021-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -122,7 +122,7 @@ func (s *IndexSubscription) stop() error { // IndexSubscriber subscribes clients for index updates. type IndexSubscriber struct { - subscribers uint32 // update atomically. + subscribers atomic.Uint32 c chan IndexNtfn subscriptions map[string]*IndexSubscription @@ -172,7 +172,7 @@ func (s *IndexSubscriber) Subscribe(index Indexer, prerequisite string) (*IndexS } prereq.dependent = sub - atomic.AddUint32(&s.subscribers, 1) + s.subscribers.Add(1) return sub, nil } @@ -183,14 +183,14 @@ func (s *IndexSubscriber) Subscribe(index Indexer, prerequisite string) (*IndexS s.subscriptions[sub.id] = sub s.mtx.Unlock() - atomic.AddUint32(&s.subscribers, 1) + s.subscribers.Add(1) return sub, nil } // Notify relays an index notification to subscribed indexes for processing. func (s *IndexSubscriber) Notify(ntfn *IndexNtfn) { - subscribers := atomic.LoadUint32(&s.subscribers) + subscribers := s.subscribers.Load() // Only relay notifications when there are subscribed indexes // to be notified. diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index e23a807ceb..580d1f6e33 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -239,8 +239,7 @@ type orphanTx struct { // and relayed to other peers. It is safe for concurrent access from multiple // peers. type TxPool struct { - // The following variables must only be used atomically. - lastUpdated int64 // last time pool was updated. + lastUpdated atomic.Int64 // last time pool was updated. mtx sync.RWMutex cfg Config @@ -821,7 +820,7 @@ func (mp *TxPool) removeTransaction(tx *dcrutil.Tx, removeRedeemers bool) { delete(mp.pool, *txHash) - atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) + mp.lastUpdated.Store(time.Now().Unix()) // Inform associated fee estimator that the transaction has been removed // from the mempool @@ -909,7 +908,7 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, txDesc *TxD for _, txIn := range msgTx.TxIn { mp.outpoints[txIn.PreviousOutPoint] = txDesc } - atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) + mp.lastUpdated.Store(time.Now().Unix()) // Add unconfirmed exists address index entries associated with the // transaction if enabled. @@ -2283,7 +2282,7 @@ func (mp *TxPool) miningDescs() []*mining.TxDesc { // // This function is safe for concurrent access. func (mp *TxPool) LastUpdated() time.Time { - return time.Unix(atomic.LoadInt64(&mp.lastUpdated), 0) + return time.Unix(mp.lastUpdated.Load(), 0) } // MiningView returns a slice of mining descriptors for all the transactions diff --git a/internal/mining/cpuminer/cpuminer.go b/internal/mining/cpuminer/cpuminer.go index 98a1b42db5..bafba1a967 100644 --- a/internal/mining/cpuminer/cpuminer.go +++ b/internal/mining/cpuminer/cpuminer.go @@ -1,5 +1,5 @@ // Copyright (c) 2014-2016 The btcsuite developers -// Copyright (c) 2015-2022 The Decred developers +// Copyright (c) 2015-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -55,8 +55,8 @@ var ( // speedStats houses tracking information used to monitor the hashing speed of // the CPU miner. type speedStats struct { - totalHashes uint64 // atomic - elapsedMicros uint64 // atomic + totalHashes atomic.Uint64 + elapsedMicros atomic.Uint64 } // Config is a descriptor containing the CPU miner configuration. @@ -112,7 +112,7 @@ type Config struct { // workers which means it will be idle. The number of worker goroutines for the // normal mining mode can be set via the SetNumWorkers method. type CPUMiner struct { - numWorkers uint32 // update atomically + numWorkers atomic.Uint32 sync.Mutex g *mining.BgBlkTmplGenerator @@ -165,8 +165,8 @@ out: hashesPerSec = 0 m.Lock() for _, stats := range m.speedStats { - totalHashes := atomic.SwapUint64(&stats.totalHashes, 0) - elapsedMicros := atomic.SwapUint64(&stats.elapsedMicros, 0) + totalHashes := stats.totalHashes.Swap(0) + elapsedMicros := stats.elapsedMicros.Swap(0) elapsedSecs := (elapsedMicros / 1000000) if totalHashes == 0 || elapsedSecs == 0 { continue @@ -285,9 +285,9 @@ func (m *CPUMiner) solveBlock(ctx context.Context, header *wire.BlockHeader, sta hashesCompleted := uint64(0) start := time.Now() updateSpeedStats := func() { - atomic.AddUint64(&stats.totalHashes, hashesCompleted) + stats.totalHashes.Add(hashesCompleted) elapsedMicros := time.Since(start).Microseconds() - atomic.AddUint64(&stats.elapsedMicros, uint64(elapsedMicros)) + stats.elapsedMicros.Add(uint64(elapsedMicros)) hashesCompleted = 0 start = time.Now() @@ -552,7 +552,7 @@ out: // Update the number of running workers. case <-m.updateNumWorkers: numRunning := uint32(len(runningWorkers)) - numWorkers := atomic.LoadUint32(&m.numWorkers) + numWorkers := m.numWorkers.Load() // No change. if numWorkers == numRunning { @@ -678,7 +678,7 @@ func (m *CPUMiner) SetNumWorkers(numWorkers int32) { } else if targetNumWorkers > MaxNumWorkers { targetNumWorkers = MaxNumWorkers } - atomic.StoreUint32(&m.numWorkers, targetNumWorkers) + m.numWorkers.Store(targetNumWorkers) // Set the normal mining state accordingly. if targetNumWorkers != 0 { @@ -699,7 +699,7 @@ func (m *CPUMiner) SetNumWorkers(numWorkers int32) { // // This function is safe for concurrent access. func (m *CPUMiner) NumWorkers() int32 { - return int32(atomic.LoadUint32(&m.numWorkers)) + return int32(m.numWorkers.Load()) } // GenerateNBlocks generates the requested number of blocks in the discrete @@ -826,14 +826,15 @@ out: // // See the documentation for CPUMiner type for more details. func New(cfg *Config) *CPUMiner { - return &CPUMiner{ + miner := &CPUMiner{ g: cfg.BgBlkTmplGenerator, cfg: cfg, - numWorkers: defaultNumWorkers, updateNumWorkers: make(chan struct{}), queryHashesPerSec: make(chan float64), speedStats: make(map[uint64]*speedStats), minedOnParents: make(map[chainhash.Hash]uint8), quit: make(chan struct{}), } + miner.numWorkers.Store(defaultNumWorkers) + return miner } diff --git a/internal/mining/mining_harness_test.go b/internal/mining/mining_harness_test.go index 4ef693cfd6..7e41e12673 100644 --- a/internal/mining/mining_harness_test.go +++ b/internal/mining/mining_harness_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 The Decred developers +// Copyright (c) 2020-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -263,7 +263,7 @@ type fakeTxSource struct { votes map[chainhash.Hash][]VoteDesc tspends map[chainhash.Hash]*dcrutil.Tx miningView *TxMiningView - lastUpdated int64 + lastUpdated atomic.Int64 } // isTransactionInTxSource returns whether or not the passed transaction exists @@ -289,7 +289,7 @@ func (p *fakeTxSource) isTransactionStaged(hash *chainhash.Hash) bool { // LastUpdated returns the last time a transaction was added to or removed from // the fake tx source. func (p *fakeTxSource) LastUpdated() time.Time { - return time.Unix(atomic.LoadInt64(&p.lastUpdated), 0) + return time.Unix(p.lastUpdated.Load(), 0) } // HaveTransaction returns whether or not the passed transaction hash exists in @@ -508,7 +508,7 @@ func (p *fakeTxSource) addTransaction(tx *dcrutil.Tx, txType stake.TxType, heigh for _, txIn := range msgTx.TxIn { p.outpoints[txIn.PreviousOutPoint] = tx } - atomic.StoreInt64(&p.lastUpdated, time.Now().Unix()) + p.lastUpdated.Store(time.Now().Unix()) } // insertVote inserts a vote into the map of block votes. @@ -607,7 +607,7 @@ func (p *fakeTxSource) RemoveTransaction(tx *dcrutil.Tx, removeRedeemers, delete(p.pool, *txHash) - atomic.StoreInt64(&p.lastUpdated, time.Now().Unix()) + p.lastUpdated.Store(time.Now().Unix()) // Stop tracking if it's a tspend. delete(p.tspends, *txHash) diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index de3a13c876..39ff1a8ea9 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2022 The Decred developers +// Copyright (c) 2015-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -4913,8 +4913,7 @@ func handleVersion(_ context.Context, _ *Server, _ interface{}) (interface{}, er // Server provides a concurrent safe RPC server to a chain server. type Server struct { - // atomic - numClients int32 + numClients atomic.Int32 cfg Config hmac hash.Hash @@ -5097,7 +5096,7 @@ func (s *Server) NotifyWinningTickets(wtnd *WinningTicketsNtfnData) { // // This function is safe for concurrent access. func (s *Server) limitConnections(w http.ResponseWriter, remoteAddr string) bool { - if int(atomic.LoadInt32(&s.numClients)+1) > s.cfg.RPCMaxClients { + if int(s.numClients.Load()+1) > s.cfg.RPCMaxClients { log.Infof("Max RPC clients exceeded [%d] - "+ "disconnecting client %s", s.cfg.RPCMaxClients, remoteAddr) @@ -5114,7 +5113,7 @@ func (s *Server) limitConnections(w http.ResponseWriter, remoteAddr string) bool // // This function is safe for concurrent access. func (s *Server) incrementClients() { - atomic.AddInt32(&s.numClients, 1) + s.numClients.Add(1) } // decrementClients subtracts one from the number of connected RPC clients. @@ -5123,7 +5122,7 @@ func (s *Server) incrementClients() { // // This function is safe for concurrent access. func (s *Server) decrementClients() { - atomic.AddInt32(&s.numClients, -1) + s.numClients.Add(-1) } // authMAC calculates the MAC (currently HMAC-SHA256) of an Authorization diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index b4477bbee4..b2b4c7048b 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2022 The Decred developers +// Copyright (c) 2015-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -1262,8 +1262,7 @@ type wsResponse struct { // subsystems can't block. Ultimately, all messages are sent via the // outHandler. type wsClient struct { - // The following variables must only be used atomically. - disconnected int32 // Websocket client disconnected? + disconnected atomic.Bool // Websocket client disconnected? sync.Mutex @@ -1309,7 +1308,7 @@ type wsClient struct { func (c *wsClient) shouldLogReadError(err error) bool { // No logging when the client is being forcibly disconnected from the server // side. - if atomic.LoadInt32(&c.disconnected) != 0 { + if c.disconnected.Load() { return false } @@ -1327,7 +1326,7 @@ func (c *wsClient) shouldLogReadError(err error) bool { // must be run as a goroutine. func (c *wsClient) inHandler(ctx context.Context) { out: - for atomic.LoadInt32(&c.disconnected) == 0 { + for !c.disconnected.Load() { _, msg, err := c.conn.ReadMessage() if err != nil { // Log the error if it's not due to disconnecting. @@ -1916,13 +1915,13 @@ func (c *wsClient) QueueNotification(marshalledJSON []byte) error { // Disconnected returns whether or not the websocket client is disconnected. func (c *wsClient) Disconnected() bool { - return atomic.LoadInt32(&c.disconnected) > 0 + return c.disconnected.Load() } // Disconnect disconnects the websocket client. func (c *wsClient) Disconnect() { // Nothing to do if already disconnected. - if atomic.AddInt32(&c.disconnected, 1) != 1 { + if !c.disconnected.CompareAndSwap(false, true) { return } diff --git a/server.go b/server.go index 419bcb55ad..53ff47a98a 100644 --- a/server.go +++ b/server.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2022 The Decred developers +// Copyright (c) 2015-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -466,11 +466,9 @@ func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr // server provides a Decred server for handling communications to and from // Decred peers. type server struct { - // The following variables must only be used atomically. - // Putting the uint64s first makes them 64-bit aligned for 32-bit systems. - bytesReceived uint64 // Total bytes received from all peers since start. - bytesSent uint64 // Total bytes sent by all peers since start. - shutdown int32 + bytesReceived atomic.Uint64 // Total bytes received from all peers since start. + bytesSent atomic.Uint64 // Total bytes sent by all peers since start. + shutdown atomic.Bool // minKnownWork houses the minimum known work from the associated network // params converted to a uint256 so the conversion only needs to be @@ -1693,7 +1691,7 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { } // Ignore new peers if we're shutting down. - if atomic.LoadInt32(&s.shutdown) != 0 { + if s.shutdown.Load() { srvrLog.Infof("New peer %s ignored - server is shutting down", sp) sp.Disconnect() return false @@ -2472,20 +2470,19 @@ func (s *server) AddedNodeInfo() []*serverPeer { // AddBytesSent adds the passed number of bytes to the total bytes sent counter // for the server. It is safe for concurrent access. func (s *server) AddBytesSent(bytesSent uint64) { - atomic.AddUint64(&s.bytesSent, bytesSent) + s.bytesSent.Add(bytesSent) } // AddBytesReceived adds the passed number of bytes to the total bytes received // counter for the server. It is safe for concurrent access. func (s *server) AddBytesReceived(bytesReceived uint64) { - atomic.AddUint64(&s.bytesReceived, bytesReceived) + s.bytesReceived.Add(bytesReceived) } // NetTotals returns the sum of all bytes received and sent across the network // for all peers. It is safe for concurrent access. func (s *server) NetTotals() (uint64, uint64) { - return atomic.LoadUint64(&s.bytesReceived), - atomic.LoadUint64(&s.bytesSent) + return s.bytesReceived.Load(), s.bytesSent.Load() } // notifiedWinningTickets returns whether or not the winning tickets @@ -3111,7 +3108,7 @@ func (s *server) Run(ctx context.Context) { // Wait until the server is signalled to shutdown. <-ctx.Done() - atomic.AddInt32(&s.shutdown, 1) + s.shutdown.Store(true) srvrLog.Warnf("Server shutting down")