Skip to content

Commit

Permalink
Merge branch 'more-tries-sync-fixes' into trie-sync-optimization
Browse files Browse the repository at this point in the history
# Conflicts:
#	data/syncer/userAccountsSyncer.go
#	data/trie/branchNode.go
#	data/trie/branchNode_test.go
#	data/trie/extensionNode.go
#	data/trie/extensionNode_test.go
#	data/trie/interface.go
#	data/trie/leafNode.go
#	data/trie/sync.go
  • Loading branch information
iulianpascalau committed Apr 8, 2021
2 parents 6bacf2f + 4547e8b commit 59cbcaa
Show file tree
Hide file tree
Showing 79 changed files with 1,675 additions and 403 deletions.
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@

[TrieNodesDataPool]
Name = "TrieNodesDataPool"
Capacity = 900000
Capacity = 300000
Type = "SizeLRU"
SizeInBytes = 314572800 #300MB
SizeInBytes = 104857600 #100MB

[SmartContractDataPool]
Name = "SmartContractDataPool"
Expand Down
1 change: 1 addition & 0 deletions cmd/node/config/systemSmartContractsConfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
MinStepValue = "100000000000000000000"
StakeEnableEpoch = 2
StakingV2Epoch = 4
CorrectLastUnjailedEpoch = 6
DoubleKeyProtectionEnableEpoch = 3
NumRoundsWithoutBleed = 100
MaximumPercentageToBleed = 0.5
Expand Down
4 changes: 4 additions & 0 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,7 @@ func newShardBlockProcessor(
stateComponents.AddressPubkeyConverter,
data.Store,
data.Datapool,
economics,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1924,6 +1925,7 @@ func newMetaBlockProcessor(
stateComponents.AddressPubkeyConverter,
data.Store,
data.Datapool,
economicsData,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2306,6 +2308,7 @@ func createShardTxSimulatorProcessor(
stateComponents.AddressPubkeyConverter,
disabled.NewChainStorer(),
data.Datapool,
&processDisabled.FeeHandler{},
)
if err != nil {
return err
Expand Down Expand Up @@ -2376,6 +2379,7 @@ func createMetaTxSimulatorProcessor(
stateComponents.AddressPubkeyConverter,
disabled.NewChainStorer(),
data.Datapool,
&processDisabled.FeeHandler{},
)
if err != nil {
return err
Expand Down
12 changes: 8 additions & 4 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,7 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
rater,
epochNotifier,
apiWorkingDir,
stateComponents.AccountsAdapterAPI,
)
if err != nil {
return err
Expand Down Expand Up @@ -2366,6 +2367,7 @@ func createNode(
node.WithTxVersionChecker(txVersionCheckerHandler),
node.WithImportMode(isInImportDbMode),
node.WithNodeRedundancyHandler(nodeRedundancyHandler),
node.WithAccountsAdapterAPI(stateComponents.AccountsAdapterAPI),
)
if err != nil {
return nil, errors.New("error creating node: " + err.Error())
Expand Down Expand Up @@ -2480,6 +2482,7 @@ func createApiResolver(
rater sharding.PeerAccountListAndRatingHandler,
epochNotifier process.EpochNotifier,
workingDir string,
accountsAPI state.AccountsAdapter,
) (facade.ApiResolver, error) {
scQueryService, err := createScQueryService(
generalConfig,
Expand Down Expand Up @@ -2532,10 +2535,11 @@ func createApiResolver(
}

args := &stakeValuesProcessor.ArgsTotalStakedValueHandler{
ShardID: shardCoordinator.SelfId(),
InternalMarshalizer: marshalizer,
Accounts: accnts,
NodePrice: systemSCConfig.StakingSystemSCConfig.GenesisNodePrice,
ShardID: shardCoordinator.SelfId(),
Accounts: accountsAPI,
PublicKeyConverter: pubkeyConv,
BlockChain: blockChain,
QueryService: scQueryService,
}
totalStakedValueHandler, err := stakeValuesProcessor.CreateTotalStakedValueHandler(args)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions config/systemSmartContractsConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type StakingSystemSCConfig struct {
MaxNumberOfNodesForStake uint64
StakingV2Epoch uint32
StakeEnableEpoch uint32
CorrectLastUnjailedEpoch uint32
DoubleKeyProtectionEnableEpoch uint32
ActivateBLSPubKeyMessageVerification bool
}
Expand Down
14 changes: 12 additions & 2 deletions core/computers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math/big"
"strconv"
"strings"
"time"
)

// MaxInt32 returns the maximum of two given numbers
Expand Down Expand Up @@ -94,6 +95,15 @@ func MaxFloat64(a float64, b float64) float64 {
return b
}

// AbsDuration returns the absolute value of the provided time.Duration parameter
func AbsDuration(duration time.Duration) time.Duration {
if duration < 0 {
return -duration
}

return duration
}

// GetApproximatePercentageOfValue returns the approximate percentage of value
// the approximation comes from floating point operations, which in case of large numbers
// has some loss in accuracy and can cause the result to be slightly lower or higher than the actual value
Expand All @@ -115,14 +125,14 @@ func GetIntTrimmedPercentageOfValue(value *big.Int, percentage float64) *big.Int
percentageString := strconv.FormatFloat(percentage, 'f', -1, 64)
exp, fra := splitExponentFraction(percentageString)
concatExpFra := exp + fra
concatBigInt, _ := big.NewInt(0).SetString(string(concatExpFra), 10)
concatBigInt, _ := big.NewInt(0).SetString(concatExpFra, 10)
intMultiplier, _ := big.NewInt(0).SetString("1"+strings.Repeat("0", len(fra)), 10)
x.Mul(x, concatBigInt)
x.Div(x, intMultiplier)
return x
}

// IsInRangeExclusive returns true if the provided value is in the given range, including the provided min and max values
// IsInRangeInclusive returns true if the provided value is in the given range, including the provided min and max values
func IsInRangeInclusive(value, min, max *big.Int) bool {
return value.Cmp(min) >= 0 && value.Cmp(max) <= 0
}
Expand Down
10 changes: 10 additions & 0 deletions core/computers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"math/rand"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -386,6 +387,15 @@ func TestSplitExponentFraction(t *testing.T) {
require.Equal(t, expectedFra, fra)
}

func TestAbsDuration(t *testing.T) {
t.Parallel()

assert.Equal(t, time.Second, core.AbsDuration(time.Second))
assert.Equal(t, time.Nanosecond, core.AbsDuration(time.Nanosecond))
assert.Equal(t, time.Second, core.AbsDuration(-time.Second))
assert.Equal(t, time.Duration(0), core.AbsDuration(0))
}

func BenchmarkSplitExponentFraction(b *testing.B) {
nbPrepared := 100000
src := rand.NewSource(1122334455)
Expand Down
1 change: 0 additions & 1 deletion data/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ type DBRemoveCacher interface {
// TrieSyncer synchronizes the trie, asking on the network for the missing nodes
type TrieSyncer interface {
StartSyncing(rootHash []byte, ctx context.Context) error
Trie() Trie
IsInterfaceNil() bool
}

Expand Down
74 changes: 50 additions & 24 deletions data/syncer/baseAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package syncer

import (
"context"
"fmt"
"sync"
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/state"
Expand All @@ -17,8 +19,7 @@ import (
type baseAccountsSyncer struct {
hasher hashing.Hasher
marshalizer marshal.Marshalizer
trieSyncers map[string]data.TrieSyncer
dataTries map[string]data.Trie
dataTries map[string]struct{}
mutex sync.Mutex
trieStorageManager data.StorageManager
requestHandler trie.RequestHandler
Expand Down Expand Up @@ -70,19 +71,26 @@ func checkArgs(args ArgsNewBaseAccountsSyncer) error {
return trie.CheckTrieSyncerVersion(args.TrieSyncerVersion)
}

func (b *baseAccountsSyncer) syncMainTrie(rootHash []byte, trieTopic string, ssh data.SyncStatisticsHandler, ctx context.Context) error {
func (b *baseAccountsSyncer) syncMainTrie(
rootHash []byte,
trieTopic string,
ssh data.SyncStatisticsHandler,
ctx context.Context,
) (data.Trie, error) {
b.rootHash = rootHash

dataTrie, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory)
if err != nil {
return err
return nil, err
}

b.dataTries[string(rootHash)] = dataTrie
b.dataTries[string(rootHash)] = struct{}{}
arg := trie.ArgTrieSyncer{
RequestHandler: b.requestHandler,
InterceptedNodes: b.cacher,
Trie: dataTrie,
DB: b.trieStorageManager.Database(),
Marshalizer: b.marshalizer,
Hasher: b.hasher,
ShardId: b.shardId,
Topic: trieTopic,
TrieSyncStatistics: ssh,
Expand All @@ -91,29 +99,15 @@ func (b *baseAccountsSyncer) syncMainTrie(rootHash []byte, trieTopic string, ssh
}
trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion)
if err != nil {
return err
return nil, err
}
b.trieSyncers[string(rootHash)] = trieSyncer

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
return nil, err
}

return nil
}

// GetSyncedTries returns the synced map of data trie
func (b *baseAccountsSyncer) GetSyncedTries() map[string]data.Trie {
b.mutex.Lock()
defer b.mutex.Unlock()

clonedMap := make(map[string]data.Trie, len(b.dataTries))
for key, value := range b.dataTries {
clonedMap[key] = value
}

return clonedMap
return dataTrie.Recreate(rootHash)
}

func (b *baseAccountsSyncer) printStatistics(ssh data.SyncStatisticsHandler, ctx context.Context) {
Expand All @@ -123,9 +117,41 @@ func (b *baseAccountsSyncer) printStatistics(ssh data.SyncStatisticsHandler, ctx
log.Info("finished trie sync", "name", b.name, "num received", ssh.NumReceived(), "num missing", ssh.NumMissing())
return
case <-time.After(timeBetweenStatisticsPrints):
log.Info("trie sync in progress", "name", b.name, "num received", ssh.NumReceived(), "num missing", ssh.NumMissing())
log.Info("trie sync in progress",
"name", b.name,
"num received", ssh.NumReceived(),
"num missing", ssh.NumMissing(),
"intercepted trie nodes cache", fmt.Sprintf("len: %d, size: %s", b.cacher.Len(), core.ConvertBytes(b.cacher.SizeInBytesContained())))
}
}
}

// Deprecated: GetSyncedTries returns the synced map of data trie. This is likely to case OOM exceptions
//TODO remove this function after fixing the hardfork sync state mechanism
func (b *baseAccountsSyncer) GetSyncedTries() map[string]data.Trie {
b.mutex.Lock()
defer b.mutex.Unlock()

dataTrie, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory)
if err != nil {
log.Warn("error creating a new trie in baseAccountsSyncer.GetSyncedTries", "error", err)
return make(map[string]data.Trie)
}

var recreatedTrie data.Trie
clonedMap := make(map[string]data.Trie, len(b.dataTries))
for key := range b.dataTries {
recreatedTrie, err = dataTrie.Recreate([]byte(key))
if err != nil {
log.Warn("error recreating trie in baseAccountsSyncer.GetSyncedTries",
"roothash", []byte(key), "error", err)
continue
}

clonedMap[key] = recreatedTrie
}

return clonedMap
}

// IsInterfaceNil returns true if underlying object is nil
Expand Down
29 changes: 11 additions & 18 deletions data/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
b := &baseAccountsSyncer{
hasher: args.Hasher,
marshalizer: args.Marshalizer,
trieSyncers: make(map[string]data.TrieSyncer),
dataTries: make(map[string]data.Trie),
dataTries: make(map[string]struct{}),
trieStorageManager: args.TrieStorageManager,
requestHandler: args.RequestHandler,
timeout: args.Timeout,
Expand Down Expand Up @@ -83,14 +82,13 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error {
tss := statistics.NewTrieSyncStatistics()
go u.printStatistics(tss, ctx)

err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, tss, ctx)
mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, tss, ctx)
if err != nil {
return err
}

log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries))

mainTrie := u.dataTries[string(rootHash)]
rootHashes, err := u.findAllAccountRootHashes(mainTrie, ctx)
if err != nil {
return err
Expand Down Expand Up @@ -146,25 +144,24 @@ func (u *userAccountsSyncer) syncAccountDataTries(rootHashes [][]byte, ssh data.

func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisticsHandler, ctx context.Context) error {
u.throttler.StartProcessing()
defer u.throttler.EndProcessing()

u.syncerMutex.Lock()
if _, ok := u.dataTries[string(rootHash)]; ok {
_, ok := u.dataTries[string(rootHash)]
if ok {
u.syncerMutex.Unlock()
u.throttler.EndProcessing()
return nil
}

dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
u.syncerMutex.Unlock()
return err
}
u.dataTries[string(rootHash)] = struct{}{}
u.syncerMutex.Unlock()

u.dataTries[string(rootHash)] = dataTrie
arg := trie.ArgTrieSyncer{
RequestHandler: u.requestHandler,
InterceptedNodes: u.cacher,
Trie: dataTrie,
DB: u.trieStorageManager.Database(),
Marshalizer: u.marshalizer,
Hasher: u.hasher,
ShardId: u.shardId,
Topic: factory.AccountTrieNodesTopic,
TrieSyncStatistics: ssh,
Expand All @@ -173,19 +170,15 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisti
}
trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion)
if err != nil {
u.syncerMutex.Unlock()

return err
}
u.trieSyncers[string(rootHash)] = trieSyncer
u.syncerMutex.Unlock()

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}

u.throttler.EndProcessing()

return nil
}

Expand Down
Loading

0 comments on commit 59cbcaa

Please sign in to comment.