Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: Use Go 1.19 atomic types #5792

Merged
merged 4 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 15 additions & 15 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@

// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network.
type Service struct {
syncStartNS int64 // at top of struct to keep 64 bit aligned for atomic.* ops
// disableSyncRound, provided externally, is the first round we will _not_ fetch from the network
// any round >= disableSyncRound will not be fetched. If set to 0, it will be disregarded.
disableSyncRound uint64
disableSyncRound atomic.Uint64
syncStartNS atomic.Int64
cfg config.Local
ledger Ledger
ctx context.Context
Expand All @@ -94,7 +94,7 @@
// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
InitialSyncDone chan struct{}
initialSyncNotified uint32
initialSyncNotified atomic.Uint32
protocolErrorLogged bool
unmatchedPendingCertificates <-chan PendingUnmatchedCertificate
// This channel signals periodSync to attempt catchup immediately. This allows us to start fetching rounds from
Expand Down Expand Up @@ -140,7 +140,7 @@
// Start the catchup service
func (s *Service) Start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
atomic.StoreUint32(&s.initialSyncNotified, 0)
s.initialSyncNotified.Store(0)
s.InitialSyncDone = make(chan struct{})
s.workers.Add(1)
go s.periodicSync()
Expand All @@ -150,7 +150,7 @@
func (s *Service) Stop() {
s.cancel()
s.workers.Wait()
if atomic.CompareAndSwapUint32(&s.initialSyncNotified, 0, 1) {
if s.initialSyncNotified.CompareAndSwap(0, 1) {
close(s.InitialSyncDone)
}
}
Expand All @@ -159,8 +159,8 @@
// or attempting to catchup after too-long waiting for next block.
// Also returns a 2nd bool indicating if this is our initial sync
func (s *Service) IsSynchronizing() (synchronizing bool, initialSync bool) {
synchronizing = atomic.LoadInt64(&s.syncStartNS) != 0
initialSync = atomic.LoadUint32(&s.initialSyncNotified) == 0
synchronizing = s.syncStartNS.Load() != 0
initialSync = s.initialSyncNotified.Load() == 0
return
}

Expand All @@ -180,25 +180,25 @@
if basics.Round(rnd) < s.ledger.LastRound() {
return ErrSyncRoundInvalid
}
atomic.StoreUint64(&s.disableSyncRound, rnd)
s.disableSyncRound.Store(rnd)
s.triggerSync()
return nil
}

// UnsetDisableSyncRound removes any previously set disabled sync round
func (s *Service) UnsetDisableSyncRound() {
atomic.StoreUint64(&s.disableSyncRound, 0)
s.disableSyncRound.Store(0)
s.triggerSync()
}

// GetDisableSyncRound returns the disabled sync round
func (s *Service) GetDisableSyncRound() uint64 {
return atomic.LoadUint64(&s.disableSyncRound)
return s.disableSyncRound.Load()
}

// SynchronizingTime returns the time we've been performing a catchup operation (0 if not currently catching up)
func (s *Service) SynchronizingTime() time.Duration {
startNS := atomic.LoadInt64(&s.syncStartNS)
startNS := s.syncStartNS.Load()
if startNS == 0 {
return time.Duration(0)
}
Expand Down Expand Up @@ -608,8 +608,8 @@
start := time.Now()

timeInNS := start.UnixNano()
if !atomic.CompareAndSwapInt64(&s.syncStartNS, 0, timeInNS) {
s.log.Infof("resuming previous sync from %d (now=%d)", atomic.LoadInt64(&s.syncStartNS), timeInNS)
if !s.syncStartNS.CompareAndSwap(0, timeInNS) {
s.log.Infof("resuming previous sync from %d (now=%d)", s.syncStartNS.Load(), timeInNS)

Check warning on line 612 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L612

Added line #L612 was not covered by tests
}

pr := s.ledger.LastRound()
Expand All @@ -632,10 +632,10 @@
// if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file.
if !s.suspendForCatchpointWriting {
// in that case, don't change the timer so that the "timer" would keep running.
atomic.StoreInt64(&s.syncStartNS, 0)
s.syncStartNS.Store(0)

// close the initial sync channel if not already close
if atomic.CompareAndSwapUint32(&s.initialSyncNotified, 0, 1) {
if s.initialSyncNotified.CompareAndSwap(0, 1) {
close(s.InitialSyncDone)
initSync = true
}
Expand Down
3 changes: 1 addition & 2 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1085,7 +1084,7 @@ func TestSynchronizingTime(t *testing.T) {
s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, ledger, &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)

require.Equal(t, time.Duration(0), s.SynchronizingTime())
atomic.StoreInt64(&s.syncStartNS, 1000000)
s.syncStartNS.Store(1000000)
require.NotEqual(t, time.Duration(0), s.SynchronizingTime())
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/tealdbg/cdtSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ type cdtSession struct {
verbose bool
}

var contextCounter int32 = 0
var scriptCounter int32 = 0
var contextCounter atomic.Int32
var scriptCounter atomic.Int32

func makeCdtSession(uuid string, debugger Control, ch chan Notification) *cdtSession {
s := new(cdtSession)
s.uuid = uuid
s.debugger = debugger
s.notifications = ch
s.done = make(chan struct{})
s.contextID = int(atomic.AddInt32(&contextCounter, 1))
s.scriptID = strconv.Itoa(int(atomic.AddInt32(&scriptCounter, 1)))
s.contextID = int(contextCounter.Add(1))
s.scriptID = strconv.Itoa(int(scriptCounter.Add(1)))
return s
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/tealdbg/cdtSession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,7 @@ func TestCdtSessionGetObjects(t *testing.T) {
{Type: basics.TealUintType, Uint: 1},
{Type: basics.TealBytesType, Bytes: "\x01\x02"},
},
pc: atomicInt{1},
line: atomicInt{1},
err: e,
err: e,
AppState: AppState{
appIdx: basics.AppIndex(1),
schemas: basics.StateSchemas{
Expand Down Expand Up @@ -582,6 +580,8 @@ func TestCdtSessionGetObjects(t *testing.T) {
},
},
}
state.pc.Store(1)
state.line.Store(1)

req.Method = "Runtime.getProperties"
req.Params = map[string]interface{}{}
Expand Down
18 changes: 7 additions & 11 deletions cmd/tealdbg/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,31 @@
}

type atomicBool struct {
value uint32
value atomic.Bool
}

func (b *atomicBool) SetTo(other bool) {
var converted uint32 = 0
if other {
converted = 1
}
atomic.StoreUint32(&b.value, converted)
b.value.Store(other)
}

func (b *atomicBool) IsSet() bool {
return atomic.LoadUint32(&b.value) != 0
return b.value.Load()
}

type atomicInt struct {
value int32
value atomic.Int32
}

func (i *atomicInt) Store(other int) {
atomic.StoreInt32(&i.value, int32(other))
i.value.Store(int32(other))
}

func (i *atomicInt) Load() int {
return int(atomic.LoadInt32(&i.value))
return int(i.value.Load())
}

func (i *atomicInt) Add(other int) int {
return int(atomic.AddInt32(&i.value, int32(other)))
return int(i.value.Add(int32(other)))

Check warning on line 71 in cmd/tealdbg/util.go

View check run for this annotation

Codecov / codecov/patch

cmd/tealdbg/util.go#L71

Added line #L71 was not covered by tests
}

// IsText checks if the input has all printable characters with strconv.IsPrint
Expand Down
6 changes: 3 additions & 3 deletions crypto/merklearray/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type workerState struct {
// maxidx is the total number of elements to process, and nextidx
// is the next element that a worker should process.
maxidx uint64
nextidx uint64
nextidx atomic.Uint64

// nworkers is the number of workers that can be started.
// This field gets decremented once workers are launched,
Expand Down Expand Up @@ -65,7 +65,7 @@ func newWorkerState(max uint64) *workerState {
// by delta. This implicitly means that the worker that calls next
// is promising to process delta elements at the returned position.
func (ws *workerState) next(delta uint64) uint64 {
return atomic.AddUint64(&ws.nextidx, delta) - delta
return ws.nextidx.Add(delta) - delta
}

// wait waits for all of the workers to finish.
Expand All @@ -82,7 +82,7 @@ func (ws *workerState) nextWorker() bool {

_ = <-ws.starting

curidx := atomic.LoadUint64(&ws.nextidx)
curidx := ws.nextidx.Load()
if curidx >= ws.maxidx {
return false
}
Expand Down
8 changes: 3 additions & 5 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ import (
// TransactionPool.AssembleBlock constructs a valid block for
// proposal given a deadline.
type TransactionPool struct {
// feePerByte is stored at the beginning of this struct to ensure it has a 64 bit aligned address. This is needed as it's being used
// with atomic operations which require 64 bit alignment on arm.
feePerByte uint64

// const
logProcessBlockStats bool
Expand All @@ -65,6 +62,7 @@ type TransactionPool struct {
expiredTxCount map[basics.Round]int
pendingBlockEvaluator BlockEvaluator
numPendingWholeBlocks basics.Round
feePerByte atomic.Uint64
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved
feeThresholdMultiplier uint64
statusCache *statusCache

Expand Down Expand Up @@ -295,7 +293,7 @@ func (pool *TransactionPool) checkPendingQueueSize(txnGroup []transactions.Signe
// FeePerByte returns the current minimum microalgos per byte a transaction
// needs to pay in order to get into the pool.
func (pool *TransactionPool) FeePerByte() uint64 {
return atomic.LoadUint64(&pool.feePerByte)
return pool.feePerByte.Load()
}

// computeFeePerByte computes and returns the current minimum microalgos per byte a transaction
Expand Down Expand Up @@ -332,7 +330,7 @@ func (pool *TransactionPool) computeFeePerByte() uint64 {
}

// Update the counter for fast reads
atomic.StoreUint64(&pool.feePerByte, feePerByte)
pool.feePerByte.Store(feePerByte)

return feePerByte
}
Expand Down
6 changes: 3 additions & 3 deletions ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ import (
// notifier is a struct that encapsulates a single-shot channel; it will only be signaled once.
type notifier struct {
signal chan struct{}
notified uint32
notified *atomic.Uint32
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved
}

// makeNotifier constructs a notifier that has not been signaled.
func makeNotifier() notifier {
return notifier{signal: make(chan struct{}), notified: 0}
return notifier{signal: make(chan struct{}), notified: &atomic.Uint32{}}
}

// notify signals the channel if it hasn't already done so
func (notifier *notifier) notify() {
if atomic.CompareAndSwapUint32(&notifier.notified, 0, 1) {
if notifier.notified.CompareAndSwap(0, 1) {
close(notifier.signal)
}
}
Expand Down
14 changes: 7 additions & 7 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
// catchpointDataWriting helps to synchronize the (first stage) catchpoint data file
// writing. When this atomic variable is 0, no writing is going on.
// Any non-zero value indicates a catchpoint being written, or scheduled to be written.
catchpointDataWriting int32
catchpointDataWriting atomic.Int32

// The Trie tracking the current account balances. Always matches the balances that were
// written to the database.
Expand Down Expand Up @@ -233,7 +233,7 @@
catchpointGenerationStats.BalancesWriteTime = uint64(updatingBalancesDuration.Nanoseconds())
totalKVs, totalAccounts, totalChunks, biggestChunkLen, err = ct.generateCatchpointData(
ctx, dbRound, &catchpointGenerationStats, spVerificationEncodedData)
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
ct.catchpointDataWriting.Store(0)
if err != nil {
return err
}
Expand Down Expand Up @@ -347,7 +347,7 @@
}

ct.roundDigest = nil
ct.catchpointDataWriting = 0
ct.catchpointDataWriting.Store(0)
// keep these channel closed if we're not generating catchpoint
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
close(ct.catchpointDataSlowWriting)
Expand Down Expand Up @@ -500,7 +500,7 @@

if ct.enableGeneratingCatchpointFiles && dcc.catchpointFirstStage {
// store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written
atomic.StoreInt32(&ct.catchpointDataWriting, int32(-1))
ct.catchpointDataWriting.Store(int32(-1))
}

dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset)
Expand All @@ -516,7 +516,7 @@

defer func() {
if err != nil && dcc.catchpointFirstStage && ct.enableGeneratingCatchpointFiles {
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
ct.catchpointDataWriting.Store(0)

Check warning on line 519 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L519

Added line #L519 was not covered by tests
}
}()

Expand Down Expand Up @@ -963,7 +963,7 @@
// determine if this was a catchpoint round
if dcc.catchpointFirstStage {
// it was a catchpoint round, so update the catchpointWriting to indicate that we're done.
atomic.StoreInt32(&ct.catchpointDataWriting, 0)
ct.catchpointDataWriting.Store(0)

Check warning on line 966 in ledger/catchpointtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointtracker.go#L966

Added line #L966 was not covered by tests
}
}
}
Expand Down Expand Up @@ -1117,7 +1117,7 @@
// isWritingCatchpointDataFile returns true iff a (first stage) catchpoint data file
// is being generated.
func (ct *catchpointTracker) isWritingCatchpointDataFile() bool {
return atomic.LoadInt32(&ct.catchpointDataWriting) != 0
return ct.catchpointDataWriting.Load() != 0
}

// Generates a (first stage) catchpoint data file.
Expand Down