Skip to content

Commit

Permalink
[FABG-699] Add SDK config for reconnecting event client
Browse files Browse the repository at this point in the history
The options for automatically reconnecting to peers based
on block height are now exposed in the SDK config.

Change-Id: Idfd4a3a1bc733e086beef3d7b66a75cb9cc625a0
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Aug 3, 2018
1 parent bf173b4 commit fa73e44
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 66 deletions.
26 changes: 25 additions & 1 deletion pkg/common/providers/fab/provider.go
Expand Up @@ -88,11 +88,35 @@ type EndpointConfig interface {
ChannelPeers(name string) ([]ChannelPeer, bool)
ChannelOrderers(name string) ([]OrdererConfig, bool)
TLSCACertPool() CertPool
EventServiceType() EventServiceType
EventServiceConfig() EventServiceConfig
TLSClientCerts() []tls.Certificate
CryptoConfigPath() string
}

// EventServiceConfig specifies configuration options for the event service
type EventServiceConfig interface {
// Type returns the type of event service to use
Type() EventServiceType

// BlockHeightLagThreshold returns the block height lag threshold. This value is used for choosing a peer
// to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of
// blocks then it will be excluded from selection.
// If set to 0 then only the most up-to-date peers are considered.
// If set to -1 then all peers (regardless of block height) are considered for selection.
BlockHeightLagThreshold() int

// ReconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
// block height falls behind the specified number of blocks and will reconnect to a better performing peer.
// If set to 0 (default) then the peer will not disconnect based on block height.
// NOTE: Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
// affecting performance.
ReconnectBlockHeightLagThreshold() int

// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
// value is only relevant if reconnectBlockHeightLagThreshold >0.
BlockHeightMonitorPeriod() time.Duration
}

// TimeoutType enumerates the different types of outgoing connections
type TimeoutType int

Expand Down
14 changes: 7 additions & 7 deletions pkg/common/providers/test/mockfab/mockfab.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/core/config/testdata/template/config.yaml
Expand Up @@ -49,6 +49,30 @@ client:
# # Event service type (optional). If not specified then the type is automatically
# # determined from channel capabilities.
# type: (deliver|eventhub)
#
# # blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer
# # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of
# # blocks then it will be excluded from selection.
# # If set to 0 then only the most up-to-date peers are considered.
# # If set to -1 then all peers (regardless of block height) are considered for selection.
# # Default: 5
# blockHeightLagThreshold: 5
#
# # reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
# # block height falls behind the specified number of blocks and will reconnect to a better performing peer.
# # If set to 0 then this feature is disabled.
# # Default: 0 (disabled)
# # NOTES:
# # - This feature should only be enabled when using deliver events, otherwise events may be lost
# # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
# # affecting performance.
# reconnectBlockHeightLagThreshold: 0
#
# # blockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
# # value is only relevant if reconnectBlockHeightLagThreshold >0.
# # Default: 5s
# blockHeightMonitorPeriod: 5s

# the below timeouts are commented out to use the default values that are found in
# "pkg/fab/endpointconfig.go"
# the client is free to override the default values by uncommenting and resetting
Expand Down
75 changes: 63 additions & 12 deletions pkg/fab/endpointconfig.go
Expand Up @@ -11,6 +11,7 @@ import (
"crypto/x509"
"reflect"
"regexp"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -50,9 +51,12 @@ const (
defaultChannelConfigRefreshInterval = time.Second * 90
defaultChannelMemshpRefreshInterval = time.Second * 60
defaultDiscoveryRefreshInterval = time.Second * 5
defaultSelectionRefreshInterval = time.Minute * 1
defaultSelectionRefreshInterval = time.Second * 5
defaultCacheSweepInterval = time.Second * 15

defaultBlockHeightLagThreshold = 5
defaultBlockHeightMonitorPeriod = 5 * time.Second

//default grpc opts
defaultKeepAliveTime = 0
defaultKeepAliveTimeout = time.Second * 20
Expand Down Expand Up @@ -232,17 +236,9 @@ func (c *EndpointConfig) TLSCACertPool() fab.CertPool {
return c.tlsCertPool
}

// EventServiceType returns the type of event service client to use
func (c *EndpointConfig) EventServiceType() fab.EventServiceType {
etype := c.backend.GetString("client.eventService.type")
switch etype {
case "eventhub":
return fab.EventHubEventServiceType
case "deliver":
return fab.DeliverEventServiceType
default:
return fab.AutoDetectEventServiceType
}
// EventServiceConfig returns the event service config
func (c *EndpointConfig) EventServiceConfig() fab.EventServiceConfig {
return &EventServiceConfig{backend: c.backend}
}

// TLSClientCerts loads the client's certs for mutual TLS
Expand Down Expand Up @@ -1446,6 +1442,61 @@ func (c *EndpointConfig) regexMatchAndReplace(regex *regexp.Regexp, src, repl st
return repl
}

// EventServiceConfig contains config options for the event service
type EventServiceConfig struct {
backend *lookup.ConfigLookup
}

// Type returns the type of event service to use
func (c *EventServiceConfig) Type() fab.EventServiceType {
etype := c.backend.GetString("client.eventService.type")
switch etype {
case "eventhub":
return fab.EventHubEventServiceType
case "deliver":
return fab.DeliverEventServiceType
default:
return fab.AutoDetectEventServiceType
}
}

// BlockHeightLagThreshold returns the block height lag threshold. This value is used for choosing a peer
// to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of
// blocks then it will be excluded from selection.
// If set to 0 then only the most up-to-date peers are considered.
// If set to -1 then all peers (regardless of block height) are considered for selection.
func (c *EventServiceConfig) BlockHeightLagThreshold() int {
lagThresholdStr := c.backend.GetString("client.eventService.blockHeightLagThreshold")
if lagThresholdStr == "" {
return defaultBlockHeightLagThreshold
}
lagThreshold, err := strconv.Atoi(lagThresholdStr)
if err != nil {
logger.Warnf("Invalid numeric value for client.eventService.blockHeightLagThreshold. Setting to default value of %d", defaultBlockHeightLagThreshold)
return defaultBlockHeightLagThreshold
}
return lagThreshold
}

// ReconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
// block height falls behind the specified number of blocks and will reconnect to a better performing peer.
// If set to 0 then this feature is disabled.
// NOTE: Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
// affecting performance.
func (c *EventServiceConfig) ReconnectBlockHeightLagThreshold() int {
return c.backend.GetInt("client.eventService.reconnectBlockHeightLagThreshold")
}

// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
// value is only relevant if reconnectBlockHeightLagThreshold >0.
func (c *EventServiceConfig) BlockHeightMonitorPeriod() time.Duration {
period := c.backend.GetDuration("client.eventService.blockHeightMonitorPeriod")
if period == 0 {
return defaultBlockHeightMonitorPeriod
}
return period
}

//peerChannelConfigHookFunc returns hook function for unmarshalling 'fab.PeerChannelConfig'
// Rule : default set to 'true' if not provided in config
func peerChannelConfigHookFunc() mapstructure.DecodeHookFunc {
Expand Down
19 changes: 19 additions & 0 deletions pkg/fab/endpointconfig_test.go
Expand Up @@ -11,6 +11,8 @@ import (
"crypto/x509"
"testing"

"github.com/stretchr/testify/require"

"os"

"fmt"
Expand Down Expand Up @@ -200,6 +202,23 @@ func TestTimeouts(t *testing.T) {
checkTimeouts(endpointConfig, t, errStr)
}

func TestEventServiceConfig(t *testing.T) {
customBackend := getCustomBackend()
customBackend.KeyValueMap["client.eventService.type"] = "deliver"
customBackend.KeyValueMap["client.eventService.blockHeightLagThreshold"] = "4"
customBackend.KeyValueMap["client.eventService.reconnectBlockHeightLagThreshold"] = "7"
customBackend.KeyValueMap["client.eventService.blockHeightMonitorPeriod"] = "7s"

endpointConfig, err := ConfigFromBackend(customBackend)
require.NoError(t, err)

eventServiceConfig := endpointConfig.EventServiceConfig()
assert.Equalf(t, fab.DeliverEventServiceType, eventServiceConfig.Type(), "invalid value for type")
assert.Equalf(t, 4, eventServiceConfig.BlockHeightLagThreshold(), "invalid value for blockHeightLagThreshold")
assert.Equalf(t, 7, eventServiceConfig.ReconnectBlockHeightLagThreshold(), "invalid value for reconnectBlockHeightLagThreshold")
assert.Equalf(t, 7*time.Second, eventServiceConfig.BlockHeightMonitorPeriod(), "invalid value for blockHeightMonitorPeriod")
}

func checkTimeouts(endpointConfig fab.EndpointConfig, t *testing.T, errStr string) {
t1 := endpointConfig.Timeout(fab.OrdererResponse)
assert.Equal(t, time.Second*6, t1, "OrdererResponse")
Expand Down
2 changes: 1 addition & 1 deletion pkg/fab/events/client/dispatcher/dispatcher.go
Expand Up @@ -41,7 +41,7 @@ type Dispatcher struct {

// New creates a new dispatcher
func New(context context.Client, chConfig fab.ChannelCfg, discoveryService fab.DiscoveryService, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher {
params := defaultParams()
params := defaultParams(context.EndpointConfig().EventServiceConfig())
options.Apply(params, opts)

return &Dispatcher{
Expand Down
9 changes: 5 additions & 4 deletions pkg/fab/events/client/dispatcher/opts.go
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"
)

Expand All @@ -20,12 +21,12 @@ type params struct {
reconnectBlockHeightLagThreshold int
}

func defaultParams() *params {
func defaultParams(config fab.EventServiceConfig) *params {
return &params{
loadBalancePolicy: lbp.NewRoundRobin(),
blockHeightLagThreshold: 5, // TODO: Use defaults from SDK config
reconnectBlockHeightLagThreshold: 0, // TODO: Use defaults from SDK config
blockHeightMonitorPeriod: 5 * time.Second, // TODO: Use defaults from SDK config
blockHeightMonitorPeriod: config.BlockHeightMonitorPeriod(),
blockHeightLagThreshold: config.BlockHeightLagThreshold(),
reconnectBlockHeightLagThreshold: config.ReconnectBlockHeightLagThreshold(),
}
}

Expand Down
40 changes: 36 additions & 4 deletions pkg/fab/mocks/mockconfig.go
Expand Up @@ -30,7 +30,7 @@ type MockConfig struct {
customPeerCfg *fab.PeerConfig
customOrdererCfg *fab.OrdererConfig
customRandomOrdererCfg *fab.OrdererConfig
EvtServiceType fab.EventServiceType
EvtServiceConfig fab.EventServiceConfig
}

// NewMockCryptoConfig ...
Expand Down Expand Up @@ -310,9 +310,12 @@ func (c *MockConfig) TLSClientCerts() []tls.Certificate {
return nil
}

// EventServiceType returns the type of event service client to use
func (c *MockConfig) EventServiceType() fab.EventServiceType {
return c.EvtServiceType
// EventServiceConfig returns the type of event service client to use
func (c *MockConfig) EventServiceConfig() fab.EventServiceConfig {
if c.EvtServiceConfig != nil {
return c.EvtServiceConfig
}
return &MockEventServiceConfig{}
}

// Lookup gets the Value from config file by Key
Expand All @@ -326,3 +329,32 @@ func (c *MockConfig) Lookup(key string) (interface{}, bool) {
}
return value, true
}

// MockEventServiceConfig contains configuration options for the event service
type MockEventServiceConfig struct {
EvtType fab.EventServiceType
LagThreshold int
ReconnectLagThreshold int
HeightMonitorPeriod time.Duration
}

// Type returns the type of event service to use
func (c *MockEventServiceConfig) Type() fab.EventServiceType {
return c.EvtType
}

// BlockHeightLagThreshold returns the block height lag threshold.
func (c *MockEventServiceConfig) BlockHeightLagThreshold() int {
return c.LagThreshold
}

// ReconnectBlockHeightLagThreshold sets the ReconnectBlockHeightLagThreshold.
func (c *MockEventServiceConfig) ReconnectBlockHeightLagThreshold() int {
return c.ReconnectLagThreshold
}

// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
// value is only relevant if reconnectBlockHeightLagThreshold >0.
func (c *MockEventServiceConfig) BlockHeightMonitorPeriod() time.Duration {
return c.HeightMonitorPeriod
}
14 changes: 7 additions & 7 deletions pkg/fab/opts.go
Expand Up @@ -28,7 +28,7 @@ type EndpointConfigOptions struct {
channelPeers
channelOrderers
tlsCACertPool
eventServiceType
eventServiceConfig
tlsClientCerts
cryptoConfigPath
}
Expand Down Expand Up @@ -92,9 +92,9 @@ type tlsCACertPool interface {
TLSCACertPool() fab.CertPool
}

// eventServiceType interface allows to uniquely override EndpointConfig interface's EventServiceType() function
type eventServiceType interface {
EventServiceType() fab.EventServiceType
// eventServiceType interface allows to uniquely override EndpointConfig interface's EventServiceConfig() function
type eventServiceConfig interface {
EventServiceConfig() fab.EventServiceConfig
}

// tlsClientCerts interface allows to uniquely override EndpointConfig interface's TLSClientCerts() function
Expand Down Expand Up @@ -139,7 +139,7 @@ func UpdateMissingOptsWithDefaultConfig(c *EndpointConfigOptions, d fab.Endpoint
s.set(c.channelPeers, nil, func() { c.channelPeers = d })
s.set(c.channelOrderers, nil, func() { c.channelOrderers = d })
s.set(c.tlsCACertPool, nil, func() { c.tlsCACertPool = d })
s.set(c.eventServiceType, nil, func() { c.eventServiceType = d })
s.set(c.eventServiceConfig, nil, func() { c.eventServiceConfig = d })
s.set(c.tlsClientCerts, nil, func() { c.tlsClientCerts = d })
s.set(c.cryptoConfigPath, nil, func() { c.cryptoConfigPath = d })

Expand All @@ -150,7 +150,7 @@ func UpdateMissingOptsWithDefaultConfig(c *EndpointConfigOptions, d fab.Endpoint
// (ie EndpointConfig interface not fully overridden)
func IsEndpointConfigFullyOverridden(c *EndpointConfigOptions) bool {
return !anyNil(c.timeout, c.orderersConfig, c.ordererConfig, c.peersConfig, c.peerConfig, c.networkConfig,
c.networkPeers, c.channelConfig, c.channelPeers, c.channelOrderers, c.tlsCACertPool, c.eventServiceType, c.tlsClientCerts, c.cryptoConfigPath)
c.networkPeers, c.channelConfig, c.channelPeers, c.channelOrderers, c.tlsCACertPool, c.eventServiceConfig, c.tlsClientCerts, c.cryptoConfigPath)
}

// will override EndpointConfig interface with functions provided by o (option)
Expand All @@ -168,7 +168,7 @@ func setEndpointConfigWithOptionInterface(c *EndpointConfigOptions, o interface{
s.set(c.channelPeers, func() bool { _, ok := o.(channelPeers); return ok }, func() { c.channelPeers = o.(channelPeers) })
s.set(c.channelOrderers, func() bool { _, ok := o.(channelOrderers); return ok }, func() { c.channelOrderers = o.(channelOrderers) })
s.set(c.tlsCACertPool, func() bool { _, ok := o.(tlsCACertPool); return ok }, func() { c.tlsCACertPool = o.(tlsCACertPool) })
s.set(c.eventServiceType, func() bool { _, ok := o.(eventServiceType); return ok }, func() { c.eventServiceType = o.(eventServiceType) })
s.set(c.eventServiceConfig, func() bool { _, ok := o.(eventServiceConfig); return ok }, func() { c.eventServiceConfig = o.(eventServiceConfig) })
s.set(c.tlsClientCerts, func() bool { _, ok := o.(tlsClientCerts); return ok }, func() { c.tlsClientCerts = o.(tlsClientCerts) })
s.set(c.cryptoConfigPath, func() bool { _, ok := o.(cryptoConfigPath); return ok }, func() { c.cryptoConfigPath = o.(cryptoConfigPath) })

Expand Down

0 comments on commit fa73e44

Please sign in to comment.