Skip to content

Commit

Permalink
go 1.19 atomics (erigontech#7164)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored and calmbeing committed Apr 24, 2023
1 parent 219cc49 commit 9ffced1
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 66 deletions.
6 changes: 3 additions & 3 deletions cmd/integration/commands/refetence_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"runtime"
"strings"
"sync/atomic"
"time"

common2 "github.com/ledgerwatch/erigon-lib/common"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/torquem-ch/mdbx-go/mdbx"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -168,7 +168,7 @@ func doWarmup(ctx context.Context, chaindata string, bucket string) error {
total, _ = c.Count()
return nil
})
progress := atomic.NewInt64(0)
progress := atomic.Int64{}

logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
Expand All @@ -190,7 +190,7 @@ func doWarmup(ctx context.Context, chaindata string, bucket string) error {
if len(v) > 0 {
_ = v[len(v)-1]
}
progress.Inc()
progress.Add(1)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/holiman/uint256"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
Expand Down
11 changes: 6 additions & 5 deletions consensus/aura/aura.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/secp256k1"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -97,7 +97,7 @@ type EpochTransition struct {

type Step struct {
calibrate bool // whether calibration is enabled.
inner *atomic.Uint64
inner atomic.Uint64
// Planned durations of steps.
durations []StepDurationInfo
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *Step) optCalibrate() bool {

type PermissionedStep struct {
inner *Step
canPropose *atomic.Bool
canPropose atomic.Bool
}

type ReceivedStepHashes map[uint64]map[libcommon.Address]libcommon.Hash //BTreeMap<(u64, Address), H256>
Expand Down Expand Up @@ -420,10 +420,10 @@ func NewAuRa(config *chain.AuRaConfig, db kv.RwDB, ourSigningAddress libcommon.A
durations = append(durations, durInfo)
}
step := &Step{
inner: atomic.NewUint64(initialStep),
calibrate: auraParams.StartStep == nil,
durations: durations,
}
step.inner.Store(initialStep)
step.doCalibrate()

/*
Expand All @@ -449,12 +449,13 @@ func NewAuRa(config *chain.AuRaConfig, db kv.RwDB, ourSigningAddress libcommon.A
c := &AuRa{
db: db,
exitCh: exitCh,
step: PermissionedStep{inner: step, canPropose: atomic.NewBool(true)},
step: PermissionedStep{inner: step},
OurSigningAddress: ourSigningAddress,
cfg: auraParams,
receivedStepHashes: ReceivedStepHashes{},
EpochManager: NewEpochManager(),
}
c.step.canPropose.Store(true)
_ = config

return c, nil
Expand Down
2 changes: 1 addition & 1 deletion consensus/aura/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"

lru "github.com/hashicorp/golang-lru"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/accounts/abi/bind"
Expand Down
2 changes: 1 addition & 1 deletion consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/google/btree"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/chain"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon/consensus"
Expand Down Expand Up @@ -242,7 +242,7 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
if total < 10_000 {
return
}
progress := atomic.NewInt64(0)
progress := atomic.Int64{}

logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
Expand All @@ -264,7 +264,7 @@ func WarmupTable(ctx context.Context, db kv.RoDB, bucket string, lvl log.Lvl) {
if err != nil {
return err
}
progress.Inc()
progress.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
Expand Down
8 changes: 3 additions & 5 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -24,7 +25,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/log/v3"
btree2 "github.com/tidwall/btree"
atomic2 "go.uber.org/atomic"
)

const CodeSizeTable = "CodeSize"
Expand All @@ -47,8 +47,8 @@ type StateV3 struct {
queue exec22.TxTaskQueue
queueLock sync.Mutex

txsDone *atomic2.Uint64
finished atomic2.Bool
txsDone atomic.Uint64
finished atomic.Bool

tmpdir string
applyPrevAccountBuf []byte // buffer for ApplyState. Doesn't need mutex because Apply is single-threaded
Expand All @@ -66,8 +66,6 @@ func NewStateV3(tmpdir string) *StateV3 {
chIncs: map[string][]byte{},
chContractCode: map[string][]byte{},

txsDone: atomic2.NewUint64(0),

applyPrevAccountBuf: make([]byte, 256),
addrIncBuf: make([]byte, 20+8),
}
Expand Down
10 changes: 5 additions & 5 deletions eth/integrity/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"fmt"
"math/bits"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/common/length"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/turbo/trie"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
)

// AssertSubset a & b == a - checks whether a is subset of b
Expand All @@ -43,7 +43,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.TrieOfAccounts, nil, math.MaxInt32)
defer clear()

trieAcc2, err := tx.Cursor(kv.TrieOfAccounts)
Expand All @@ -57,7 +57,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer accC.Close()
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.HashedAccounts, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
Expand Down Expand Up @@ -159,7 +159,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.TrieOfStorage, nil, math.MaxInt32)
defer clear()

trieStorage, err := tx.Cursor(kv.TrieOfStorage)
Expand All @@ -173,7 +173,7 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer storageC.Close()
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, &atomic.Bool{}, kv.HashedStorage, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
Expand Down
30 changes: 15 additions & 15 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/VictoriaMetrics/metrics"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/log/v3"
"github.com/torquem-ch/mdbx-go/mdbx"
atomic2 "go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon/cmd/state/exec22"
Expand Down Expand Up @@ -133,7 +133,7 @@ func ExecV3(ctx context.Context,

var block, stageProgress uint64
var maxTxNum uint64
var outputTxNum = atomic2.NewUint64(0)
outputTxNum := atomic.Uint64{}
var inputTxNum uint64
if execStage.BlockNumber > 0 {
stageProgress = execStage.BlockNumber
Expand All @@ -158,7 +158,7 @@ func ExecV3(ctx context.Context,
return err
}
outputTxNum.Store(_outputTxNum)
outputTxNum.Inc()
outputTxNum.Add(1)
inputTxNum = outputTxNum.Load()
}
} else {
Expand All @@ -174,7 +174,7 @@ func ExecV3(ctx context.Context,
return err
}
outputTxNum.Store(_outputTxNum)
outputTxNum.Inc()
outputTxNum.Add(1)
inputTxNum = outputTxNum.Load()
}
return nil
Expand All @@ -185,10 +185,10 @@ func ExecV3(ctx context.Context,
agg.SetTxNum(inputTxNum)

var outputBlockNum = syncMetrics[stages.Execution]
var inputBlockNum = atomic2.NewUint64(0)
inputBlockNum := &atomic.Uint64{}
var count uint64
var repeatCount, triggerCount = atomic2.NewUint64(0), atomic2.NewUint64(0)
var resultsSize = atomic2.NewInt64(0)
var repeatCount, triggerCount = &atomic.Uint64{}, &atomic.Uint64{}
resultsSize := &atomic.Int64{}
var lock sync.RWMutex

queueSize := workerCount // workerCount * 4 // when wait cond can be moved inside txs loop
Expand Down Expand Up @@ -658,7 +658,7 @@ Loop:
return fmt.Errorf("StateV3.Apply: %w", err)
}
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum.Inc()
outputTxNum.Add(1)

if err := rs.ApplyHistory(txTask, agg); err != nil {
return fmt.Errorf("StateV3.Apply: %w", err)
Expand Down Expand Up @@ -768,7 +768,7 @@ func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, bl
return b, nil
}

func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.AggregatorV3, applyTx kv.Tx, triggerCount *atomic2.Uint64, rwsCond *sync.Cond, applyWorker *exec3.Worker) (resultSize int64, outputTxNum, conflicts, processedBlockNum uint64, err error) {
func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.AggregatorV3, applyTx kv.Tx, triggerCount *atomic.Uint64, rwsCond *sync.Cond, applyWorker *exec3.Worker) (resultSize int64, outputTxNum, conflicts, processedBlockNum uint64, err error) {
var i int
outputTxNum = outputTxNumIn
for rws.Len() > 0 && (*rws)[0].TxNum == outputTxNum {
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func reconstituteStep(last bool,
var transposedKey []byte

if err = db.View(ctx, func(roTx kv.Tx) error {
clear := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateR, nil, math.MaxUint32)
clear := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateR, nil, math.MaxUint32)
defer clear()
if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
Expand All @@ -1129,7 +1129,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear2 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateD, nil, math.MaxUint32)
clear2 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateD, nil, math.MaxUint32)
defer clear2()
if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
Expand All @@ -1138,7 +1138,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear3 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeR, nil, math.MaxUint32)
clear3 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeR, nil, math.MaxUint32)
defer clear3()
if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
Expand All @@ -1147,7 +1147,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear4 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeD, nil, math.MaxUint32)
clear4 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeD, nil, math.MaxUint32)
defer clear4()
if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
Expand All @@ -1156,7 +1156,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear5 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractR, nil, math.MaxUint32)
clear5 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractR, nil, math.MaxUint32)
defer clear5()
if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
Expand All @@ -1165,7 +1165,7 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
clear6 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractD, nil, math.MaxUint32)
clear6 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractD, nil, math.MaxUint32)
defer clear6()
if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
Expand Down
Loading

0 comments on commit 9ffced1

Please sign in to comment.