Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add blockhashes stage #862

Merged
merged 20 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

var cmdResetState = &cobra.Command{
Use: "reset_state",
Short: "Reset StateStages (4,5,6,7,8,9) and buckets",
Short: "Reset StateStages (5,6,7,8,9,10) and buckets",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := rootContext()
err := resetState(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
bc, st, progress := newSync(ch, db, changeSetHook)
defer bc.Stop()

st.DisableStages(stages.Headers, stages.Bodies, stages.Senders, stages.TxPool)
st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders, stages.TxPool)

senderStageProgress := progress(stages.Senders).BlockNumber

Expand Down
8 changes: 7 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeight uint64) (uint6
end = check
break
}
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
// Independent of sync mode, header surely exists
var header *types.Header
if mode == StagedSync {
header = rawdb.ReadHeader(d.stateDB, h, n)
} else {
header = d.lightchain.GetHeaderByHash(h)
}
if header.Number.Uint64() != check {
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
Expand Down
49 changes: 49 additions & 0 deletions eth/stagedsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package stagedsync

import (
"encoding/binary"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/ethdb"
)

func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
// We only want to extract entries composed by Block Number + Header Hash
if len(k) != 40 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment here why you are comparing with 40

return nil
}
return next(k, common.CopyBytes(k[8:]), common.CopyBytes(k[:8]))
}

func SpawnBlockHashStage(s *StageState, stateDB ethdb.Database, quit <-chan struct{}) error {
headHash := rawdb.ReadHeadHeaderHash(stateDB)
headNumber := rawdb.ReadHeaderNumber(stateDB, headHash)
if s.BlockNumber == *headNumber {
s.Done()
return nil
}

startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, s.BlockNumber)
endKey := dbutils.HeaderKey(*headNumber, headHash) // Make sure we stop at head

if err := etl.Transform(
stateDB,
dbutils.HeaderPrefix,
dbutils.HeaderNumberPrefix,
".",
extractHeaders,
etl.IdentityLoadFunc,
etl.TransformArgs{
ExtractStartKey: startKey,
ExtractEndKey: endKey,
Quit: quit,
},
); err != nil {
return err
}
return s.DoneAndUpdate(stateDB, *headNumber)
}
35 changes: 35 additions & 0 deletions eth/stagedsync/stage_blockhashes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package stagedsync

import (
"context"
"testing"

"github.com/ledgerwatch/turbo-geth/consensus/ethash"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/stretchr/testify/assert"
)

func TestBlockHashStage(t *testing.T) {
origin, headers := generateFakeBlocks(1, 4)

db := ethdb.NewMemDatabase()

// prepare db so it works with our test
rawdb.WriteHeaderNumber(db, origin.Hash(), 0)
rawdb.WriteTd(db, origin.Hash(), 0, origin.Difficulty)
rawdb.WriteHeader(context.TODO(), db, origin)
rawdb.WriteHeadHeaderHash(db, origin.Hash())
rawdb.WriteCanonicalHash(db, origin.Hash(), 0)

_, _, err := InsertHeaderChain(db, headers, params.AllEthashProtocolChanges, ethash.NewFaker(), 0)
assert.NoError(t, err)
err = SpawnBlockHashStage(&StageState{}, db, nil)
assert.NoError(t, err)
for _, h := range headers {
n := rawdb.ReadHeaderNumber(db, h.Hash())
assert.Equal(t, *n, h.Number.Uint64())
}

}
18 changes: 15 additions & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stagedsync

import (
"context"
"errors"
"fmt"
"math/big"
Expand All @@ -11,13 +10,15 @@ import (
"time"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/consensus"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/rlp"
)

func SpawnHeaderDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, headersFetchers []func() error) error {
Expand Down Expand Up @@ -113,7 +114,7 @@ Error: %v
return false, 0, nil
}

if rawdb.ReadHeaderNumber(db, headers[0].ParentHash) == nil {
if rawdb.ReadHeader(db, headers[0].ParentHash, headers[0].Number.Uint64()-1) == nil {
return false, 0, errors.New("unknown parent")
}
parentTd := rawdb.ReadTd(db, headers[0].ParentHash, headers[0].Number.Uint64()-1)
Expand Down Expand Up @@ -196,8 +197,14 @@ Error: %v
if newCanonical {
rawdb.WriteCanonicalHash(batch, header.Hash(), header.Number.Uint64())
}
data, err := rlp.EncodeToBytes(header)
if err != nil {
log.Crit("Failed to RLP encode header", "err", err)
}
rawdb.WriteTd(batch, header.Hash(), header.Number.Uint64(), td)
rawdb.WriteHeader(context.Background(), batch, header)
if err := batch.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(number, header.Hash()), data); err != nil {
log.Crit("Failed to store header", "err", err)
}
}
if deepFork {
forkHeader := rawdb.ReadHeader(batch, headers[0].ParentHash, headers[0].Number.Uint64()-1)
Expand All @@ -219,6 +226,11 @@ Error: %v
}
}
if newCanonical {
encoded := dbutils.EncodeBlockNumber(lastHeader.Number.Uint64())

if err := batch.Put(dbutils.HeaderNumberPrefix, lastHeader.Hash().Bytes(), encoded); err != nil {
log.Crit("Failed to store hash to number mapping", "err", err)
}
rawdb.WriteHeadHeaderHash(batch, lastHeader.Hash())
}
if _, err := batch.Commit(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion eth/stagedsync/stagedsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ func PrepareStagedSync(
return u.Done(stateDB)
},
},
{
ID: stages.BlockHashes,
Description: "Write block hashes",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnBlockHashStage(s, stateDB, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return u.Done(stateDB)
},
},
{
ID: stages.Bodies,
Description: "Download block bodies",
Expand Down Expand Up @@ -157,7 +167,7 @@ func PrepareStagedSync(
state := NewState(stages)
state.unwindOrder = []*Stage{
// Unwinding of tx pool (reinjecting transactions into the pool needs to happen after unwinding execution)
stages[0], stages[1], stages[2], stages[9], stages[3], stages[4], stages[5], stages[6], stages[7], stages[8],
stages[0], stages[1], stages[2], stages[3], stages[10], stages[4], stages[5], stages[6], stages[7], stages[8], stages[9],
}
if err := state.LoadUnwindInfo(stateDB); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions eth/stagedsync/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type SyncStage byte

const (
Headers SyncStage = iota // Headers are downloaded, their Proof-Of-Work validity and chaining is verified
BlockHashes // Headers Number are written, fills blockHash => number bucket
Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified
Senders // "From" recovered from signatures, bodies re-written
Execution // Executing each block w/o buildinf a trie
Expand All @@ -45,6 +46,7 @@ const (

var DBKeys = map[SyncStage][]byte{
Headers: []byte("Headers"),
BlockHashes: []byte("BlockHashes"),
Bodies: []byte("Bodies"),
Senders: []byte("Senders"),
Execution: []byte("Execution"),
Expand Down
3 changes: 3 additions & 0 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package migrations
import (
"bytes"
"errors"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
Expand Down Expand Up @@ -53,6 +54,8 @@ import (
var migrations = []Migration{
stagesToUseNamedKeys,
unwindStagesToUseNamedKeys,
stagedsyncToUseStageBlockhashes,
unwindStagedsyncToUseStageBlockhashes,
}

type Migration struct {
Expand Down
51 changes: 51 additions & 0 deletions migrations/stagedsync_to_use_stage_blockhashes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package migrations

import (
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
)

var stagedsyncToUseStageBlockhashes = Migration{
Name: "stagedsync_to_use_stage_blockhashes",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {

var progress uint64
var err error
if progress, _, err = stages.GetStageProgress(db, stages.Headers); err != nil {
return err
}

if err = stages.SaveStageProgress(db, stages.BlockHashes, progress, nil); err != nil {
return err
}

if err = OnLoadCommit(db, nil, true); err != nil {
return err
}

return nil
},
}

var unwindStagedsyncToUseStageBlockhashes = Migration{
Name: "unwind_stagedsync_to_use_stage_blockhashes",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {

var progress uint64
var err error
if progress, _, err = stages.GetStageUnwind(db, stages.Headers); err != nil {
return err
}

if err = stages.SaveStageUnwind(db, stages.BlockHashes, progress, nil); err != nil {
return err
}

if err = OnLoadCommit(db, nil, true); err != nil {
return err
}

return nil
},
}
46 changes: 46 additions & 0 deletions migrations/stagedsync_to_use_stage_blockhashes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package migrations

import (
"testing"

"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStagedsyncToUseStageBlockhashes(t *testing.T) {

require, db := require.New(t), ethdb.NewMemDatabase()
var expected uint64 = 12

err := stages.SaveStageProgress(db, stages.Headers, expected, nil)
require.NoError(err)

migrator := NewMigrator()
migrator.Migrations = []Migration{stagedsyncToUseStageBlockhashes}
err = migrator.Apply(db, "")
require.NoError(err)

actual, _, err := stages.GetStageProgress(db, stages.BlockHashes)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}

func TestUnwindStagedsyncToUseStageBlockhashes(t *testing.T) {

require, db := require.New(t), ethdb.NewMemDatabase()
var expected uint64 = 12

err := stages.SaveStageUnwind(db, stages.Headers, expected, nil)
require.NoError(err)

migrator := NewMigrator()
migrator.Migrations = []Migration{unwindStagedsyncToUseStageBlockhashes}
err = migrator.Apply(db, "")
require.NoError(err)

actual, _, err := stages.GetStageUnwind(db, stages.BlockHashes)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}