Skip to content

Commit

Permalink
[FAB-8910] Add payload for non-filtered CC events
Browse files Browse the repository at this point in the history
The EventService now includes the payload for
non-filtered chaincode events. For filtered
events, the payload will be set to nil.

Change-Id: I624d1613c4c7a609af3ef034285bea95d3c9736b
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Mar 15, 2018
1 parent 523348b commit e0e3413
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 26 deletions.
16 changes: 13 additions & 3 deletions pkg/context/api/fab/eventservice.go
Expand Up @@ -13,25 +13,35 @@ import (

// BlockEvent contains the data for the block event
type BlockEvent struct {
// Block is the block that was committed
Block *cb.Block
}

// FilteredBlockEvent contains the data for a filtered block event
type FilteredBlockEvent struct {
// FilteredBlock contains a filtered version of the block that was committed
FilteredBlock *pb.FilteredBlock
}

// TxStatusEvent contains the data for a transaction status event
type TxStatusEvent struct {
TxID string
// TxID is the ID of the transaction in which the event was set
TxID string
// TxValidationCode is the status code of the commit
TxValidationCode pb.TxValidationCode
}

// CCEvent contains the data for a chaincode event
type CCEvent struct {
TxID string
// TxID is the ID of the transaction in which the event was set
TxID string
// ChaincodeID is the ID of the chaincode that set the event
ChaincodeID string
EventName string
// EventName is the name of the chaincode event
EventName string
// Payload contains the payload of the chaincode event
// NOTE: Payload will be nil for filtered events
Payload []byte
}

// Registration is a handle that is returned from a successful RegisterXXXEvent.
Expand Down
18 changes: 12 additions & 6 deletions pkg/fab/events/client/client_test.go
Expand Up @@ -9,6 +9,7 @@ SPDX-License-Identifier: Apache-2.0
package client

import (
"bytes"
"fmt"
"sync"
"testing"
Expand Down Expand Up @@ -595,14 +596,14 @@ func TestCCEvents(t *testing.T) {
if !ok {
t.Fatalf("unexpected closed channel")
} else {
checkCCEvent(t, event, ccID1, event1)
checkCCEvent(t, event, ccID1, nil, event1)
numReceived++
}
case event, ok := <-eventch2:
if !ok {
t.Fatalf("unexpected closed channel")
} else {
checkCCEvent(t, event, ccID2, event2, event3)
checkCCEvent(t, event, ccID2, nil, event2, event3)
numReceived++
}
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -908,6 +909,7 @@ func listenChaincodeEvents(channelID string, eventch <-chan *fab.CCEvent, expect
func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID string, expected int, errch chan<- error) {
ccID := "mycc1"
event1 := "event1"
payload1 := []byte("payload1")

var wg sync.WaitGroup
wg.Add(expected)
Expand All @@ -931,7 +933,7 @@ func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID str
defer eventClient.Unregister(reg)

ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent(txID, pb.TxValidationCode_VALID, ccID, event1),
servicemocks.NewTransactionWithCCEvent(txID, pb.TxValidationCode_VALID, ccID, event1, payload1),
)

select {
Expand Down Expand Up @@ -1108,7 +1110,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents mockconn.NumBlo
numEvents++
numCCEvents++
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)

// Wait a while for the subscriber to receive the event
Expand All @@ -1124,7 +1126,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents mockconn.NumBlo
for ; numCCEvents < int(expectedCCEvents); numCCEvents++ {
numEvents++
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)
}
for ; numEvents < int(expectedBlockEvents); numEvents++ {
Expand Down Expand Up @@ -1316,10 +1318,14 @@ func checkTxStatusEvent(t *testing.T, event *fab.TxStatusEvent, expectedTxID str
}
}

func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expectedEventNames ...string) {
func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expectedPayload []byte, expectedEventNames ...string) {
if event.ChaincodeID != expectedCCID {
t.Fatalf("expecting event for CC [%s] but received event for CC [%s]", expectedCCID, event.ChaincodeID)
}
if bytes.Compare(event.Payload, expectedPayload) != 0 {
t.Fatalf("expecting payload [%s] but received payload [%s]", expectedPayload, event.Payload)
}

found := false
for _, eventName := range expectedEventNames {
if event.EventName == eventName {
Expand Down
6 changes: 3 additions & 3 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Expand Up @@ -287,7 +287,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA
servicemocks.NewTransaction("txID", pb.TxValidationCode_VALID, cb.HeaderType_CONFIG_UPDATE),
)
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)

cp := clientmocks.NewProviderFactory()
Expand Down Expand Up @@ -346,7 +346,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA

// Produce a chaincode event
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)

// Wait a while for the subscriber to receive the events
Expand All @@ -359,7 +359,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA

// Produce some more events while the client is disconnected
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)
ledger.NewBlock(channelID,
servicemocks.NewTransaction("txID", pb.TxValidationCode_VALID, cb.HeaderType_CONFIG_UPDATE),
Expand Down
4 changes: 2 additions & 2 deletions pkg/fab/events/eventhubclient/eventhubclient_test.go
Expand Up @@ -359,7 +359,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num
numEvents++
numCCEvents++
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)

// Wait a while for the subscriber to receive the event
Expand All @@ -375,7 +375,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num
for ; numCCEvents < int(expectedCCEvents); numCCEvents++ {
numEvents++
ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"),
servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1", nil),
)
}
for ; numEvents < int(expectedBlockEvents); numEvents++ {
Expand Down
6 changes: 3 additions & 3 deletions pkg/fab/events/service/dispatcher/dispatcher.go
Expand Up @@ -470,15 +470,15 @@ func (ed *Dispatcher) publishCCEvents(ccEvent *pb.ChaincodeEvent) {

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId):
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload):
default:
logger.Warnf("Unable to send to CC event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId)
reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload)
} else {
select {
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId):
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload):
case <-time.After(ed.eventConsumerTimeout):
logger.Warnf("Timed out sending CC event.")
}
Expand Down
123 changes: 119 additions & 4 deletions pkg/fab/events/service/dispatcher/dispatcher_test.go
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package dispatcher

import (
"bytes"
"testing"
"time"

Expand Down Expand Up @@ -425,7 +426,118 @@ func TestTxStatusEvents(t *testing.T) {
}
}

func TestCCEvents(t *testing.T) {
func TestCCEventsUnfiltered(t *testing.T) {
channelID := "testchannel"
dispatcher := New()
if err := dispatcher.Start(); err != nil {
t.Fatalf("Error starting dispatcher: %s", err)
}

dispatcherEventch, err := dispatcher.EventCh()
if err != nil {
t.Fatalf("Error getting event channel from dispatcher: %s", err)
}

ccID1 := "mycc1"
ccID2 := "mycc2"
ccFilter1 := "event1"
ccFilter2 := "event2"
event1 := "event1"
event2 := "event2"
payload1 := []byte("payload1")
payload2 := []byte("payload2")

errch := make(chan error)
fbrespch := make(chan fab.Registration)
eventch := make(chan *fab.CCEvent, 10)
dispatcherEventch <- NewRegisterChaincodeEvent(ccID1, ccFilter1, eventch, fbrespch, errch)

var reg1 fab.Registration
select {
case reg1 = <-fbrespch:
case err := <-errch:
t.Fatalf("error registering for chaincode events: %s", err)
}

eventch = make(chan *fab.CCEvent, 10)
dispatcherEventch <- NewRegisterChaincodeEvent(ccID1, ccFilter1, eventch, fbrespch, errch)

select {
case reg1 = <-fbrespch:
t.Fatalf("expecting error registering multiple times for chaincode events but got registration")
case err = <-errch:
}

if err == nil {
t.Fatalf("expecting error registering multiple times for chaincode events")
}

dispatcherEventch <- NewUnregisterEvent(reg1)

eventch1 := make(chan *fab.CCEvent, 10)
dispatcherEventch <- NewRegisterChaincodeEvent(ccID1, ccFilter1, eventch1, fbrespch, errch)

select {
case reg1 = <-fbrespch:
case err := <-errch:
t.Fatalf("error registering for chaincode events: %s", err)
}

eventch2 := make(chan *fab.CCEvent, 10)
dispatcherEventch <- NewRegisterChaincodeEvent(ccID2, ccFilter2, eventch2, fbrespch, errch)

var reg2 fab.Registration
select {
case reg2 = <-fbrespch:
case err := <-errch:
t.Fatalf("error registering for chaincode events: %s", err)
}

dispatcherEventch <- servicemocks.NewBlockProducer().NewBlock(
channelID,
servicemocks.NewTransactionWithCCEvent("txid1", pb.TxValidationCode_VALID, ccID1, event1, payload1),
servicemocks.NewTransactionWithCCEvent("txid2", pb.TxValidationCode_VALID, ccID2, event2, payload2),
)

numExpected := 2
numReceived := 0

for {
select {
case event, ok := <-eventch1:
if !ok {
t.Fatalf("unexpected closed channel")
} else {
checkCCEvent(t, event, ccID1, payload1, event1)
numReceived++
}
case event, ok := <-eventch2:
if !ok {
t.Fatalf("unexpected closed channel")
} else {
checkCCEvent(t, event, ccID2, payload2, event2)
numReceived++
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for [%d] CC events. Only received [%d]", numExpected, numReceived)
}

if numReceived == numExpected {
break
}
}

dispatcherEventch <- NewUnregisterEvent(reg1)
dispatcherEventch <- NewUnregisterEvent(reg2)

stopResp := make(chan error)
dispatcherEventch <- NewStopEvent(stopResp)
if err := <-stopResp; err != nil {
t.Fatalf("Error stopping dispatcher: %s", err)
}
}

func TestCCEventsFiltered(t *testing.T) {
channelID := "testchannel"
dispatcher := New()
if err := dispatcher.Start(); err != nil {
Expand Down Expand Up @@ -507,14 +619,14 @@ func TestCCEvents(t *testing.T) {
if !ok {
t.Fatalf("unexpected closed channel")
} else {
checkCCEvent(t, event, ccID1, event1)
checkCCEvent(t, event, ccID1, nil, event1)
numReceived++
}
case event, ok := <-eventch2:
if !ok {
t.Fatalf("unexpected closed channel")
} else {
checkCCEvent(t, event, ccID2, event2, event3)
checkCCEvent(t, event, ccID2, nil, event2, event3)
numReceived++
}
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -646,10 +758,13 @@ func checkTxStatusEvent(t *testing.T, event *fab.TxStatusEvent, expectedTxID str
}
}

func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expectedEventNames ...string) {
func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expectedPayload []byte, expectedEventNames ...string) {
if event.ChaincodeID != expectedCCID {
t.Fatalf("expecting event for CC [%s] but received event for CC [%s]", expectedCCID, event.ChaincodeID)
}
if bytes.Compare(event.Payload, expectedPayload) != 0 {
t.Fatalf("expecting payload [%s] but received payload [%s]", expectedPayload, event.Payload)
}
found := false
for _, eventName := range expectedEventNames {
if event.EventName == eventName {
Expand Down
3 changes: 2 additions & 1 deletion pkg/fab/events/service/dispatcher/events.go
Expand Up @@ -121,11 +121,12 @@ func NewRegisterEvent(respch chan<- fab.Registration, errCh chan<- error) Regist
}

// NewChaincodeEvent creates a new ChaincodeEvent
func NewChaincodeEvent(chaincodeID, eventName, txID string) *fab.CCEvent {
func NewChaincodeEvent(chaincodeID, eventName, txID string, payload []byte) *fab.CCEvent {
return &fab.CCEvent{
ChaincodeID: chaincodeID,
EventName: eventName,
TxID: txID,
Payload: payload,
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/fab/events/service/mocks/mockevents.go
Expand Up @@ -43,6 +43,7 @@ type TxInfo struct {
HeaderType cb.HeaderType
ChaincodeID string
EventName string
Payload []byte
}

// NewTransaction creates a new transaction
Expand All @@ -55,12 +56,13 @@ func NewTransaction(txID string, txValidationCode pb.TxValidationCode, headerTyp
}

// NewTransactionWithCCEvent creates a new transaction with the given chaincode event
func NewTransactionWithCCEvent(txID string, txValidationCode pb.TxValidationCode, ccID string, eventName string) *TxInfo {
func NewTransactionWithCCEvent(txID string, txValidationCode pb.TxValidationCode, ccID string, eventName string, payload []byte) *TxInfo {
return &TxInfo{
TxID: txID,
TxValidationCode: txValidationCode,
ChaincodeID: ccID,
EventName: eventName,
Payload: payload,
HeaderType: cb.HeaderType_ENDORSER_TRANSACTION,
}
}
Expand Down Expand Up @@ -105,7 +107,7 @@ func NewFilteredTxWithCCEvent(txID, ccID, event string) *pb.FilteredTransaction

func newEnvelope(channelID string, txInfo *TxInfo) *cb.Envelope {
tx := &pb.Transaction{
Actions: []*pb.TransactionAction{newTxAction(txInfo.TxID, txInfo.ChaincodeID, txInfo.EventName)},
Actions: []*pb.TransactionAction{newTxAction(txInfo.TxID, txInfo.ChaincodeID, txInfo.EventName, txInfo.Payload)},
}
txBytes, err := proto.Marshal(tx)
if err != nil {
Expand All @@ -132,11 +134,12 @@ func newEnvelope(channelID string, txInfo *TxInfo) *cb.Envelope {
}
}

func newTxAction(txID string, ccID string, eventName string) *pb.TransactionAction {
func newTxAction(txID string, ccID string, eventName string, payload []byte) *pb.TransactionAction {
ccEvent := &pb.ChaincodeEvent{
TxId: string(txID),
ChaincodeId: ccID,
EventName: eventName,
Payload: payload,
}
eventBytes, err := proto.Marshal(ccEvent)
if err != nil {
Expand Down

0 comments on commit e0e3413

Please sign in to comment.