Skip to content

Commit

Permalink
[FAB-5143] [FAB-5557] Fix eventhub race condition
Browse files Browse the repository at this point in the history
Change-Id: I1abb724605ea4d6fab60cd0c131495c53d9e5cce
Signed-off-by: Divyank Katira <Divyank.Katira@securekey.com>
  • Loading branch information
d1vyank committed Aug 15, 2017
1 parent 306ba0a commit 7fb8ad9
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 77 deletions.
2 changes: 1 addition & 1 deletion api/apifabclient/event.go
Expand Up @@ -18,7 +18,7 @@ type EventHub interface {
SetPeerAddr(peerURL string, certificate string, serverHostOverride string)
IsConnected() bool
Connect() error
Disconnect()
Disconnect() error
RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE
UnregisterChaincodeEvent(cbe *ChainCodeCBE)
RegisterTxEvent(txnID apitxn.TransactionID, callback func(string, pb.TxValidationCode, error))
Expand Down
54 changes: 43 additions & 11 deletions pkg/fabric-client/events/consumer/consumer.go
Expand Up @@ -32,14 +32,15 @@ const defaultTimeout = time.Second * 3

type eventsClient struct {
sync.RWMutex
peerAddress string
regTimeout time.Duration
stream ehpb.Events_ChatClient
adapter consumer.EventAdapter
TLSCertificate string
TLSServerHostOverride string
clientConn *grpc.ClientConn
client fab.FabricClient
peerAddress string
regTimeout time.Duration
stream ehpb.Events_ChatClient
adapter consumer.EventAdapter
TLSCertificate string
TLSServerHostOverride string
clientConn *grpc.ClientConn
client fab.FabricClient
processEventsCompleted chan struct{}
}

//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
Expand All @@ -52,7 +53,8 @@ func NewEventsClient(client fab.FabricClient, peerAddress string, certificate st
regTimeout = 60 * time.Second
err = fmt.Errorf("regTimeout > 60, setting to 60 sec")
}
return &eventsClient{sync.RWMutex{}, peerAddress, regTimeout, nil, adapter, certificate, serverhostoverride, nil, client}, err
return &eventsClient{sync.RWMutex{}, peerAddress, regTimeout, nil, adapter,
certificate, serverhostoverride, nil, client, nil}, err
}

//newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
Expand Down Expand Up @@ -151,8 +153,19 @@ func (ec *eventsClient) register(ies []*ehpb.Interest) error {

// UnregisterAsync - Unregisters interest in a event and doesn't wait for a response
func (ec *eventsClient) UnregisterAsync(ies []*ehpb.Interest) error {
emsg := &ehpb.Event{Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}}}
var err error
if ec.client.UserContext() == nil {
return fmt.Errorf("User context needs to be set")
}
creator, err := ec.client.UserContext().Identity()
if err != nil {
return fmt.Errorf("Error getting creator: %v", err)
}

emsg := &ehpb.Event{
Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}},
Creator: creator,
}

if err = ec.send(emsg); err != nil {
err = fmt.Errorf("error on unregister send %s", err)
}
Expand Down Expand Up @@ -211,6 +224,8 @@ func (ec *eventsClient) Recv() (*ehpb.Event, error) {
}
func (ec *eventsClient) processEvents() error {
defer ec.stream.CloseSend()
defer close(ec.processEventsCompleted)

for {
in, err := ec.stream.Recv()
if err == io.EOF {
Expand Down Expand Up @@ -262,13 +277,16 @@ func (ec *eventsClient) Start() error {
return err
}

ec.processEventsCompleted = make(chan struct{})
go ec.processEvents()

return nil
}

//Stop terminates connection with event hub
func (ec *eventsClient) Stop() error {
var timeoutErr error

if ec.stream == nil {
// in case the stream/chat server has not been established earlier, we assume that it's closed, successfully
return nil
Expand All @@ -279,12 +297,26 @@ func (ec *eventsClient) Stop() error {
if err != nil {
return err
}

select {
// Server ended its send stream in response to CloseSend()
case <-ec.processEventsCompleted:
// Timeout waiting for server to end stream
case <-time.After(ec.client.Config().TimeoutOrDefault(apiconfig.EventHub)):
timeoutErr = fmt.Errorf("Timed out waiting for server to close event stream")
}

//close client connection
if ec.clientConn != nil {
err := ec.clientConn.Close()
if err != nil {
return err
}
}

if timeoutErr != nil {
return timeoutErr
}

return nil
}
79 changes: 40 additions & 39 deletions pkg/fabric-client/events/eventhub.go
Expand Up @@ -110,19 +110,26 @@ func (eventHub *EventHub) SetInterests(block bool) {
}

// Disconnect disconnects from peer event source
func (eventHub *EventHub) Disconnect() {
func (eventHub *EventHub) Disconnect() error {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

if !eventHub.connected {
return
return nil
}

// Unregister interests with server and stop the stream
eventHub.grpcClient.UnregisterAsync(eventHub.interestedEvents)
eventHub.grpcClient.Stop()
err := eventHub.grpcClient.UnregisterAsync(eventHub.interestedEvents)
if err != nil {
return fmt.Errorf("Error unregistering events: %s", err)
}
err = eventHub.grpcClient.Stop()
if err != nil {
return fmt.Errorf("Error stopping event client: %s", err)
}

eventHub.connected = false
return nil
}

// RegisterBlockEvent - register callback function for block events
Expand Down Expand Up @@ -258,7 +265,6 @@ func (eventHub *EventHub) Connect() error {
}

eventHub.connected = true

return nil
}

Expand All @@ -269,46 +275,43 @@ func (eventHub *EventHub) GetInterestedEvents() ([]*pb.Interest, error) {

//Recv implements consumer.EventAdapter interface for receiving events
func (eventHub *EventHub) Recv(msg *pb.Event) (bool, error) {
switch msg.Event.(type) {
case *pb.Event_Block:
blockEvent := msg.Event.(*pb.Event_Block)
logger.Debugf("Recv blockEvent:%v\n", blockEvent)
for _, v := range eventHub.getBlockRegistrants() {
v(blockEvent.Block)
}

for _, tdata := range blockEvent.Block.Data.Data {
if ccEvent, channelID, err := getChainCodeEvent(tdata); err != nil {
logger.Warningf("getChainCodeEvent return error: %v\n", err)
} else if ccEvent != nil {
eventHub.notifyChaincodeRegistrants(channelID, ccEvent, true)
// Deliver events asynchronously so that we can continue receiving events
go func() {
switch msg.Event.(type) {
case *pb.Event_Block:
blockEvent := msg.Event.(*pb.Event_Block)
logger.Debugf("Recv blockEvent:%v\n", blockEvent)
for _, v := range eventHub.getBlockRegistrants() {
v(blockEvent.Block)
}
for _, tdata := range blockEvent.Block.Data.Data {
if ccEvent, channelID, err := getChainCodeEvent(tdata); err != nil {
logger.Warningf("getChainCodeEvent return error: %v\n", err)
} else if ccEvent != nil {
eventHub.notifyChaincodeRegistrants(channelID, ccEvent, true)
}
}
return
case *pb.Event_ChaincodeEvent:
ccEvent := msg.Event.(*pb.Event_ChaincodeEvent)
logger.Debugf("Recv ccEvent:%v\n", ccEvent)
if ccEvent != nil {
eventHub.notifyChaincodeRegistrants("", ccEvent.ChaincodeEvent, false)
}
return
default:
return
}
return true, nil
case *pb.Event_ChaincodeEvent:
ccEvent := msg.Event.(*pb.Event_ChaincodeEvent)
logger.Debugf("Recv ccEvent:%v\n", ccEvent)
}()

if ccEvent != nil {
eventHub.notifyChaincodeRegistrants("", ccEvent.ChaincodeEvent, false)
}
return true, nil
default:
return true, nil
}
return true, nil
}

// Disconnected implements consumer.EventAdapter interface for receiving events
func (eventHub *EventHub) Disconnected(err error) {
eventHub.mtx.Lock()
defer eventHub.mtx.Unlock()

if !eventHub.connected {
return
if err != nil {
logger.Warningf("EventHub was disconnected unexpectedly: %s", err)
}

eventHub.grpcClient.Stop()
eventHub.connected = false
}

// RegisterChaincodeEvent registers a callback function to receive chaincode events.
Expand Down Expand Up @@ -518,12 +521,10 @@ func getChainCodeEvent(tdata []byte) (event *pb.ChaincodeEvent, channelID string

// Utility function to fire callbacks for chaincode registrants
func (eventHub *EventHub) notifyChaincodeRegistrants(channelID string, ccEvent *pb.ChaincodeEvent, patternMatch bool) {

cbeArray := eventHub.getChaincodeRegistrants(ccEvent.ChaincodeId)
if len(cbeArray) <= 0 {
logger.Debugf("No event registration for ccid %s \n", ccEvent.ChaincodeId)
}

for _, v := range cbeArray {
match := v.EventNameFilter == ccEvent.EventName
if !match && patternMatch {
Expand Down
21 changes: 0 additions & 21 deletions pkg/fabric-client/events/eventhub_test.go
Expand Up @@ -402,27 +402,6 @@ func TestDisconnectWhenDisconnected(t *testing.T) {
verifyDisconnectedEventHub(eventHub, t)
}

func TestDiconnected(t *testing.T) {
eventHub, _ := createMockedEventHub(t)
if t.Failed() {
return
}

eventHub.Disconnected(nil)
verifyDisconnectedEventHub(eventHub, t)

}
func TestDiconnectedWhenDisconnected(t *testing.T) {
eventHub, _ := createMockedEventHub(t)
if t.Failed() {
return
}
eventHub.connected = false
eventHub.Disconnected(nil)
verifyDisconnectedEventHub(eventHub, t)

}

func verifyDisconnectedEventHub(eventHub *EventHub, t *testing.T) {
if eventHub.connected == true {
t.Fatalf("EventHub is not disconnected after Disconnect call")
Expand Down
10 changes: 5 additions & 5 deletions test/integration/events_test.go
Expand Up @@ -24,12 +24,10 @@ const (
func TestEvents(t *testing.T) {
testSetup := initializeTests(t)

testReconnectEventHub(t, testSetup)
testFailedTx(t, testSetup)
testFailedTxErrorCode(t, testSetup)
testMultipleBlockEventCallbacks(t, testSetup)

// TODO: The ordering of the reconnect test can affect the result - needs investigation.
testReconnectEventHub(t, testSetup)
}

func initializeTests(t *testing.T) BaseSetupImpl {
Expand Down Expand Up @@ -232,11 +230,13 @@ Loop:

func testReconnectEventHub(t *testing.T, testSetup BaseSetupImpl) {
// Test disconnect event hub
testSetup.EventHub.Disconnect()
err := testSetup.EventHub.Disconnect()
if err != nil {
t.Fatalf("Error disconnecting event hub: %s", err)
}
if testSetup.EventHub.IsConnected() {
t.Fatalf("Failed to disconnect event hub")
}

// Reconnect event hub
if err := testSetup.EventHub.Connect(); err != nil {
t.Fatalf("Failed to connect event hub")
Expand Down

0 comments on commit 7fb8ad9

Please sign in to comment.