Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go 1.19 atomics #7164

Merged
merged 11 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/integration/commands/refetence_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"runtime"
"strings"
"sync/atomic"
"time"

common2 "github.com/ledgerwatch/erigon-lib/common"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/torquem-ch/mdbx-go/mdbx"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -168,7 +168,7 @@ func doWarmup(ctx context.Context, chaindata string, bucket string) error {
total, _ = c.Count()
return nil
})
progress := atomic.NewInt64(0)
progress := atomic.Int64{}

logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
Expand All @@ -190,7 +190,7 @@ func doWarmup(ctx context.Context, chaindata string, bucket string) error {
if len(v) > 0 {
_ = v[len(v)-1]
}
progress.Inc()
progress.Add(1)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/holiman/uint256"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
Expand Down
11 changes: 6 additions & 5 deletions consensus/aura/aura.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/secp256k1"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -97,7 +97,7 @@ type EpochTransition struct {

type Step struct {
calibrate bool // whether calibration is enabled.
inner *atomic.Uint64
inner atomic.Uint64
// Planned durations of steps.
durations []StepDurationInfo
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *Step) optCalibrate() bool {

type PermissionedStep struct {
inner *Step
canPropose *atomic.Bool
canPropose atomic.Bool
}

type ReceivedStepHashes map[uint64]map[libcommon.Address]libcommon.Hash //BTreeMap<(u64, Address), H256>
Expand Down Expand Up @@ -420,10 +420,10 @@ func NewAuRa(config *chain.AuRaConfig, db kv.RwDB, ourSigningAddress libcommon.A
durations = append(durations, durInfo)
}
step := &Step{
inner: atomic.NewUint64(initialStep),
calibrate: auraParams.StartStep == nil,
durations: durations,
}
step.inner.Store(initialStep)
step.doCalibrate()

/*
Expand All @@ -449,12 +449,13 @@ func NewAuRa(config *chain.AuRaConfig, db kv.RwDB, ourSigningAddress libcommon.A
c := &AuRa{
db: db,
exitCh: exitCh,
step: PermissionedStep{inner: step, canPropose: atomic.NewBool(true)},
step: PermissionedStep{inner: step},
OurSigningAddress: ourSigningAddress,
cfg: auraParams,
receivedStepHashes: ReceivedStepHashes{},
EpochManager: NewEpochManager(),
}
c.step.canPropose.Store(true)
_ = config

return c, nil
Expand Down
2 changes: 1 addition & 1 deletion consensus/aura/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"

lru "github.com/hashicorp/golang-lru"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/accounts/abi/bind"
Expand Down
2 changes: 1 addition & 1 deletion consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/google/btree"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/chain"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon/consensus"
Expand Down Expand Up @@ -242,7 +242,7 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
if total < 10_000 {
return
}
progress := atomic.NewInt64(0)
progress := atomic.Int64{}

logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
Expand All @@ -264,7 +264,7 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
if err != nil {
return err
}
progress.Inc()
progress.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
Expand Down
8 changes: 3 additions & 5 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -24,7 +25,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/log/v3"
btree2 "github.com/tidwall/btree"
atomic2 "go.uber.org/atomic"
)

const CodeSizeTable = "CodeSize"
Expand All @@ -47,8 +47,8 @@ type StateV3 struct {
queue exec22.TxTaskQueue
queueLock sync.Mutex

txsDone *atomic2.Uint64
finished atomic2.Bool
txsDone atomic.Uint64
finished atomic.Bool

tmpdir string
applyPrevAccountBuf []byte // buffer for ApplyState. Doesn't need mutex because Apply is single-threaded
Expand All @@ -66,8 +66,6 @@ func NewStateV3(tmpdir string) *StateV3 {
chIncs: map[string][]byte{},
chContractCode: map[string][]byte{},

txsDone: atomic2.NewUint64(0),

applyPrevAccountBuf: make([]byte, 256),
addrIncBuf: make([]byte, 20+8),
}
Expand Down
10 changes: 5 additions & 5 deletions eth/integrity/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"fmt"
"math/bits"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/common/length"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/turbo/trie"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
)

// AssertSubset a & b == a - checks whether a is subset of b
Expand All @@ -43,7 +43,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.TrieOfAccounts, nil, math.MaxInt32)
defer clear()

trieAcc2, err := tx.Cursor(kv.TrieOfAccounts)
Expand All @@ -57,7 +57,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer accC.Close()
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.HashedAccounts, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
Expand Down Expand Up @@ -159,7 +159,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.TrieOfStorage, nil, math.MaxInt32)
defer clear()

trieStorage, err := tx.Cursor(kv.TrieOfStorage)
Expand All @@ -173,7 +173,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer storageC.Close()
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.HashedStorage, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
Expand Down
30 changes: 15 additions & 15 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/VictoriaMetrics/metrics"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/log/v3"
"github.com/torquem-ch/mdbx-go/mdbx"
atomic2 "go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon/cmd/state/exec22"
Expand Down Expand Up @@ -133,7 +133,7 @@ func ExecV3(ctx context.Context,

var block, stageProgress uint64
var maxTxNum uint64
var outputTxNum = atomic2.NewUint64(0)
outputTxNum := atomic.Uint64{}
var inputTxNum uint64
if execStage.BlockNumber > 0 {
stageProgress = execStage.BlockNumber
Expand All @@ -158,7 +158,7 @@ func ExecV3(ctx context.Context,
return err
}
outputTxNum.Store(_outputTxNum)
outputTxNum.Inc()
outputTxNum.Add(1)
inputTxNum = outputTxNum.Load()
}
} else {
Expand All @@ -174,7 +174,7 @@ func ExecV3(ctx context.Context,
return err
}
outputTxNum.Store(_outputTxNum)
outputTxNum.Inc()
outputTxNum.Add(1)
inputTxNum = outputTxNum.Load()
}
return nil
Expand All @@ -185,10 +185,10 @@ func ExecV3(ctx context.Context,
agg.SetTxNum(inputTxNum)

var outputBlockNum = syncMetrics[stages.Execution]
var inputBlockNum = atomic2.NewUint64(0)
inputBlockNum := &atomic.Uint64{}
var count uint64
var repeatCount, triggerCount = atomic2.NewUint64(0), atomic2.NewUint64(0)
var resultsSize = atomic2.NewInt64(0)
var repeatCount, triggerCount = &atomic.Uint64{}, &atomic.Uint64{}
resultsSize := &atomic.Int64{}
var lock sync.RWMutex

queueSize := workerCount // workerCount * 4 // when wait cond can be moved inside txs loop
Expand Down Expand Up @@ -658,7 +658,7 @@ Loop:
return fmt.Errorf("StateV3.Apply: %w", err)
}
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum.Inc()
outputTxNum.Add(1)

if err := rs.ApplyHistory(txTask, agg); err != nil {
return fmt.Errorf("StateV3.Apply: %w", err)
Expand Down Expand Up @@ -768,7 +768,7 @@ func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, bl
return b, nil
}

func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.AggregatorV3, applyTx kv.Tx, triggerCount *atomic2.Uint64, rwsCond *sync.Cond, applyWorker *exec3.Worker) (resultSize int64, outputTxNum, conflicts, processedBlockNum uint64, err error) {
func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.AggregatorV3, applyTx kv.Tx, triggerCount *atomic.Uint64, rwsCond *sync.Cond, applyWorker *exec3.Worker) (resultSize int64, outputTxNum, conflicts, processedBlockNum uint64, err error) {
var i int
outputTxNum = outputTxNumIn
for rws.Len() > 0 && (*rws)[0].TxNum == outputTxNum {
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func reconstituteStep(last bool,
var transposedKey []byte

if err = db.View(ctx, func(roTx kv.Tx) error {
clear := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateR, nil, math.MaxUint32)
clear := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateR, nil, math.MaxUint32)
defer clear()
if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
Expand All @@ -1129,7 +1129,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear2 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateD, nil, math.MaxUint32)
clear2 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateD, nil, math.MaxUint32)
defer clear2()
if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
Expand All @@ -1138,7 +1138,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear3 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeR, nil, math.MaxUint32)
clear3 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeR, nil, math.MaxUint32)
defer clear3()
if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
Expand All @@ -1147,7 +1147,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear4 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeD, nil, math.MaxUint32)
clear4 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeD, nil, math.MaxUint32)
defer clear4()
if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
Expand All @@ -1156,7 +1156,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear5 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractR, nil, math.MaxUint32)
clear5 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractR, nil, math.MaxUint32)
defer clear5()
if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
Expand All @@ -1165,7 +1165,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear6 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractD, nil, math.MaxUint32)
clear6 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractD, nil, math.MaxUint32)
defer clear6()
if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
Expand Down
Loading