Skip to content

Commit

Permalink
add blockhashes stage (#862)
Browse files Browse the repository at this point in the history
* add blockhashes stage

* unwindOrder specified

* fixed typo

* added common.copybytes

* fixed little typo

* better progress

* better progress

* refactoring with etl

* added blockhashes to disabled stages in cmd/integration

* added migraations

* fixed go.mod

* better migrations

* rename unwind

* simplified migrations

* added onloadcommit
  • Loading branch information
Giulio2002 committed Aug 10, 2020
1 parent 81c4878 commit 0578949
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

var cmdResetState = &cobra.Command{
Use: "reset_state",
Short: "Reset StateStages (4,5,6,7,8,9) and buckets",
Short: "Reset StateStages (5,6,7,8,9,10) and buckets",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := rootContext()
err := resetState(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
bc, st, progress := newSync(ch, db, changeSetHook)
defer bc.Stop()

st.DisableStages(stages.Headers, stages.Bodies, stages.Senders, stages.TxPool)
st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders, stages.TxPool)

senderStageProgress := progress(stages.Senders).BlockNumber

Expand Down
8 changes: 7 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeight uint64) (uint6
end = check
break
}
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
// Independent of sync mode, header surely exists
var header *types.Header
if mode == StagedSync {
header = rawdb.ReadHeader(d.stateDB, h, n)
} else {
header = d.lightchain.GetHeaderByHash(h)
}
if header.Number.Uint64() != check {
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
Expand Down
49 changes: 49 additions & 0 deletions eth/stagedsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package stagedsync

import (
"encoding/binary"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/ethdb"
)

func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
// We only want to extract entries composed by Block Number + Header Hash
if len(k) != 40 {
return nil
}
return next(k, common.CopyBytes(k[8:]), common.CopyBytes(k[:8]))
}

func SpawnBlockHashStage(s *StageState, stateDB ethdb.Database, quit <-chan struct{}) error {
headHash := rawdb.ReadHeadHeaderHash(stateDB)
headNumber := rawdb.ReadHeaderNumber(stateDB, headHash)
if s.BlockNumber == *headNumber {
s.Done()
return nil
}

startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, s.BlockNumber)
endKey := dbutils.HeaderKey(*headNumber, headHash) // Make sure we stop at head

if err := etl.Transform(
stateDB,
dbutils.HeaderPrefix,
dbutils.HeaderNumberPrefix,
".",
extractHeaders,
etl.IdentityLoadFunc,
etl.TransformArgs{
ExtractStartKey: startKey,
ExtractEndKey: endKey,
Quit: quit,
},
); err != nil {
return err
}
return s.DoneAndUpdate(stateDB, *headNumber)
}
35 changes: 35 additions & 0 deletions eth/stagedsync/stage_blockhashes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package stagedsync

import (
"context"
"testing"

"github.com/ledgerwatch/turbo-geth/consensus/ethash"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/stretchr/testify/assert"
)

func TestBlockHashStage(t *testing.T) {
origin, headers := generateFakeBlocks(1, 4)

db := ethdb.NewMemDatabase()

// prepare db so it works with our test
rawdb.WriteHeaderNumber(db, origin.Hash(), 0)
rawdb.WriteTd(db, origin.Hash(), 0, origin.Difficulty)
rawdb.WriteHeader(context.TODO(), db, origin)
rawdb.WriteHeadHeaderHash(db, origin.Hash())
rawdb.WriteCanonicalHash(db, origin.Hash(), 0)

_, _, err := InsertHeaderChain(db, headers, params.AllEthashProtocolChanges, ethash.NewFaker(), 0)
assert.NoError(t, err)
err = SpawnBlockHashStage(&StageState{}, db, nil)
assert.NoError(t, err)
for _, h := range headers {
n := rawdb.ReadHeaderNumber(db, h.Hash())
assert.Equal(t, *n, h.Number.Uint64())
}

}
18 changes: 15 additions & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stagedsync

import (
"context"
"errors"
"fmt"
"math/big"
Expand All @@ -11,13 +10,15 @@ import (
"time"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/consensus"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/rlp"
)

func SpawnHeaderDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, headersFetchers []func() error) error {
Expand Down Expand Up @@ -113,7 +114,7 @@ Error: %v
return false, 0, nil
}

if rawdb.ReadHeaderNumber(db, headers[0].ParentHash) == nil {
if rawdb.ReadHeader(db, headers[0].ParentHash, headers[0].Number.Uint64()-1) == nil {
return false, 0, errors.New("unknown parent")
}
parentTd := rawdb.ReadTd(db, headers[0].ParentHash, headers[0].Number.Uint64()-1)
Expand Down Expand Up @@ -196,8 +197,14 @@ Error: %v
if newCanonical {
rawdb.WriteCanonicalHash(batch, header.Hash(), header.Number.Uint64())
}
data, err := rlp.EncodeToBytes(header)
if err != nil {
log.Crit("Failed to RLP encode header", "err", err)
}
rawdb.WriteTd(batch, header.Hash(), header.Number.Uint64(), td)
rawdb.WriteHeader(context.Background(), batch, header)
if err := batch.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(number, header.Hash()), data); err != nil {
log.Crit("Failed to store header", "err", err)
}
}
if deepFork {
forkHeader := rawdb.ReadHeader(batch, headers[0].ParentHash, headers[0].Number.Uint64()-1)
Expand All @@ -219,6 +226,11 @@ Error: %v
}
}
if newCanonical {
encoded := dbutils.EncodeBlockNumber(lastHeader.Number.Uint64())

if err := batch.Put(dbutils.HeaderNumberPrefix, lastHeader.Hash().Bytes(), encoded); err != nil {
log.Crit("Failed to store hash to number mapping", "err", err)
}
rawdb.WriteHeadHeaderHash(batch, lastHeader.Hash())
}
if _, err := batch.Commit(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion eth/stagedsync/stagedsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ func PrepareStagedSync(
return u.Done(stateDB)
},
},
{
ID: stages.BlockHashes,
Description: "Write block hashes",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnBlockHashStage(s, stateDB, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return u.Done(stateDB)
},
},
{
ID: stages.Bodies,
Description: "Download block bodies",
Expand Down Expand Up @@ -157,7 +167,7 @@ func PrepareStagedSync(
state := NewState(stages)
state.unwindOrder = []*Stage{
// Unwinding of tx pool (reinjecting transactions into the pool needs to happen after unwinding execution)
stages[0], stages[1], stages[2], stages[9], stages[3], stages[4], stages[5], stages[6], stages[7], stages[8],
stages[0], stages[1], stages[2], stages[3], stages[10], stages[4], stages[5], stages[6], stages[7], stages[8], stages[9],
}
if err := state.LoadUnwindInfo(stateDB); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions eth/stagedsync/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type SyncStage byte

const (
Headers SyncStage = iota // Headers are downloaded, their Proof-Of-Work validity and chaining is verified
BlockHashes // Headers Number are written, fills blockHash => number bucket
Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified
Senders // "From" recovered from signatures, bodies re-written
Execution // Executing each block w/o buildinf a trie
Expand All @@ -45,6 +46,7 @@ const (

var DBKeys = map[SyncStage][]byte{
Headers: []byte("Headers"),
BlockHashes: []byte("BlockHashes"),
Bodies: []byte("Bodies"),
Senders: []byte("Senders"),
Execution: []byte("Execution"),
Expand Down
3 changes: 3 additions & 0 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package migrations
import (
"bytes"
"errors"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
Expand Down Expand Up @@ -53,6 +54,8 @@ import (
var migrations = []Migration{
stagesToUseNamedKeys,
unwindStagesToUseNamedKeys,
stagedsyncToUseStageBlockhashes,
unwindStagedsyncToUseStageBlockhashes,
}

type Migration struct {
Expand Down
51 changes: 51 additions & 0 deletions migrations/stagedsync_to_use_stage_blockhashes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package migrations

import (
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
)

var stagedsyncToUseStageBlockhashes = Migration{
Name: "stagedsync_to_use_stage_blockhashes",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {

var progress uint64
var err error
if progress, _, err = stages.GetStageProgress(db, stages.Headers); err != nil {
return err
}

if err = stages.SaveStageProgress(db, stages.BlockHashes, progress, nil); err != nil {
return err
}

if err = OnLoadCommit(db, nil, true); err != nil {
return err
}

return nil
},
}

var unwindStagedsyncToUseStageBlockhashes = Migration{
Name: "unwind_stagedsync_to_use_stage_blockhashes",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {

var progress uint64
var err error
if progress, _, err = stages.GetStageUnwind(db, stages.Headers); err != nil {
return err
}

if err = stages.SaveStageUnwind(db, stages.BlockHashes, progress, nil); err != nil {
return err
}

if err = OnLoadCommit(db, nil, true); err != nil {
return err
}

return nil
},
}
46 changes: 46 additions & 0 deletions migrations/stagedsync_to_use_stage_blockhashes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package migrations

import (
"testing"

"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStagedsyncToUseStageBlockhashes(t *testing.T) {

require, db := require.New(t), ethdb.NewMemDatabase()
var expected uint64 = 12

err := stages.SaveStageProgress(db, stages.Headers, expected, nil)
require.NoError(err)

migrator := NewMigrator()
migrator.Migrations = []Migration{stagedsyncToUseStageBlockhashes}
err = migrator.Apply(db, "")
require.NoError(err)

actual, _, err := stages.GetStageProgress(db, stages.BlockHashes)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}

func TestUnwindStagedsyncToUseStageBlockhashes(t *testing.T) {

require, db := require.New(t), ethdb.NewMemDatabase()
var expected uint64 = 12

err := stages.SaveStageUnwind(db, stages.Headers, expected, nil)
require.NoError(err)

migrator := NewMigrator()
migrator.Migrations = []Migration{unwindStagedsyncToUseStageBlockhashes}
err = migrator.Apply(db, "")
require.NoError(err)

actual, _, err := stages.GetStageUnwind(db, stages.BlockHashes)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}

0 comments on commit 0578949

Please sign in to comment.