Skip to content

Commit b271113

Browse files
committed
[FAB-14295] de-vipererize gossip privdata
Inject configuration of privdata via struct, and not through viper getter invocations. Have the viper config at higher levels. Change-Id: Idff9c205e71cec0300734d7a5b93718afe24a8c6 Signed-off-by: Hagar Meir <hagar.meir@ibm.com>
1 parent 3d956b9 commit b271113

File tree

11 files changed

+123
-99
lines changed

11 files changed

+123
-99
lines changed

gossip/privdata/coordinator.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,9 @@ import (
3131
transientstore2 "github.com/hyperledger/fabric/protos/transientstore"
3232
"github.com/hyperledger/fabric/protos/utils"
3333
"github.com/pkg/errors"
34-
"github.com/spf13/viper"
3534
)
3635

37-
const (
38-
pullRetrySleepInterval = time.Second
39-
transientBlockRetentionConfigKey = "peer.gossip.pvtData.transientstoreMaxBlockRetention"
40-
transientBlockRetentionDefault = 1000
41-
)
36+
const pullRetrySleepInterval = time.Second
4237

4338
var logger = util.GetLogger(util.PrivateDataLogger, "")
4439

@@ -126,17 +121,20 @@ type coordinator struct {
126121
Support
127122
transientBlockRetention uint64
128123
metrics *metrics.PrivdataMetrics
124+
pullRetryThreshold time.Duration
125+
}
126+
127+
type CoordinatorConfig struct {
128+
TransientBlockRetention uint64
129+
PullRetryThreshold time.Duration
129130
}
130131

131132
// NewCoordinator creates a new instance of coordinator
132-
func NewCoordinator(support Support, selfSignedData common.SignedData, metrics *metrics.PrivdataMetrics) Coordinator {
133-
transientBlockRetention := uint64(viper.GetInt(transientBlockRetentionConfigKey))
134-
if transientBlockRetention == 0 {
135-
logger.Warning("Configuration key", transientBlockRetentionConfigKey, "isn't set, defaulting to", transientBlockRetentionDefault)
136-
transientBlockRetention = transientBlockRetentionDefault
137-
}
133+
func NewCoordinator(support Support, selfSignedData common.SignedData, metrics *metrics.PrivdataMetrics,
134+
config CoordinatorConfig) Coordinator {
138135
return &coordinator{Support: support, selfSignedData: selfSignedData,
139-
transientBlockRetention: transientBlockRetention, metrics: metrics}
136+
transientBlockRetention: config.TransientBlockRetention, metrics: metrics,
137+
pullRetryThreshold: config.PullRetryThreshold}
140138
}
141139

142140
// StorePvtData used to persist private date into transient store
@@ -186,7 +184,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
186184

187185
c.reportListMissingPrivateDataDuration(time.Since(listMissingStart))
188186

189-
retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold")
187+
retryThresh := c.pullRetryThreshold
190188
var bFetchFromPeers bool // defaults to false
191189
if len(privateInfo.missingKeys) == 0 {
192190
logger.Debugf("[%s] No missing collection private write sets to fetch from remote peers", c.ChainID)

gossip/privdata/coordinator_test.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,19 @@ import (
3535
"github.com/hyperledger/fabric/protos/msp"
3636
"github.com/hyperledger/fabric/protos/peer"
3737
transientstore2 "github.com/hyperledger/fabric/protos/transientstore"
38-
"github.com/spf13/viper"
3938
"github.com/stretchr/testify/assert"
4039
"github.com/stretchr/testify/mock"
4140
)
4241

4342
func init() {
44-
viper.Set("peer.gossip.pvtData.pullRetryThreshold", time.Second*3)
4543
factory.InitFactories(nil)
4644
}
4745

46+
var testConfig = CoordinatorConfig{
47+
PullRetryThreshold: time.Second * 3,
48+
TransientBlockRetention: TransientBlockRetentionDefault,
49+
}
50+
4851
// CollectionCriteria aggregates criteria of
4952
// a collection
5053
type CollectionCriteria struct {
@@ -670,7 +673,7 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
670673
Fetcher: fetcher,
671674
TransientStore: store,
672675
Validator: &validatorMock{},
673-
}, peerSelfSignedData, metrics)
676+
}, peerSelfSignedData, metrics, testConfig)
674677
err := coordinator.StoreBlock(block, pvtData)
675678
assert.Error(t, err)
676679
assert.Contains(t, err.Error(), "Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
@@ -684,7 +687,7 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
684687
Fetcher: fetcher,
685688
TransientStore: store,
686689
Validator: &validatorMock{fmt.Errorf("failed validating block")},
687-
}, peerSelfSignedData, metrics)
690+
}, peerSelfSignedData, metrics, testConfig)
688691
err = coordinator.StoreBlock(block, pvtData)
689692
assert.Error(t, err)
690693
assert.Contains(t, err.Error(), "failed validating block")
@@ -698,7 +701,7 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
698701
Fetcher: fetcher,
699702
TransientStore: store,
700703
Validator: &validatorMock{},
701-
}, peerSelfSignedData, metrics)
704+
}, peerSelfSignedData, metrics, testConfig)
702705
err = coordinator.StoreBlock(block, pvtData)
703706
assert.Error(t, err)
704707
assert.Contains(t, err.Error(), "Block data size")
@@ -732,7 +735,7 @@ func TestCoordinatorStoreInvalidBlock(t *testing.T) {
732735
Fetcher: fetcher,
733736
TransientStore: store,
734737
Validator: &validatorMock{},
735-
}, peerSelfSignedData, metrics)
738+
}, peerSelfSignedData, metrics, testConfig)
736739
err = coordinator.StoreBlock(block, pvtData)
737740
assert.NoError(t, err)
738741
assertCommitHappened()
@@ -814,7 +817,7 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
814817
Fetcher: fetcher,
815818
TransientStore: store,
816819
Validator: &validatorMock{},
817-
}, peerSelfSignedData, metrics)
820+
}, peerSelfSignedData, metrics, testConfig)
818821

819822
fetcher.On("fetch", mock.Anything).expectingDigests([]privdatacommon.DigKey{
820823
{
@@ -913,7 +916,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
913916
Fetcher: fetcher,
914917
TransientStore: store,
915918
Validator: &validatorMock{},
916-
}, peerSelfSignedData, metrics)
919+
}, peerSelfSignedData, metrics, testConfig)
917920
err := coordinator.StoreBlock(block, pvtData)
918921
assert.NoError(t, err)
919922
assertCommitHappened()
@@ -1002,7 +1005,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
10021005
Fetcher: fetcher,
10031006
TransientStore: store,
10041007
Validator: &validatorMock{},
1005-
}, peerSelfSignedData, metrics)
1008+
}, peerSelfSignedData, metrics, testConfig)
10061009
err = coordinator.StoreBlock(block, nil)
10071010
assert.Error(t, err)
10081011
assert.Equal(t, "test error", err.Error())
@@ -1051,7 +1054,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
10511054
Fetcher: fetcher,
10521055
TransientStore: store,
10531056
Validator: &validatorMock{},
1054-
}, peerSelfSignedData, metrics)
1057+
}, peerSelfSignedData, metrics, testConfig)
10551058
err = coordinator.StoreBlock(block, nil)
10561059
assertPurged("tx3")
10571060
assert.NoError(t, err)
@@ -1089,7 +1092,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
10891092
Fetcher: fetcher,
10901093
TransientStore: store,
10911094
Validator: &validatorMock{},
1092-
}, peerSelfSignedData, metrics)
1095+
}, peerSelfSignedData, metrics, testConfig)
10931096

10941097
pvtData = pdFactory.addRWSet().addNSRWSet("ns3", "c3").create()
10951098
err = coordinator.StoreBlock(block, pvtData)
@@ -1178,7 +1181,7 @@ func TestProceedWithoutPrivateData(t *testing.T) {
11781181
Fetcher: fetcher,
11791182
TransientStore: store,
11801183
Validator: &validatorMock{},
1181-
}, peerSelfSignedData, metrics)
1184+
}, peerSelfSignedData, metrics, testConfig)
11821185
err := coordinator.StoreBlock(block, pvtData)
11831186
assert.NoError(t, err)
11841187
assertCommitHappened()
@@ -1229,7 +1232,7 @@ func TestProceedWithInEligiblePrivateData(t *testing.T) {
12291232
Fetcher: nil,
12301233
TransientStore: nil,
12311234
Validator: &validatorMock{},
1232-
}, peerSelfSignedData, metrics)
1235+
}, peerSelfSignedData, metrics, testConfig)
12331236
err := coordinator.StoreBlock(block, nil)
12341237
assert.NoError(t, err)
12351238
assertCommitHappened()
@@ -1252,7 +1255,7 @@ func TestCoordinatorGetBlocks(t *testing.T) {
12521255
Fetcher: fetcher,
12531256
TransientStore: store,
12541257
Validator: &validatorMock{},
1255-
}, sd, metrics)
1258+
}, sd, metrics, testConfig)
12561259

12571260
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
12581261
bf := &blockFactory{
@@ -1279,7 +1282,7 @@ func TestCoordinatorGetBlocks(t *testing.T) {
12791282
Fetcher: fetcher,
12801283
TransientStore: store,
12811284
Validator: &validatorMock{},
1282-
}, sd, metrics)
1285+
}, sd, metrics, testConfig)
12831286
expectedPrivData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c2").create()
12841287
block2, returnedPrivateData, err := coordinator.GetPvtDataAndBlockByNum(1, sd)
12851288
assert.NoError(t, err)
@@ -1330,7 +1333,7 @@ func TestPurgeByHeight(t *testing.T) {
13301333
Fetcher: fetcher,
13311334
TransientStore: store,
13321335
Validator: &validatorMock{},
1333-
}, peerSelfSignedData, metrics)
1336+
}, peerSelfSignedData, metrics, testConfig)
13341337

13351338
for i := 0; i <= 3000; i++ {
13361339
block := bf.create()
@@ -1359,7 +1362,7 @@ func TestCoordinatorStorePvtData(t *testing.T) {
13591362
Fetcher: fetcher,
13601363
TransientStore: store,
13611364
Validator: &validatorMock{},
1362-
}, common.SignedData{}, metrics)
1365+
}, common.SignedData{}, metrics, testConfig)
13631366
pvtData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c1").create()
13641367
// Green path: ledger height can be retrieved from ledger/committer
13651368
err := coordinator.StorePvtData("tx1", &transientstore2.TxPvtReadWriteSetWithConfigInfo{
@@ -1430,7 +1433,7 @@ func TestIgnoreReadOnlyColRWSets(t *testing.T) {
14301433
Fetcher: fetcher,
14311434
TransientStore: store,
14321435
Validator: &validatorMock{},
1433-
}, peerSelfSignedData, metrics)
1436+
}, peerSelfSignedData, metrics, testConfig)
14341437
// We pass a nil private data slice to indicate no pre-images though the block contains
14351438
// private data reads.
14361439
err := coordinator.StoreBlock(block, nil)
@@ -1473,7 +1476,7 @@ func TestCoordinatorMetrics(t *testing.T) {
14731476
TransientStore: store,
14741477
Validator: &validatorMock{},
14751478
ChainID: "test",
1476-
}, peerSelfSignedData, metrics)
1479+
}, peerSelfSignedData, metrics, testConfig)
14771480
err := coordinator.StoreBlock(block, pvtData)
14781481
assert.NoError(t, err)
14791482

gossip/privdata/distributor.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/hyperledger/fabric/protos/ledger/rwset"
3030
"github.com/hyperledger/fabric/protos/transientstore"
3131
"github.com/pkg/errors"
32-
"github.com/spf13/viper"
3332
)
3433

3534
// gossipAdapter an adapter for API's required from gossip module
@@ -107,12 +106,12 @@ func NewCollectionAccessFactory(factory IdentityDeserializerFactory) CollectionA
107106
// NewDistributor a constructor for private data distributor capable to send
108107
// private read write sets for underlying collection
109108
func NewDistributor(chainID string, gossip gossipAdapter, factory CollectionAccessFactory,
110-
metrics *metrics.PrivdataMetrics) PvtDataDistributor {
109+
metrics *metrics.PrivdataMetrics, pushAckTimeout time.Duration) PvtDataDistributor {
111110
return &distributorImpl{
112111
chainID: chainID,
113112
gossipAdapter: gossip,
114113
CollectionAccessFactory: factory,
115-
pushAckTimeout: viper.GetDuration("peer.gossip.pvtData.pushAckTimeout"),
114+
pushAckTimeout: pushAckTimeout,
116115
metrics: metrics,
117116
}
118117
}

gossip/privdata/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func TestDistributor(t *testing.T) {
184184
testMetricProvider := mocks.TestUtilConstructMetricProvider()
185185
metrics := metrics.NewGossipMetrics(testMetricProvider.FakeProvider).PrivdataMetrics
186186

187-
d := NewDistributor(channelID, g, accessFactoryMock, metrics)
187+
d := NewDistributor(channelID, g, accessFactoryMock, metrics, 0)
188188
pdFactory := &pvtDataFactory{}
189189
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1", "c2").create()
190190
err := d.Distribute("tx1", &transientstore.TxPvtReadWriteSetWithConfigInfo{

gossip/privdata/pull.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ import (
2626
fcommon "github.com/hyperledger/fabric/protos/common"
2727
proto "github.com/hyperledger/fabric/protos/gossip"
2828
"github.com/pkg/errors"
29-
"github.com/spf13/viper"
3029
"go.uber.org/zap/zapcore"
3130
)
3231

3332
const (
3433
membershipPollingBackoff = time.Second
3534
responseWaitTime = time.Second * 5
3635
maxMembershipPollIterations = 5
37-
btlPullMarginDefault = 10
3836
)
3937

4038
// Dig2PvtRWSetWithConfig
@@ -82,14 +80,14 @@ type puller struct {
8280

8381
// NewPuller creates new private data puller
8482
func NewPuller(metrics *metrics.PrivdataMetrics, cs privdata.CollectionStore, g gossip,
85-
dataRetriever PrivateDataRetriever, factory CollectionAccessFactory, channel string) *puller {
83+
dataRetriever PrivateDataRetriever, factory CollectionAccessFactory, channel string, btlPullMargin uint64) *puller {
8684
p := &puller{
8785
metrics: metrics,
8886
pubSub: util.NewPubSub(),
8987
stopChan: make(chan struct{}),
9088
channel: channel,
9189
cs: cs,
92-
btlPullMargin: getBtlPullMargin(),
90+
btlPullMargin: btlPullMargin,
9391
gossip: g,
9492
PrivateDataRetriever: dataRetriever,
9593
CollectionAccessFactory: factory,
@@ -699,21 +697,6 @@ func (rp remotePeer) AsRemotePeer() *comm.RemotePeer {
699697
}
700698
}
701699

702-
func getBtlPullMargin() uint64 {
703-
var result uint64
704-
if viper.IsSet("peer.gossip.pvtData.btlPullMargin") {
705-
btlMarginVal := viper.GetInt("peer.gossip.pvtData.btlPullMargin")
706-
if btlMarginVal < 0 {
707-
result = btlPullMarginDefault
708-
} else {
709-
result = uint64(btlMarginVal)
710-
}
711-
} else {
712-
result = btlPullMarginDefault
713-
}
714-
return result
715-
}
716-
717700
func addWithOverflow(a uint64, b uint64) uint64 {
718701
res := a + b
719702
if res < a {

gossip/privdata/pull_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (gn *gossipNetwork) newPullerWithMetrics(metrics *metrics.PrivdataMetrics,
283283
g.network = gn
284284
g.On("PeersOfChannel", mock.Anything).Return(knownMembers)
285285

286-
p := NewPuller(metrics, ps, g, &dataRetrieverMock{}, factory, "A")
286+
p := NewPuller(metrics, ps, g, &dataRetrieverMock{}, factory, "A", btlPullMarginDefault)
287287
gn.peers = append(gn.peers, g)
288288
return p
289289
}

gossip/privdata/reconcile.go

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,6 @@ import (
2121
"github.com/hyperledger/fabric/protos/common"
2222
gossip2 "github.com/hyperledger/fabric/protos/gossip"
2323
"github.com/pkg/errors"
24-
"github.com/spf13/viper"
25-
)
26-
27-
const (
28-
reconcileSleepIntervalConfigKey = "peer.gossip.pvtData.reconcileSleepInterval"
29-
reconcileSleepIntervalDefault = time.Minute * 1
30-
reconcileBatchSizeConfigKey = "peer.gossip.pvtData.reconcileBatchSize"
31-
reconcileBatchSizeDefault = 10
32-
reconciliationEnabledConfigKey = "peer.gossip.pvtData.reconciliationEnabled"
3324
)
3425

3526
// ReconciliationFetcher interface which defines API to fetch
@@ -78,27 +69,11 @@ func (*NoOpReconciler) Stop() {
7869

7970
// ReconcilerConfig holds config flags that are read from core.yaml
8071
type ReconcilerConfig struct {
81-
sleepInterval time.Duration
82-
batchSize int
72+
SleepInterval time.Duration
73+
BatchSize int
8374
IsEnabled bool
8475
}
8576

86-
// this func reads reconciler configuration values from core.yaml and returns ReconcilerConfig
87-
func GetReconcilerConfig() *ReconcilerConfig {
88-
reconcileSleepInterval := viper.GetDuration(reconcileSleepIntervalConfigKey)
89-
if reconcileSleepInterval == 0 {
90-
logger.Warning("Configuration key", reconcileSleepIntervalConfigKey, "isn't set, defaulting to", reconcileSleepIntervalDefault)
91-
reconcileSleepInterval = reconcileSleepIntervalDefault
92-
}
93-
reconcileBatchSize := viper.GetInt(reconcileBatchSizeConfigKey)
94-
if reconcileBatchSize == 0 {
95-
logger.Warning("Configuration key", reconcileBatchSizeConfigKey, "isn't set, defaulting to", reconcileBatchSizeDefault)
96-
reconcileBatchSize = reconcileBatchSizeDefault
97-
}
98-
isEnabled := viper.GetBool(reconciliationEnabledConfigKey)
99-
return &ReconcilerConfig{sleepInterval: reconcileSleepInterval, batchSize: reconcileBatchSize, IsEnabled: isEnabled}
100-
}
101-
10277
// NewReconciler creates a new instance of reconciler
10378
func NewReconciler(channel string, metrics *metrics.PrivdataMetrics, c committer.Committer,
10479
fetcher ReconciliationFetcher, config *ReconcilerConfig) *Reconciler {
@@ -130,7 +105,7 @@ func (r *Reconciler) run() {
130105
select {
131106
case <-r.stopChan:
132107
return
133-
case <-time.After(r.config.sleepInterval):
108+
case <-time.After(r.config.SleepInterval):
134109
logger.Debug("Start reconcile missing private info")
135110
if err := r.reconcile(); err != nil {
136111
logger.Error("Failed to reconcile missing private info, error: ", err.Error())
@@ -156,7 +131,7 @@ func (r *Reconciler) reconcile() error {
156131
defer r.reportReconciliationDuration(time.Now())
157132

158133
for {
159-
missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForMostRecentBlocks(r.config.batchSize)
134+
missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForMostRecentBlocks(r.config.BatchSize)
160135
if err != nil {
161136
logger.Error("reconciliation error when trying to get missing pvt data info recent blocks:", err)
162137
return err

0 commit comments

Comments
 (0)