From 67455b333c40a036a1da83f9ba6587d1b977dddd Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Thu, 12 Jan 2017 12:22:56 -0500 Subject: [PATCH] [FAB-1242] Limit batch size to AbsoluteMaxBytes - Receiver.Ordered() should not produce a batch of messages larger BatchSize.AbsoluteMaxBytes. Change-Id: I3fcdc49c5756ee2215c8a7837ac3e0e49073aa13 Signed-off-by: Luis Sanchez --- orderer/common/blockcutter/blockcutter.go | 125 +++++++++++++----- .../common/blockcutter/blockcutter_test.go | 70 +++++++++- 2 files changed, 155 insertions(+), 40 deletions(-) diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go index 1751c13d306..f1dcb742a83 100644 --- a/orderer/common/blockcutter/blockcutter.go +++ b/orderer/common/blockcutter/blockcutter.go @@ -29,11 +29,22 @@ var logger = logging.MustGetLogger("orderer/common/blockcutter") // Receiver defines a sink for the ordered broadcast messages type Receiver interface { // Ordered should be invoked sequentially as messages are ordered - // If the message is a valid normal message and does not fill the batch, nil, nil, true is returned - // If the message is a valid normal message and fills a batch, the batch, committers, true is returned - // If the message is a valid special message (like a config message) it terminates the current batch - // and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true - // If the ordered message is determined to be invalid, then nil, nil, false is returned + // If the current message valid, and no batches need to be cut: + // - Ordered will return nil, nil, and true (indicating ok). + // If the current message valid, and batches need to be cut: + // - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok). + // If the current message is invalid: + // - Ordered will return nil, nil, and false (to indicate not ok). + // + // Given a valid message, if the current message needs to be isolated (as determined during filtering). + // - Ordered will return: + // * The pending batch of (if not empty), and a second batch containing only the isolated message. + // * The corresponding batches of committers. + // * true (indicating ok). + // Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if: + // - The current message needs to be isolated (as determined during filtering). + // - The current message will cause the pending batch size in bytes to exceed BatchSize.AbsoluteMaxBytes. + // - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount. Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) // Cut returns the current batch and starts a new one @@ -41,10 +52,11 @@ type Receiver interface { } type receiver struct { - sharedConfigManager sharedconfig.Manager - filters *filter.RuleSet - curBatch []*cb.Envelope - batchComs []filter.Committer + sharedConfigManager sharedconfig.Manager + filters *filter.RuleSet + pendingBatch []*cb.Envelope + pendingBatchSizeBytes uint32 + pendingCommitters []filter.Committer } // NewReceiverImpl creates a Receiver implementation based on the given sharedconfig manager and filters @@ -56,11 +68,22 @@ func NewReceiverImpl(sharedConfigManager sharedconfig.Manager, filters *filter.R } // Ordered should be invoked sequentially as messages are ordered -// If the message is a valid normal message and does not fill the batch, nil, nil, true is returned -// If the message is a valid normal message and fills a batch, the batch, committers, true is returned -// If the message is a valid special message (like a config message) it terminates the current batch -// and returns the current batch and committers (if it is not empty), plus a second batch containing the special transaction and commiter, and true -// If the ordered message is determined to be invalid, then nil, nil, false is returned +// If the current message valid, and no batches need to be cut: +// - Ordered will return nil, nil, and true (indicating ok). +// If the current message valid, and batches need to be cut: +// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok). +// If the current message is invalid: +// - Ordered will return nil, nil, and false (to indicate not ok). +// +// Given a valid message, if the current message needs to be isolated (as determined during filtering). +// - Ordered will return: +// * The pending batch of (if not empty), and a second batch containing only the isolated message. +// * The corresponding batches of committers. +// * true (indicating ok). +// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if: +// - The current message needs to be isolated (as determined during filtering). +// - The current message will cause the pending batch size in bytes to exceed BatchSize.AbsoluteMaxBytes. +// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount. func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) { // The messages must be filtered a second time in case configuration has changed since the message was received committer, err := r.filters.Apply(msg) @@ -70,38 +93,70 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi } if committer.Isolated() { - logger.Debugf("Found message which requested to be isolated, cutting into its own block") - firstBatch := r.curBatch - r.curBatch = nil - firstComs := r.batchComs - r.batchComs = nil - secondBatch := []*cb.Envelope{msg} - if firstBatch == nil { - return [][]*cb.Envelope{secondBatch}, [][]filter.Committer{[]filter.Committer{committer}}, true + logger.Debugf("Found message which requested to be isolated, cutting into its own batch") + + messageBatches := [][]*cb.Envelope{} + committerBatches := [][]filter.Committer{} + + // cut pending batch, if it has any messages + if len(r.pendingBatch) > 0 { + messageBatch, committerBatch := r.Cut() + messageBatches = append(messageBatches, messageBatch) + committerBatches = append(committerBatches, committerBatch) } - return [][]*cb.Envelope{firstBatch, secondBatch}, [][]filter.Committer{firstComs, []filter.Committer{committer}}, true + + // create new batch with single message + messageBatches = append(messageBatches, []*cb.Envelope{msg}) + committerBatches = append(committerBatches, []filter.Committer{committer}) + + return messageBatches, committerBatches, true + } + + messageBatches := [][]*cb.Envelope{} + committerBatches := [][]filter.Committer{} + + messageSizeBytes := messageSizeBytes(msg) + messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().AbsoluteMaxBytes + + if messageWillOverflowBatchSizeBytes { + logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes) + logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.") + messageBatch, committerBatch := r.Cut() + messageBatches = append(messageBatches, messageBatch) + committerBatches = append(committerBatches, committerBatch) } logger.Debugf("Enqueuing message into batch") - r.curBatch = append(r.curBatch, msg) - r.batchComs = append(r.batchComs, committer) + r.pendingBatch = append(r.pendingBatch, msg) + r.pendingBatchSizeBytes += messageSizeBytes + r.pendingCommitters = append(r.pendingCommitters, committer) + + if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().MaxMessageCount { + logger.Debugf("Batch size met, cutting batch") + messageBatch, committerBatch := r.Cut() + messageBatches = append(messageBatches, messageBatch) + committerBatches = append(committerBatches, committerBatch) + } - if uint32(len(r.curBatch)) < r.sharedConfigManager.BatchSize().MaxMessageCount { + // return nils instead of empty slices + if len(messageBatches) == 0 { return nil, nil, true } - logger.Debugf("Batch size met, creating block") - newBatch := r.curBatch - newComs := r.batchComs - r.curBatch = nil - return [][]*cb.Envelope{newBatch}, [][]filter.Committer{newComs}, true + return messageBatches, committerBatches, true + } // Cut returns the current batch and starts a new one func (r *receiver) Cut() ([]*cb.Envelope, []filter.Committer) { - batch := r.curBatch - r.curBatch = nil - committers := r.batchComs - r.batchComs = nil + batch := r.pendingBatch + r.pendingBatch = nil + committers := r.pendingCommitters + r.pendingCommitters = nil + r.pendingBatchSizeBytes = 0 return batch, committers } + +func messageSizeBytes(message *cb.Envelope) uint32 { + return uint32(len(message.Payload) + len(message.Signature)) +} diff --git a/orderer/common/blockcutter/blockcutter_test.go b/orderer/common/blockcutter/blockcutter_test.go index 8e816bc4867..85acca03ef1 100644 --- a/orderer/common/blockcutter/blockcutter_test.go +++ b/orderer/common/blockcutter/blockcutter_test.go @@ -80,7 +80,8 @@ var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")} func TestNormalBatch(t *testing.T) { filters := getFilters() maxMessageCount := uint32(2) - r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters) + absoluteMaxBytes := uint32(100) + r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters) batches, committers, ok := r.Ordered(goodTx) @@ -107,7 +108,8 @@ func TestNormalBatch(t *testing.T) { func TestBadMessageInBatch(t *testing.T) { filters := getFilters() maxMessageCount := uint32(2) - r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters) + absoluteMaxBytes := uint32(100) + r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters) batches, committers, ok := r.Ordered(badTx) @@ -143,7 +145,8 @@ func TestBadMessageInBatch(t *testing.T) { func TestUnmatchedMessageInBatch(t *testing.T) { filters := getFilters() maxMessageCount := uint32(2) - r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters) + absoluteMaxBytes := uint32(100) + r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters) batches, committers, ok := r.Ordered(unmatchedTx) @@ -179,7 +182,8 @@ func TestUnmatchedMessageInBatch(t *testing.T) { func TestIsolatedEmptyBatch(t *testing.T) { filters := getFilters() maxMessageCount := uint32(2) - r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters) + absoluteMaxBytes := uint32(100) + r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters) batches, committers, ok := r.Ordered(isolatedTx) @@ -203,7 +207,8 @@ func TestIsolatedEmptyBatch(t *testing.T) { func TestIsolatedPartialBatch(t *testing.T) { filters := getFilters() maxMessageCount := uint32(2) - r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters) + absoluteMaxBytes := uint32(100) + r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters) batches, committers, ok := r.Ordered(goodTx) @@ -241,3 +246,58 @@ func TestIsolatedPartialBatch(t *testing.T) { t.Fatalf("Should have had the isolated tx in the second batch") } } + +func TestBatchSizeAbsoluteMaxBytesOverflow(t *testing.T) { + filters := getFilters() + + goodTxBytes := messageSizeBytes(goodTx) + + // set absolute max bytes such that 10 goodTx will not fit + absoluteMaxBytes := goodTxBytes*10 - 1 + + // set message count > 9 + maxMessageCount := uint32(20) + + r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters) + + // enqueue 9 messages + for i := 0; i < 9; i++ { + batches, committers, ok := r.Ordered(goodTx) + if batches != nil || committers != nil { + t.Fatalf("Should not have created batch") + } + if !ok { + t.Fatalf("Should have enqueued message into batch") + } + } + + // next message should create batch + batches, committers, ok := r.Ordered(goodTx) + + if batches == nil || committers == nil { + t.Fatalf("Should have created batch") + } + + if len(batches) != 1 || len(committers) != 1 { + t.Fatalf("Should have created one batch, got %d and %d", len(batches), len(committers)) + } + + if len(batches[0]) != 9 || len(committers[0]) != 9 { + t.Fatalf("Should have had nine normal tx in the batch got %d and %d committers", len(batches[0]), len(committers[0])) + } + if !ok { + t.Fatalf("Should have enqueued the tenth message into batch") + } + + // force a batch cut + messageBatch, committerBatch := r.Cut() + + if messageBatch == nil || committerBatch == nil { + t.Fatalf("Should have created batch") + } + + if len(messageBatch) != 1 || len(committerBatch) != 1 { + t.Fatalf("Should have had one tx in the batch, got %d and %d", len(batches), len(committers)) + } + +}