Skip to content

Commit

Permalink
timer-based logs
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Aug 22, 2020
1 parent 7c16160 commit fc85e3a
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 53 deletions.
19 changes: 11 additions & 8 deletions common/etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,30 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi
}
var canUseAppend bool

putTimer := time.Now()
i := 0
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

var initialized bool
loadNextFunc := func(originalK, k, v []byte) error {
if i == 0 {
if !initialized {
isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1
canUseAppend = haveSortingGuaranties && isEndOfBucket
initialized = true
}

i++
i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) {
select {
default:
case <-logEvery.C:
runtime.ReadMemStats(&m)
log.Info(
"ETL [2/2] Loading",
"into", bucket,
"size", common.StorageSize(batch.BatchSize()),
"keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000),
"progress", progress+50, // loading is the second stage, from 50..100
"progress", progressFromKey(k)+50, // loading is the second stage, from 50..100
"use append", canUseAppend,
"current key", makeCurrentKeyStr(originalK),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
})
}

if canUseAppend && len(v) == 0 {
return nil // nothing to delete after end of bucket
Expand Down
16 changes: 8 additions & 8 deletions common/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,27 @@ func extractBucketIntoFiles(
extractFunc ExtractFunc,
quit <-chan struct{},
) error {

i := 0
putTimer := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var m runtime.MemStats

if err := db.Walk(bucket, startkey, fixedBits, func(k, v []byte) (bool, error) {
if err := common.Stopped(quit); err != nil {
return false, err
}
i++
i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) {

select {
default:
case <-logEvery.C:
runtime.ReadMemStats(&m)
log.Info(
"ETL [1/2] Extracting",
"from", bucket,
"keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000),
"progress", progress, // extracting is the first stage, from 0..50
"progress", progressFromKey(k), // extracting is the first stage, from 0..50
"current key", makeCurrentKeyStr(k),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC),
)
})
}
if endkey != nil && bytes.Compare(k, endkey) > 0 {
return false, nil
}
Expand Down
10 changes: 0 additions & 10 deletions common/etl/progress.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
package etl

import "time"

func progressFromKey(k []byte) int {
if len(k) < 1 {
return 0
}
return int(float64(k[0]>>4) * 3.3)
}

func printProgressIfNeeded(i int, t time.Time, k []byte, printFunc func(int)) (int, time.Time) {
if i%1_000_000 == 0 && time.Since(t) > 30*time.Second {
printFunc(progressFromKey(k))
return i + 1, time.Now()
}
return i + 1, t
}
36 changes: 27 additions & 9 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
logInterval = 30 // seconds
logInterval = 30 * time.Second
)

type HasChangeSetWriter interface {
Expand Down Expand Up @@ -68,7 +68,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
engine := chainContext.Engine()

stageProgress := s.BlockNumber
logTime, logBlock := time.Now(), stageProgress
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
logBlock := stageProgress

for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
if err := common.Stopped(quit); err != nil {
Expand Down Expand Up @@ -123,7 +125,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
}
}

logTime, logBlock = logProgress(logTime, logBlock, blockNum, batch)
select {
default:
case <-logEvery.C:
logBlock = logProgress(logBlock, blockNum, batch)
}
}

if err := s.Update(batch, stageProgress); err != nil {
Expand All @@ -137,11 +143,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
return nil
}

func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPendingMutations) (time.Time, uint64) {
if now%64 != 0 || time.Since(lastLogTime).Seconds() < logInterval {
return lastLogTime, prev // return old values because no logging happened
}

func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 {
speed := float64(now-prev) / float64(logInterval)
var m runtime.MemStats
runtime.ReadMemStats(&m)
Expand All @@ -153,7 +155,7 @@ func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPend
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))

return time.Now(), now
return now
}

func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, writeReceipts bool) error {
Expand Down Expand Up @@ -210,6 +212,9 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
}
}

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

for i := s.BlockNumber; i > u.UnwindPoint; i-- {
if err = deleteChangeSets(batch, i, accountChangeSetBucket, storageChangeSetBucket); err != nil {
return err
Expand All @@ -218,6 +223,19 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
blockHash := rawdb.ReadCanonicalHash(batch, i)
rawdb.DeleteReceipts(batch, blockHash, i)
}

select {
default:
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
"currentBlock", i,
"batch", common.StorageSize(batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
}
}

if err = u.Done(batch); err != nil {
Expand Down
10 changes: 7 additions & 3 deletions ethdb/tx_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (m *TxDb) MultiPut(tuples ...[]byte) (uint64, error) {
}

func MultiPut(tx Tx, tuples ...[]byte) error {
putTimer := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

count := 0
total := float64(len(tuples)) / 3
for bucketStart := 0; bucketStart < len(tuples); {
Expand Down Expand Up @@ -174,10 +176,12 @@ func MultiPut(tx Tx, tuples ...[]byte) error {
}

count++
if count%100_000 == 0 && time.Since(putTimer) > 30*time.Second {

select {
default:
case <-logEvery.C:
progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000)
log.Info("Write to db", "progress", progress)
putTimer = time.Now()
}
}

Expand Down
32 changes: 17 additions & 15 deletions trie/flatdb_sub_trie_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,21 +611,27 @@ func (fstl *FlatDbSubTrieLoader) LoadSubTries() (SubTries, error) {
if err := fstl.iteration(c, ih, true /* first */); err != nil {
return err
}
var counter uint64
t := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

for fstl.rangeIdx < len(fstl.dbPrefixes) {
for !fstl.itemPresent {
if err := fstl.iteration(c, ih, false /* first */); err != nil {
return err
}
counter++
t = fstl.logProgress(t, counter)

}
if fstl.itemPresent {
if err := fstl.receiver.Receive(fstl.itemType, fstl.accountKey, fstl.storageKey, &fstl.accountValue, fstl.storageValue, fstl.hashValue, fstl.streamCutoff); err != nil {
return err
}
fstl.itemPresent = false

select {
default:
case <-logEvery.C:
fstl.logProgress()
}
}
}
return nil
Expand All @@ -635,18 +641,14 @@ func (fstl *FlatDbSubTrieLoader) LoadSubTries() (SubTries, error) {
return fstl.receiver.Result(), nil
}

func (fstl *FlatDbSubTrieLoader) logProgress(lastLogTime time.Time, counter uint64) time.Time {
if counter%100_000 == 0 && time.Since(lastLogTime) > 30*time.Second {
var k string
if fstl.accountKey != nil {
k = makeCurrentKeyStr(fstl.accountKey)
} else {
k = makeCurrentKeyStr(fstl.ihK)
}
log.Info("Calculating Merkle root", "current key", k)
return time.Now()
func (fstl *FlatDbSubTrieLoader) logProgress() {
var k string
if fstl.accountKey != nil {
k = makeCurrentKeyStr(fstl.accountKey)
} else {
k = makeCurrentKeyStr(fstl.ihK)
}
return lastLogTime
log.Info("Calculating Merkle root", "current key", k)
}

func makeCurrentKeyStr(k []byte) string {
Expand Down

0 comments on commit fc85e3a

Please sign in to comment.