Skip to content

Commit 4cf2ff5

Browse files
author
Jason Yellick
committed
FAB-613 Add Kafka metrics for orderer
This CR adds a simple polling wrapper around the go-metrics metric registry logic that Sarama already implements. It converts all of the exposed per broker/topic metrics to gauges and exposes those gauges through the go-kit metrics. Change-Id: I0847cf01262f2fdd87b5ae3e4338c7a49314cd08 Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
1 parent a063c8a commit 4cf2ff5

16 files changed

+2518
-22
lines changed

Gopkg.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/metrics/provider.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type CounterOpts struct {
6161
StatsdFormat string
6262
}
6363

64-
// A Gauge is a memter that expresses the current value of some metric.
64+
// A Gauge is a meter that expresses the current value of some metric.
6565
type Gauge interface {
6666
// With is used to provide label values when recording a Gauge value. This
6767
// must be used to provide values for all LabelNames provided to GaugeOpts.
@@ -108,7 +108,7 @@ type GaugeOpts struct {
108108
StatsdFormat string
109109
}
110110

111-
// A Histogram is a memter that records an observed value into quantized
111+
// A Histogram is a meter that records an observed value into quantized
112112
// buckets.
113113
type Histogram interface {
114114
// With is used to provide label values when recording a Histogram

orderer/common/server/main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func Start(cmd string, conf *localconfig.TopLevel) {
126126
}
127127
}
128128

129-
manager := initializeMultichannelRegistrar(clusterType, clusterDialer, serverConfig, grpcServer, conf, signer, tlsCallback)
129+
manager := initializeMultichannelRegistrar(clusterType, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, tlsCallback)
130130
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
131131
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS)
132132

@@ -397,6 +397,7 @@ func initializeMultichannelRegistrar(isClusterType bool,
397397
srv *comm.GRPCServer,
398398
conf *localconfig.TopLevel,
399399
signer crypto.LocalSigner,
400+
metricsProvider metrics.Provider,
400401
callbacks ...func(bundle *channelconfig.Bundle)) *multichannel.Registrar {
401402
lf, _ := createLedgerFactory(conf)
402403
genesisBlock := extractGenesisBlock(conf)
@@ -412,7 +413,11 @@ func initializeMultichannelRegistrar(isClusterType bool,
412413
registrar := multichannel.NewRegistrar(lf, signer, callbacks...)
413414

414415
consenters["solo"] = solo.New()
415-
consenters["kafka"] = kafka.New(conf.Kafka)
416+
var kafkaMetrics *kafka.Metrics
417+
consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider)
418+
// Note, we pass a 'nil' channel here, we could pass a channel that
419+
// closes if we wished to cleanup this routine on exit.
420+
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
416421
if isClusterType {
417422
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar)
418423
consenters["etcdraft"] = raftConsenter

orderer/common/server/main_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func TestInitializeMultiChainManager(t *testing.T) {
259259
conf := genesisConfig(t)
260260
assert.NotPanics(t, func() {
261261
initializeLocalMsp(conf)
262-
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner())
262+
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{})
263263
})
264264
}
265265

@@ -320,7 +320,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
320320
updateTrustedRoots(grpcServer, caSupport, bundle)
321321
}
322322
}
323-
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), callback)
323+
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, callback)
324324
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
325325
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
326326
// mutual TLS not required so no updates should have occurred
@@ -357,7 +357,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
357357
updateClusterDialer(caSupport, predDialer, clusterConf.SecOpts.ServerRootCAs)
358358
}
359359
}
360-
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), callback)
360+
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, callback)
361361
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
362362
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
363363
// mutual TLS is required so updates should have occurred

orderer/consensus/kafka/chain_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,20 @@ import (
1111
"testing"
1212
"time"
1313

14-
"github.com/Shopify/sarama"
15-
"github.com/Shopify/sarama/mocks"
16-
"github.com/golang/protobuf/proto"
1714
"github.com/hyperledger/fabric/common/channelconfig"
1815
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
1916
"github.com/hyperledger/fabric/orderer/common/blockcutter"
2017
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
18+
lmock "github.com/hyperledger/fabric/orderer/consensus/kafka/mock"
2119
mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
2220
mockmultichannel "github.com/hyperledger/fabric/orderer/mocks/common/multichannel"
2321
cb "github.com/hyperledger/fabric/protos/common"
2422
ab "github.com/hyperledger/fabric/protos/orderer"
2523
"github.com/hyperledger/fabric/protos/utils"
24+
25+
"github.com/Shopify/sarama"
26+
"github.com/Shopify/sarama/mocks"
27+
"github.com/golang/protobuf/proto"
2628
"github.com/stretchr/testify/assert"
2729
"github.com/stretchr/testify/mock"
2830
"github.com/stretchr/testify/require"
@@ -3402,7 +3404,7 @@ func TestDeliverSession(t *testing.T) {
34023404
defer env.broker2.Close()
34033405

34043406
// initialize consenter
3405-
consenter := New(mockLocalConfig.Kafka)
3407+
consenter, _ := New(mockLocalConfig.Kafka, &lmock.MetricsProvider{})
34063408

34073409
// initialize chain
34083410
metadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
@@ -3491,7 +3493,7 @@ func TestDeliverSession(t *testing.T) {
34913493
defer env.broker0.Close()
34923494

34933495
// initialize consenter
3494-
consenter := New(mockLocalConfig.Kafka)
3496+
consenter, _ := New(mockLocalConfig.Kafka, &lmock.MetricsProvider{})
34953497

34963498
// initialize chain
34973499
metadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
@@ -3553,7 +3555,7 @@ func TestDeliverSession(t *testing.T) {
35533555
defer env.broker0.Close()
35543556

35553557
// initialize consenter
3556-
consenter := New(mockLocalConfig.Kafka)
3558+
consenter, _ := New(mockLocalConfig.Kafka, &lmock.MetricsProvider{})
35573559

35583560
// initialize chain
35593561
metadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}

orderer/consensus/kafka/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import (
1010
"crypto/tls"
1111
"crypto/x509"
1212

13-
"github.com/Shopify/sarama"
1413
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
14+
15+
"github.com/Shopify/sarama"
1516
)
1617

1718
func newBrokerConfig(

orderer/consensus/kafka/consenter.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,28 @@ SPDX-License-Identifier: Apache-2.0
77
package kafka
88

99
import (
10-
"github.com/Shopify/sarama"
10+
"github.com/hyperledger/fabric/common/metrics"
1111
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
1212
"github.com/hyperledger/fabric/orderer/consensus"
1313
cb "github.com/hyperledger/fabric/protos/common"
14+
15+
"github.com/Shopify/sarama"
1416
logging "github.com/op/go-logging"
1517
)
1618

1719
// New creates a Kafka-based consenter. Called by orderer's main.go.
18-
func New(config localconfig.Kafka) consensus.Consenter {
20+
func New(config localconfig.Kafka, metricsProvider metrics.Provider) (consensus.Consenter, *Metrics) {
1921
if config.Verbose {
2022
logging.SetLevel(logging.DEBUG, "orderer.consensus.kafka.sarama")
2123
}
24+
2225
brokerConfig := newBrokerConfig(
2326
config.TLS,
2427
config.SASLPlain,
2528
config.Retry,
2629
config.Version,
2730
defaultPartition)
31+
2832
return &consenterImpl{
2933
brokerConfigVal: brokerConfig,
3034
tlsConfigVal: config.TLS,
@@ -34,7 +38,7 @@ func New(config localconfig.Kafka) consensus.Consenter {
3438
NumPartitions: 1,
3539
ReplicationFactor: config.Topic.ReplicationFactor,
3640
},
37-
}
41+
}, NewMetrics(metricsProvider, brokerConfig.MetricRegistry)
3842
}
3943

4044
// consenterImpl holds the implementation of type that satisfies the
@@ -46,6 +50,7 @@ type consenterImpl struct {
4650
retryOptionsVal localconfig.Retry
4751
kafkaVersionVal sarama.KafkaVersion
4852
topicDetailVal *sarama.TopicDetail
53+
metricsProvider metrics.Provider
4954
}
5055

5156
// HandleChain creates/returns a reference to a consensus.Chain object for the

orderer/consensus/kafka/consenter_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,18 @@ import (
1212
"testing"
1313
"time"
1414

15-
"github.com/Shopify/sarama"
16-
"github.com/golang/protobuf/proto"
1715
"github.com/hyperledger/fabric/common/flogging"
1816
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
1917
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
2018
"github.com/hyperledger/fabric/orderer/consensus"
19+
"github.com/hyperledger/fabric/orderer/consensus/kafka/mock"
2120
mockmultichannel "github.com/hyperledger/fabric/orderer/mocks/common/multichannel"
2221
cb "github.com/hyperledger/fabric/protos/common"
2322
ab "github.com/hyperledger/fabric/protos/orderer"
2423
"github.com/hyperledger/fabric/protos/utils"
24+
25+
"github.com/Shopify/sarama"
26+
"github.com/golang/protobuf/proto"
2527
"github.com/stretchr/testify/assert"
2628
)
2729

@@ -69,11 +71,12 @@ func init() {
6971
}
7072

7173
func TestNew(t *testing.T) {
72-
_ = consensus.Consenter(New(mockLocalConfig.Kafka))
74+
c, _ := New(mockLocalConfig.Kafka, &mock.MetricsProvider{})
75+
_ = consensus.Consenter(c)
7376
}
7477

7578
func TestHandleChain(t *testing.T) {
76-
consenter := consensus.Consenter(New(mockLocalConfig.Kafka))
79+
consenter, _ := New(mockLocalConfig.Kafka, &mock.MetricsProvider{})
7780

7881
oldestOffset := int64(0)
7982
newestOffset := int64(5)
@@ -103,7 +106,6 @@ func TestHandleChain(t *testing.T) {
103106
}
104107

105108
mockMetadata := &cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: newestOffset - 1})}
106-
107109
_, err := consenter.HandleChain(mockSupport, mockMetadata)
108110
assert.NoError(t, err, "Expected the HandleChain call to return without errors")
109111
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package kafka_test
8+
9+
import (
10+
"testing"
11+
12+
"github.com/hyperledger/fabric/common/metrics"
13+
14+
. "github.com/onsi/ginkgo"
15+
. "github.com/onsi/gomega"
16+
17+
gometrics "github.com/rcrowley/go-metrics"
18+
)
19+
20+
//go:generate counterfeiter -o mock/metrics_registry.go --fake-name MetricsRegistry . metricsRegistry
21+
type metricsRegistry interface {
22+
gometrics.Registry
23+
}
24+
25+
//go:generate counterfeiter -o mock/metrics_meter.go --fake-name MetricsMeter . metricsMeter
26+
type metricsMeter interface {
27+
gometrics.Meter
28+
}
29+
30+
//go:generate counterfeiter -o mock/metrics_histogram.go --fake-name MetricsHistogram . metricsHistogram
31+
type metricsHistogram interface {
32+
gometrics.Histogram
33+
}
34+
35+
//go:generate counterfeiter -o mock/metrics_provider.go --fake-name MetricsProvider . metricsProvider
36+
type metricsProvider interface {
37+
metrics.Provider
38+
}
39+
40+
//go:generate counterfeiter -o mock/metrics_gauge.go --fake-name MetricsGauge . metricsGauge
41+
type metricsGauge interface {
42+
metrics.Gauge
43+
}
44+
45+
func TestKafka(t *testing.T) {
46+
RegisterFailHandler(Fail)
47+
RunSpecs(t, "Kafka Suite")
48+
}

0 commit comments

Comments
 (0)