Skip to content

Commit

Permalink
[FAB-12801] Create ledger stats with basic metrics
Browse files Browse the repository at this point in the history
This CR introduces the ledger stats and populates
following basic metrics
 - Blockchain Height
 - Block Processing Time
 - Block storage commit time
 - State DB commit time

Change-Id: I7071ececad7a67edddaf665d4af39a3878e9ba35
Signed-off-by: manish <manish.sethi@gmail.com>
Signed-off-by: Saad Karim <skarim@us.ibm.com>
  • Loading branch information
manish-sethi committed Nov 20, 2018
1 parent bd5df09 commit 9d8fcbb
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 11 deletions.
46 changes: 38 additions & 8 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type kvLedger struct {
historyDB historydb.HistoryDB
configHistoryRetriever ledger.ConfigHistoryRetriever
blockAPIsRWLock *sync.RWMutex
stats *ledgerStats
}

// NewKVLedger constructs new `KVLedger`
Expand All @@ -52,6 +53,7 @@ func newKVLedger(
stateListeners []ledger.StateListener,
bookkeeperProvider bookkeeping.Provider,
ccInfoProvider ledger.DeployedChaincodeInfoProvider,
stats *ledgerStats,
) (*kvLedger, error) {
logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)
// Create a kvLedger for this chain/ledger, which encasulates the underlying
Expand All @@ -76,6 +78,14 @@ func newKVLedger(
panic(errors.WithMessage(err, "error during state DB recovery"))
}
l.configHistoryRetriever = configHistoryMgr.GetRetriever(ledgerID, l)

info, err := l.GetBlockchainInfo()
if err != nil {
return nil, err
}
// initialize stat with the current height
stats.updateBlockchainHeight(info.Height)
l.stats = stats
return l, nil
}

Expand Down Expand Up @@ -254,13 +264,13 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
block := pvtdataAndBlock.Block
blockNo := pvtdataAndBlock.Block.Header.Number

startStateValidation := time.Now()
startBlockProcessing := time.Now()
logger.Debugf("[%s] Validating state for block [%d]", l.ledgerID, blockNo)
err = l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)
if err != nil {
return err
}
elapsedStateValidation := time.Since(startStateValidation) / time.Millisecond // duration in ms
elapsedBlockProcessing := time.Since(startBlockProcessing)

startCommitBlockStorage := time.Now()
logger.Debugf("[%s] Committing block [%d] to storage", l.ledgerID, blockNo)
Expand All @@ -269,14 +279,14 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
if err = l.blockStore.CommitWithPvtData(pvtdataAndBlock); err != nil {
return err
}
elapsedCommitBlockStorage := time.Since(startCommitBlockStorage) / time.Millisecond // duration in ms
elapsedCommitBlockStorage := time.Since(startCommitBlockStorage)

startCommitState := time.Now()
logger.Debugf("[%s] Committing block [%d] transactions to state database", l.ledgerID, blockNo)
if err = l.txtmgmt.Commit(); err != nil {
panic(errors.WithMessage(err, "error during commit to txmgr"))
}
elapsedCommitState := time.Since(startCommitState) / time.Millisecond // duration in ms
elapsedCommitState := time.Since(startCommitState)

// History database could be written in parallel with state and/or async as a future optimization,
// although it has not been a bottleneck...no need to clutter the log with elapsed duration.
Expand All @@ -287,15 +297,35 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
}
}

elapsedCommitWithPvtData := time.Since(startStateValidation) / time.Millisecond // total duration in ms
elapsedCommitWithPvtData := time.Since(startBlockProcessing)

logger.Infof("[%s] Committed block [%d] with %d transaction(s) in %dms (state_validation=%dms block_commit=%dms state_commit=%dms)",
l.ledgerID, block.Header.Number, len(block.Data.Data), elapsedCommitWithPvtData,
elapsedStateValidation, elapsedCommitBlockStorage, elapsedCommitState)

l.ledgerID, block.Header.Number, len(block.Data.Data),
elapsedCommitWithPvtData/time.Millisecond,
elapsedBlockProcessing/time.Millisecond,
elapsedCommitBlockStorage/time.Millisecond,
elapsedCommitState/time.Millisecond,
)
l.updateBlockStats(blockNo,
elapsedBlockProcessing,
elapsedCommitBlockStorage,
elapsedCommitState,
)
return nil
}

func (l *kvLedger) updateBlockStats(
blockNum uint64,
blockProcessingTime time.Duration,
blockstorageCommitTime time.Duration,
statedbCommitTime time.Duration,
) {
l.stats.updateBlockchainHeight(blockNum + 1)
l.stats.updateBlockProcessingTime(blockProcessingTime)
l.stats.updateBlockstorageCommitTime(blockstorageCommitTime)
l.stats.updateStatedbCommitTime(statedbCommitTime)
}

// GetMissingPvtDataInfoForMostRecentBlocks returns the missing private data information for the
// most recent `maxBlock` blocks which miss at least a private data of a eligible collection.
func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.MissingPvtDataInfo, error) {
Expand Down
12 changes: 9 additions & 3 deletions core/ledger/kvledger/kv_ledger_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Provider struct {
bookkeepingProvider bookkeeping.Provider
initializer *ledger.Initializer
collElgNotifier *collElgNotifier
stats *stats
}

// NewProvider instantiates a new Provider.
Expand All @@ -62,7 +63,7 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
historydbProvider := historyleveldb.NewHistoryDBProvider()
logger.Info("ledger provider Initialized")
provider := &Provider{idStore, ledgerStoreProvider,
nil, historydbProvider, nil, nil, nil, nil, nil}
nil, historydbProvider, nil, nil, nil, nil, nil, nil}
return provider, nil
}

Expand Down Expand Up @@ -91,6 +92,7 @@ func (provider *Provider) Initialize(initializer *ledger.Initializer) error {
if err != nil {
return err
}
provider.stats = newStats(initializer.MetricsProvider)
provider.recoverUnderConstructionLedger()
return nil
}
Expand Down Expand Up @@ -168,8 +170,12 @@ func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, erro

// Create a kvLedger for this chain/ledger, which encasulates the underlying data stores
// (id store, blockstore, state database, history database)
l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB, provider.configHistoryMgr,
provider.stateListeners, provider.bookkeepingProvider, provider.initializer.DeployedChaincodeInfoProvider)
l, err := newKVLedger(
ledgerID, blockStore, vDB, historyDB, provider.configHistoryMgr,
provider.stateListeners, provider.bookkeepingProvider,
provider.initializer.DeployedChaincodeInfoProvider,
provider.stats.ledgerStats(ledgerID),
)
if err != nil {
return nil, err
}
Expand Down
125 changes: 125 additions & 0 deletions core/ledger/kvledger/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package kvledger

import (
"time"

"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)

type stats struct {
blockchainHeight metrics.Gauge
blockProcessingTime metrics.Histogram
blockstorageCommitTime metrics.Histogram
statedbCommitTime metrics.Histogram
transactionsCount metrics.Counter
}

func newStats(metricsProvider metrics.Provider) *stats {
stats := &stats{}
stats.blockchainHeight = metricsProvider.NewGauge(blockchainHeightOpts)
stats.blockProcessingTime = metricsProvider.NewHistogram(blockProcessingTimeOpts)
stats.blockstorageCommitTime = metricsProvider.NewHistogram(blockstorageCommitTimeOpts)
stats.statedbCommitTime = metricsProvider.NewHistogram(statedbCommitTimeOpts)
stats.transactionsCount = metricsProvider.NewCounter(transactionCountOpts)
return stats
}

type ledgerStats struct {
stats *stats
ledgerid string
}

func (s *stats) ledgerStats(ledgerid string) *ledgerStats {
return &ledgerStats{
s, ledgerid,
}
}

func (s *ledgerStats) updateBlockchainHeight(height uint64) {
// casting uint64 to float64 guarentees precision for the numbers upto 9,007,199,254,740,992 (1<<53)
// since, we are not expecting the blockchains of this scale anytime soon, we go ahead with this for now.
s.stats.blockchainHeight.With("channel_name", s.ledgerid).Set(float64(height))
}

func (s *ledgerStats) updateBlockProcessingTime(timeTaken time.Duration) {
s.stats.blockProcessingTime.With("channel_name", s.ledgerid).Observe(timeTaken.Seconds())
}

func (s *ledgerStats) updateBlockstorageCommitTime(timeTaken time.Duration) {
s.stats.blockstorageCommitTime.With("channel_name", s.ledgerid).Observe(timeTaken.Seconds())
}

func (s *ledgerStats) updateStatedbCommitTime(timeTaken time.Duration) {
s.stats.statedbCommitTime.With("channel_name", s.ledgerid).Observe(timeTaken.Seconds())
}

func (s *ledgerStats) updateTransactionCounts(
transactionType common.HeaderType,
chaincodeName string,
validatioCode peer.TxValidationCode,
) {
s.stats.transactionsCount.
With(s.ledgerid,
transactionType.String(),
chaincodeName,
validatioCode.String(),
).
Add(1)
}

var (
blockchainHeightOpts = metrics.GaugeOpts{
Namespace: "ledger",
Subsystem: "",
Name: "blockchain_height",
Help: "Height of the chain in blocks.",
LabelNames: []string{"channel_name"},
StatsdFormat: "%{#fqname}.%{channel_name}",
}

blockProcessingTimeOpts = metrics.HistogramOpts{
Namespace: "ledger",
Subsystem: "",
Name: "block_processing_time",
Help: "Time taken in seconds for ledger block processing.",
LabelNames: []string{"channel_name"},
StatsdFormat: "%{#fqname}.%{channel_name}",
Buckets: []float64{0.005, 0.01, 0.015, 0.05, 0.1, 1, 10},
}

blockstorageCommitTimeOpts = metrics.HistogramOpts{
Namespace: "ledger",
Subsystem: "",
Name: "blockstorage_commit_time",
Help: "Time taken in seconds for committing block and private data to respective storage.",
LabelNames: []string{"channel_name"},
StatsdFormat: "%{#fqname}.%{channel_name}",
Buckets: []float64{0.005, 0.01, 0.015, 0.05, 0.1, 1, 10},
}

statedbCommitTimeOpts = metrics.HistogramOpts{
Namespace: "ledger",
Subsystem: "",
Name: "statedb_commit_time",
Help: "Time taken in seconds for committing block changes to state db.",
LabelNames: []string{"channel_name"},
StatsdFormat: "%{#fqname}.%{channel_name}",
Buckets: []float64{0.005, 0.01, 0.015, 0.05, 0.1, 1, 10},
}

transactionCountOpts = metrics.CounterOpts{
Namespace: "ledger",
Subsystem: "",
Name: "transaction_counts",
Help: "Number of transactions processed.",
LabelNames: []string{"channel_name", "transaction_type", "chaincode", "validation_code"},
StatsdFormat: "%{#fqname}.%{channel_name}.%{transaction_type}.%{chaincode}.%{validation_code}",
}
)
Loading

0 comments on commit 9d8fcbb

Please sign in to comment.