Skip to content

Commit 406c80c

Browse files
committed
[FAB-15031] Collect common ledger stats in blockstorage
- move ledger stats blockchain_height and blockstorage_commit_time from core/ledger/kvledger to common/ledger/blkstorage so that they are collected for orderers and peers - blockstorage_commit_time does not include pvtdata time any more - original blockstorage_commit_time is renamed to blockstorage_and_pvtdata_commit_time Change-Id: I4981638161987912b6c439b586d1453602c3e012 Signed-off-by: Wenjian Qiao <wenjianq@gmail.com>
1 parent dbaeb77 commit 406c80c

File tree

22 files changed

+376
-156
lines changed

22 files changed

+376
-156
lines changed

common/ledger/blkstorage/fsblkstorage/blockindex_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/hyperledger/fabric/common/ledger/blkstorage"
1414
"github.com/hyperledger/fabric/common/ledger/testutil"
15+
"github.com/hyperledger/fabric/common/metrics/disabled"
1516
"github.com/hyperledger/fabric/core/ledger/util"
1617
"github.com/hyperledger/fabric/protos/common"
1718
"github.com/hyperledger/fabric/protos/peer"
@@ -123,7 +124,7 @@ func testBlockIndexSelectiveIndexingWrongConfig(t *testing.T, indexItems []blkst
123124
testName = testName + string(s)
124125
}
125126
t.Run(testName, func(t *testing.T) {
126-
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems)
127+
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems, &disabled.Provider{})
127128
defer env.Cleanup()
128129

129130
assert.Panics(t, func() {
@@ -150,7 +151,7 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index
150151
testName = testName + string(s)
151152
}
152153
t.Run(testName, func(t *testing.T) {
153-
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems)
154+
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems, &disabled.Provider{})
154155
defer env.Cleanup()
155156
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testledger")
156157
defer blkfileMgrWrapper.close()

common/ledger/blkstorage/fsblkstorage/fs_blockstore.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package fsblkstorage
1818

1919
import (
20+
"time"
21+
2022
"github.com/hyperledger/fabric/common/ledger"
2123
"github.com/hyperledger/fabric/common/ledger/blkstorage"
2224
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
@@ -29,17 +31,32 @@ type fsBlockStore struct {
2931
id string
3032
conf *Conf
3133
fileMgr *blockfileMgr
34+
stats *ledgerStats
3235
}
3336

3437
// NewFsBlockStore constructs a `FsBlockStore`
3538
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
36-
dbHandle *leveldbhelper.DBHandle) *fsBlockStore {
37-
return &fsBlockStore{id, conf, newBlockfileMgr(id, conf, indexConfig, dbHandle)}
39+
dbHandle *leveldbhelper.DBHandle, stats *stats) *fsBlockStore {
40+
fileMgr := newBlockfileMgr(id, conf, indexConfig, dbHandle)
41+
42+
// create ledgerStats and initialize blockchain_height stat
43+
ledgerStats := stats.ledgerStats(id)
44+
info := fileMgr.getBlockchainInfo()
45+
ledgerStats.updateBlockchainHeight(info.Height)
46+
47+
return &fsBlockStore{id, conf, fileMgr, ledgerStats}
3848
}
3949

4050
// AddBlock adds a new block
4151
func (store *fsBlockStore) AddBlock(block *common.Block) error {
42-
return store.fileMgr.addBlock(block)
52+
// track elapsed time to collect block commit time
53+
startBlockCommit := time.Now()
54+
result := store.fileMgr.addBlock(block)
55+
elapsedBlockCommit := time.Since(startBlockCommit)
56+
57+
store.updateBlockStats(block.Header.Number, elapsedBlockCommit)
58+
59+
return result
4360
}
4461

4562
// GetBlockchainInfo returns the current info about blockchain
@@ -85,3 +102,8 @@ func (store *fsBlockStore) Shutdown() {
85102
logger.Debugf("closing fs blockStore:%s", store.id)
86103
store.fileMgr.close()
87104
}
105+
106+
func (store *fsBlockStore) updateBlockStats(blockNum uint64, blockstorageCommitTime time.Duration) {
107+
store.stats.updateBlockchainHeight(blockNum + 1)
108+
store.stats.updateBlockstorageCommitTime(blockstorageCommitTime)
109+
}

common/ledger/blkstorage/fsblkstorage/fs_blockstore_provider.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,23 @@ import (
2020
"github.com/hyperledger/fabric/common/ledger/blkstorage"
2121
"github.com/hyperledger/fabric/common/ledger/util"
2222
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
23+
"github.com/hyperledger/fabric/common/metrics"
2324
)
2425

2526
// FsBlockstoreProvider provides handle to block storage - this is not thread-safe
2627
type FsBlockstoreProvider struct {
2728
conf *Conf
2829
indexConfig *blkstorage.IndexConfig
2930
leveldbProvider *leveldbhelper.Provider
31+
stats *stats
3032
}
3133

3234
// NewProvider constructs a filesystem based block store provider
33-
func NewProvider(conf *Conf, indexConfig *blkstorage.IndexConfig) blkstorage.BlockStoreProvider {
35+
func NewProvider(conf *Conf, indexConfig *blkstorage.IndexConfig, metricsProvider metrics.Provider) blkstorage.BlockStoreProvider {
3436
p := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: conf.getIndexDir()})
35-
return &FsBlockstoreProvider{conf, indexConfig, p}
37+
// create stats instance at provider level and pass to newFsBlockStore
38+
stats := newStats(metricsProvider)
39+
return &FsBlockstoreProvider{conf, indexConfig, p, stats}
3640
}
3741

3842
// CreateBlockStore simply calls OpenBlockStore
@@ -45,7 +49,7 @@ func (p *FsBlockstoreProvider) CreateBlockStore(ledgerid string) (blkstorage.Blo
4549
// This method should be invoked only once for a particular ledgerid
4650
func (p *FsBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) {
4751
indexStoreHandle := p.leveldbProvider.GetDBHandle(ledgerid)
48-
return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle), nil
52+
return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle, p.stats), nil
4953
}
5054

5155
// Exists tells whether the BlockStore with given id exists
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package fsblkstorage
7+
8+
import (
9+
"time"
10+
11+
"github.com/hyperledger/fabric/common/metrics"
12+
)
13+
14+
type stats struct {
15+
blockchainHeight metrics.Gauge
16+
blockstorageCommitTime metrics.Histogram
17+
}
18+
19+
func newStats(metricsProvider metrics.Provider) *stats {
20+
stats := &stats{}
21+
stats.blockchainHeight = metricsProvider.NewGauge(blockchainHeightOpts)
22+
stats.blockstorageCommitTime = metricsProvider.NewHistogram(blockstorageCommitTimeOpts)
23+
return stats
24+
}
25+
26+
// ledgerStats defines block metrics that are common for both orderer and peer
27+
type ledgerStats struct {
28+
stats *stats
29+
ledgerid string
30+
}
31+
32+
func (s *stats) ledgerStats(ledgerid string) *ledgerStats {
33+
return &ledgerStats{
34+
s, ledgerid,
35+
}
36+
}
37+
38+
func (s *ledgerStats) updateBlockchainHeight(height uint64) {
39+
// casting uint64 to float64 guarantees precision for the numbers upto 9,007,199,254,740,992 (1<<53)
40+
// since, we are not expecting the blockchains of this scale anytime soon, we go ahead with this for now.
41+
s.stats.blockchainHeight.With("channel", s.ledgerid).Set(float64(height))
42+
}
43+
44+
func (s *ledgerStats) updateBlockstorageCommitTime(timeTaken time.Duration) {
45+
s.stats.blockstorageCommitTime.With("channel", s.ledgerid).Observe(timeTaken.Seconds())
46+
}
47+
48+
var (
49+
blockchainHeightOpts = metrics.GaugeOpts{
50+
Namespace: "ledger",
51+
Subsystem: "",
52+
Name: "blockchain_height",
53+
Help: "Height of the chain in blocks.",
54+
LabelNames: []string{"channel"},
55+
StatsdFormat: "%{#fqname}.%{channel}",
56+
}
57+
58+
blockstorageCommitTimeOpts = metrics.HistogramOpts{
59+
Namespace: "ledger",
60+
Subsystem: "",
61+
Name: "blockstorage_commit_time",
62+
Help: "Time taken in seconds for committing the block to storage.",
63+
LabelNames: []string{"channel"},
64+
StatsdFormat: "%{#fqname}.%{channel}",
65+
Buckets: []float64{0.005, 0.01, 0.015, 0.05, 0.1, 1, 10},
66+
}
67+
)
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package fsblkstorage
7+
8+
import (
9+
"testing"
10+
"time"
11+
12+
"github.com/hyperledger/fabric/common/ledger/testutil"
13+
"github.com/hyperledger/fabric/common/metrics"
14+
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
15+
"github.com/hyperledger/fabric/common/util"
16+
"github.com/stretchr/testify/assert"
17+
)
18+
19+
func TestStatsBlockchainHeight(t *testing.T) {
20+
testMetricProvider := testutilConstructMetricProvider()
21+
env := newTestEnvWithMetricsProvider(t, NewConf(testPath(), 0), testMetricProvider.fakeProvider)
22+
defer env.Cleanup()
23+
24+
provider := env.provider
25+
ledgerid := "ledger-stats"
26+
store, err := provider.OpenBlockStore(ledgerid)
27+
assert.NoError(t, err)
28+
defer store.Shutdown()
29+
30+
// add genesis block
31+
blockGenerator, genesisBlock := testutil.NewBlockGenerator(t, util.GetTestChainID(), false)
32+
err = store.AddBlock(genesisBlock)
33+
assert.NoError(t, err)
34+
35+
// add one more block
36+
b1 := blockGenerator.NextBlock([][]byte{})
37+
err = store.AddBlock(b1)
38+
assert.NoError(t, err)
39+
40+
// should have 3 calls for fakeBlockchainHeightGauge: OpenBlockStore, genesis block, and block b1
41+
fakeBlockchainHeightGauge := testMetricProvider.fakeBlockchainHeightGauge
42+
expectedCallCount := 3
43+
assert.Equal(t, expectedCallCount, fakeBlockchainHeightGauge.SetCallCount())
44+
45+
// verify the call for OpenBlockStore
46+
assert.Equal(t, float64(0), fakeBlockchainHeightGauge.SetArgsForCall(0))
47+
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(0))
48+
49+
// verify the call for adding genesis block
50+
assert.Equal(t, float64(1), fakeBlockchainHeightGauge.SetArgsForCall(1))
51+
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(1))
52+
53+
// verify the call for adding block b1
54+
assert.Equal(t, float64(2), fakeBlockchainHeightGauge.SetArgsForCall(2))
55+
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(2))
56+
57+
// shutdown and reopen the store to verify blockchain height
58+
store.Shutdown()
59+
store, err = provider.OpenBlockStore(ledgerid)
60+
assert.NoError(t, err)
61+
62+
// verify the call when opening an existing ledger - should set height correctly
63+
assert.Equal(t, float64(2), fakeBlockchainHeightGauge.SetArgsForCall(3))
64+
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(3))
65+
66+
// invoke updateBlockStats api explicitly and verify the call with fake metrics
67+
store.(*fsBlockStore).updateBlockStats(10, 1*time.Second)
68+
assert.Equal(t, float64(11), fakeBlockchainHeightGauge.SetArgsForCall(4))
69+
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(4))
70+
}
71+
72+
func TestStatsBlockCommit(t *testing.T) {
73+
testMetricProvider := testutilConstructMetricProvider()
74+
env := newTestEnvWithMetricsProvider(t, NewConf(testPath(), 0), testMetricProvider.fakeProvider)
75+
defer env.Cleanup()
76+
77+
provider := env.provider
78+
ledgerid := "ledger-stats"
79+
store, err := provider.OpenBlockStore(ledgerid)
80+
assert.NoError(t, err)
81+
defer store.Shutdown()
82+
83+
// add a genesis block
84+
blockGenerator, genesisBlock := testutil.NewBlockGenerator(t, util.GetTestChainID(), false)
85+
err = store.AddBlock(genesisBlock)
86+
assert.NoError(t, err)
87+
88+
// add 3 more blocks
89+
for i := 1; i <= 3; i++ {
90+
b := blockGenerator.NextBlock([][]byte{})
91+
err = store.AddBlock(b)
92+
assert.NoError(t, err)
93+
}
94+
95+
fakeBlockstorageCommitTimeHist := testMetricProvider.fakeBlockstorageCommitTimeHist
96+
97+
// should have 4 calls to fakeBlockstorageCommitTimeHist: genesis block, and 3 blocks
98+
expectedCallCount := 1 + 3
99+
assert.Equal(t, expectedCallCount, fakeBlockstorageCommitTimeHist.ObserveCallCount())
100+
101+
// verify the value of channel in each call (0, 1, 2, 3)
102+
for i := 0; i < expectedCallCount; i++ {
103+
assert.Equal(t, []string{"channel", ledgerid}, fakeBlockstorageCommitTimeHist.WithArgsForCall(i))
104+
}
105+
106+
// invoke updateBlockStats api explicitly and verify with fake metrics (call number is 4)
107+
store.(*fsBlockStore).updateBlockStats(4, 10*time.Second)
108+
assert.Equal(t,
109+
[]string{"channel", ledgerid},
110+
testMetricProvider.fakeBlockstorageCommitTimeHist.WithArgsForCall(4),
111+
)
112+
assert.Equal(t,
113+
float64(10),
114+
testMetricProvider.fakeBlockstorageCommitTimeHist.ObserveArgsForCall(4),
115+
)
116+
}
117+
118+
type testMetricProvider struct {
119+
fakeProvider *metricsfakes.Provider
120+
fakeBlockchainHeightGauge *metricsfakes.Gauge
121+
fakeBlockstorageCommitTimeHist *metricsfakes.Histogram
122+
}
123+
124+
func testutilConstructMetricProvider() *testMetricProvider {
125+
fakeProvider := &metricsfakes.Provider{}
126+
fakeBlockchainHeightGauge := testutilConstructGauge()
127+
fakeBlockstorageCommitTimeHist := testutilConstructHist()
128+
fakeProvider.NewGaugeStub = func(opts metrics.GaugeOpts) metrics.Gauge {
129+
switch opts.Name {
130+
case blockchainHeightOpts.Name:
131+
return fakeBlockchainHeightGauge
132+
default:
133+
return nil
134+
}
135+
}
136+
fakeProvider.NewHistogramStub = func(opts metrics.HistogramOpts) metrics.Histogram {
137+
switch opts.Name {
138+
case blockstorageCommitTimeOpts.Name:
139+
return fakeBlockstorageCommitTimeHist
140+
default:
141+
return nil
142+
}
143+
}
144+
145+
return &testMetricProvider{
146+
fakeProvider,
147+
fakeBlockchainHeightGauge,
148+
fakeBlockstorageCommitTimeHist,
149+
}
150+
}
151+
152+
func testutilConstructGauge() *metricsfakes.Gauge {
153+
fakeGauge := &metricsfakes.Gauge{}
154+
fakeGauge.WithStub = func(lableValues ...string) metrics.Gauge {
155+
return fakeGauge
156+
}
157+
return fakeGauge
158+
}
159+
160+
func testutilConstructHist() *metricsfakes.Histogram {
161+
fakeHist := &metricsfakes.Histogram{}
162+
fakeHist.WithStub = func(lableValues ...string) metrics.Histogram {
163+
return fakeHist
164+
}
165+
return fakeHist
166+
}

common/ledger/blkstorage/fsblkstorage/pkg_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424

2525
"github.com/hyperledger/fabric/common/flogging"
2626
"github.com/hyperledger/fabric/common/ledger/blkstorage"
27+
"github.com/hyperledger/fabric/common/metrics"
28+
"github.com/hyperledger/fabric/common/metrics/disabled"
2729
"github.com/hyperledger/fabric/protos/common"
2830
"github.com/hyperledger/fabric/protoutil"
2931
"github.com/stretchr/testify/assert"
@@ -48,6 +50,10 @@ type testEnv struct {
4850
}
4951

5052
func newTestEnv(t testing.TB, conf *Conf) *testEnv {
53+
return newTestEnvWithMetricsProvider(t, conf, &disabled.Provider{})
54+
}
55+
56+
func newTestEnvWithMetricsProvider(t testing.TB, conf *Conf, metricsProvider metrics.Provider) *testEnv {
5157
attrsToIndex := []blkstorage.IndexableAttr{
5258
blkstorage.IndexableAttrBlockHash,
5359
blkstorage.IndexableAttrBlockNum,
@@ -56,12 +62,12 @@ func newTestEnv(t testing.TB, conf *Conf) *testEnv {
5662
blkstorage.IndexableAttrBlockTxID,
5763
blkstorage.IndexableAttrTxValidationCode,
5864
}
59-
return newTestEnvSelectiveIndexing(t, conf, attrsToIndex)
65+
return newTestEnvSelectiveIndexing(t, conf, attrsToIndex, metricsProvider)
6066
}
6167

62-
func newTestEnvSelectiveIndexing(t testing.TB, conf *Conf, attrsToIndex []blkstorage.IndexableAttr) *testEnv {
68+
func newTestEnvSelectiveIndexing(t testing.TB, conf *Conf, attrsToIndex []blkstorage.IndexableAttr, metricsProvider metrics.Provider) *testEnv {
6369
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
64-
return &testEnv{t, NewProvider(conf, indexConfig).(*FsBlockstoreProvider)}
70+
return &testEnv{t, NewProvider(conf, indexConfig, metricsProvider).(*FsBlockstoreProvider)}
6571
}
6672

6773
func (env *testEnv) Cleanup() {

0 commit comments

Comments
 (0)