Skip to content

Commit b0e6bbb

Browse files
committed
[FAB-15488] Refactor viper in gossip/state
Refactor viper usage in gossip/state package. Added stateConfig structure to hold configuration values. Configuration variables names changed accordinly. FAB-15488 #done Change-Id: I0f241c90492a858e39eab710e5b8bc040661af43 Signed-off-by: Chongxin Luo <Chongxin.Luo@ibm.com>
1 parent 591af34 commit b0e6bbb

File tree

6 files changed

+198
-89
lines changed

6 files changed

+198
-89
lines changed

gossip/privdata/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
const (
16-
reconcileSleepIntervalDefault = time.Minute * 1
16+
reconcileSleepIntervalDefault = time.Minute
1717
reconcileBatchSizeDefault = 10
1818
)
1919

gossip/service/gossip_service.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,14 @@ func (g *GossipService) InitializeChannel(channelID string, endpoints []string,
343343
g.privateHandlers[channelID].reconciler.Start()
344344

345345
blockingMode := !g.serviceConfig.NonBlockingCommitMode
346-
g.chains[channelID] = state.NewGossipStateProvider(channelID, servicesAdapter, coordinator,
347-
g.metrics.StateMetrics, blockingMode)
346+
stateConfig := state.GlobalConfig()
347+
g.chains[channelID] = state.NewGossipStateProvider(
348+
channelID,
349+
servicesAdapter,
350+
coordinator,
351+
g.metrics.StateMetrics,
352+
blockingMode,
353+
stateConfig)
348354
if g.deliveryService[channelID] == nil {
349355
var err error
350356
g.deliveryService[channelID], err = g.deliveryFactory.Service(g, endpoints, g.mcs)

gossip/state/config.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package state
8+
9+
import (
10+
"time"
11+
12+
"github.com/spf13/viper"
13+
)
14+
15+
const (
16+
DefStateCheckInterval = 10 * time.Second
17+
DefStateResponseTimeout = 3 * time.Second
18+
DefStateBatchSize = 10
19+
DefStateMaxRetries = 3
20+
DefStateBlockBufferSize = 100
21+
DefStateChannelSize = 100
22+
DefStateEnabled = true
23+
)
24+
25+
type StateConfig struct {
26+
StateCheckInterval time.Duration
27+
StateResponseTimeout time.Duration
28+
StateBatchSize uint64
29+
StateMaxRetries int
30+
StateBlockBufferSize int
31+
StateChannelSize int
32+
StateEnabled bool
33+
}
34+
35+
func GlobalConfig() *StateConfig {
36+
c := &StateConfig{}
37+
c.loadStateConfig()
38+
return c
39+
}
40+
41+
func (c *StateConfig) loadStateConfig() {
42+
43+
c.StateCheckInterval = DefStateCheckInterval
44+
if viper.IsSet("peer.gossip.state.checkInterval") {
45+
c.StateCheckInterval = viper.GetDuration("peer.gossip.state.checkInterval")
46+
}
47+
c.StateResponseTimeout = DefStateResponseTimeout
48+
if viper.IsSet("peer.gossip.state.responseTimeout") {
49+
c.StateResponseTimeout = viper.GetDuration("peer.gossip.state.responseTimeout")
50+
}
51+
c.StateBatchSize = DefStateBatchSize
52+
if viper.IsSet("peer.gossip.state.batchSize") {
53+
c.StateBatchSize = uint64(viper.GetInt("peer.gossip.state.batchSize"))
54+
}
55+
c.StateMaxRetries = DefStateMaxRetries
56+
if viper.IsSet("peer.gossip.state.maxRetries") {
57+
c.StateMaxRetries = viper.GetInt("peer.gossip.state.maxRetries")
58+
}
59+
c.StateBlockBufferSize = DefStateBlockBufferSize
60+
if viper.IsSet("peer.gossip.state.blockBufferSize") {
61+
c.StateBlockBufferSize = viper.GetInt("peer.gossip.state.blockBufferSize")
62+
}
63+
c.StateChannelSize = DefStateChannelSize
64+
if viper.IsSet("peer.gossip.state.channelSize") {
65+
c.StateChannelSize = viper.GetInt("peer.gossip.state.channelSize")
66+
}
67+
c.StateEnabled = DefStateEnabled
68+
if viper.IsSet("peer.gossip.state.enabled") {
69+
c.StateEnabled = viper.GetBool("peer.gossip.state.enabled")
70+
}
71+
}

gossip/state/config_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package state_test
8+
9+
import (
10+
"testing"
11+
"time"
12+
13+
"github.com/hyperledger/fabric/gossip/state"
14+
"github.com/stretchr/testify/assert"
15+
16+
"github.com/spf13/viper"
17+
)
18+
19+
func TestGlobalConfig(t *testing.T) {
20+
viper.Reset()
21+
viper.Set("peer.gossip.state.checkInterval", "1s")
22+
viper.Set("peer.gossip.state.responseTimeout", "2s")
23+
viper.Set("peer.gossip.state.batchSize", 3)
24+
viper.Set("peer.gossip.state.maxRetries", 4)
25+
viper.Set("peer.gossip.state.blockBufferSize", 5)
26+
viper.Set("peer.gossip.state.channelSize", 6)
27+
viper.Set("peer.gossip.state.enabled", false)
28+
29+
coreConfig := state.GlobalConfig()
30+
31+
expectedConfig := &state.StateConfig{
32+
StateCheckInterval: time.Second,
33+
StateResponseTimeout: 2 * time.Second,
34+
StateBatchSize: uint64(3),
35+
StateMaxRetries: 4,
36+
StateBlockBufferSize: 5,
37+
StateChannelSize: 6,
38+
StateEnabled: false,
39+
}
40+
41+
assert.Equal(t, expectedConfig, coreConfig)
42+
}
43+
44+
func TestGlobalConfigDefaults(t *testing.T) {
45+
viper.Reset()
46+
47+
coreConfig := state.GlobalConfig()
48+
49+
expectedConfig := &state.StateConfig{
50+
StateCheckInterval: 10 * time.Second,
51+
StateResponseTimeout: 3 * time.Second,
52+
StateBatchSize: uint64(10),
53+
StateMaxRetries: 3,
54+
StateBlockBufferSize: 100,
55+
StateChannelSize: 100,
56+
StateEnabled: true,
57+
}
58+
59+
assert.Equal(t, expectedConfig, coreConfig)
60+
}

gossip/state/state.go

Lines changed: 27 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/hyperledger/fabric/protos/transientstore"
2828
"github.com/hyperledger/fabric/protoutil"
2929
"github.com/pkg/errors"
30-
"github.com/spf13/viper"
3130
)
3231

3332
// GossipStateProvider is the interface to acquire sequences of the ledger blocks
@@ -166,7 +165,7 @@ type GossipStateProviderImpl struct {
166165

167166
blockingMode bool
168167

169-
config *Configuration
168+
config *StateConfig
170169
}
171170

172171
var logger = util.GetLogger(util.StateLogger, "")
@@ -188,53 +187,16 @@ func (v *stateRequestValidator) validate(request *proto.RemoteStateRequest, batc
188187
return nil
189188
}
190189

191-
// readConfiguration reading state configuration
192-
func readConfiguration() *Configuration {
193-
config := &Configuration{
194-
AntiEntropyInterval: defAntiEntropyInterval,
195-
AntiEntropyStateResponseTimeout: defAntiEntropyStateResponseTimeout,
196-
AntiEntropyBatchSize: defAntiEntropyBatchSize,
197-
MaxBlockDistance: defMaxBlockDistance,
198-
AntiEntropyMaxRetries: defAntiEntropyMaxRetries,
199-
ChannelBufferSize: defChannelBufferSize,
200-
EnableStateTransfer: true,
201-
}
202-
203-
if viper.IsSet("peer.gossip.state.checkInterval") {
204-
config.AntiEntropyInterval = viper.GetDuration("peer.gossip.state.checkInterval")
205-
}
206-
207-
if viper.IsSet("peer.gossip.state.responseTimeout") {
208-
config.AntiEntropyStateResponseTimeout = viper.GetDuration("peer.gossip.state.responseTimeout")
209-
}
210-
211-
if viper.IsSet("peer.gossip.state.batchSize") {
212-
config.AntiEntropyBatchSize = uint64(viper.GetInt("peer.gossip.state.batchSize"))
213-
}
214-
215-
if viper.IsSet("peer.gossip.state.blockBufferSize") {
216-
config.MaxBlockDistance = viper.GetInt("peer.gossip.state.blockBufferSize")
217-
}
218-
219-
if viper.IsSet("peer.gossip.state.maxRetries") {
220-
config.AntiEntropyMaxRetries = viper.GetInt("peer.gossip.state.maxRetries")
221-
}
222-
223-
if viper.IsSet("peer.gossip.state.channelSize") {
224-
config.ChannelBufferSize = viper.GetInt("peer.gossip.state.channelSize")
225-
}
226-
227-
if viper.IsSet("peer.gossip.state.enabled") {
228-
config.EnableStateTransfer = viper.GetBool("peer.gossip.state.enabled")
229-
}
230-
231-
return config
232-
}
233-
234190
// NewGossipStateProvider creates state provider with coordinator instance
235191
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
236-
func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources,
237-
stateMetrics *metrics.StateMetrics, blockingMode bool) GossipStateProvider {
192+
func NewGossipStateProvider(
193+
chainID string,
194+
services *ServicesMediator,
195+
ledger ledgerResources,
196+
stateMetrics *metrics.StateMetrics,
197+
blockingMode bool,
198+
config *StateConfig,
199+
) GossipStateProvider {
238200

239201
gossipChan, _ := services.Accept(func(message interface{}) bool {
240202
// Get only data messages
@@ -278,48 +240,31 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
278240
return nil
279241
}
280242

281-
// Reading state configuration
282-
config := readConfiguration()
283-
284243
s := &GossipStateProviderImpl{
285244
// MessageCryptoService
286245
mediator: services,
287-
288246
// Chain ID
289247
chainID: chainID,
290-
291248
// Channel to read new messages from
292249
gossipChan: gossipChan,
293-
294250
// Channel to read direct messages from other peers
295251
commChan: commChan,
296-
297252
// Create a queue for payloads, wrapped in a metrics buffer
298253
payloads: &metricsBuffer{
299254
PayloadsBuffer: NewPayloadsBuffer(height),
300255
sizeMetrics: stateMetrics.PayloadBufferSize,
301256
chainID: chainID,
302257
},
303-
304-
ledger: ledger,
305-
306-
stateResponseCh: make(chan protoext.ReceivedMessage, config.ChannelBufferSize),
307-
308-
stateRequestCh: make(chan protoext.ReceivedMessage, config.ChannelBufferSize),
309-
310-
stopCh: make(chan struct{}),
311-
258+
ledger: ledger,
259+
stateResponseCh: make(chan protoext.ReceivedMessage, config.StateChannelSize),
260+
stateRequestCh: make(chan protoext.ReceivedMessage, config.StateChannelSize),
261+
stopCh: make(chan struct{}),
312262
stateTransferActive: 0,
313-
314-
once: sync.Once{},
315-
316-
stateMetrics: stateMetrics,
317-
318-
requestValidator: &stateRequestValidator{},
319-
320-
blockingMode: blockingMode,
321-
322-
config: config,
263+
once: sync.Once{},
264+
stateMetrics: stateMetrics,
265+
requestValidator: &stateRequestValidator{},
266+
blockingMode: blockingMode,
267+
config: config,
323268
}
324269

325270
logger.Infof("Updating metadata information, "+
@@ -331,7 +276,7 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
331276
go s.listen()
332277
// Deliver in order messages into the incoming channel
333278
go s.deliverPayloads()
334-
if s.config.EnableStateTransfer {
279+
if s.config.StateEnabled {
335280
// Execute anti entropy to fill missing gaps
336281
go s.antiEntropy()
337282
}
@@ -438,7 +383,7 @@ func (s *GossipStateProviderImpl) directMessage(msg protoext.ReceivedMessage) {
438383
incoming := msg.GetGossipMessage()
439384

440385
if incoming.GetStateRequest() != nil {
441-
if len(s.stateRequestCh) < s.config.ChannelBufferSize {
386+
if len(s.stateRequestCh) < s.config.StateChannelSize {
442387
// Forward state request to the channel, if there are too
443388
// many message of state request ignore to avoid flooding.
444389
s.stateRequestCh <- msg
@@ -471,7 +416,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg protoext.ReceivedMessag
471416
}
472417
request := msg.GetGossipMessage().GetStateRequest()
473418

474-
if err := s.requestValidator.validate(request, s.config.AntiEntropyBatchSize); err != nil {
419+
if err := s.requestValidator.validate(request, s.config.StateBatchSize); err != nil {
475420
logger.Errorf("State request validation failed, %s. Ignoring request...", err)
476421
return
477422
}
@@ -654,7 +599,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
654599
select {
655600
case <-s.stopCh:
656601
return
657-
case <-time.After(s.config.AntiEntropyInterval):
602+
case <-time.After(s.config.StateCheckInterval):
658603
ourHeight, err := s.ledger.LedgerHeight()
659604
if err != nil {
660605
// Unable to read from ledger continue to the next round
@@ -699,15 +644,15 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
699644
defer atomic.StoreInt32(&s.stateTransferActive, 0)
700645

701646
for prev := start; prev <= end; {
702-
next := min(end, prev+s.config.AntiEntropyBatchSize)
647+
next := min(end, prev+s.config.StateBatchSize)
703648

704649
gossipMsg := s.stateRequestMessage(prev, next)
705650

706651
responseReceived := false
707652
tryCounts := 0
708653

709654
for !responseReceived {
710-
if tryCounts > s.config.AntiEntropyMaxRetries {
655+
if tryCounts > s.config.StateMaxRetries {
711656
logger.Warningf("Wasn't able to get blocks in range [%d...%d), after %d retries",
712657
prev, next, tryCounts)
713658
return
@@ -745,7 +690,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
745690
}
746691
prev = index + 1
747692
responseReceived = true
748-
case <-time.After(s.config.AntiEntropyStateResponseTimeout):
693+
case <-time.After(s.config.StateResponseTimeout):
749694
}
750695
}
751696
}
@@ -824,11 +769,11 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
824769
return errors.Wrap(err, "Failed obtaining ledger height")
825770
}
826771

827-
if !blockingMode && payload.SeqNum-height >= uint64(s.config.MaxBlockDistance) {
772+
if !blockingMode && payload.SeqNum-height >= uint64(s.config.StateBlockBufferSize) {
828773
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
829774
}
830775

831-
for blockingMode && s.payloads.Size() > s.config.MaxBlockDistance*2 {
776+
for blockingMode && s.payloads.Size() > s.config.StateBlockBufferSize*2 {
832777
time.Sleep(enqueueRetryInterval)
833778
}
834779

0 commit comments

Comments
 (0)