Skip to content

Commit

Permalink
[FAB-1241] Rename BatchSize configuration parameter
Browse files Browse the repository at this point in the history
Subtask of FAB-1171

  - In orderer.yaml, renamed General.BatchSize to
    General.BatchSize.MaxMessageCount.
  - In shared config, changed type of BatchSize from
    unit32 to *ab.BatchSize.

These previously checked-in generated proto go source
files did not match the output of `make protos`, so
I'm checking them in now:

   msp/identities.pb.go
   protos/msp/mspconfig.pb.go

This generated proto go source file was not previously
checked in:

   core/crypto/attributes/proto/attributes.pb.go

Change-Id: Ice3d398f191eaaeaaae8c3ce9d5612100a76b582
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Dec 20, 2016
1 parent 6e9073c commit 96de525
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 94 deletions.
85 changes: 85 additions & 0 deletions core/crypto/attributes/proto/attributes.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions msp/identities.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi
r.curBatch = append(r.curBatch, msg)
r.batchComs = append(r.batchComs, committer)

if uint32(len(r.curBatch)) < r.sharedConfigManager.BatchSize() {
if uint32(len(r.curBatch)) < r.sharedConfigManager.BatchSize().MaxMessageCount {
return nil, nil, true
}

Expand Down
21 changes: 11 additions & 10 deletions orderer/common/blockcutter/blockcutter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/filter"
mocksharedconfig "github.com/hyperledger/fabric/orderer/mocks/sharedconfig"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)

type isolatedCommitter struct{}
Expand Down Expand Up @@ -73,8 +74,8 @@ var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")}

func TestNormalBatch(t *testing.T) {
filters := getFilters()
batchSize := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: batchSize}, filters)
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)

batches, committers, ok := r.Ordered(goodTx)

Expand All @@ -100,8 +101,8 @@ func TestNormalBatch(t *testing.T) {

func TestBadMessageInBatch(t *testing.T) {
filters := getFilters()
batchSize := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: batchSize}, filters)
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)

batches, committers, ok := r.Ordered(badTx)

Expand Down Expand Up @@ -136,8 +137,8 @@ func TestBadMessageInBatch(t *testing.T) {

func TestUnmatchedMessageInBatch(t *testing.T) {
filters := getFilters()
batchSize := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: batchSize}, filters)
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)

batches, committers, ok := r.Ordered(unmatchedTx)

Expand Down Expand Up @@ -172,8 +173,8 @@ func TestUnmatchedMessageInBatch(t *testing.T) {

func TestIsolatedEmptyBatch(t *testing.T) {
filters := getFilters()
batchSize := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: batchSize}, filters)
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)

batches, committers, ok := r.Ordered(isolatedTx)

Expand All @@ -196,8 +197,8 @@ func TestIsolatedEmptyBatch(t *testing.T) {

func TestIsolatedPartialBatch(t *testing.T) {
filters := getFilters()
batchSize := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: batchSize}, filters)
maxMessageCount := uint32(2)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount}}, filters)

batches, committers, ok := r.Ordered(goodTx)

Expand Down
2 changes: 1 addition & 1 deletion orderer/common/bootstrap/provisional/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cbs *commonBootstrapper) encodeConsensusType() *cb.SignedConfigurationItem

func (cbs *commonBootstrapper) encodeBatchSize() *cb.SignedConfigurationItem {
configItemKey := sharedconfig.BatchSizeKey
configItemValue := utils.MarshalOrPanic(&ab.BatchSize{Messages: cbs.batchSize})
configItemValue := utils.MarshalOrPanic(cbs.batchSize)
modPolicy := configtx.DefaultModificationPolicyID

configItemChainHeader := utils.MakeChainHeader(cb.HeaderType_CONFIGURATION_ITEM, msgVersion, cbs.chainID, epoch)
Expand Down
7 changes: 5 additions & 2 deletions orderer/common/bootstrap/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/bootstrap"
"github.com/hyperledger/fabric/orderer/localconfig"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ var DefaultChainCreators = []string{AcceptAllPolicyKey}
type commonBootstrapper struct {
chainID string
consensusType string
batchSize uint32
batchSize *ab.BatchSize
}

type soloBootstrapper struct {
Expand All @@ -72,7 +73,9 @@ func New(conf *config.TopLevel) bootstrap.Helper {
cbs := &commonBootstrapper{
chainID: TestChainID,
consensusType: conf.General.OrdererType,
batchSize: conf.General.BatchSize,
batchSize: &ab.BatchSize{
MaxMessageCount: conf.General.BatchSize.MaxMessageCount,
},
}

switch conf.General.OrdererType {
Expand Down
12 changes: 6 additions & 6 deletions orderer/common/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Manager interface {
ConsensusType() string

// BatchSize returns the maximum number of messages to include in a block
BatchSize() uint32
BatchSize() *ab.BatchSize

// ChainCreators returns the policy names which are allowed for chain creation
// This field is only set for the system ordering chain
Expand All @@ -72,7 +72,7 @@ type Manager interface {

type ordererConfig struct {
consensusType string
batchSize uint32
batchSize *ab.BatchSize
chainCreators []string
kafkaBrokers []string
}
Expand All @@ -97,7 +97,7 @@ func (pm *ManagerImpl) ConsensusType() string {
}

// BatchSize returns the maximum number of messages to include in a block
func (pm *ManagerImpl) BatchSize() uint32 {
func (pm *ManagerImpl) BatchSize() *ab.BatchSize {
return pm.config.batchSize
}

Expand Down Expand Up @@ -164,10 +164,10 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
return fmt.Errorf("Unmarshaling error for BatchSize: %s", err)
}

if batchSize.Messages <= 0 {
return fmt.Errorf("Attempted to set the batch size to %d which is less than or equal to 0", batchSize.Messages)
if batchSize.MaxMessageCount <= 0 {
return fmt.Errorf("Attempted to set the batch size max message count to %d which is less than or equal to 0", batchSize.MaxMessageCount)
}
pm.pendingConfig.batchSize = batchSize.Messages
pm.pendingConfig.batchSize = batchSize
case ChainCreatorsKey:
chainCreators := &ab.ChainCreators{}
err := proto.Unmarshal(configItem.Value, chainCreators)
Expand Down
16 changes: 11 additions & 5 deletions orderer/common/sharedconfig/sharedconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func TestConsensusType(t *testing.T) {
}

func TestBatchSize(t *testing.T) {
endBatchSize := uint32(10)
endBatchSize := struct{ MaxMessageCount uint32 }{
MaxMessageCount: uint32(10),
}
invalidMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Expand All @@ -134,12 +136,12 @@ func TestBatchSize(t *testing.T) {
zeroBatchSize := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{Messages: 0}),
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: 0}),
}
validMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{Messages: endBatchSize}),
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: endBatchSize.MaxMessageCount}),
}
m := NewManagerImpl()
m.BeginConfig()
Expand All @@ -161,8 +163,12 @@ func TestBatchSize(t *testing.T) {

m.CommitConfig()

if nowBatchSize := m.BatchSize(); nowBatchSize != endBatchSize {
t.Fatalf("Got batch size of %d when expecting batch size of %d", nowBatchSize, endBatchSize)
nowBatchSize := struct{ MaxMessageCount uint32 }{
MaxMessageCount: m.BatchSize().MaxMessageCount,
}

if nowBatchSize.MaxMessageCount != endBatchSize.MaxMessageCount {
t.Fatalf("Got batch size max message count of %d. Expected: %d", nowBatchSize.MaxMessageCount, endBatchSize.MaxMessageCount)
}
}

Expand Down
4 changes: 3 additions & 1 deletion orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ var testConf = &config.TopLevel{
OrdererType: "kafka",
LedgerType: "ram",
BatchTimeout: 500 * time.Millisecond,
BatchSize: 100,
QueueSize: 100,
MaxWindowSize: 100,
ListenAddress: "127.0.0.1",
ListenPort: 7050,
GenesisMethod: "provisional",
BatchSize: config.BatchSize{
MaxMessageCount: 100,
},
},
Kafka: config.Kafka{
Brokers: []string{"127.0.0.1:9092"},
Expand Down
17 changes: 12 additions & 5 deletions orderer/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ type General struct {
OrdererType string
LedgerType string
BatchTimeout time.Duration
BatchSize uint32
QueueSize uint32
MaxWindowSize uint32
ListenAddress string
ListenPort uint16
GenesisMethod string
BatchSize BatchSize
Profile Profile
}

// BatchSize contains configuration affecting the size of batches
type BatchSize struct {
MaxMessageCount uint32
}

// Profile contains configuration for Go pprof profiling
type Profile struct {
Enabled bool
Expand Down Expand Up @@ -99,12 +104,14 @@ var defaults = TopLevel{
OrdererType: "solo",
LedgerType: "ram",
BatchTimeout: 10 * time.Second,
BatchSize: 10,
QueueSize: 1000,
MaxWindowSize: 1000,
ListenAddress: "127.0.0.1",
ListenPort: 7050,
GenesisMethod: "provisional",
BatchSize: BatchSize{
MaxMessageCount: 10,
},
Profile: Profile{
Enabled: false,
Address: "0.0.0.0:6060",
Expand Down Expand Up @@ -142,9 +149,9 @@ func (c *TopLevel) completeInitialization() {
case c.General.BatchTimeout == 0:
logger.Infof("General.BatchTimeout unset, setting to %s", defaults.General.BatchTimeout)
c.General.BatchTimeout = defaults.General.BatchTimeout
case c.General.BatchSize == 0:
logger.Infof("General.BatchSize unset, setting to %s", defaults.General.BatchSize)
c.General.BatchSize = defaults.General.BatchSize
case c.General.BatchSize.MaxMessageCount == 0:
logger.Infof("General.BatchSize.MaxMessageCount unset, setting to %s", defaults.General.BatchSize.MaxMessageCount)
c.General.BatchSize.MaxMessageCount = defaults.General.BatchSize.MaxMessageCount
case c.General.QueueSize == 0:
logger.Infof("General.QueueSize unset, setting to %s", defaults.General.QueueSize)
c.General.QueueSize = defaults.General.QueueSize
Expand Down

0 comments on commit 96de525

Please sign in to comment.