Skip to content

Commit

Permalink
Merge branch 'development' into feat-genesis-spec-props-and-output
Browse files Browse the repository at this point in the history
  • Loading branch information
noot committed Apr 16, 2021
2 parents c4663fd + 02b53b6 commit bcbe1bd
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 102 deletions.
15 changes: 3 additions & 12 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,6 @@ var _ NotificationsMessage = &ConsensusMessage{}

// ConsensusMessage is mostly opaque to us
type ConsensusMessage struct {
// Identifies consensus engine.
ConsensusEngineID types.ConsensusEngineID
// Message payload.
Data []byte
}

Expand All @@ -377,23 +374,17 @@ func (cm *ConsensusMessage) Type() byte {

// String is the string
func (cm *ConsensusMessage) String() string {
return fmt.Sprintf("ConsensusMessage ConsensusEngineID=%d, DATA=%x", cm.ConsensusEngineID, cm.Data)
return fmt.Sprintf("ConsensusMessage Data=%x", cm.Data)
}

// Encode encodes a block response message using SCALE
func (cm *ConsensusMessage) Encode() ([]byte, error) {
encMsg := cm.ConsensusEngineID.ToBytes()
return append(encMsg, cm.Data...), nil
return cm.Data, nil
}

// Decode the message into a ConsensusMessage
func (cm *ConsensusMessage) Decode(in []byte) error {
if len(in) < 5 {
return errors.New("cannot decode ConsensusMessage: encoding is too short")
}

cm.ConsensusEngineID = types.NewConsensusEngineID(in[:4])
cm.Data = in[4:]
cm.Data = in
return nil
}

Expand Down
9 changes: 2 additions & 7 deletions dot/network/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,8 @@ func TestDecodeTransactionMessageTwoExtrinsics(t *testing.T) {
}

func TestDecodeConsensusMessage(t *testing.T) {
ConsensusEngineID := types.BabeEngineID

testID := hex.EncodeToString(types.BabeEngineID.ToBytes())
testData := "03100405"

msg := "0x" + testID + testData // 0x4241424503100405
msg := "0x" + testData

encMsg, err := common.HexToBytes(msg)
require.Nil(t, err)
Expand All @@ -358,8 +354,7 @@ func TestDecodeConsensusMessage(t *testing.T) {
require.Nil(t, err)

expected := &ConsensusMessage{
ConsensusEngineID: ConsensusEngineID,
Data: out,
Data: out,
}

require.Equal(t, expected, m)
Expand Down
16 changes: 13 additions & 3 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/common/variadic"
Expand Down Expand Up @@ -163,7 +164,6 @@ func newSyncQueue(s *Service) *syncQueue {
func (q *syncQueue) start() {
go q.handleResponseQueue()
go q.syncAtHead()
go q.finalizeAtHead()

go q.processBlockRequests()
go q.processBlockResponses()
Expand Down Expand Up @@ -693,7 +693,7 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) {
func (q *syncQueue) handleBlockData(data []*types.BlockData) {
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
panic(err) // TODO: don't panic but try again. seems blockState needs better concurrency handling
panic(err) // this should never happen
}

end := data[len(data)-1].Number().Int64()
Expand Down Expand Up @@ -738,13 +738,23 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {
func (q *syncQueue) handleBlockDataFailure(idx int, err error, data []*types.BlockData) {
logger.Warn("failed to handle block data", "failed on block", q.currStart+int64(idx), "error", err)

if errors.Is(err, chaindb.ErrKeyNotFound) {
if errors.Is(err, chaindb.ErrKeyNotFound) || errors.Is(err, blocktree.ErrParentNotFound) {
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
panic(err)
}

header, err := types.NewHeaderFromOptional(data[idx].Header)
if err != nil {
logger.Debug("failed to get header from BlockData", "idx", idx, "error", err)
return
}

// don't request a chain that's been dropped
if header.Number.Int64() <= finalized.Number.Int64() {
return
}

parentHash := header.ParentHash
req := createBlockRequestWithHash(parentHash, 0)

Expand Down
80 changes: 63 additions & 17 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ const pruneKeyBufferSize = 1000

// BlockState defines fields for manipulating the state of blocks, such as BlockTree, BlockDB and Header
type BlockState struct {
bt *blocktree.BlockTree
baseDB chaindb.Database
db chaindb.Database
lock sync.RWMutex
bt *blocktree.BlockTree
baseDB chaindb.Database
db chaindb.Database
sync.RWMutex
genesisHash common.Hash

// block notifiers
Expand Down Expand Up @@ -268,7 +268,7 @@ func (bs *BlockState) GetHeader(hash common.Hash) (*types.Header, error) {
func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) {
bh, err := bs.db.Get(headerHashKey(num.Uint64()))
if err != nil {
return common.Hash{}, fmt.Errorf("cannot get block %d: %s", num, err)
return common.Hash{}, fmt.Errorf("cannot get block %d: %w", num, err)
}

return common.NewHash(bh), nil
Expand All @@ -278,7 +278,7 @@ func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) {
func (bs *BlockState) GetHeaderByNumber(num *big.Int) (*types.Header, error) {
bh, err := bs.db.Get(headerHashKey(num.Uint64()))
if err != nil {
return nil, fmt.Errorf("cannot get block %d: %s", num, err)
return nil, fmt.Errorf("cannot get block %d: %w", num, err)
}

hash := common.NewHash(bh)
Expand All @@ -304,7 +304,7 @@ func (bs *BlockState) GetBlockByNumber(num *big.Int) (*types.Block, error) {
// First retrieve the block hash in a byte array based on the block number from the database
byteHash, err := bs.db.Get(headerHashKey(num.Uint64()))
if err != nil {
return nil, fmt.Errorf("cannot get block %d: %s", num, err)
return nil, fmt.Errorf("cannot get block %d: %w", num, err)
}

// Then find the block based on the hash
Expand All @@ -322,17 +322,14 @@ func (bs *BlockState) GetBlockHash(blockNumber *big.Int) (*common.Hash, error) {
// First retrieve the block hash in a byte array based on the block number from the database
byteHash, err := bs.db.Get(headerHashKey(blockNumber.Uint64()))
if err != nil {
return nil, fmt.Errorf("cannot get block %d: %s", blockNumber, err)
return nil, fmt.Errorf("cannot get block %d: %w", blockNumber, err)
}
hash := common.NewHash(byteHash)
return &hash, nil
}

// SetHeader will set the header into DB
func (bs *BlockState) SetHeader(header *types.Header) error {
bs.lock.Lock()
defer bs.lock.Unlock()

hash := header.Hash()

// Write the encoded header
Expand Down Expand Up @@ -366,11 +363,7 @@ func (bs *BlockState) GetBlockBody(hash common.Hash) (*types.Body, error) {

// SetBlockBody will add a block body to the db
func (bs *BlockState) SetBlockBody(hash common.Hash, body *types.Body) error {
bs.lock.Lock()
defer bs.lock.Unlock()

err := bs.db.Put(blockBodyKey(hash), body.AsOptional().Value())
return err
return bs.db.Put(blockBodyKey(hash), body.AsOptional().Value())
}

// HasFinalizedBlock returns true if there is a finalized block for a given round and setID, false otherwise
Expand Down Expand Up @@ -427,6 +420,9 @@ func (bs *BlockState) GetFinalizedHash(round, setID uint64) (common.Hash, error)

// SetFinalizedHash sets the latest finalized block header
func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) error {
bs.Lock()
defer bs.Unlock()

go bs.notifyFinalized(hash)
if round > 0 {
err := bs.SetRound(round)
Expand Down Expand Up @@ -496,6 +492,8 @@ func (bs *BlockState) CompareAndSetBlockData(bd *types.BlockData) error {

// AddBlock adds a block to the blocktree and the DB with arrival time as current unix time
func (bs *BlockState) AddBlock(block *types.Block) error {
bs.Lock()
defer bs.Unlock()
return bs.AddBlockWithArrivalTime(block, time.Now())
}

Expand All @@ -506,6 +504,8 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
return err
}

prevHead := bs.bt.DeepestBlockHash()

// add block to blocktree
err = bs.bt.AddBlock(block.Header, uint64(arrivalTime.UnixNano()))
if err != nil {
Expand Down Expand Up @@ -541,12 +541,58 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
return err
}

// check if there was a re-org, if so, re-set the canonical number->hash mapping
err = bs.handleAddedBlock(prevHead, bs.bt.DeepestBlockHash())
if err != nil {
return err
}

go bs.notifyImported(block)
return bs.baseDB.Flush()
}

// handleAddedBlock re-sets the canonical number->hash mapping if there was a chain re-org.
// prev is the previous best block hash before the new block was added to the blocktree.
// curr is the current best blogetck hash.
func (bs *BlockState) handleAddedBlock(prev, curr common.Hash) error {
ancestor, err := bs.HighestCommonAncestor(prev, curr)
if err != nil {
return err
}

// if the highest common ancestor of the previous chain head and current chain head is the previous chain head,
// then the current chain head is the descendant of the previous and thus are on the same chain
if ancestor == prev {
return nil
}

subchain, err := bs.SubChain(ancestor, curr)
if err != nil {
return err
}

batch := bs.db.NewBatch()
for _, hash := range subchain {
// TODO: set number from ancestor.Number + i ?
header, err := bs.GetHeader(hash)
if err != nil {
return fmt.Errorf("failed to get header in subchain: %w", err)
}

err = batch.Put(headerHashKey(header.Number.Uint64()), hash.ToBytes())
if err != nil {
return err
}
}

return batch.Flush()
}

// AddBlockToBlockTree adds the given block to the blocktree. It does not write it to the database.
func (bs *BlockState) AddBlockToBlockTree(header *types.Header) error {
bs.Lock()
defer bs.Unlock()

arrivalTime, err := bs.GetArrivalTime(header.Hash())
if err != nil {
arrivalTime = time.Now()
Expand All @@ -567,7 +613,7 @@ func (bs *BlockState) isBlockOnCurrentChain(header *types.Header) (bool, error)
}

// if the new block is ahead of our best block, then it is on our current chain.
if header.Number.Cmp(bestBlock.Number) == 1 {
if header.Number.Cmp(bestBlock.Number) > 0 {
return true, nil
}

Expand Down
12 changes: 6 additions & 6 deletions dot/state/block_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func (bs *BlockState) HasReceipt(hash common.Hash) (bool, error) {

// SetReceipt sets a Receipt in the database
func (bs *BlockState) SetReceipt(hash common.Hash, data []byte) error {
bs.lock.Lock()
defer bs.lock.Unlock()
bs.Lock()
defer bs.Unlock()

err := bs.db.Put(prefixKey(hash, receiptPrefix), data)
if err != nil {
Expand All @@ -60,8 +60,8 @@ func (bs *BlockState) HasMessageQueue(hash common.Hash) (bool, error) {

// SetMessageQueue sets a MessageQueue in the database
func (bs *BlockState) SetMessageQueue(hash common.Hash, data []byte) error {
bs.lock.Lock()
defer bs.lock.Unlock()
bs.Lock()
defer bs.Unlock()

err := bs.db.Put(prefixKey(hash, messageQueuePrefix), data)
if err != nil {
Expand All @@ -88,8 +88,8 @@ func (bs *BlockState) HasJustification(hash common.Hash) (bool, error) {

// SetJustification sets a Justification in the database
func (bs *BlockState) SetJustification(hash common.Hash, data []byte) error {
bs.lock.Lock()
defer bs.lock.Unlock()
bs.Lock()
defer bs.Unlock()

err := bs.db.Put(prefixKey(hash, justificationPrefix), data)
if err != nil {
Expand Down
Loading

0 comments on commit bcbe1bd

Please sign in to comment.