Skip to content
This repository has been archived by the owner on May 13, 2022. It is now read-only.

Commit

Permalink
Assert continuity when reassmbling blocks, transactions, and events from
Browse files Browse the repository at this point in the history
StreamEvents.

When serialising BlockExecutions into StreamEvents it is possible that
we do not detect dropped SteamEvents when deserialising (consuming) the SteamEvents later.

This PR makes the StreamEvent consumers, namely: ConsumeBlockExecutions, BlockAccumulator, and TxStack throw errors if there is an incorrect number of events streamed within each container (BlockExecution or TxExecution). To do this a checksum NumTxs and NumEvents is added to BeginBlock and BeginTx respectively.

This allows Vent to crash if it sees an invalid stream (for example due
to messages being dropped due to load on Burrow's event emitter) or a
transport issue. Vent can then restart from its previous good offset.

Annoyance: I had to add in Predecessor to the BlockExecution and
BeginBlock messages in order to track the previous _expected_ block
since we do not store empty blocks in state to avoid changing the AppHash (which itself is just a
workaround for an outstanding issue in tendermint
tendermint/tendermint#1909)

This PR was in response to suspected event drops in Vent in production.

Signed-off-by: Silas Davis <silas@monax.io>
  • Loading branch information
silasdavis committed Apr 19, 2020
1 parent 69b2160 commit 1395263
Show file tree
Hide file tree
Showing 34 changed files with 834 additions and 314 deletions.
11 changes: 9 additions & 2 deletions core/kernel.go
Expand Up @@ -142,8 +142,15 @@ func (kern *Kernel) LoadState(genesisDoc *genesis.GenesisDoc) (err error) {
kern.Logger.InfoMsg("State loading successful")

params := execution.ParamsFromGenesis(genesisDoc)
kern.checker = execution.NewBatchChecker(kern.State, params, kern.Blockchain, kern.Logger)
kern.committer = execution.NewBatchCommitter(kern.State, params, kern.Blockchain, kern.Emitter, kern.Logger, kern.exeOptions...)
kern.checker, err = execution.NewBatchChecker(kern.State, params, kern.Blockchain, kern.Logger)
if err != nil {
return fmt.Errorf("could not create BatchChecker: %w", err)
}
kern.committer, err = execution.NewBatchCommitter(kern.State, params, kern.Blockchain, kern.Emitter, kern.Logger,
kern.exeOptions...)
if err != nil {
return fmt.Errorf("could not create BatchCommitter: %w", err)
}
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion dump/dump.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hyperledger/burrow/execution/names"
"github.com/hyperledger/burrow/execution/state"
"github.com/hyperledger/burrow/logging"
"github.com/hyperledger/burrow/storage"
)

const (
Expand Down Expand Up @@ -164,7 +165,7 @@ func (ds *Dumper) Transmit(sink Sink, startHeight, endHeight uint64, options Opt
var origin *exec.Origin

// Only return events from specified start height - allows for resume
err = ds.state.IterateStreamEvents(&startHeight, &endHeight,
err = ds.state.IterateStreamEvents(&startHeight, &endHeight, storage.AscendingSort,
func(ev *exec.StreamEvent) error {
switch {
case ev.BeginBlock != nil:
Expand Down
3 changes: 2 additions & 1 deletion dump/load_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/hyperledger/burrow/bcm"
"github.com/hyperledger/burrow/storage"

"github.com/hyperledger/burrow/execution/exec"
"github.com/hyperledger/burrow/execution/state"
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestLoadAndDump(t *testing.T) {
require.NoError(t, err)
}

err = st.IterateStreamEvents(nil, nil, func(se *exec.StreamEvent) error {
err = st.IterateStreamEvents(nil, nil, storage.AscendingSort, func(se *exec.StreamEvent) error {
if se.BeginTx != nil {
require.Equal(t, se.BeginTx.TxHeader.Origin.Height, uint64(5))
require.Equal(t, se.BeginTx.TxHeader.Origin.Index, uint64(2))
Expand Down
6 changes: 4 additions & 2 deletions execution/exec/block_execution.go
Expand Up @@ -18,8 +18,10 @@ func (be *BlockExecution) StreamEvents() []*StreamEvent {
var ses []*StreamEvent
ses = append(ses, &StreamEvent{
BeginBlock: &BeginBlock{
Height: be.Height,
Header: be.Header,
Height: be.Height,
PredecessorHeight: be.PredecessorHeight,
NumTxs: uint64(len(be.TxExecutions)),
Header: be.Header,
},
})
for _, txe := range be.TxExecutions {
Expand Down
2 changes: 0 additions & 2 deletions execution/exec/event_test.go
Expand Up @@ -56,8 +56,6 @@ func TestEventTagQueries(t *testing.T) {
require.NoError(t, err)
assert.True(t, qry.Matches(ev))
require.NoError(t, qry.MatchError())

t.Logf("Query: %v", qry)
}

func BenchmarkMatching(b *testing.B) {
Expand Down
315 changes: 231 additions & 84 deletions execution/exec/exec.pb.go

Large diffs are not rendered by default.

145 changes: 115 additions & 30 deletions execution/exec/stream_event.go
Expand Up @@ -56,11 +56,55 @@ func (ev *StreamEvent) Get(key string) (interface{}, bool) {
ev.EndBlock).Get(key)
}

func ConsumeBlockExecution(stream EventStream) (block *BlockExecution, err error) {
type ContinuityOpt byte

func (so ContinuityOpt) Allows(opt ContinuityOpt) bool {
return so&opt > 0
}

// ContinuityOpt encodes the following possible relaxations in continuity
const (
// Default - continuous blocks, txs, and events are always permitted
Continuous ContinuityOpt = iota
// Allows consumption of blocks where the next block has a different predecessor block to that which was last consumed
NonConsecutiveBlocks
// Allows consumption of transactions with non-monotonic index (within block) or a different number of transactions
// to that which is expected
NonConsecutiveTxs
// Allows consumption of events with non-monotonic index (within transaction) or a different number of events
// to that which is expected
NonConsecutiveEvents
)

type BlockAccumulator struct {
block *BlockExecution
// Number of txs expected in current block
numTxs uint64
// Height of last block consumed that contained transactions
previousNonEmptyBlockHeight uint64
// Accumulator for Txs
stack TxStack
// Continuity requirements for the stream
continuity ContinuityOpt
}

func NewBlockAccumulator(continuityOptions ...ContinuityOpt) *BlockAccumulator {
continuity := Continuous
for _, opt := range continuityOptions {
continuity |= opt
}
return &BlockAccumulator{
continuity: continuity,
stack: TxStack{
continuity: continuity,
},
}
}

func (ba *BlockAccumulator) ConsumeBlockExecution(stream EventStream) (block *BlockExecution, err error) {
var ev *StreamEvent
accum := new(BlockAccumulator)
for ev, err = stream.Recv(); err == nil; ev, err = stream.Recv() {
block, err = accum.Consume(ev)
block, err = ba.Consume(ev)
if err != nil {
return nil, err
}
Expand All @@ -72,65 +116,106 @@ func ConsumeBlockExecution(stream EventStream) (block *BlockExecution, err error
return nil, err
}

type BlockAccumulator struct {
block *BlockExecution
stack TxStack
}

// Consume will add the StreamEvent passed to the block accumulator and if the block complete is complete return the
// BlockExecution, otherwise will return nil
func (ba *BlockAccumulator) Consume(ev *StreamEvent) (*BlockExecution, error) {
switch {
case ev.BeginBlock != nil:
if !ba.continuity.Allows(NonConsecutiveBlocks) &&
(ba.previousNonEmptyBlockHeight > 0 && ba.previousNonEmptyBlockHeight != ev.BeginBlock.PredecessorHeight) {
return nil, fmt.Errorf("BlockAccumulator.Consume: received non-consecutive block at height %d: "+
"predecessor height %d, but previous (non-empty) block height was %d",
ev.BeginBlock.Height, ev.BeginBlock.PredecessorHeight, ba.previousNonEmptyBlockHeight)
}
// If we are consuming blocks over the event stream (rather than from state) we may see empty blocks
// by definition empty blocks will not be a predecessor
if ev.BeginBlock.NumTxs > 0 {
ba.previousNonEmptyBlockHeight = ev.BeginBlock.Height
}
ba.numTxs = ev.BeginBlock.NumTxs
ba.block = &BlockExecution{
Height: ev.BeginBlock.Height,
Header: ev.BeginBlock.Header,
Height: ev.BeginBlock.Height,
PredecessorHeight: ev.BeginBlock.PredecessorHeight,
Header: ev.BeginBlock.Header,
TxExecutions: make([]*TxExecution, 0, ba.numTxs),
}
case ev.BeginTx != nil, ev.Envelope != nil, ev.Event != nil, ev.EndTx != nil:
txe, err := ba.stack.Consume(ev)
if err != nil {
return nil, err
}
if txe != nil {
if !ba.continuity.Allows(NonConsecutiveTxs) && uint64(len(ba.block.TxExecutions)) != txe.Index {
return nil, fmt.Errorf("BlockAccumulator.Consume recieved transaction with index %d at "+
"position %d in the event stream", txe.Index, len(ba.block.TxExecutions))
}
ba.block.TxExecutions = append(ba.block.TxExecutions, txe)
}
case ev.EndBlock != nil:
if !ba.continuity.Allows(NonConsecutiveTxs) && uint64(len(ba.block.TxExecutions)) != ba.numTxs {
return nil, fmt.Errorf("BlockAccumulator.Consume did not receive the expected number of "+
"transactions for block %d, expected: %d, received: %d",
ba.block.Height, ba.numTxs, len(ba.block.TxExecutions))
}
return ba.block, nil
}
return nil, nil
}

// TxStack is able to consume potentially nested txs
type TxStack []*TxExecution
type TxStack struct {
// Stack of TxExecutions, top of stack is TxExecution receiving innermost events
txes []*TxExecution
// Track the expected number events from the BeginTx event (also a stack)
numEvents []uint64

continuity ContinuityOpt
}

func (stack *TxStack) Push(txe *TxExecution) {
func (stack *TxStack) Push(beginTx *BeginTx) {
// Put this txe in the parent position
*stack = append(*stack, txe)
stack.txes = append(stack.txes, &TxExecution{
TxHeader: beginTx.TxHeader,
Result: beginTx.Result,
Events: make([]*Event, 0, beginTx.NumEvents),
Exception: beginTx.Exception,
})
stack.numEvents = append(stack.numEvents, beginTx.NumEvents)
}

func (stack TxStack) Peek() (*TxExecution, error) {
if len(stack) < 1 {
func (stack *TxStack) Peek() (*TxExecution, error) {
if len(stack.txes) < 1 {
return nil, fmt.Errorf("tried to peek from an empty TxStack - might be missing essential StreamEvents")
}
return stack[len(stack)-1], nil
return stack.txes[len(stack.txes)-1], nil
}

func (stack *TxStack) Pop() (*TxExecution, error) {
s := *stack
txc, err := s.Peek()
txe, err := stack.Peek()
if err != nil {
return nil, err
}
*stack = s[:len(s)-1]
return txc, nil
newLength := len(stack.txes) - 1
stack.txes = stack.txes[:newLength]
numEvents := stack.numEvents[newLength]
if !stack.continuity.Allows(NonConsecutiveEvents) && uint64(len(txe.Events)) != numEvents {
return nil, fmt.Errorf("TxStack.Pop emitted transaction %s with wrong number of events, "+
"expected: %d, received: %d", txe.TxHash, numEvents, len(txe.Events))
}
stack.numEvents = stack.numEvents[:newLength]
return txe, nil
}

func (stack *TxStack) Length() int {
return len(stack.txes)
}

// Consume will add the StreamEvent to the transaction stack and if that completes a single outermost transaction
// returns the TxExecution otherwise will return nil
func (stack *TxStack) Consume(ev *StreamEvent) (*TxExecution, error) {
switch {
case ev.BeginTx != nil:
stack.Push(initTx(ev.BeginTx))
stack.Push(ev.BeginTx)
case ev.Envelope != nil:
txe, err := stack.Peek()
if err != nil {
Expand All @@ -143,13 +228,21 @@ func (stack *TxStack) Consume(ev *StreamEvent) (*TxExecution, error) {
if err != nil {
return nil, err
}
if !stack.continuity.Allows(NonConsecutiveEvents) && uint64(len(txe.Events)) != ev.Event.Header.Index {
return nil, fmt.Errorf("TxStack.Consume recieved event with index %d at "+
"position %d in the event stream", ev.Event.GetHeader().GetIndex(), len(txe.Events))
}
txe.Events = append(txe.Events, ev.Event)
case ev.EndTx != nil:
txe, err := stack.Pop()
if err != nil {
return nil, err
}
if len(*stack) == 0 {
if txe.Envelope == nil || txe.Receipt == nil {
return nil, fmt.Errorf("TxStack.Consume did not receive transaction envelope for transaction %s",
txe.TxHash)
}
if stack.Length() == 0 {
// This terminates the outermost transaction
return txe, nil
}
Expand All @@ -162,11 +255,3 @@ func (stack *TxStack) Consume(ev *StreamEvent) (*TxExecution, error) {
}
return nil, nil
}

func initTx(beginTx *BeginTx) *TxExecution {
return &TxExecution{
TxHeader: beginTx.TxHeader,
Result: beginTx.Result,
Exception: beginTx.Exception,
}
}
23 changes: 15 additions & 8 deletions execution/exec/stream_event_test.go
@@ -1,6 +1,7 @@
package exec

import (
"encoding/json"
"testing"
"time"

Expand All @@ -26,7 +27,7 @@ func TestTxExecution(t *testing.T) {
txeOut, err = stack.Consume(ev)
require.NoError(t, err)
if txeOut != nil {
require.Equal(t, txe, txeOut)
require.Equal(t, jsonString(t, txe), jsonString(t, txeOut))
}
}

Expand All @@ -43,21 +44,21 @@ func TestConsumeBlockExecution(t *testing.T) {
Height: height,
},
Height: uint64(height),
TxExecutions: []*TxExecution{
NewTxExecution(txs.Enclose(genesisDoc.ChainID(), newCallTx(0, 3))),
NewTxExecution(txs.Enclose(genesisDoc.ChainID(), newCallTx(0, 2))),
NewTxExecution(txs.Enclose(genesisDoc.ChainID(), newCallTx(2, 1))),
},
}
be.AppendTxs(
NewTxExecution(txs.Enclose(genesisDoc.ChainID(), newCallTx(0, 3))),
NewTxExecution(txs.Enclose(genesisDoc.ChainID(), newCallTx(0, 2))),
NewTxExecution(txs.Enclose(genesisDoc.ChainID(), newCallTx(2, 1))),
)

stack := new(BlockAccumulator)
stack := NewBlockAccumulator()
var beOut *BlockExecution
var err error
for _, ev := range be.StreamEvents() {
beOut, err = stack.Consume(ev)
require.NoError(t, err)
if beOut != nil {
require.Equal(t, be, beOut)
require.Equal(t, jsonString(t, be), jsonString(t, beOut))
}
}
assert.NotNil(t, beOut, "should have consumed input BlockExecution")
Expand All @@ -68,3 +69,9 @@ func newCallTx(fromIndex, toIndex int) *payload.CallTx {
to := accounts[toIndex].GetAddress()
return payload.NewCallTxWithSequence(from.GetPublicKey(), &to, []byte{1, 2, 3}, 324, 34534534, 23, 1)
}

func jsonString(t testing.TB, conf interface{}) string {
bs, err := json.MarshalIndent(conf, "", " ")
require.NoError(t, err, "must be able to convert interface to string for comparison")
return string(bs)
}
1 change: 1 addition & 0 deletions execution/exec/tx_execution.go
Expand Up @@ -40,6 +40,7 @@ func (txe *TxExecution) StreamEvents() []*StreamEvent {
&StreamEvent{
BeginTx: &BeginTx{
TxHeader: txe.TxHeader,
NumEvents: uint64(len(txe.Events)),
Exception: txe.Exception,
Result: txe.Result,
},
Expand Down

0 comments on commit 1395263

Please sign in to comment.