Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticker-based logs #954

Merged
merged 10 commits into from
Aug 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package commands

import "github.com/spf13/cobra"
import (
"github.com/ledgerwatch/turbo-geth/node"
"github.com/spf13/cobra"
)

var (
chaindata string
Expand Down Expand Up @@ -56,5 +59,5 @@ func withBucket(cmd *cobra.Command) {
}

func withDatadir(cmd *cobra.Command) {
cmd.Flags().StringVar(&datadir, "datadir", "", "data directory for temporary ELT files")
cmd.Flags().StringVar(&datadir, "datadir", node.DefaultDataDir(), "data directory for temporary ELT files")
}
32 changes: 18 additions & 14 deletions common/etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,31 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi
}
var canUseAppend bool

putTimer := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

i := 0
loadNextFunc := func(originalK, k, v []byte) error {
if i == 0 {
isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1
canUseAppend = haveSortingGuaranties && isEndOfBucket
}

i++
i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) {

select {
default:
case <-logEvery.C:
logArs := []interface{}{"into", bucket}
if args.LogDetailsLoad != nil {
logArs = append(logArs, args.LogDetailsLoad(k, v)...)
} else {
logArs = append(logArs, "current key", makeCurrentKeyStr(k))
}

runtime.ReadMemStats(&m)
log.Info(
"ETL [2/2] Loading",
"into", bucket,
"size", common.StorageSize(batch.BatchSize()),
"keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000),
"progress", progress+50, // loading is the second stage, from 50..100
"use append", canUseAppend,
"current key", makeCurrentKeyStr(originalK),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
})
logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
log.Info("ETL [2/2] Loading", logArs...)
}

if canUseAppend && len(v) == 0 {
return nil // nothing to delete after end of bucket
Expand Down Expand Up @@ -191,7 +195,7 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi
"Committed batch",
"bucket", bucket,
"commit", commitTook,
"size", common.StorageSize(batch.BatchSize()),
"records", i,
"current key", makeCurrentKeyStr(nil),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))

Expand Down
1 change: 0 additions & 1 deletion common/etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func FlushToDisk(encoder Encoder, currentKey []byte, b Buffer, datadir string) (
runtime.ReadMemStats(&m)
log.Info(
"Flushed buffer file",
"current key", makeCurrentKeyStr(currentKey),
"name", bufferFile.Name(),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
}()
Expand Down
37 changes: 22 additions & 15 deletions common/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NextKey(key []byte) ([]byte, error) {
// * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey)
// * `isDone`: true, if everything is processed
type LoadCommitHandler func(db ethdb.Putter, key []byte, isDone bool) error
type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{})

type TransformArgs struct {
ExtractStartKey []byte
Expand All @@ -67,6 +68,9 @@ type TransformArgs struct {
Quit <-chan struct{}
OnLoadCommit LoadCommitHandler
loadBatchSize int // used in testing

LogDetailsExtract AdditionalLogArguments
LogDetailsLoad AdditionalLogArguments
}

func Transform(
Expand All @@ -86,7 +90,7 @@ func Transform(
collector := NewCollector(datadir, buffer)

t := time.Now()
if err := extractBucketIntoFiles(db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit); err != nil {
if err := extractBucketIntoFiles(db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit, args.LogDetailsExtract); err != nil {
disposeProviders(collector.dataProviders)
return err
}
Expand All @@ -105,28 +109,31 @@ func extractBucketIntoFiles(
collector *Collector,
extractFunc ExtractFunc,
quit <-chan struct{},
additionalLogArguments AdditionalLogArguments,
) error {

i := 0
putTimer := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var m runtime.MemStats

if err := db.Walk(bucket, startkey, fixedBits, func(k, v []byte) (bool, error) {
if err := common.Stopped(quit); err != nil {
return false, err
}
i++
i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) {

select {
default:
case <-logEvery.C:
logArs := []interface{}{"from", bucket}
if additionalLogArguments != nil {
logArs = append(logArs, additionalLogArguments(k, v)...)
} else {
logArs = append(logArs, "current key", makeCurrentKeyStr(k))
}

runtime.ReadMemStats(&m)
log.Info(
"ETL [1/2] Extracting",
"from", bucket,
"keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000),
"progress", progress, // extracting is the first stage, from 0..50
"current key", makeCurrentKeyStr(k),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC),
)
})
logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
log.Info("ETL [1/2] Extracting", logArs...)
}
if endkey != nil && bytes.Compare(k, endkey) > 0 {
return false, nil
}
Expand Down
4 changes: 2 additions & 2 deletions common/etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestFileDataProviders(t *testing.T) {

collector := NewCollector("", NewSortableBuffer(1))

err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil)
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil)
assert.NoError(t, err)

assert.Equal(t, 10, len(collector.dataProviders))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestRAMDataProviders(t *testing.T) {
generateTestData(t, db, sourceBucket, 10)

collector := NewCollector("", NewSortableBuffer(BufferOptimalSize))
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil)
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil)
assert.NoError(t, err)

assert.Equal(t, 1, len(collector.dataProviders))
Expand Down
12 changes: 1 addition & 11 deletions common/etl/progress.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
package etl

import "time"

func progressFromKey(k []byte) int {
func ProgressFromKey(k []byte) int {
if len(k) < 1 {
return 0
}
return int(float64(k[0]>>4) * 3.3)
}

func printProgressIfNeeded(i int, t time.Time, k []byte, printFunc func(int)) (int, time.Time) {
if i%1_000_000 == 0 && time.Since(t) > 30*time.Second {
printFunc(progressFromKey(k))
return i + 1, time.Now()
}
return i + 1, t
}
6 changes: 6 additions & 0 deletions core/generate_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (ig *IndexGenerator) GenerateIndex(startBlock, endBlock uint64, changeSetBu
BufferType: etl.SortableAppendBuffer,
BufferSize: ig.ChangeSetBufSize,
Quit: ig.quitCh,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
)
if err != nil {
Expand Down
38 changes: 28 additions & 10 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
logInterval = 30 // seconds
logInterval = 30 * time.Second
)

type HasChangeSetWriter interface {
Expand Down Expand Up @@ -68,7 +68,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
engine := chainContext.Engine()

stageProgress := s.BlockNumber
logTime, logBlock := time.Now(), stageProgress
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
logBlock := stageProgress

for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
if err := common.Stopped(quit); err != nil {
Expand Down Expand Up @@ -123,7 +125,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
}
}

logTime, logBlock = logProgress(logTime, logBlock, blockNum, batch)
select {
default:
case <-logEvery.C:
logBlock = logProgress(logBlock, blockNum, batch)
}
}

if err := s.Update(batch, stageProgress); err != nil {
Expand All @@ -137,12 +143,8 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
return nil
}

func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPendingMutations) (time.Time, uint64) {
if now%64 != 0 || time.Since(lastLogTime).Seconds() < logInterval {
return lastLogTime, prev // return old values because no logging happened
}

speed := float64(now-prev) / float64(logInterval)
func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 {
speed := float64(now-prev) / float64(logInterval/time.Second)
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
Expand All @@ -153,7 +155,7 @@ func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPend
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))

return time.Now(), now
return now
}

func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, writeReceipts bool) error {
Expand Down Expand Up @@ -210,6 +212,9 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
}
}

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

for i := s.BlockNumber; i > u.UnwindPoint; i-- {
if err = deleteChangeSets(batch, i, accountChangeSetBucket, storageChangeSetBucket); err != nil {
return err
Expand All @@ -218,6 +223,19 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
blockHash := rawdb.ReadCanonicalHash(batch, i)
rawdb.DeleteReceipts(batch, blockHash, i)
}

select {
default:
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
"currentBlock", i,
"batch", common.StorageSize(batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
}
}

if err = u.Done(batch); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions eth/stagedsync/stage_hashstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ func (p *Promoter) Unwind(s *StageState, u *UnwindState, storage bool, codes boo
BufferType: etl.SortableOldestAppearedBuffer,
ExtractStartKey: startkey,
Quit: p.quitCh,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
)
}
Expand Down
28 changes: 26 additions & 2 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,19 @@ func incrementIntermediateHashes(s *StageState, db ethdb.Database, to uint64, da
"gen IH", generationIHTook,
)

if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collector.Load(db,
dbutils.IntermediateTrieHashBucket,
etl.IdentityLoadFunc,
etl.TransformArgs{
Quit: quit,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -301,7 +313,19 @@ func unwindIntermediateHashesStageImpl(u *UnwindState, s *StageState, db ethdb.D
"root hash", subTries.Hashes[0].Hex(),
"gen IH", generationIHTook,
)
if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collector.Load(db,
dbutils.IntermediateTrieHashBucket,
etl.IdentityLoadFunc,
etl.TransformArgs{
Quit: quit,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
); err != nil {
return err
}
return nil
Expand Down
21 changes: 20 additions & 1 deletion eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
}()

collector := etl.NewCollector(datadir, etl.NewSortableBuffer(etl.BufferOptimalSize))
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
for j := range out {
if j.err != nil {
return j.err
Expand All @@ -175,6 +177,11 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
return err
}
k := make([]byte, 4)
select {
default:
case <-logEvery.C:
log.Info("Senders recovery", "block", j.index)
}
binary.BigEndian.PutUint32(k, uint32(j.index))
if err := collector.Collect(k, j.senders); err != nil {
return err
Expand All @@ -184,7 +191,19 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
index := int(binary.BigEndian.Uint32(k))
return next(k, dbutils.BlockBodyKey(s.BlockNumber+uint64(index)+1, canonical[index]), value)
}
if err := collector.Load(db, dbutils.Senders, loadFunc, etl.TransformArgs{Quit: quitCh}); err != nil {
if err := collector.Load(db,
dbutils.Senders,
loadFunc,
etl.TransformArgs{
Quit: quitCh,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
},
); err != nil {
return err
}
return s.DoneAndUpdate(db, to)
Expand Down
3 changes: 3 additions & 0 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh <-chan
Quit: quitCh,
ExtractStartKey: startKey,
ExtractEndKey: endKey,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
})
}

Expand Down
Loading