@@ -27,6 +27,7 @@ import (
27
27
"github.com/hyperledger/fabric/protos/transientstore"
28
28
"github.com/hyperledger/fabric/protoutil"
29
29
"github.com/pkg/errors"
30
+ "github.com/spf13/viper"
30
31
)
31
32
32
33
// GossipStateProvider is the interface to acquire sequences of the ledger blocks
@@ -55,6 +56,17 @@ const (
55
56
enqueueRetryInterval = time .Millisecond * 100
56
57
)
57
58
59
+ // Configuration keeps state transfer configuration parameters
60
+ type Configuration struct {
61
+ AntiEntropyInterval time.Duration
62
+ AntiEntropyStateResponseTimeout time.Duration
63
+ AntiEntropyBatchSize uint64
64
+ MaxBlockDistance int
65
+ AntiEntropyMaxRetries int
66
+ ChannelBufferSize int
67
+ EnableStateTransfer bool
68
+ }
69
+
58
70
// GossipAdapter defines gossip/communication required interface for state provider
59
71
type GossipAdapter interface {
60
72
// Send sends a message to remote peers
@@ -155,6 +167,8 @@ type GossipStateProviderImpl struct {
155
167
requestValidator * stateRequestValidator
156
168
157
169
blockingMode bool
170
+
171
+ config * Configuration
158
172
}
159
173
160
174
var logger = util .GetLogger (util .StateLogger , "" )
@@ -164,18 +178,61 @@ type stateRequestValidator struct {
164
178
}
165
179
166
180
// validate checks for RemoteStateRequest message validity
167
- func (v * stateRequestValidator ) validate (request * proto.RemoteStateRequest ) error {
181
+ func (v * stateRequestValidator ) validate (request * proto.RemoteStateRequest , batchSize uint64 ) error {
168
182
if request .StartSeqNum > request .EndSeqNum {
169
183
return errors .Errorf ("Invalid sequence interval [%d...%d)." , request .StartSeqNum , request .EndSeqNum )
170
184
}
171
185
172
- if request .EndSeqNum > defAntiEntropyBatchSize + request .StartSeqNum {
186
+ if request .EndSeqNum > batchSize + request .StartSeqNum {
173
187
return errors .Errorf ("Requesting blocks range [%d-%d) greater than configured allowed" +
174
- " (%d) batching size for anti-entropy." , request .StartSeqNum , request .EndSeqNum , defAntiEntropyBatchSize )
188
+ " (%d) batching size for anti-entropy." , request .StartSeqNum , request .EndSeqNum , batchSize )
175
189
}
176
190
return nil
177
191
}
178
192
193
+ // readConfiguration reading state configuration
194
+ func readConfiguration () * Configuration {
195
+ config := & Configuration {
196
+ AntiEntropyInterval : defAntiEntropyInterval ,
197
+ AntiEntropyStateResponseTimeout : defAntiEntropyStateResponseTimeout ,
198
+ AntiEntropyBatchSize : defAntiEntropyBatchSize ,
199
+ MaxBlockDistance : defMaxBlockDistance ,
200
+ AntiEntropyMaxRetries : defAntiEntropyMaxRetries ,
201
+ ChannelBufferSize : defChannelBufferSize ,
202
+ EnableStateTransfer : true ,
203
+ }
204
+
205
+ if viper .IsSet ("peer.gossip.state.checkInterval" ) {
206
+ config .AntiEntropyInterval = viper .GetDuration ("peer.gossip.state.checkInterval" )
207
+ }
208
+
209
+ if viper .IsSet ("peer.gossip.state.responseTimeout" ) {
210
+ config .AntiEntropyStateResponseTimeout = viper .GetDuration ("peer.gossip.state.responseTimeout" )
211
+ }
212
+
213
+ if viper .IsSet ("peer.gossip.state.batchSize" ) {
214
+ config .AntiEntropyBatchSize = uint64 (viper .GetInt ("peer.gossip.state.batchSize" ))
215
+ }
216
+
217
+ if viper .IsSet ("peer.gossip.state.blockBufferSize" ) {
218
+ config .MaxBlockDistance = viper .GetInt ("peer.gossip.state.blockBufferSize" )
219
+ }
220
+
221
+ if viper .IsSet ("peer.gossip.state.maxRetries" ) {
222
+ config .AntiEntropyMaxRetries = viper .GetInt ("peer.gossip.state.maxRetries" )
223
+ }
224
+
225
+ if viper .IsSet ("peer.gossip.state.channelSize" ) {
226
+ config .ChannelBufferSize = viper .GetInt ("peer.gossip.state.channelSize" )
227
+ }
228
+
229
+ if viper .IsSet ("peer.gossip.state.enabled" ) {
230
+ config .EnableStateTransfer = viper .GetBool ("peer.gossip.state.enabled" )
231
+ }
232
+
233
+ return config
234
+ }
235
+
179
236
// NewGossipStateProvider creates state provider with coordinator instance
180
237
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
181
238
func NewGossipStateProvider (chainID string , services * ServicesMediator , ledger ledgerResources ,
@@ -223,6 +280,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
223
280
return nil
224
281
}
225
282
283
+ // Reading state configuration
284
+ config := readConfiguration ()
285
+
226
286
s := & GossipStateProviderImpl {
227
287
// MessageCryptoService
228
288
mediator : services ,
@@ -245,9 +305,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
245
305
246
306
ledger : ledger ,
247
307
248
- stateResponseCh : make (chan protoext.ReceivedMessage , defChannelBufferSize ),
308
+ stateResponseCh : make (chan protoext.ReceivedMessage , config . ChannelBufferSize ),
249
309
250
- stateRequestCh : make (chan protoext.ReceivedMessage , defChannelBufferSize ),
310
+ stateRequestCh : make (chan protoext.ReceivedMessage , config . ChannelBufferSize ),
251
311
252
312
stopCh : make (chan struct {}, 1 ),
253
313
@@ -260,6 +320,8 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
260
320
requestValidator : & stateRequestValidator {},
261
321
262
322
blockingMode : blockingMode ,
323
+
324
+ config : config ,
263
325
}
264
326
265
327
logger .Infof ("Updating metadata information, " +
@@ -273,8 +335,10 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
273
335
go s .listen ()
274
336
// Deliver in order messages into the incoming channel
275
337
go s .deliverPayloads ()
276
- // Execute anti entropy to fill missing gaps
277
- go s .antiEntropy ()
338
+ if s .config .EnableStateTransfer {
339
+ // Execute anti entropy to fill missing gaps
340
+ go s .antiEntropy ()
341
+ }
278
342
// Taking care of state request messages
279
343
go s .processStateRequests ()
280
344
@@ -381,7 +445,7 @@ func (s *GossipStateProviderImpl) directMessage(msg protoext.ReceivedMessage) {
381
445
incoming := msg .GetGossipMessage ()
382
446
383
447
if incoming .GetStateRequest () != nil {
384
- if len (s .stateRequestCh ) < defChannelBufferSize {
448
+ if len (s .stateRequestCh ) < s . config . ChannelBufferSize {
385
449
// Forward state request to the channel, if there are too
386
450
// many message of state request ignore to avoid flooding.
387
451
s .stateRequestCh <- msg
@@ -418,7 +482,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg protoext.ReceivedMessag
418
482
}
419
483
request := msg .GetGossipMessage ().GetStateRequest ()
420
484
421
- if err := s .requestValidator .validate (request ); err != nil {
485
+ if err := s .requestValidator .validate (request , s . config . AntiEntropyBatchSize ); err != nil {
422
486
logger .Errorf ("State request validation failed, %s. Ignoring request..." , err )
423
487
return
424
488
}
@@ -609,7 +673,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
609
673
case <- s .stopCh :
610
674
s .stopCh <- struct {}{}
611
675
return
612
- case <- time .After (defAntiEntropyInterval ):
676
+ case <- time .After (s . config . AntiEntropyInterval ):
613
677
ourHeight , err := s .ledger .LedgerHeight ()
614
678
if err != nil {
615
679
// Unable to read from ledger continue to the next round
@@ -654,15 +718,15 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
654
718
defer atomic .StoreInt32 (& s .stateTransferActive , 0 )
655
719
656
720
for prev := start ; prev <= end ; {
657
- next := min (end , prev + defAntiEntropyBatchSize )
721
+ next := min (end , prev + s . config . AntiEntropyBatchSize )
658
722
659
723
gossipMsg := s .stateRequestMessage (prev , next )
660
724
661
725
responseReceived := false
662
726
tryCounts := 0
663
727
664
728
for ! responseReceived {
665
- if tryCounts > defAntiEntropyMaxRetries {
729
+ if tryCounts > s . config . AntiEntropyMaxRetries {
666
730
logger .Warningf ("Wasn't able to get blocks in range [%d...%d), after %d retries" ,
667
731
prev , next , tryCounts )
668
732
return
@@ -696,7 +760,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
696
760
}
697
761
prev = index + 1
698
762
responseReceived = true
699
- case <- time .After (defAntiEntropyStateResponseTimeout ):
763
+ case <- time .After (s . config . AntiEntropyStateResponseTimeout ):
700
764
case <- s .stopCh :
701
765
s .stopCh <- struct {}{}
702
766
return
@@ -778,15 +842,16 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
778
842
return errors .Wrap (err , "Failed obtaining ledger height" )
779
843
}
780
844
781
- if ! blockingMode && payload .SeqNum - height >= defMaxBlockDistance {
845
+ if ! blockingMode && payload .SeqNum - height >= uint64 ( s . config . MaxBlockDistance ) {
782
846
return errors .Errorf ("Ledger height is at %d, cannot enqueue block with sequence of %d" , height , payload .SeqNum )
783
847
}
784
848
785
- for blockingMode && s .payloads .Size () > defMaxBlockDistance * 2 {
849
+ for blockingMode && s .payloads .Size () > s . config . MaxBlockDistance * 2 {
786
850
time .Sleep (enqueueRetryInterval )
787
851
}
788
852
789
853
s .payloads .Push (payload )
854
+ logger .Debugf ("Blocks payloads buffer size for channel [%s] is %d blocks" , s .chainID , s .payloads .Size ())
790
855
return nil
791
856
}
792
857
0 commit comments