Skip to content

Commit d06c012

Browse files
author
Luis Sanchez
committed
[FAB-6167] use go-logging for sarama logging
- sarama kafka client library will log to a go-logging logger with id: orderer/consensus/kafka/sarama. - the logger can be enabled via Kafka.Verbose config or by explicitly setting to DEBUG in the log specification string. Change-Id: Ieb91ef06a7d7b8587b711439d26e116d12260dd9 Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
1 parent 40e41a5 commit d06c012

File tree

6 files changed

+83
-29
lines changed

6 files changed

+83
-29
lines changed

orderer/common/server/main.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package server
99
import (
1010
"fmt"
1111
"io/ioutil"
12-
"log"
1312
"net"
1413
"net/http"
1514
_ "net/http/pprof" // This is essentially the main package for the orderer
@@ -32,7 +31,6 @@ import (
3231
ab "github.com/hyperledger/fabric/protos/orderer"
3332
"github.com/hyperledger/fabric/protos/utils"
3433

35-
"github.com/Shopify/sarama"
3634
"github.com/hyperledger/fabric/common/localmsp"
3735
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
3836
"github.com/hyperledger/fabric/orderer/common/performance"
@@ -100,9 +98,6 @@ func Start(cmd string, conf *config.TopLevel) {
10098
func initializeLoggingLevel(conf *config.TopLevel) {
10199
flogging.InitBackend(flogging.SetFormat(conf.General.LogFormat), os.Stderr)
102100
flogging.InitFromSpec(conf.General.LogLevel)
103-
if conf.Kafka.Verbose {
104-
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
105-
}
106101
}
107102

108103
// Start the profiling service if enabled.
@@ -227,7 +222,7 @@ func initializeMultichannelRegistrar(conf *config.TopLevel, signer crypto.LocalS
227222

228223
consenters := make(map[string]consensus.Consenter)
229224
consenters["solo"] = solo.New()
230-
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version)
225+
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version, conf.Kafka.Verbose)
231226

232227
return multichannel.NewRegistrar(lf, consenters, signer)
233228
}

orderer/common/server/main_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"testing"
1818
"time"
1919

20-
"github.com/Shopify/sarama"
2120
"github.com/hyperledger/fabric/bccsp/factory"
2221
"github.com/hyperledger/fabric/common/flogging"
2322
"github.com/hyperledger/fabric/common/localmsp"
@@ -40,11 +39,9 @@ func TestInitializeLoggingLevel(t *testing.T) {
4039
// global log level setting in tests of this package (for example,
4140
// the benchmark-related ones) that would occur otherwise.
4241
General: config.General{LogLevel: "foo=debug"},
43-
Kafka: config.Kafka{Verbose: true},
4442
},
4543
)
4644
assert.Equal(t, flogging.GetModuleLevel("foo"), "DEBUG")
47-
assert.NotNil(t, sarama.Logger)
4845
}
4946

5047
func TestInitializeProfilingService(t *testing.T) {

orderer/consensus/kafka/consenter.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,17 @@ package kafka
88

99
import (
1010
"github.com/Shopify/sarama"
11-
"github.com/hyperledger/fabric/common/flogging"
1211
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
1312
"github.com/hyperledger/fabric/orderer/consensus"
1413
cb "github.com/hyperledger/fabric/protos/common"
1514
logging "github.com/op/go-logging"
1615
)
1716

18-
const pkgLogID = "orderer/consensus/kafka"
19-
20-
var logger *logging.Logger
21-
22-
func init() {
23-
logger = flogging.MustGetLogger(pkgLogID)
24-
}
25-
2617
// New creates a Kafka-based consenter. Called by orderer's main.go.
27-
func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion) consensus.Consenter {
18+
func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion, verbose bool) consensus.Consenter {
19+
if verbose {
20+
logging.SetLevel(logging.DEBUG, saramaLogID)
21+
}
2822
brokerConfig := newBrokerConfig(tlsConfig, retryOptions, kafkaVersion, defaultPartition)
2923
return &consenterImpl{
3024
brokerConfigVal: brokerConfig,

orderer/consensus/kafka/consenter_test.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ package kafka
88

99
import (
1010
"fmt"
11-
"log"
12-
"os"
1311
"strings"
1412
"testing"
1513
"time"
@@ -54,15 +52,15 @@ func init() {
5452
mockLocalConfig = newMockLocalConfig(false, mockRetryOptions, false)
5553
mockBrokerConfig = newMockBrokerConfig(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, defaultPartition)
5654
mockConsenter = newMockConsenter(mockBrokerConfig, mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version)
57-
setupTestLogging("ERROR", mockLocalConfig.Kafka.Verbose)
55+
setupTestLogging("ERROR")
5856
}
5957

6058
func TestNew(t *testing.T) {
61-
_ = consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version))
59+
_ = consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, mockLocalConfig.Kafka.Verbose))
6260
}
6361

6462
func TestHandleChain(t *testing.T) {
65-
consenter := consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version))
63+
consenter := consensus.Consenter(New(mockLocalConfig.General.TLS, mockLocalConfig.Kafka.Retry, mockLocalConfig.Kafka.Version, mockLocalConfig.Kafka.Verbose))
6664

6765
oldestOffset := int64(0)
6866
newestOffset := int64(5)
@@ -154,15 +152,11 @@ func newMockLocalConfig(enableTLS bool, retryOptions localconfig.Retry, verboseL
154152
}
155153
}
156154

157-
func setupTestLogging(logLevel string, verbose bool) {
155+
func setupTestLogging(logLevel string) {
158156
// This call allows us to (a) get the logging backend initialization that
159157
// takes place in the `flogging` package, and (b) adjust the verbosity of
160158
// the logs when running tests on this package.
161159
flogging.SetModuleLevel(pkgLogID, logLevel)
162-
163-
if verbose {
164-
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
165-
}
166160
}
167161

168162
func tamperBytes(original []byte) []byte {

orderer/consensus/kafka/logger.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package kafka
8+
9+
import (
10+
"fmt"
11+
12+
"github.com/Shopify/sarama"
13+
"github.com/hyperledger/fabric/common/flogging"
14+
logging "github.com/op/go-logging"
15+
)
16+
17+
const (
18+
pkgLogID = "orderer/consensus/kafka"
19+
saramaLogID = pkgLogID + "/sarama"
20+
)
21+
22+
var logger *logging.Logger
23+
24+
// init initializes the package logger
25+
func init() {
26+
logger = flogging.MustGetLogger(pkgLogID)
27+
}
28+
29+
// init initializes the samara logger
30+
func init() {
31+
loggingProvider := flogging.MustGetLogger(saramaLogID)
32+
loggingProvider.ExtraCalldepth = 3
33+
sarama.Logger = &saramaLoggerImpl{
34+
logger: loggingProvider,
35+
}
36+
}
37+
38+
type saramaLoggerImpl struct {
39+
logger *logging.Logger
40+
}
41+
42+
func (l saramaLoggerImpl) Print(args ...interface{}) {
43+
l.print(fmt.Sprint(args...))
44+
}
45+
46+
func (l saramaLoggerImpl) Printf(format string, args ...interface{}) {
47+
l.print(fmt.Sprintf(format, args...))
48+
}
49+
50+
func (l saramaLoggerImpl) Println(args ...interface{}) {
51+
l.print(fmt.Sprintln(args...))
52+
}
53+
54+
func (l saramaLoggerImpl) print(message string) {
55+
l.logger.Debug(message)
56+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package kafka
8+
9+
import (
10+
"testing"
11+
12+
"github.com/Shopify/sarama"
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
func TestLoggerInit(t *testing.T) {
17+
assert.IsType(t, &saramaLoggerImpl{}, sarama.Logger, "Sarama logger not properly initialized")
18+
}

0 commit comments

Comments
 (0)