Skip to content

Commit

Permalink
[FAB-3421] SDK Go - Disconnect, RegisterBlockEvent
Browse files Browse the repository at this point in the history
Change-Id: I891e42625ce7a0501605753abfcbcd0bdc161f46
Signed-off-by: Sandra Vrtikapa <sandra.vrtikapa@securekey.com>
  • Loading branch information
sandrask committed Apr 26, 2017
1 parent ec9d2ef commit 797da09
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 14 deletions.
95 changes: 81 additions & 14 deletions fabric-client/events/eventhub.go
Expand Up @@ -22,6 +22,7 @@ package events
import (
"errors"
"fmt"
"reflect"
"regexp"
"sync"

Expand Down Expand Up @@ -49,6 +50,9 @@ type EventHub interface {
UnregisterChaincodeEvent(cbe *ChainCodeCBE)
RegisterTxEvent(txID string, callback func(string, error))
UnregisterTxEvent(txID string)
RegisterBlockEvent(callback func(*common.Block))
UnregisterBlockEvent(callback func(*common.Block))
Disconnect()
}

// The EventHubExt interface allows extensions of the SDK to add functionality to EventHub overloads.
Expand Down Expand Up @@ -130,8 +134,8 @@ func NewEventHub() EventHub {
eventsClientFactory: &consumerClientFactory{},
}

// default to listening for block events
eventHub.SetInterests(true)
// register default transaction callback
eventHub.RegisterBlockEvent(eventHub.txCallback)

return eventHub
}
Expand All @@ -141,12 +145,67 @@ func (eventHub *eventHub) SetInterests(block bool) {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

eventHub.interestedEvents = nil
eventHub.interestedEvents = make([]*pb.Interest, 0)
eventHub.blockRegistrants = make([]func(*common.Block), 0)

if block {
eventHub.blockRegistrants = append(eventHub.blockRegistrants, eventHub.txCallback)
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_BLOCK})
}
}

// Disconnect disconnects from peer event source
func (eventHub *eventHub) Disconnect() {
if !eventHub.connected {
return
}

eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

// Unregister interests with server and stop the stream
eventHub.client.UnregisterAsync(eventHub.interestedEvents)
eventHub.client.Stop()
eventHub.connected = false
}

// RegisterBlockEvent - register callback function for block events
func (eventHub *eventHub) RegisterBlockEvent(callback func(*common.Block)) {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

eventHub.blockRegistrants = append(eventHub.blockRegistrants, callback)
if len(eventHub.blockRegistrants) == 1 {
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_BLOCK})
}
}

// UnregisterBlockEvent unregister callback for block event
func (eventHub *eventHub) UnregisterBlockEvent(callback func(*common.Block)) {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

f1 := reflect.ValueOf(callback)

for i := range eventHub.blockRegistrants {
f2 := reflect.ValueOf(eventHub.blockRegistrants[i])
if f1.Pointer() == f2.Pointer() {
eventHub.blockRegistrants = append(eventHub.blockRegistrants[:i], eventHub.blockRegistrants[i+1:]...)
break
}
}

if len(eventHub.blockRegistrants) < 1 {
blockEventInterest := pb.Interest{EventType: pb.EventType_BLOCK}
eventHub.client.UnregisterAsync([]*pb.Interest{&blockEventInterest})
for i, v := range eventHub.interestedEvents {
if *v == blockEventInterest {
eventHub.interestedEvents = append(eventHub.interestedEvents[:i], eventHub.interestedEvents[i+1:]...)
}
}
}
}

// AddChaincodeInterest adds interest for specific CHAINCODE events.
func (eventHub *eventHub) AddChaincodeInterest(ChaincodeID string, EventName string) {
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{
Expand Down Expand Up @@ -191,24 +250,31 @@ func (eventHub *eventHub) IsConnected() bool {
* Establishes connection with peer event source<p>
*/
func (eventHub *eventHub) Connect() error {

if eventHub.connected {
logger.Debugf("Nothing to do - EventHub already connected")
return nil
}

if eventHub.peerAddr == "" {
return fmt.Errorf("eventHub.peerAddr is empty")
}

eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

eventHub.blockRegistrants = make([]func(*common.Block), 0)
eventHub.blockRegistrants = append(eventHub.blockRegistrants, eventHub.txCallback)
if eventHub.client == nil {
eventsClient, _ := eventHub.eventsClientFactory.newEventsClient(eventHub.peerAddr, eventHub.peerTLSCertificate, eventHub.peerTLSServerHostOverride, 5, eventHub)
eventHub.client = eventsClient
}

eventsClient, _ := eventHub.eventsClientFactory.newEventsClient(eventHub.peerAddr, eventHub.peerTLSCertificate, eventHub.peerTLSServerHostOverride, 5, eventHub)
if err := eventsClient.Start(); err != nil {
eventsClient.Stop()
if err := eventHub.client.Start(); err != nil {
eventHub.client.Stop()
return fmt.Errorf("Error from eventsClient.Start (%s)", err.Error())

}

eventHub.connected = true
eventHub.client = eventsClient

return nil
}

Expand Down Expand Up @@ -248,14 +314,15 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) {
}
}

// Disconnect implements consumer.EventAdapter interface for receiving events
/**
* Disconnects peer event source<p>
*/
// Disconnected implements consumer.EventAdapter interface for receiving events
func (eventHub *eventHub) Disconnected(err error) {
if !eventHub.connected {
return
}

eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

eventHub.client.Stop()
eventHub.connected = false
}
Expand Down
53 changes: 53 additions & 0 deletions fabric-client/events/eventhub_test.go
Expand Up @@ -27,6 +27,9 @@ import (

"time"

"reflect"

"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)

Expand Down Expand Up @@ -175,3 +178,53 @@ func flood(invocationsPerThread int, threads int, f func()) {
}()
}
}

func TestRegisterBlockEvent(t *testing.T) {
eventHub, _ := createMockedEventHub(t)
if t.Failed() {
return
}

// Transaction callback is registered by default
if len(eventHub.interestedEvents) != 1 || len(eventHub.blockRegistrants) != 1 {
t.Fatalf("Transaction callback should be registered by default")
}

f1 := reflect.ValueOf(eventHub.txCallback)
f2 := reflect.ValueOf(eventHub.blockRegistrants[0])

if f1.Pointer() != f2.Pointer() {
t.Fatalf("Registered callback is not txCallback")
}

eventHub.RegisterBlockEvent(testCallback)

if len(eventHub.blockRegistrants) != 2 {
t.Fatalf("Failed to add test callback for block event")
}

f1 = reflect.ValueOf(testCallback)
f2 = reflect.ValueOf(eventHub.blockRegistrants[1])

if f1.Pointer() != f2.Pointer() {
t.Fatalf("Registered callback is not testCallback")
}

eventHub.UnregisterBlockEvent(testCallback)

if len(eventHub.interestedEvents) != 1 || len(eventHub.blockRegistrants) != 1 {
t.Fatalf("Failed to unregister testCallback")
}

eventHub.UnregisterBlockEvent(eventHub.txCallback)

if len(eventHub.interestedEvents) != 0 || len(eventHub.blockRegistrants) != 0 {
t.Fatalf("Failed to unregister txCallback")
}

}

// private test callback to be executed on block event
func testCallback(block *common.Block) {
fmt.Println("testCallback called on block")
}
56 changes: 56 additions & 0 deletions test/integration/events_test.go
Expand Up @@ -20,11 +20,13 @@ limitations under the License.
package integration

import (
"fmt"
"testing"
"time"

fabricClient "github.com/hyperledger/fabric-sdk-go/fabric-client"
fcUtil "github.com/hyperledger/fabric-sdk-go/fabric-client/helpers"
"github.com/hyperledger/fabric/protos/common"
)

func TestEvents(t *testing.T) {
Expand Down Expand Up @@ -53,6 +55,19 @@ func TestEvents(t *testing.T) {

testFailedTx(t, testSetup)

// Test disconnect event hub
testSetup.EventHub.Disconnect()
if testSetup.EventHub.IsConnected() {
t.Fatalf("Failed to disconnect event hub")
}

// Reconnect event hub
if err := testSetup.EventHub.Connect(); err != nil {
t.Fatalf("Failed to connect event hub")
}

testMultipleBlockEventCallbacks(t, testSetup)

}

func testFailedTx(t *testing.T, testSetup BaseSetupImpl) {
Expand Down Expand Up @@ -106,3 +121,44 @@ func testFailedTx(t *testing.T, testSetup BaseSetupImpl) {
}

}

func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {

// Arguments for events CC
var args []string
args = append(args, "invoke")
args = append(args, "invoke")
args = append(args, "SEVERE")

// Create and register test callback that will be invoked upon block event
test := make(chan bool)
testSetup.EventHub.RegisterBlockEvent(func(block *common.Block) {
fmt.Println("Invoked test callback on block event")
test <- true
})

tpResponses, tx, err := fcUtil.CreateAndSendTransactionProposal(testSetup.Chain, testSetup.ChainCodeID, testSetup.ChainID, args, []fabricClient.Peer{testSetup.Chain.GetPrimaryPeer()})
if err != nil {
t.Fatalf("CreateAndSendTransactionProposal return error: %v \n", err)
}

// Register tx for commit/block event(s)
done, fail := fcUtil.RegisterTxEvent(tx, testSetup.EventHub)
defer testSetup.EventHub.UnregisterTxEvent(tx)

_, err = fcUtil.CreateAndSendTransaction(testSetup.Chain, tpResponses)
if err != nil {
t.Fatalf("First invoke failed err: %v", err)
}

for i := 0; i < 2; i++ {
select {
case <-done:
case <-fail:
case <-test:
case <-time.After(time.Second * 30):
t.Fatalf("invoke Didn't receive test callback event")
}
}

}

0 comments on commit 797da09

Please sign in to comment.