Skip to content

Commit

Permalink
[FAB-8395] Abstract Event Service
Browse files Browse the repository at this point in the history
Defined an Event Service API and provided
an abstract implementation which may be
reused by protocol-specific implementations
(e.g. Event Hub and Deliver Service)

Change-Id: Ie8b901dfe08df20b01a38ba26c85a3a87f667a41
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn authored and troyronda committed Feb 23, 2018
1 parent 05b5ab9 commit 948f2fd
Show file tree
Hide file tree
Showing 19 changed files with 3,152 additions and 0 deletions.
93 changes: 93 additions & 0 deletions api/apifabclient/eventservice.go
@@ -0,0 +1,93 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package apifabclient

import (
cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
)

// BlockEvent contains the data for the block event
type BlockEvent struct {
Block *cb.Block
}

// FilteredBlockEvent contains the data for a filtered block event
type FilteredBlockEvent struct {
FilteredBlock *pb.FilteredBlock
}

// TxStatusEvent contains the data for a transaction status event
type TxStatusEvent struct {
TxID string
TxValidationCode pb.TxValidationCode
}

// CCEvent contains the data for a chaincode event
type CCEvent struct {
TxID string
ChaincodeID string
EventName string
}

// Registration is a handle that is returned from a successful RegisterXXXEvent.
// This handle should be used in Unregister in order to unregister the event.
type Registration interface{}

// BlockFilter is a function that determines whether a Block event
// should be ignored
type BlockFilter func(block *cb.Block) bool

// EventService is a service that receives events such as block, filtered block,
// chaincode, and transaction status events.
type EventService interface {
// RegisterBlockEvent registers for block events. If the caller does not have permission
// to register for block events then an error is returned.
// Note that Unregister must be called when the registration is no longer needed.
// - filter is an optional filter that filters out unwanted events. (Note: Only one filter may be specified.)
// - Returns the registration and a channel that is used to receive events. The channel
// is closed when Unregister is called.
RegisterBlockEvent(filter ...BlockFilter) (Registration, <-chan *BlockEvent, error)

// RegisterFilteredBlockEvent registers for filtered block events.
// Note that Unregister must be called when the registration is no longer needed.
// - Returns the registration and a channel that is used to receive events. The channel
// is closed when Unregister is called.
RegisterFilteredBlockEvent() (Registration, <-chan *FilteredBlockEvent, error)

// RegisterChaincodeEvent registers for chaincode events.
// Note that Unregister must be called when the registration is no longer needed.
// - ccID is the chaincode ID for which events are to be received
// - eventFilter is the chaincode event filter (regular expression) for which events are to be received
// - Returns the registration and a channel that is used to receive events. The channel
// is closed when Unregister is called.
RegisterChaincodeEvent(ccID, eventFilter string) (Registration, <-chan *CCEvent, error)

// RegisterTxStatusEvent registers for transaction status events.
// Note that Unregister must be called when the registration is no longer needed.
// - txID is the transaction ID for which events are to be received
// - Returns the registration and a channel that is used to receive events. The channel
// is closed when Unregister is called.
RegisterTxStatusEvent(txID string) (Registration, <-chan *TxStatusEvent, error)

// Unregister removes the given registration and closes the event channel.
// - reg is the registration handle that was returned from one of the Register functions
Unregister(reg Registration)
}

// EventClient is a client that connects to a peer and receives channel events
// such as block, filtered block, chaincode, and transaction status events.
type EventClient interface {
EventService

// Connect connects to the event server.
Connect() error

// Close closes the connection to the event server and releases all resources.
// Once this function is invoked the client may no longer be used.
Close()
}
17 changes: 17 additions & 0 deletions pkg/fabric-client/events/service/blockfilter/acceptanyfilter.go
@@ -0,0 +1,17 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package blockfilter

import (
"github.com/hyperledger/fabric-sdk-go/api/apifabclient"
cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
)

// AcceptAny returns a block filter that accepts any block
var AcceptAny apifabclient.BlockFilter = func(block *cb.Block) bool {
return true
}
@@ -0,0 +1,51 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package headertypefilter

import (
"github.com/hyperledger/fabric-sdk-go/api/apifabclient"
"github.com/hyperledger/fabric-sdk-go/pkg/logging"
cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/utils"
)

var logger = logging.NewLogger("eventservice/blockfilter")

// New returns a block filter that filters out blocks that
// don't contain envelopes of the given type(s)
func New(headerTypes ...cb.HeaderType) apifabclient.BlockFilter {
return func(block *cb.Block) bool {
return hasType(block, headerTypes...)
}
}

func hasType(block *cb.Block, headerTypes ...cb.HeaderType) bool {
for i := 0; i < len(block.Data.Data); i++ {
env, err := utils.ExtractEnvelope(block, i)
if err != nil {
logger.Errorf("error extracting envelope from block: %s", err)
continue
}
payload, err := utils.ExtractPayload(env)
if err != nil {
logger.Errorf("error extracting payload from block: %s", err)
continue
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Errorf("error extracting channel header: %s", err)
continue
}
htype := cb.HeaderType(chdr.Type)
for _, headerType := range headerTypes {
if htype == headerType {
return true
}
}
}
return false
}
@@ -0,0 +1,29 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package headertypefilter

import (
"testing"

servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/events/service/mocks"
cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
)

func TestHeaderTypeBlockFilter(t *testing.T) {
filter := New(cb.HeaderType_CONFIG, cb.HeaderType_CONFIG_UPDATE)

if !filter(servicemocks.NewBlock("somechannel", servicemocks.NewTransaction("txid", pb.TxValidationCode_VALID, cb.HeaderType_CONFIG))) {
t.Fatalf("expecting block filter to accept block with header type %s", cb.HeaderType_CONFIG)
}
if !filter(servicemocks.NewBlock("somechannel", servicemocks.NewTransaction("txid", pb.TxValidationCode_VALID, cb.HeaderType_CONFIG_UPDATE))) {
t.Fatalf("expecting block filter to accept block with header type %s", cb.HeaderType_CONFIG_UPDATE)
}
if filter(servicemocks.NewBlock("somechannel", servicemocks.NewTransaction("txid", pb.TxValidationCode_VALID, cb.HeaderType_MESSAGE))) {
t.Fatalf("expecting block filter to reject block with header type %s", cb.HeaderType_MESSAGE)
}
}

0 comments on commit 948f2fd

Please sign in to comment.