Skip to content

Commit

Permalink
[FAB-1242] Limit batch size to AbsoluteMaxBytes
Browse files Browse the repository at this point in the history
 - Receiver.Ordered() should not produce a batch of
   messages larger BatchSize.AbsoluteMaxBytes.

Change-Id: I3fcdc49c5756ee2215c8a7837ac3e0e49073aa13
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Jan 14, 2017
1 parent 012f0b5 commit 67455b3
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 40 deletions.
125 changes: 90 additions & 35 deletions orderer/common/blockcutter/blockcutter.go
Expand Up @@ -29,22 +29,34 @@ var logger = logging.MustGetLogger("orderer/common/blockcutter")
// Receiver defines a sink for the ordered broadcast messages
type Receiver interface {
// Ordered should be invoked sequentially as messages are ordered
// If the message is a valid normal message and does not fill the batch, nil, nil, true is returned
// If the message is a valid normal message and fills a batch, the batch, committers, true is returned
// If the message is a valid special message (like a config message) it terminates the current batch
// and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true
// If the ordered message is determined to be invalid, then nil, nil, false is returned
// If the current message valid, and no batches need to be cut:
// - Ordered will return nil, nil, and true (indicating ok).
// If the current message valid, and batches need to be cut:
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok).
// If the current message is invalid:
// - Ordered will return nil, nil, and false (to indicate not ok).
//
// Given a valid message, if the current message needs to be isolated (as determined during filtering).
// - Ordered will return:
// * The pending batch of (if not empty), and a second batch containing only the isolated message.
// * The corresponding batches of committers.
// * true (indicating ok).
// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message needs to be isolated (as determined during filtering).
// - The current message will cause the pending batch size in bytes to exceed BatchSize.AbsoluteMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool)

// Cut returns the current batch and starts a new one
Cut() ([]*cb.Envelope, []filter.Committer)
}

type receiver struct {
sharedConfigManager sharedconfig.Manager
filters *filter.RuleSet
curBatch []*cb.Envelope
batchComs []filter.Committer
sharedConfigManager sharedconfig.Manager
filters *filter.RuleSet
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
pendingCommitters []filter.Committer
}

// NewReceiverImpl creates a Receiver implementation based on the given sharedconfig manager and filters
Expand All @@ -56,11 +68,22 @@ func NewReceiverImpl(sharedConfigManager sharedconfig.Manager, filters *filter.R
}

// Ordered should be invoked sequentially as messages are ordered
// If the message is a valid normal message and does not fill the batch, nil, nil, true is returned
// If the message is a valid normal message and fills a batch, the batch, committers, true is returned
// If the message is a valid special message (like a config message) it terminates the current batch
// and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true
// If the ordered message is determined to be invalid, then nil, nil, false is returned
// If the current message valid, and no batches need to be cut:
// - Ordered will return nil, nil, and true (indicating ok).
// If the current message valid, and batches need to be cut:
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok).
// If the current message is invalid:
// - Ordered will return nil, nil, and false (to indicate not ok).
//
// Given a valid message, if the current message needs to be isolated (as determined during filtering).
// - Ordered will return:
// * The pending batch of (if not empty), and a second batch containing only the isolated message.
// * The corresponding batches of committers.
// * true (indicating ok).
// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message needs to be isolated (as determined during filtering).
// - The current message will cause the pending batch size in bytes to exceed BatchSize.AbsoluteMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) {
// The messages must be filtered a second time in case configuration has changed since the message was received
committer, err := r.filters.Apply(msg)
Expand All @@ -70,38 +93,70 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi
}

if committer.Isolated() {
logger.Debugf("Found message which requested to be isolated, cutting into its own block")
firstBatch := r.curBatch
r.curBatch = nil
firstComs := r.batchComs
r.batchComs = nil
secondBatch := []*cb.Envelope{msg}
if firstBatch == nil {
return [][]*cb.Envelope{secondBatch}, [][]filter.Committer{[]filter.Committer{committer}}, true
logger.Debugf("Found message which requested to be isolated, cutting into its own batch")

messageBatches := [][]*cb.Envelope{}
committerBatches := [][]filter.Committer{}

// cut pending batch, if it has any messages
if len(r.pendingBatch) > 0 {
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
}
return [][]*cb.Envelope{firstBatch, secondBatch}, [][]filter.Committer{firstComs, []filter.Committer{committer}}, true

// create new batch with single message
messageBatches = append(messageBatches, []*cb.Envelope{msg})
committerBatches = append(committerBatches, []filter.Committer{committer})

return messageBatches, committerBatches, true
}

messageBatches := [][]*cb.Envelope{}
committerBatches := [][]filter.Committer{}

messageSizeBytes := messageSizeBytes(msg)
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().AbsoluteMaxBytes

if messageWillOverflowBatchSizeBytes {
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
}

logger.Debugf("Enqueuing message into batch")
r.curBatch = append(r.curBatch, msg)
r.batchComs = append(r.batchComs, committer)
r.pendingBatch = append(r.pendingBatch, msg)
r.pendingBatchSizeBytes += messageSizeBytes
r.pendingCommitters = append(r.pendingCommitters, committer)

if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().MaxMessageCount {
logger.Debugf("Batch size met, cutting batch")
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
}

if uint32(len(r.curBatch)) < r.sharedConfigManager.BatchSize().MaxMessageCount {
// return nils instead of empty slices
if len(messageBatches) == 0 {
return nil, nil, true
}

logger.Debugf("Batch size met, creating block")
newBatch := r.curBatch
newComs := r.batchComs
r.curBatch = nil
return [][]*cb.Envelope{newBatch}, [][]filter.Committer{newComs}, true
return messageBatches, committerBatches, true

}

// Cut returns the current batch and starts a new one
func (r *receiver) Cut() ([]*cb.Envelope, []filter.Committer) {
batch := r.curBatch
r.curBatch = nil
committers := r.batchComs
r.batchComs = nil
batch := r.pendingBatch
r.pendingBatch = nil
committers := r.pendingCommitters
r.pendingCommitters = nil
r.pendingBatchSizeBytes = 0
return batch, committers
}

func messageSizeBytes(message *cb.Envelope) uint32 {
return uint32(len(message.Payload) + len(message.Signature))
}
70 changes: 65 additions & 5 deletions orderer/common/blockcutter/blockcutter_test.go
Expand Up @@ -80,7 +80,8 @@ var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")}
func TestNormalBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)

batches, committers, ok := r.Ordered(goodTx)

Expand All @@ -107,7 +108,8 @@ func TestNormalBatch(t *testing.T) {
func TestBadMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)

batches, committers, ok := r.Ordered(badTx)

Expand Down Expand Up @@ -143,7 +145,8 @@ func TestBadMessageInBatch(t *testing.T) {
func TestUnmatchedMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)

batches, committers, ok := r.Ordered(unmatchedTx)

Expand Down Expand Up @@ -179,7 +182,8 @@ func TestUnmatchedMessageInBatch(t *testing.T) {
func TestIsolatedEmptyBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)

batches, committers, ok := r.Ordered(isolatedTx)

Expand All @@ -203,7 +207,8 @@ func TestIsolatedEmptyBatch(t *testing.T) {
func TestIsolatedPartialBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)

batches, committers, ok := r.Ordered(goodTx)

Expand Down Expand Up @@ -241,3 +246,58 @@ func TestIsolatedPartialBatch(t *testing.T) {
t.Fatalf("Should have had the isolated tx in the second batch")
}
}

func TestBatchSizeAbsoluteMaxBytesOverflow(t *testing.T) {
filters := getFilters()

goodTxBytes := messageSizeBytes(goodTx)

// set absolute max bytes such that 10 goodTx will not fit
absoluteMaxBytes := goodTxBytes*10 - 1

// set message count > 9
maxMessageCount := uint32(20)

r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)

// enqueue 9 messages
for i := 0; i < 9; i++ {
batches, committers, ok := r.Ordered(goodTx)
if batches != nil || committers != nil {
t.Fatalf("Should not have created batch")
}
if !ok {
t.Fatalf("Should have enqueued message into batch")
}
}

// next message should create batch
batches, committers, ok := r.Ordered(goodTx)

if batches == nil || committers == nil {
t.Fatalf("Should have created batch")
}

if len(batches) != 1 || len(committers) != 1 {
t.Fatalf("Should have created one batch, got %d and %d", len(batches), len(committers))
}

if len(batches[0]) != 9 || len(committers[0]) != 9 {
t.Fatalf("Should have had nine normal tx in the batch got %d and %d committers", len(batches[0]), len(committers[0]))
}
if !ok {
t.Fatalf("Should have enqueued the tenth message into batch")
}

// force a batch cut
messageBatch, committerBatch := r.Cut()

if messageBatch == nil || committerBatch == nil {
t.Fatalf("Should have created batch")
}

if len(messageBatch) != 1 || len(committerBatch) != 1 {
t.Fatalf("Should have had one tx in the batch, got %d and %d", len(batches), len(committers))
}

}

0 comments on commit 67455b3

Please sign in to comment.