From 39bd8e20f6c9149290db3c5a1b5c5bdc3364233b Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Mon, 9 Jan 2017 13:45:59 -0500 Subject: [PATCH] [FAB-1568] Add orderer shared config ingresspolicy https://jira.hyperledger.org/browse/FAB-1568 This is the third a series meant to enable broadcast filtering based on policy. This changeset adds an IngressPolicy orderer configuration item, which references a policy by name. It is not currently used, but will eventually be used by the signature filter. Change-Id: I52598fa395d95112accfd8baa69c9be3d9cd6bdd Signed-off-by: Jason Yellick --- orderer/common/sharedconfig/sharedconfig.go | 18 +++++++ .../common/sharedconfig/sharedconfig_test.go | 41 ++++++++++++++ orderer/mocks/sharedconfig/sharedconfig.go | 7 +++ orderer/multichain/systemchain_test.go | 36 +++---------- protos/orderer/ab.pb.go | 1 + protos/orderer/configuration.pb.go | 54 +++++++++++-------- protos/orderer/configuration.proto | 5 ++ 7 files changed, 111 insertions(+), 51 deletions(-) diff --git a/orderer/common/sharedconfig/sharedconfig.go b/orderer/common/sharedconfig/sharedconfig.go index 876eebf3236..de4917fec51 100644 --- a/orderer/common/sharedconfig/sharedconfig.go +++ b/orderer/common/sharedconfig/sharedconfig.go @@ -45,6 +45,9 @@ const ( // KafkaBrokersKey is the cb.ConfigurationItem type key name for the KafkaBrokers message KafkaBrokersKey = "KafkaBrokers" + + // IngressPolicyKey is the cb.ConfigurationItem type key name for the IngressPolicy message + IngressPolicyKey = "IngressPolicy" ) var logger = logging.MustGetLogger("orderer/common/sharedconfig") @@ -71,6 +74,9 @@ type Manager interface { // Kafka brokers, i.e. this is not necessarily the entire set of Kafka brokers // used for ordering KafkaBrokers() []string + + // IngressPolicy returns the name of the policy to validate incoming broadcast messages against + IngressPolicy() string } type ordererConfig struct { @@ -79,6 +85,7 @@ type ordererConfig struct { batchTimeout time.Duration chainCreators []string kafkaBrokers []string + ingressPolicy string } // ManagerImpl is an implementation of Manager and configtx.ConfigHandler @@ -123,6 +130,11 @@ func (pm *ManagerImpl) KafkaBrokers() []string { return pm.config.kafkaBrokers } +// IngressPolicy returns the name of the policy to validate incoming broadcast messages against +func (pm *ManagerImpl) IngressPolicy() string { + return pm.config.ingressPolicy +} + // BeginConfig is used to start a new configuration proposal func (pm *ManagerImpl) BeginConfig() { if pm.pendingConfig != nil { @@ -194,6 +206,12 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error { return fmt.Errorf("Unmarshaling error for ChainCreator: %s", err) } pm.pendingConfig.chainCreators = chainCreators.Policies + case IngressPolicyKey: + ingressPolicy := &ab.IngressPolicy{} + if err := proto.Unmarshal(configItem.Value, ingressPolicy); err != nil { + return fmt.Errorf("Unmarshaling error for IngressPolicy: %s", err) + } + pm.pendingConfig.ingressPolicy = ingressPolicy.Name case KafkaBrokersKey: kafkaBrokers := &ab.KafkaBrokers{} if err := proto.Unmarshal(configItem.Value, kafkaBrokers); err != nil { diff --git a/orderer/common/sharedconfig/sharedconfig_test.go b/orderer/common/sharedconfig/sharedconfig_test.go index 5a9792f469b..a365f2c8cfb 100644 --- a/orderer/common/sharedconfig/sharedconfig_test.go +++ b/orderer/common/sharedconfig/sharedconfig_test.go @@ -298,3 +298,44 @@ func TestKafkaBrokers(t *testing.T) { return } } + +func TestIngressPolicy(t *testing.T) { + endPolicy := "foo" + invalidMessage := + &cb.ConfigurationItem{ + Type: cb.ConfigurationItem_Orderer, + Key: IngressPolicyKey, + Value: []byte("Garbage Data"), + } + validMessage := &cb.ConfigurationItem{ + Type: cb.ConfigurationItem_Orderer, + Key: IngressPolicyKey, + Value: utils.MarshalOrPanic(&ab.IngressPolicy{Name: endPolicy}), + } + m := NewManagerImpl() + m.BeginConfig() + + err := m.ProposeConfig(validMessage) + if err != nil { + t.Fatalf("Error applying valid config: %s", err) + } + + m.CommitConfig() + m.BeginConfig() + + err = m.ProposeConfig(invalidMessage) + if err == nil { + t.Fatalf("Should have failed on invalid message") + } + + err = m.ProposeConfig(validMessage) + if err != nil { + t.Fatalf("Error re-applying valid config: %s", err) + } + + m.CommitConfig() + + if nowPolicy := m.IngressPolicy(); nowPolicy != endPolicy { + t.Fatalf("IngressPolicy should have ended as %s but was %s", endPolicy, nowPolicy) + } +} diff --git a/orderer/mocks/sharedconfig/sharedconfig.go b/orderer/mocks/sharedconfig/sharedconfig.go index af40d77d005..4508b4b0a7b 100644 --- a/orderer/mocks/sharedconfig/sharedconfig.go +++ b/orderer/mocks/sharedconfig/sharedconfig.go @@ -31,6 +31,8 @@ type Manager struct { ChainCreatorsVal []string // KafkaBrokersVal is returned as the result of KafkaBrokers() KafkaBrokersVal []string + // IngressPolicyVal is returned as the result of IngressPolicy() + IngressPolicyVal string } // ConsensusType returns the ConsensusTypeVal @@ -57,3 +59,8 @@ func (scm *Manager) ChainCreators() []string { func (scm *Manager) KafkaBrokers() []string { return scm.KafkaBrokersVal } + +// IngressPolicy returns the IngressPolicyVal +func (scm *Manager) IngressPolicy() string { + return scm.IngressPolicyVal +} diff --git a/orderer/multichain/systemchain_test.go b/orderer/multichain/systemchain_test.go index 23ddf5ccf4e..2bd72a9ed61 100644 --- a/orderer/multichain/systemchain_test.go +++ b/orderer/multichain/systemchain_test.go @@ -19,13 +19,13 @@ package multichain import ( "reflect" "testing" - "time" "github.com/hyperledger/fabric/common/policies" coreutil "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sharedconfig" + mocksharedconfig "github.com/hyperledger/fabric/orderer/mocks/sharedconfig" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" @@ -47,33 +47,9 @@ func (mpm *mockPolicyManager) GetPolicy(id string) (policies.Policy, bool) { return mpm.mp, mpm.mp != nil } -type mockSharedConfig struct { - chainCreators []string -} - -func (msc *mockSharedConfig) ConsensusType() string { - panic("Unimplemented") -} - -func (msc *mockSharedConfig) BatchSize() *ab.BatchSize { - panic("Unimplemented") -} - -func (msc *mockSharedConfig) BatchTimeout() time.Duration { - panic("Unimplemented") -} - -func (msc *mockSharedConfig) ChainCreators() []string { - return msc.chainCreators -} - -func (msc *mockSharedConfig) KafkaBrokers() []string { - panic("Unimplemented") -} - type mockSupport struct { mpm *mockPolicyManager - msc *mockSharedConfig + msc *mocksharedconfig.Manager chainID string queue []*cb.Envelope } @@ -81,7 +57,7 @@ type mockSupport struct { func newMockSupport(chainID string) *mockSupport { return &mockSupport{ mpm: &mockPolicyManager{}, - msc: &mockSharedConfig{}, + msc: &mocksharedconfig.Manager{}, chainID: chainID, } } @@ -129,7 +105,7 @@ func TestGoodProposal(t *testing.T) { newChainID := "NewChainID" mcc := newMockChainCreator() - mcc.ms.msc.chainCreators = []string{provisional.AcceptAllPolicyKey} + mcc.ms.msc.ChainCreatorsVal = []string{provisional.AcceptAllPolicyKey} mcc.ms.mpm.mp = &mockPolicy{} chainCreateTx := &cb.ConfigurationItem{ @@ -214,7 +190,7 @@ func TestProposalWithMissingPolicy(t *testing.T) { newChainID := "NewChainID" mcc := newMockChainCreator() - mcc.ms.msc.chainCreators = []string{provisional.AcceptAllPolicyKey} + mcc.ms.msc.ChainCreatorsVal = []string{provisional.AcceptAllPolicyKey} chainCreateTx := &cb.ConfigurationItem{ Key: utils.CreationPolicyKey, @@ -238,7 +214,7 @@ func TestProposalWithBadDigest(t *testing.T) { mcc := newMockChainCreator() mcc.ms.mpm.mp = &mockPolicy{} - mcc.ms.msc.chainCreators = []string{provisional.AcceptAllPolicyKey} + mcc.ms.msc.ChainCreatorsVal = []string{provisional.AcceptAllPolicyKey} chainCreateTx := &cb.ConfigurationItem{ Key: utils.CreationPolicyKey, diff --git a/protos/orderer/ab.pb.go b/protos/orderer/ab.pb.go index 5bf48646e59..3a59777c5c7 100644 --- a/protos/orderer/ab.pb.go +++ b/protos/orderer/ab.pb.go @@ -22,6 +22,7 @@ It has these top-level messages: BatchSize BatchTimeout CreationPolicy + IngressPolicy ChainCreators KafkaBrokers KafkaMessage diff --git a/protos/orderer/configuration.pb.go b/protos/orderer/configuration.pb.go index f2359dc2c53..42a31b8a350 100644 --- a/protos/orderer/configuration.pb.go +++ b/protos/orderer/configuration.pb.go @@ -65,6 +65,16 @@ func (m *CreationPolicy) String() string { return proto.CompactTextSt func (*CreationPolicy) ProtoMessage() {} func (*CreationPolicy) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} } +// IngressPolicy is the name of the policy which incoming Broadcast messages are filtered against +type IngressPolicy struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` +} + +func (m *IngressPolicy) Reset() { *m = IngressPolicy{} } +func (m *IngressPolicy) String() string { return proto.CompactTextString(m) } +func (*IngressPolicy) ProtoMessage() {} +func (*IngressPolicy) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} } + type ChainCreators struct { // A list of policies, any of which may be specified as the chain creation // policy in a chain creation request @@ -74,7 +84,7 @@ type ChainCreators struct { func (m *ChainCreators) Reset() { *m = ChainCreators{} } func (m *ChainCreators) String() string { return proto.CompactTextString(m) } func (*ChainCreators) ProtoMessage() {} -func (*ChainCreators) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} } +func (*ChainCreators) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} } // Carries a list of bootstrap brokers, i.e. this is not the exclusive set of // brokers an ordering service @@ -87,13 +97,14 @@ type KafkaBrokers struct { func (m *KafkaBrokers) Reset() { *m = KafkaBrokers{} } func (m *KafkaBrokers) String() string { return proto.CompactTextString(m) } func (*KafkaBrokers) ProtoMessage() {} -func (*KafkaBrokers) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} } +func (*KafkaBrokers) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} } func init() { proto.RegisterType((*ConsensusType)(nil), "orderer.ConsensusType") proto.RegisterType((*BatchSize)(nil), "orderer.BatchSize") proto.RegisterType((*BatchTimeout)(nil), "orderer.BatchTimeout") proto.RegisterType((*CreationPolicy)(nil), "orderer.CreationPolicy") + proto.RegisterType((*IngressPolicy)(nil), "orderer.IngressPolicy") proto.RegisterType((*ChainCreators)(nil), "orderer.ChainCreators") proto.RegisterType((*KafkaBrokers)(nil), "orderer.KafkaBrokers") } @@ -101,23 +112,24 @@ func init() { func init() { proto.RegisterFile("orderer/configuration.proto", fileDescriptor1) } var fileDescriptor1 = []byte{ - // 284 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x90, 0xc1, 0x4b, 0xf3, 0x40, - 0x10, 0xc5, 0xc9, 0xf7, 0x49, 0x6b, 0x97, 0x56, 0x61, 0x05, 0x09, 0xf5, 0x52, 0xe2, 0x25, 0xa0, - 0x74, 0x0f, 0xe2, 0x5d, 0x92, 0xa3, 0x08, 0x12, 0x7b, 0xf2, 0xb6, 0xd9, 0x4c, 0x92, 0xa5, 0xcd, - 0x4e, 0x98, 0xdd, 0x80, 0xf1, 0xaf, 0x97, 0x6c, 0xb6, 0x1e, 0x3c, 0xcd, 0xfb, 0xcd, 0xbc, 0x19, - 0x1e, 0xc3, 0xee, 0x90, 0x2a, 0x20, 0x20, 0xa1, 0xd0, 0xd4, 0xba, 0x19, 0x48, 0x3a, 0x8d, 0x66, - 0xdf, 0x13, 0x3a, 0xe4, 0xcb, 0x30, 0xdc, 0xde, 0x28, 0xec, 0x3a, 0x34, 0x62, 0x2e, 0xf3, 0x34, - 0xb9, 0x67, 0x9b, 0x1c, 0x8d, 0x05, 0x63, 0x07, 0x7b, 0x18, 0x7b, 0xe0, 0x9c, 0x5d, 0xb8, 0xb1, - 0x87, 0x38, 0xda, 0x45, 0xe9, 0xaa, 0xf0, 0x3a, 0x79, 0x66, 0xab, 0x4c, 0x3a, 0xd5, 0x7e, 0xe8, - 0x6f, 0xe0, 0x29, 0xbb, 0xee, 0xe4, 0xd7, 0x1b, 0x58, 0x2b, 0x1b, 0xc8, 0x71, 0x30, 0xce, 0x7b, - 0x37, 0xc5, 0xdf, 0x76, 0x92, 0xb2, 0xb5, 0x5f, 0x3b, 0xe8, 0x0e, 0x70, 0x70, 0x3c, 0x66, 0x4b, - 0x37, 0xcb, 0x70, 0xfd, 0x8c, 0xc9, 0x0b, 0xbb, 0xca, 0x09, 0x7c, 0xea, 0x77, 0x3c, 0x69, 0x35, - 0xf2, 0x5b, 0xb6, 0xe8, 0xbd, 0x0a, 0xd6, 0x40, 0x53, 0xbf, 0xd2, 0x0d, 0x58, 0x17, 0xff, 0xdb, - 0x45, 0xe9, 0xba, 0x08, 0x94, 0x3c, 0xb0, 0x4d, 0xde, 0x4a, 0x6d, 0xfc, 0x19, 0x24, 0xcb, 0xb7, - 0xec, 0xd2, 0xaf, 0x68, 0xb0, 0x71, 0xb4, 0xfb, 0x9f, 0xae, 0x8a, 0x5f, 0x9e, 0x82, 0xbd, 0xca, - 0xfa, 0x28, 0x33, 0xc2, 0x23, 0x90, 0x9d, 0x82, 0x95, 0xb3, 0x0c, 0xd6, 0x33, 0x66, 0xfb, 0xcf, - 0xc7, 0x46, 0xbb, 0x76, 0x28, 0xf7, 0x0a, 0x3b, 0xd1, 0x8e, 0x3d, 0xd0, 0x09, 0xaa, 0x06, 0x48, - 0xd4, 0xb2, 0x24, 0xad, 0x84, 0xff, 0xa2, 0x15, 0xe1, 0xc7, 0xe5, 0xc2, 0xf3, 0xd3, 0x4f, 0x00, - 0x00, 0x00, 0xff, 0xff, 0x71, 0x0c, 0x1d, 0x71, 0x92, 0x01, 0x00, 0x00, + // 301 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x91, 0x41, 0x4b, 0xc3, 0x40, + 0x10, 0x85, 0x89, 0x96, 0xd6, 0x2e, 0xad, 0xc2, 0x0a, 0x12, 0xea, 0xa5, 0xc4, 0x4b, 0x40, 0x69, + 0x0e, 0xe2, 0x5d, 0x9a, 0x93, 0x88, 0x20, 0xb1, 0x27, 0x6f, 0x9b, 0x74, 0x92, 0x2c, 0x6d, 0x76, + 0xc2, 0xcc, 0x06, 0x8c, 0xbf, 0x5e, 0xb2, 0xd9, 0xf6, 0xe0, 0x29, 0xef, 0x9b, 0x99, 0xbc, 0x7d, + 0xb3, 0x2b, 0xee, 0x91, 0xf6, 0x40, 0x40, 0x49, 0x81, 0xa6, 0xd4, 0x55, 0x47, 0xca, 0x6a, 0x34, + 0x9b, 0x96, 0xd0, 0xa2, 0x9c, 0xf9, 0xe6, 0xea, 0xb6, 0xc0, 0xa6, 0x41, 0x93, 0x8c, 0x9f, 0xb1, + 0x1b, 0x3d, 0x88, 0x65, 0x8a, 0x86, 0xc1, 0x70, 0xc7, 0xbb, 0xbe, 0x05, 0x29, 0xc5, 0xc4, 0xf6, + 0x2d, 0x84, 0xc1, 0x3a, 0x88, 0xe7, 0x99, 0xd3, 0xd1, 0x8b, 0x98, 0x6f, 0x95, 0x2d, 0xea, 0x2f, + 0xfd, 0x0b, 0x32, 0x16, 0x37, 0x8d, 0xfa, 0xf9, 0x00, 0x66, 0x55, 0x41, 0x8a, 0x9d, 0xb1, 0x6e, + 0x76, 0x99, 0xfd, 0x2f, 0x47, 0xb1, 0x58, 0xb8, 0xdf, 0x76, 0xba, 0x01, 0xec, 0xac, 0x0c, 0xc5, + 0xcc, 0x8e, 0xd2, 0xbb, 0x9f, 0x30, 0x7a, 0x15, 0xd7, 0x29, 0x81, 0x4b, 0xfd, 0x89, 0x47, 0x5d, + 0xf4, 0xf2, 0x4e, 0x4c, 0x5b, 0xa7, 0xfc, 0xa8, 0xa7, 0xa1, 0xbe, 0xd7, 0x15, 0xb0, 0x0d, 0x2f, + 0xd6, 0x41, 0xbc, 0xc8, 0x3c, 0x0d, 0x7b, 0xbc, 0x99, 0x8a, 0x80, 0xd9, 0x1b, 0x48, 0x31, 0x31, + 0xaa, 0x39, 0xef, 0x31, 0xe8, 0xe8, 0x51, 0x2c, 0xd3, 0x5a, 0x69, 0xe3, 0xce, 0x42, 0x62, 0xb9, + 0x12, 0x57, 0xce, 0x57, 0x03, 0x87, 0xc1, 0xfa, 0x32, 0x9e, 0x67, 0x67, 0x1e, 0xd2, 0xbf, 0xab, + 0xf2, 0xa0, 0xb6, 0x84, 0x07, 0x20, 0x1e, 0xd2, 0xe7, 0xa3, 0xf4, 0xa3, 0x27, 0xdc, 0x6e, 0xbe, + 0x9f, 0x2a, 0x6d, 0xeb, 0x2e, 0xdf, 0x14, 0xd8, 0x24, 0x75, 0xdf, 0x02, 0x1d, 0x61, 0x5f, 0x01, + 0x25, 0xa5, 0xca, 0x49, 0x17, 0x89, 0xbb, 0x6a, 0x4e, 0xfc, 0x43, 0xe4, 0x53, 0xc7, 0xcf, 0x7f, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x3f, 0xf2, 0x2b, 0x29, 0xb7, 0x01, 0x00, 0x00, } diff --git a/protos/orderer/configuration.proto b/protos/orderer/configuration.proto index 82e43300c7e..4462a18e2ef 100644 --- a/protos/orderer/configuration.proto +++ b/protos/orderer/configuration.proto @@ -61,6 +61,11 @@ message CreationPolicy { bytes digest = 2; } +// IngressPolicy is the name of the policy which incoming Broadcast messages are filtered against +message IngressPolicy { + string name = 1; +} + message ChainCreators { // A list of policies, any of which may be specified as the chain creation // policy in a chain creation request