Skip to content

Commit

Permalink
[FABG-746] Allow transfer of event registrations
Browse files Browse the repository at this point in the history
A new function was added to the EventClient API that allows
you to transfer event registrations from an existing client
and import them into a new client. The new client will
start listening to blocks from where the old client left off.
This allows you to update the configuration of an SDK without
losing events.

Change-Id: I5f8035b2c29a0a4e10a4f85c8ffa09d4153e04ec
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Aug 27, 2018
1 parent c5ec8d9 commit cdd0b4d
Show file tree
Hide file tree
Showing 16 changed files with 1,021 additions and 71 deletions.
28 changes: 28 additions & 0 deletions pkg/common/providers/fab/eventservice.go
Expand Up @@ -113,6 +113,29 @@ type ConnectionEvent struct {
Err error
}

// EventSnapshot contains a snapshot of the event client before it was stopped.
// The snapshot includes all of the event registrations and the last block received.
type EventSnapshot interface {
// LastBlockReceived returns the block number of the last block received at the time
// that the snapshot was taken.
LastBlockReceived() uint64

// BlockRegistrations returns the block registrations.
BlockRegistrations() []Registration

// FilteredBlockRegistrations returns the filtered block registrations.
FilteredBlockRegistrations() []Registration

// CCRegistrations returns the chaincode registrations.
CCRegistrations() []Registration

// TxStatusRegistrations returns the transaction status registrations.
TxStatusRegistrations() []Registration

// Closes all registrations
Close()
}

// EventClient is a client that connects to a peer and receives channel events
// such as block, filtered block, chaincode, and transaction status events.
type EventClient interface {
Expand All @@ -131,4 +154,9 @@ type EventClient interface {
// A return value of false indicates that the client could not be closed since
// there was at least one registration.
CloseIfIdle() bool

// TransferRegistrations transfers all registrations into an EventSnapshot.
// The registrations are not closed and may be transferred to a new event client.
// - close: If true then the client will also be closed
TransferRegistrations(close bool) (EventSnapshot, error)
}
68 changes: 46 additions & 22 deletions pkg/fab/events/client/client.go
Expand Up @@ -108,40 +108,66 @@ func (c *Client) Connect() error {
// A return value of false indicates that the client could not be closed since
// there was at least one registration.
func (c *Client) CloseIfIdle() bool {
return c.close(false)
logger.Debug("Attempting to close event client...")

// Check if there are any outstanding registrations
regInfoCh := make(chan *esdispatcher.RegistrationInfo)
err := c.Submit(esdispatcher.NewRegistrationInfoEvent(regInfoCh))
if err != nil {
logger.Debugf("Submit failed %s", err)
return false
}
regInfo := <-regInfoCh

logger.Debugf("Outstanding registrations: %d", regInfo.TotalRegistrations)

if regInfo.TotalRegistrations > 0 {
logger.Debugf("Cannot stop client since there are %d outstanding registrations", regInfo.TotalRegistrations)
return false
}

c.Close()

return true
}

// Close closes the connection to the event server and releases all resources.
// Once this function is invoked the client may no longer be used.
func (c *Client) Close() {
c.close(true)
c.close(func() {
c.Stop()
})
}

func (c *Client) close(force bool) bool {
logger.Debug("Attempting to close event client...")
// TransferRegistrations transfers all registrations into an EventSnapshot.
// The registrations are not closed and may susequently be transferred to a
// new event client.
// - close - if true then the client will also be closed
func (c *Client) TransferRegistrations(close bool) (fab.EventSnapshot, error) {
if !close {
return c.Transfer()
}

if !force {
// Check if there are any outstanding registrations
regInfoCh := make(chan *esdispatcher.RegistrationInfo)
err := c.Submit(esdispatcher.NewRegistrationInfoEvent(regInfoCh))
var snapshot fab.EventSnapshot
var err error
c.close(func() {
logger.Debug("Stopping dispatcher and taking snapshot of all registrations...")
snapshot, err = c.StopAndTransfer()
if err != nil {
logger.Debugf("Submit failed %s", err)
return false
logger.Errorf("An error occurred while stopping dispatcher and taking snapshot: %s", err)
}
regInfo := <-regInfoCh
})

logger.Debugf("Outstanding registrations: %d", regInfo.TotalRegistrations)
return snapshot, err
}

if regInfo.TotalRegistrations > 0 {
logger.Debugf("Cannot stop client since there are %d outstanding registrations", regInfo.TotalRegistrations)
return false
}
}
func (c *Client) close(stopHandler func()) {
logger.Debug("Attempting to close event client...")

if !c.setStoppped() {
// Already stopped
logger.Debug("Client already stopped")
return true
return
}

logger.Debug("Stopping client...")
Expand All @@ -154,7 +180,7 @@ func (c *Client) close(force bool) bool {
err1 := c.Submit(dispatcher.NewDisconnectEvent(errch))
if err1 != nil {
logger.Debugf("Submit failed %s", err1)
return false
return
}
err := <-errch

Expand All @@ -166,13 +192,11 @@ func (c *Client) close(force bool) bool {

logger.Debug("Stopping dispatcher...")

c.Stop()
stopHandler()

c.mustSetConnectionState(Disconnected)

logger.Debug("... event client is stopped")

return true
}

func (c *Client) connect() error {
Expand Down
85 changes: 85 additions & 0 deletions pkg/fab/events/client/client_test.go
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
Expand Down Expand Up @@ -1457,3 +1459,86 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
t.Fatal("Timed out waiting for reconnect")
}
}

func TestTransferRegistrations(t *testing.T) {
// Tests the scenario where all event registrations are transferred to another event client.
t.Run("Transfer", func(t *testing.T) {
testTransferRegistrations(t, func(client *Client) (fab.EventSnapshot, error) {
return client.TransferRegistrations(false)
})
})

// Tests the scenario where one event client is stopped and all
// of the event registrations are transferred to another event client.
t.Run("TransferAndClose", func(t *testing.T) {
testTransferRegistrations(t, func(client *Client) (fab.EventSnapshot, error) {
return client.TransferRegistrations(true)
})
})
}

type transferFunc func(client *Client) (fab.EventSnapshot, error)

// TestTransferRegistrations tests the scenario where one event client is stopped and all
// of the event registrations are transferred to another event client.
func testTransferRegistrations(t *testing.T, transferFunc transferFunc) {
channelID := "mychannel"
eventClient1, conn1, err := newClientWithMockConn(
fabmocks.NewMockContext(
mspmocks.NewMockSigningIdentity("user1", "Org1MSP"),
),
fabmocks.NewMockChannelCfg(channelID),
clientmocks.NewDiscoveryService(peer1, peer2),
clientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
)
require.NoErrorf(t, err, "error creating channel event client")

err = eventClient1.Connect()
require.NoErrorf(t, err, "error connecting channel event client")

breg, beventch, err := eventClient1.RegisterBlockEvent()
require.NoErrorf(t, err, "error registering for block events")

conn1.Ledger().NewBlock(channelID,
servicemocks.NewTransaction("txID", pb.TxValidationCode_VALID, cb.HeaderType_ENDORSER_TRANSACTION),
)

select {
case <-beventch:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for block event")
}

snapshot, err := transferFunc(eventClient1)
require.NoErrorf(t, err, "error transferring snapshot")

eventClient2, conn2, err := newClientWithMockConnAndOpts(
fabmocks.NewMockContext(
mspmocks.NewMockSigningIdentity("user1", "Org1MSP"),
),
fabmocks.NewMockChannelCfg("mychannel"),
clientmocks.NewDiscoveryService(peer1, peer2),
nil, clientProvider,
[]options.Opt{
esdispatcher.WithSnapshot(snapshot),
},
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
)
require.NoErrorf(t, err, "error creating channel event client")

err = eventClient2.Connect()
require.NoErrorf(t, err, "error connecting channel event client")

conn2.Ledger().NewBlock(channelID,
servicemocks.NewTransaction("txID", pb.TxValidationCode_VALID, cb.HeaderType_ENDORSER_TRANSACTION),
)

select {
case <-beventch:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for block event")
}

eventClient2.Unregister(breg)
}
62 changes: 51 additions & 11 deletions pkg/fab/events/client/dispatcher/dispatcher.go
Expand Up @@ -8,6 +8,7 @@ package dispatcher

import (
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -238,22 +239,61 @@ func (ed *Dispatcher) clearConnectionRegistration() {
}

func (ed *Dispatcher) filterByBlockHeght(peers []fab.Peer) []fab.Peer {
if ed.blockHeightLagThreshold < 0 || len(peers) == 1 {
logger.Debugf("Returning all peers")
return peers
var minBlockHeight uint64
if ed.minBlockHeight > 0 {
if ed.LastBlockNum() != math.MaxUint64 {
// No blocks received yet
logger.Debugf("Min block height was specified: %d", ed.minBlockHeight)
minBlockHeight = ed.minBlockHeight
} else {
// Make sure minBlockHeight is greater than the last block received
if ed.minBlockHeight > ed.LastBlockNum() {
minBlockHeight = ed.minBlockHeight
} else {
minBlockHeight = ed.LastBlockNum() + 1
logger.Debugf("Min block height was specified as %d but the last block received was %d. Setting min height to %d", ed.minBlockHeight, ed.LastBlockNum(), minBlockHeight)
}
}
}

maxHeight := getMaxBlockHeight(peers)
logger.Debugf("Max block height of peers: %d", maxHeight)

if maxHeight <= uint64(ed.blockHeightLagThreshold) {
logger.Debugf("Max block height of peers is %d and lag threshold is %d so returning all peers", maxHeight, ed.blockHeightLagThreshold)
return peers
retPeers := ed.doFilterByBlockHeght(minBlockHeight, peers)
if len(retPeers) == 0 && minBlockHeight > 0 {
// The last block that was received may have been the last block in the channel. Try again with lastBlock-1.
logger.Infof("No peers at the minimum height %d. Trying again with min height %d ...", minBlockHeight, minBlockHeight-1)
minBlockHeight--
retPeers = ed.doFilterByBlockHeght(minBlockHeight, peers)
if len(retPeers) == 0 {
// No peers at the given height. Try again without min height
logger.Infof("No peers at the minimum height %d. Trying again without min height ...", minBlockHeight)
retPeers = ed.doFilterByBlockHeght(0, peers)
}
}

cutoffHeight := maxHeight - uint64(ed.blockHeightLagThreshold)
return retPeers
}

func (ed *Dispatcher) doFilterByBlockHeght(minBlockHeight uint64, peers []fab.Peer) []fab.Peer {
var cutoffHeight uint64
if minBlockHeight > 0 {
logger.Debugf("Setting cutoff height to be min block height: %d ...", minBlockHeight)
cutoffHeight = minBlockHeight
} else {
if ed.blockHeightLagThreshold < 0 || len(peers) == 1 {
logger.Debugf("Returning all peers")
return peers
}

maxHeight := getMaxBlockHeight(peers)
logger.Debugf("Max block height of peers: %d", maxHeight)

if maxHeight <= uint64(ed.blockHeightLagThreshold) {
logger.Debugf("Max block height of peers is %d and lag threshold is %d so returning all peers", maxHeight, ed.blockHeightLagThreshold)
return peers
}
cutoffHeight = maxHeight - uint64(ed.blockHeightLagThreshold)
}

logger.Debugf("Choosing peers whose block heights are greater than the cutoff height %d ...", cutoffHeight)
logger.Debugf("Choosing peers whose block heights are at least the cutoff height %d ...", cutoffHeight)

var retPeers []fab.Peer
for _, p := range peers {
Expand Down
1 change: 1 addition & 0 deletions pkg/fab/events/client/dispatcher/dispatcher_test.go
Expand Up @@ -213,6 +213,7 @@ func TestConnectionEvent(t *testing.T) {
t.Fatalf("Error stopping dispatcher: %s", err1)
}

// Wait for event that test is done
err = <-errch
if err != nil {
t.Fatal(err.Error())
Expand Down
12 changes: 12 additions & 0 deletions pkg/fab/events/client/dispatcher/opts.go
Expand Up @@ -16,6 +16,7 @@ import (

type params struct {
loadBalancePolicy lbp.LoadBalancePolicy
minBlockHeight uint64
blockHeightMonitorPeriod time.Duration
blockHeightLagThreshold int
reconnectBlockHeightLagThreshold int
Expand Down Expand Up @@ -110,3 +111,14 @@ func (p *params) SetBlockHeightMonitorPeriod(value time.Duration) {
logger.Debugf("BlockHeightMonitorPeriod: %s", value)
p.blockHeightMonitorPeriod = value
}

func (p *params) SetFromBlock(value uint64) {
logger.Debugf("FromBlock: %d", value)
p.minBlockHeight = value + 1
}

func (p *params) SetSnapshot(value fab.EventSnapshot) error {
logger.Debugf("SetSnapshot.FromBlock: %d", value)
p.minBlockHeight = value.LastBlockReceived() + 1
return nil
}
8 changes: 8 additions & 0 deletions pkg/fab/events/deliverclient/deliverclient.go
Expand Up @@ -28,6 +28,10 @@ var logger = logging.NewLogger("fabsdk/fab")

// deliverProvider is the connection provider used for connecting to the Deliver service
var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
if peer == nil {
return nil, errors.New("Peer is nil")
}

eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
Expand All @@ -37,6 +41,10 @@ var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, p

// deliverFilteredProvider is the connection provider used for connecting to the DeliverFiltered service
var deliverFilteredProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
if peer == nil {
return nil, errors.New("Peer is nil")
}

eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
Expand Down

0 comments on commit cdd0b4d

Please sign in to comment.