diff --git a/bddtests/features/bootstrap.feature b/bddtests/features/bootstrap.feature index 6ddce48cd6a..b2fa0b0ec69 100644 --- a/bddtests/features/bootstrap.feature +++ b/bddtests/features/bootstrap.feature @@ -254,14 +254,14 @@ Feature: Bootstrap | ChainId | Start | End | | com.acme.blockchain.jdoe.channel1 | 0 | 0 | - Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "0" blocks with "0" messages within "1" seconds + Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "1" blocks with "1" messages within "1" seconds When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "peer2" using port "7051" And user "dev0Org0" sends deliver a seek request on orderer "peer2" with properties: | ChainId | Start | End | | com.acme.blockchain.jdoe.channel1 | 0 | 0 | - Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "0" blocks with "0" messages within "1" seconds + Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "1" blocks with "1" messages within "1" seconds # Entry point for invoking on an existing channel When user "peer0Admin" creates a chaincode spec "ccSpec" with name "example02" of type "GOLANG" for chaincode "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example02" with args diff --git a/common/config/api.go b/common/config/api.go index 0c281671974..c3bfaa7ec7f 100644 --- a/common/config/api.go +++ b/common/config/api.go @@ -6,7 +6,6 @@ SPDX-License-Identifier: Apache-2.0 package config import ( - "github.com/hyperledger/fabric/common/resourcesconfig" cb "github.com/hyperledger/fabric/protos/common" ) @@ -26,7 +25,4 @@ type Manager interface { // GetResourceConfig defines methods that are related to resource configuration GetResourceConfig(channel string) Config - - // GetPolicyMapper returns API to the policy mapper - GetPolicyMapper(channel string) resourcesconfig.PolicyMapper } diff --git a/common/deliver/deliver.go b/common/deliver/deliver.go index e62404e07a5..2271077d9a5 100644 --- a/common/deliver/deliver.go +++ b/common/deliver/deliver.go @@ -68,18 +68,19 @@ type Support interface { Errored() <-chan struct{} } -// PolicyNameProvider provides a policy name given the channel id -type PolicyNameProvider func(chainID string) (string, error) +// PolicyChecker checks the envelope against the policy logic supplied by the +// function +type PolicyChecker func(envelope *cb.Envelope, channelID string) error type deliverServer struct { sm SupportManager - policyProvider PolicyNameProvider + policyChecker PolicyChecker timeWindow time.Duration bindingInspector comm.BindingInspector } // NewHandlerImpl creates an implementation of the Handler interface -func NewHandlerImpl(sm SupportManager, policyProvider PolicyNameProvider, timeWindow time.Duration, mutualTLS bool) Handler { +func NewHandlerImpl(sm SupportManager, policyChecker PolicyChecker, timeWindow time.Duration, mutualTLS bool) Handler { // function to extract the TLS cert hash from a channel header extract := func(msg proto.Message) []byte { chdr, isChannelHeader := msg.(*cb.ChannelHeader) @@ -92,7 +93,7 @@ func NewHandlerImpl(sm SupportManager, policyProvider PolicyNameProvider, timeWi return &deliverServer{ sm: sm, - policyProvider: policyProvider, + policyChecker: policyChecker, timeWindow: timeWindow, bindingInspector: bindingInspector, } @@ -166,13 +167,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env lastConfigSequence := chain.Sequence() - policyName, err := ds.policyProvider(chdr.ChannelId) - if err != nil { - logger.Warningf("[channel: %s] failed to obtain policy name due to %s", chdr.ChannelId, err) - return sendStatusReply(srv, cb.Status_BAD_REQUEST) - } - sf := NewSigFilter(policyName, chain) - if err := sf.Apply(envelope); err != nil { + if err := ds.policyChecker(envelope, chdr.ChannelId); err != nil { logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err) return sendStatusReply(srv, cb.Status_FORBIDDEN) } @@ -225,7 +220,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env currentConfigSequence := chain.Sequence() if currentConfigSequence > lastConfigSequence { lastConfigSequence = currentConfigSequence - if err := sf.Apply(envelope); err != nil { + if err := ds.policyChecker(envelope, chdr.ChannelId); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return sendStatusReply(srv, cb.Status_FORBIDDEN) } diff --git a/common/deliver/deliver_test.go b/common/deliver/deliver_test.go index 6e899a3406b..e41d7ee93e1 100644 --- a/common/deliver/deliver_test.go +++ b/common/deliver/deliver_test.go @@ -45,10 +45,6 @@ import ( var genesisBlock = cb.NewBlock(0, nil) var systemChainID = "systemChain" -var policyNameProvider = func(_ string) (string, error) { - return policies.ChannelReaders, nil -} - var timeWindow = time.Duration(15 * time.Minute) var testCert = &x509.Certificate{ Raw: []byte("test"), @@ -172,14 +168,24 @@ func NewRAMLedger() blockledger.ReadWriter { return rl } -func initializeDeliverHandler() Handler { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) +func initializeDeliverHandler(mm *mockSupportManager, mutualTLS bool) Handler { + if mm == nil { + mm = newMockMultichainManager() + ms := mm.chains[systemChainID] + l := ms.ledger + for i := 1; i < ledgerSize; i++ { + l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) + } } - - return NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + policyChecker := func(env *cb.Envelope, channelID string) error { + chain, ok := mm.GetChain(channelID) + if !ok { + return fmt.Errorf("channel %s not found", channelID) + } + sf := NewSigFilter(policies.ChannelReaders, chain) + return sf.Apply(env) + } + return NewHandlerImpl(mm, policyChecker, timeWindow, mutualTLS) } func newMockMultichainManager() *mockSupportManager { @@ -237,7 +243,7 @@ func TestWholeChainSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -269,7 +275,7 @@ func TestNewestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -291,7 +297,7 @@ func TestSpecificSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) specifiedStart := uint64(3) @@ -328,7 +334,7 @@ func TestUnauthorizedSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -337,7 +343,7 @@ func TestUnauthorizedSeek(t *testing.T) { select { case deliverReply := <-m.sendChan: if deliverReply.GetStatus() != cb.Status_FORBIDDEN { - t.Fatalf("Received wrong error on the reply channel") + t.Fatalf("Received wrong error on the reply channel: %s", deliverReply.GetStatus()) } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") @@ -353,7 +359,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -384,7 +390,7 @@ func TestOutOfBoundSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(3 * ledgerSize)), Stop: seekSpecified(uint64(3 * ledgerSize)), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -403,7 +409,7 @@ func TestFailFastSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_FAIL_IF_NOT_READY}) @@ -436,7 +442,7 @@ func TestBlockingSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -490,7 +496,7 @@ func TestErroredSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -514,7 +520,7 @@ func TestErroredBlockingSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -539,7 +545,7 @@ func TestErroredBlockingSeek(t *testing.T) { func TestSGracefulShutdown(t *testing.T) { m := newMockD() - ds := NewHandlerImpl(nil, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(nil, !mutualTLS) close(m.recvChan) assert.NoError(t, ds.Handle(m), "Expected no error for hangup") @@ -549,7 +555,7 @@ func TestReversedSeqSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) specifiedStart := uint64(7) @@ -567,13 +573,13 @@ func TestReversedSeqSeek(t *testing.T) { } func TestBadStreamRecv(t *testing.T) { - bh := NewHandlerImpl(nil, policyNameProvider, timeWindow, !mutualTLS) + bh := initializeDeliverHandler(nil, !mutualTLS) assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error") } func TestBadStreamSend(t *testing.T) { m := &erroneousSendMockD{recvVal: makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})} - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) assert.Error(t, ds.Handle(m), "Should catch unexpected stream error") } @@ -581,7 +587,7 @@ func TestOldestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekOldest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -599,7 +605,7 @@ func TestNoPayloadSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{Payload: []byte("Foo")} @@ -616,7 +622,7 @@ func TestNilPayloadHeaderSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{})} @@ -633,7 +639,7 @@ func TestBadChannelHeader(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{ @@ -656,7 +662,7 @@ func TestChainNotFound(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -673,7 +679,7 @@ func TestBadSeekInfoPayload(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -701,7 +707,7 @@ func TestMissingSeekPosition(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -729,7 +735,7 @@ func TestNilTimestamp(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -756,7 +762,7 @@ func TestTimestampOutOfTimeWindow(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -781,17 +787,10 @@ func TestTimestampOutOfTimeWindow(t *testing.T) { } func TestSeekWithMutualTLS(t *testing.T) { - mm := newMockMultichainManager() - ms := mm.chains[systemChainID] - l := ms.ledger - for i := 1; i < ledgerSize; i++ { - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS) + ds := initializeDeliverHandler(nil, mutualTLS) go ds.Handle(m) m.recvChan <- makeSeekWithTLSCertHash(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}, testCert) @@ -810,17 +809,10 @@ func TestSeekWithMutualTLS(t *testing.T) { } func TestSeekWithMutualTLS_wrongTLSCert(t *testing.T) { - mm := newMockMultichainManager() - ms := mm.chains[systemChainID] - l := ms.ledger - for i := 1; i < ledgerSize; i++ { - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS) + ds := initializeDeliverHandler(nil, mutualTLS) go ds.Handle(m) wrongCert := &x509.Certificate{ Raw: []byte("wrong"), @@ -836,17 +828,10 @@ func TestSeekWithMutualTLS_wrongTLSCert(t *testing.T) { } func TestSeekWithMutualTLS_noTLSCert(t *testing.T) { - mm := newMockMultichainManager() - ms := mm.chains[systemChainID] - l := ms.ledger - for i := 1; i < ledgerSize; i++ { - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS) + ds := initializeDeliverHandler(nil, mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) diff --git a/core/peer/atomicbroadcast.go b/core/peer/atomicbroadcast.go index f10a50a03c1..83e9741ff10 100644 --- a/core/peer/atomicbroadcast.go +++ b/core/peer/atomicbroadcast.go @@ -16,17 +16,14 @@ limitations under the License. package peer import ( - "fmt" "runtime/debug" "time" "github.com/hyperledger/fabric/common/deliver" "github.com/hyperledger/fabric/common/flogging" - "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" - "github.com/pkg/errors" ) const pkgLogID = "common/peer" @@ -62,16 +59,9 @@ func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { // NewAtomicBroadcastServer creates an ab.AtomicBroadcastServer based on the // ledger Reader. Broadcast is not implemented/supported on the peer. -func NewAtomicBroadcastServer(timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { - configSupport := NewConfigSupport() +func NewAtomicBroadcastServer(timeWindow time.Duration, mutualTLS bool, policyChecker deliver.PolicyChecker) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandlerImpl(DeliverSupportManager{}, func(chainID string) (string, error) { - policyMapper := configSupport.GetPolicyMapper(chainID) - if policyMapper == nil { - return "", errors.New(fmt.Sprintf("cannot find policy mapper for channel %s", chainID)) - } - return policyMapper.PolicyRefForAPI(resources.BLOCKEVENT), nil - }, timeWindow, mutualTLS), + dh: deliver.NewHandlerImpl(DeliverSupportManager{}, policyChecker, timeWindow, mutualTLS), } return s } diff --git a/core/peer/peer.go b/core/peer/peer.go index 6f278974be4..f16c4a5d9eb 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -809,17 +809,3 @@ func (*configSupport) GetResourceConfig(channel string) cc.Config { } return chain.cs.bundleSource.ConfigtxValidator() } - -// GetPolicyMapper returns an instance of a object that represents -// an API policy mapper which provides a mapping from specific API -// function to its policy -func (*configSupport) GetPolicyMapper(channel string) resourcesconfig.PolicyMapper { - chains.RLock() - defer chains.RUnlock() - chain := chains.list[channel] - if chain == nil { - peerLogger.Error("GetPolicyMapper: channel", channel, "not found in the list of channels associated with this peer") - return nil - } - return chain.cs.bundleSource.APIPolicyMapper() -} diff --git a/orderer/common/server/server.go b/orderer/common/server/server.go index 4de03c36e7f..5b2d2ca59ad 100644 --- a/orderer/common/server/server.go +++ b/orderer/common/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/multichannel" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/pkg/errors" ) type broadcastSupport struct { @@ -49,9 +50,15 @@ type server struct { // NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, func(_ string) (string, error) { - return policies.ChannelReaders, nil - }, timeWindow, mutualTLS), + dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, + func(env *cb.Envelope, channelID string) error { + chain, ok := r.GetChain(channelID) + if !ok { + return errors.Errorf("channel %s not found", channelID) + } + sf := deliver.NewSigFilter(policies.ChannelReaders, chain) + return sf.Apply(env) + }, timeWindow, mutualTLS), bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}), debug: debug, } diff --git a/peer/node/start.go b/peer/node/start.go index d9254b33560..7c2d38e3f9e 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -23,6 +23,8 @@ import ( "github.com/hyperledger/fabric/common/localmsp" "github.com/hyperledger/fabric/common/viperutil" "github.com/hyperledger/fabric/core" + "github.com/hyperledger/fabric/core/aclmgmt" + "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/chaincode/accesscontrol" "github.com/hyperledger/fabric/core/comm" @@ -42,6 +44,7 @@ import ( "github.com/hyperledger/fabric/peer/common" peergossip "github.com/hyperledger/fabric/peer/gossip" "github.com/hyperledger/fabric/peer/version" + cb "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/ledger/rwset" ab "github.com/hyperledger/fabric/protos/orderer" pb "github.com/hyperledger/fabric/protos/peer" @@ -175,7 +178,10 @@ func serve(args []string) error { // broadcast mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert timeWindow := viper.GetDuration("peer.authentication.timewindow") - abServer := peer.NewAtomicBroadcastServer(timeWindow, mutualTLS) + policyChecker := func(env *cb.Envelope, channelID string) error { + return aclmgmt.GetACLProvider().CheckACL(resources.BLOCKEVENT, channelID, env) + } + abServer := peer.NewAtomicBroadcastServer(timeWindow, mutualTLS, policyChecker) ab.RegisterAtomicBroadcastServer(peerServer.Server(), abServer) // enable the cache of chaincode info