Skip to content

Commit

Permalink
Merge "[FAB-2913] Prepend channel name to log output"
Browse files Browse the repository at this point in the history
  • Loading branch information
hacera-jonathan authored and Gerrit Code Review committed Mar 29, 2017
2 parents 561443e + b7166b7 commit e79ab45
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 49 deletions.
18 changes: 12 additions & 6 deletions orderer/kafka/broker.go
Expand Up @@ -53,20 +53,23 @@ func newBroker(brokers []string, cp ChainPartition) (Broker, error) {
}

if connectedBroker == nil {
return nil, fmt.Errorf("Failed to connect to any of the given brokers (%v) for metadata request", brokers)
return nil, fmt.Errorf("failed to connect to any of the given brokers (%v) for metadata request", brokers)
}
logger.Debugf("Connected to broker %s", connectedBroker.Addr())

// Get metadata for the topic that corresponds to this chain
metadata, err := connectedBroker.GetMetadata(&sarama.MetadataRequest{Topics: []string{cp.Topic()}})
if err != nil {
return nil, fmt.Errorf("Failed to get metadata for topic %s: %s", cp, err)
return nil, fmt.Errorf("failed to get metadata for topic %s: %s", cp, err)
}

// Get the leader broker for this chain partition
if (cp.Partition() >= 0) && (cp.Partition() < int32(len(metadata.Topics[0].Partitions))) {
leaderBrokerID := metadata.Topics[0].Partitions[cp.Partition()].Leader
logger.Debugf("Leading broker for chain %s is broker ID %d", cp, leaderBrokerID)
// ATTN: If we ever switch to more than one partition per topic, the message
// below should be updated to print `cp` (i.e. Topic/Partition) instead of
// `cp.Topic()`.
logger.Debugf("[channel: %s] Leading broker: %d", cp.Topic(), leaderBrokerID)
for _, availableBroker := range metadata.Brokers {
if availableBroker.ID() == leaderBrokerID {
leaderBroker = availableBroker
Expand All @@ -76,15 +79,18 @@ func newBroker(brokers []string, cp ChainPartition) (Broker, error) {
}

if leaderBroker == nil {
return nil, fmt.Errorf("Can't find leader for chain %s", cp)
// ATTN: If we ever switch to more than one partition per topic, the message
// below should be updated to print `cp` (i.e. Topic/Partition) instead of
// `cp.Topic()`.
return nil, fmt.Errorf("[channel: %s] cannot find leader", cp.Topic())
}

// Connect to broker
if err := leaderBroker.Open(nil); err != nil {
return nil, fmt.Errorf("Failed to connect ho Kafka broker: %s", err)
return nil, fmt.Errorf("failed to connect to Kafka broker: %s", err)
}
if connected, err := leaderBroker.Connected(); !connected {
return nil, fmt.Errorf("Failed to connect to Kafka broker: %s", err)
return nil, fmt.Errorf("failed to connect to Kafka broker: %s", err)
}

return &brokerImpl{broker: leaderBroker}, nil
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/broker_mock_test.go
Expand Up @@ -47,7 +47,7 @@ func mockNewBroker(t *testing.T, cp ChainPartition) (Broker, error) {

broker := sarama.NewBroker(mockBroker.Addr())
if err := broker.Open(nil); err != nil {
return nil, fmt.Errorf("Cannot connect to mock broker: %s", err)
return nil, fmt.Errorf("cannot connect to mock broker: %s", err)
}

return &mockBrockerImpl{
Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/consumer.go
Expand Up @@ -47,7 +47,7 @@ func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.
parent: parent,
partition: partition,
}
logger.Debugf("Created new consumer for session (partition %s, beginning offset %d)", cp, offset)
logger.Debugf("[channel: %s] Created new consumer for session (beginning offset: %d)", cp.Topic(), offset)
return c, nil
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/kafka/log_test.go
Expand Up @@ -21,5 +21,5 @@ import (
)

func init() {
logging.SetLevel(logging.INFO, "") // Silence debug-level outputs when testing
logging.SetLevel(logging.DEBUG, "") // Silence debug-level outputs when testing
}
62 changes: 35 additions & 27 deletions orderer/kafka/orderer.go
Expand Up @@ -80,15 +80,16 @@ type consenterImpl struct {
// Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which
// is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains.
func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) {
return newChain(co, cs, getLastOffsetPersisted(metadata)), nil
return newChain(co, cs, getLastOffsetPersisted(metadata, cs.ChainID())), nil
}

func getLastOffsetPersisted(metadata *cb.Metadata) int64 {
func getLastOffsetPersisted(metadata *cb.Metadata, chainID string) int64 {
if metadata.Value != nil {
// Extract orderer-related metadata from the tip of the ledger first
kafkaMetadata := &ab.KafkaMetadata{}
if err := proto.Unmarshal(metadata.Value, kafkaMetadata); err != nil {
panic("Ledger may be corrupted: cannot unmarshal orderer metadata in most recent block")
logger.Panicf("[channel: %s] Ledger may be corrupted:"+
"cannot unmarshal orderer metadata in most recent block", chainID)
}
return kafkaMetadata.LastOffsetPersisted
}
Expand All @@ -104,7 +105,7 @@ func getLastOffsetPersisted(metadata *cb.Metadata) int64 {
// be satisfied by both the actual and the mock object and will allow
// us to retrieve these constructors.
func newChain(consenter testableConsenter, support multichain.ConsenterSupport, lastOffsetPersisted int64) *chainImpl {
logger.Debug("Starting chain with last persisted offset:", lastOffsetPersisted)
logger.Debugf("[channel: %s] Starting chain with last persisted offset: %d", support.ChainID(), lastOffsetPersisted)
return &chainImpl{
consenter: consenter,
support: support,
Expand Down Expand Up @@ -163,18 +164,19 @@ type chainImpl struct {
func (ch *chainImpl) Start() {
// 1. Post the CONNECT message to prevent panicking that occurs
// when seeking on a partition that hasn't been created yet.
logger.Debug("Posting the CONNECT message...")
logger.Debugf("[channel: %s] Posting the CONNECT message...", ch.support.ChainID())
if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newConnectMessage())); err != nil {
logger.Criticalf("Couldn't post CONNECT message to %s: %s", ch.partition, err)
logger.Criticalf("[channel: %s] Cannot post CONNECT message: %s", ch.support.ChainID(), err)
close(ch.exitChan)
ch.halted = true
return
}
logger.Debugf("[channel: %s] CONNECT message posted successfully", ch.support.ChainID())

// 2. Set up the listener/consumer for this partition.
consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.consenter.tlsConfig(), ch.partition, ch.lastOffsetPersisted+1)
if err != nil {
logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err)
logger.Criticalf("[channel: %s] Cannot retrieve requested offset from Kafka cluster: %s", ch.support.ChainID(), err)
close(ch.exitChan)
ch.halted = true
return
Expand Down Expand Up @@ -204,7 +206,9 @@ func (ch *chainImpl) Halt() {
// This construct is useful because it allows Halt() to be
// called multiple times w/o panicking. Recal that a receive
// from a closed channel returns (the zero value) immediately.
logger.Debugf("[channel: %s] Halting of chain requested again", ch.support.ChainID())
default:
logger.Debugf("[channel: %s] Halting of chain requested", ch.support.ChainID())
close(ch.exitChan)
}
}
Expand All @@ -217,11 +221,12 @@ func (ch *chainImpl) Enqueue(env *cb.Envelope) bool {
return false
}

logger.Debug("Enqueueing:", env)
logger.Debugf("[channel: %s] Enqueueing envelope...", ch.support.ChainID())
if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newRegularMessage(utils.MarshalOrPanic(env)))); err != nil {
logger.Errorf("Couldn't post to %s: %s", ch.partition, err)
logger.Errorf("[channel: %s] cannot enqueue envelope: %s", ch.support.ChainID(), err)
return false
}
logger.Debugf("[channel: %s] Envelope enqueued successfully", ch.support.ChainID())

return !ch.halted // If ch.halted has been set to true while sending, we should return false
}
Expand All @@ -242,49 +247,52 @@ func (ch *chainImpl) loop() {
case in := <-ch.consumer.Recv():
if err := proto.Unmarshal(in.Value, msg); err != nil {
// This shouldn't happen, it should be filtered at ingress
logger.Critical("Unable to unmarshal consumed message:", err)
logger.Criticalf("[channel: %s] Unable to unmarshal consumed message:", ch.support.ChainID(), err)
}
logger.Debug("Received:", msg)
logger.Debugf("[channel: %s] Successfully unmarshalled consumed message. Inspecting type...", ch.support.ChainID())
switch msg.Type.(type) {
case *ab.KafkaMessage_Connect:
logger.Debug("It's a connect message - ignoring")
logger.Debugf("[channel: %s] It's a connect message - ignoring", ch.support.ChainID())
continue
case *ab.KafkaMessage_TimeToCut:
ttcNumber = msg.GetTimeToCut().BlockNumber
logger.Debug("It's a time-to-cut message for block", ttcNumber)
logger.Debugf("[channel: %s] It's a time-to-cut message for block %d", ch.support.ChainID(), ttcNumber)
if ttcNumber == ch.lastCutBlock+1 {
timer = nil
logger.Debug("Nil'd the timer")
logger.Debugf("[channel: %s] Nil'd the timer", ch.support.ChainID())
batch, committers := ch.support.BlockCutter().Cut()
if len(batch) == 0 {
logger.Warningf("Got right time-to-cut message (%d) but no pending requests - this might indicate a bug", ch.lastCutBlock)
logger.Infof("Consenter for chain %s exiting", ch.partition.Topic())
logger.Warningf("[channel: %s] Got right time-to-cut message (for block %d),"+
" no pending requests though; this might indicate a bug", ch.support.ChainID(), ch.lastCutBlock)
logger.Infof("[channel: %s] Consenter for channel exiting", ch.support.ChainID())
return
}
block := ch.support.CreateNextBlock(batch)
encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset})
ch.support.WriteBlock(block, committers, encodedLastOffsetPersisted)
ch.lastCutBlock++
logger.Debug("Proper time-to-cut received, just cut block", ch.lastCutBlock)
logger.Debugf("[channel: %s] Proper time-to-cut received, just cut block %d",
ch.support.ChainID(), ch.lastCutBlock)
continue
} else if ttcNumber > ch.lastCutBlock+1 {
logger.Warningf("Got larger time-to-cut message (%d) than allowed (%d) - this might indicate a bug", ttcNumber, ch.lastCutBlock+1)
logger.Infof("Consenter for chain %s exiting", ch.partition.Topic())
logger.Warningf("[channel: %s] Got larger time-to-cut message (%d) than allowed (%d)"+
" - this might indicate a bug", ch.support.ChainID(), ttcNumber, ch.lastCutBlock+1)
logger.Infof("[channel: %s] Consenter for channel exiting", ch.support.ChainID())
return
}
logger.Debug("Ignoring stale time-to-cut-message for", ch.lastCutBlock)
logger.Debugf("[channel: %s] Ignoring stale time-to-cut-message for block %d", ch.support.ChainID(), ch.lastCutBlock)
case *ab.KafkaMessage_Regular:
env := new(cb.Envelope)
if err := proto.Unmarshal(msg.GetRegular().Payload, env); err != nil {
// This shouldn't happen, it should be filtered at ingress
logger.Critical("Unable to unmarshal consumed regular message:", err)
logger.Criticalf("[channel: %s] Unable to unmarshal consumed regular message:", ch.support.ChainID(), err)
continue
}
batches, committers, ok := ch.support.BlockCutter().Ordered(env)
logger.Debugf("Ordering results: batches: %v, ok: %v", batches, ok)
logger.Debugf("[channel: %s] Ordering results: batches: %v, ok: %v", ch.support.ChainID(), batches, ok)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.batchTimeout)
logger.Debugf("Just began %s batch timer", ch.batchTimeout.String())
logger.Debugf("[channel: %s] Just began %s batch timer", ch.support.ChainID(), ch.batchTimeout.String())
continue
}
// If !ok, batches == nil, so this will be skipped
Expand All @@ -293,21 +301,21 @@ func (ch *chainImpl) loop() {
encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset})
ch.support.WriteBlock(block, committers[i], encodedLastOffsetPersisted)
ch.lastCutBlock++
logger.Debug("Batch filled, just cut block", ch.lastCutBlock)
logger.Debugf("[channel: %s] Batch filled, just cut block %d", ch.support.ChainID(), ch.lastCutBlock)
}
if len(batches) > 0 {
timer = nil
}
}
case <-timer:
logger.Debugf("Time-to-cut block %d timer expired", ch.lastCutBlock+1)
logger.Debugf("[channel: %s] Time-to-cut block %d timer expired", ch.support.ChainID(), ch.lastCutBlock+1)
timer = nil
if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock+1))); err != nil {
logger.Errorf("Couldn't post to %s: %s", ch.partition, err)
logger.Errorf("[channel: %s] Cannot post time-to-cut message: %s", ch.support.ChainID(), err)
// Do not exit
}
case <-ch.exitChan: // When Halt() is called
logger.Infof("Consenter for chain %s exiting", ch.partition.Topic())
logger.Infof("[channel: %s] Consenter for channel exiting", ch.support.ChainID())
return
}
}
Expand Down
6 changes: 3 additions & 3 deletions orderer/kafka/orderer_test.go
Expand Up @@ -593,15 +593,15 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) {

func TestGetLastOffsetPersistedEmpty(t *testing.T) {
expected := sarama.OffsetOldest - 1
actual := getLastOffsetPersisted(&cb.Metadata{})
actual := getLastOffsetPersisted(&cb.Metadata{}, "")
if actual != expected {
t.Fatalf("Expected last offset %d, got %d", expected, actual)
}
}

func TestGetLastOffsetPersistedRight(t *testing.T) {
expected := int64(100)
actual := getLastOffsetPersisted(&cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: expected})})
actual := getLastOffsetPersisted(&cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: expected})}, "")
if actual != expected {
t.Fatalf("Expected last offset %d, got %d", expected, actual)
}
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestKafkaConsenterRestart(t *testing.T) {
logger.Fatalf("Error extracting orderer metadata for chain %x: %s", cs.ChainIDVal, err)
}

lastPersistedOffset = getLastOffsetPersisted(metadata)
lastPersistedOffset = getLastOffsetPersisted(metadata, ch.support.ChainID())
nextProducedOffset = lastPersistedOffset + 1

co = mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
Expand Down
9 changes: 4 additions & 5 deletions orderer/kafka/producer.go
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package kafka

import (
"fmt"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -48,7 +47,7 @@ loop:
for {
select {
case <-panicTick.C:
panic(fmt.Errorf("Failed to create Kafka producer: %v", err))
logger.Panicf("Failed to create Kafka producer: %v", err)
case <-repeatTick.C:
logger.Debug("Connecting to Kafka cluster:", brokers)
p, err = sarama.NewSyncProducer(brokers, brokerConfig)
Expand All @@ -72,12 +71,12 @@ func (p *producerImpl) Send(cp ChainPartition, payload []byte) error {
prt, ofs, err := p.producer.SendMessage(newProducerMessage(cp, payload))
if prt != cp.Partition() {
// If this happens, something's up with the partitioner
logger.Warningf("Blob destined for partition %d, but posted to %d instead", cp.Partition(), prt)
logger.Warningf("[channel: %s] Blob destined for partition %d, but posted to %d instead", cp.Topic(), cp.Partition(), prt)
}
if err == nil {
logger.Debugf("Forwarded blob with offset number %d to chain partition %s on the Kafka cluster", ofs, cp)
logger.Debugf("[channel %s] Posted blob to the Kafka cluster (offset number: %d)", cp.Topic(), ofs)
} else {
logger.Infof("Failed to send message to chain partition %s on the Kafka cluster: %s", cp, err)
logger.Infof("[channel %s] Failed to post blob to the Kafka cluster: %s", cp.Topic(), err)
}
return err
}
2 changes: 1 addition & 1 deletion orderer/kafka/producer_mock_test.go
Expand Up @@ -87,7 +87,7 @@ func (mp *mockProducerImpl) init(cp ChainPartition, offset int64) {
// on that chain partition gives you blob #offset.
mp.testFillWithBlocks(cp, offset-1)
} else {
panic(fmt.Errorf("Out of range offset (seek number) given to producer: %d", offset))
logger.Panicf("Out of range offset (seek number) given to producer: %d", offset)
}
}

Expand Down
5 changes: 2 additions & 3 deletions orderer/kafka/util.go
Expand Up @@ -19,7 +19,6 @@ package kafka
import (
"crypto/tls"
"crypto/x509"
"fmt"
"strconv"

"github.com/Shopify/sarama"
Expand All @@ -37,13 +36,13 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int
// create public/private key pair structure
keyPair, err := tls.X509KeyPair([]byte(tlsConfig.Certificate), []byte(tlsConfig.PrivateKey))
if err != nil {
panic(fmt.Errorf("Unable to decode public/private key pair. Error: %v", err))
logger.Panicf("Unable to decode public/private key pair: %s", err)
}
// create root CA pool
rootCAs := x509.NewCertPool()
for _, certificate := range tlsConfig.RootCAs {
if !rootCAs.AppendCertsFromPEM([]byte(certificate)) {
panic(fmt.Errorf("Unable to decode certificate. Error: %v", err))
logger.Panic("Unable to parse the root certificate authority certificates (Kafka.Tls.RootCAs)")
}
}
brokerConfig.Net.TLS.Config = &tls.Config{
Expand Down
2 changes: 1 addition & 1 deletion orderer/multichain/manager.go
Expand Up @@ -104,7 +104,7 @@ func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consente
ledgerResources,
consenters,
signer)
logger.Infof("Starting with system channel: %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
ml.chains[string(chainID)] = chain
ml.systemChannelID = chainID
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
Expand Down

0 comments on commit e79ab45

Please sign in to comment.