Skip to content

Commit

Permalink
[FAB-4408] Add retry logic to Chain.Start steps
Browse files Browse the repository at this point in the history
The following steps take place during the Start call on a Chain object:

1. Creation of Kafka producer.
2. Posting of no-op CONNECT message to partition that corresponds to
channel.
3. Creation of Kafka consumer.

All of these steps need to succeed before we can have an OSN that can
write to and read from a channel.

This changeset:

1. Introduces a `retryProcess` concept, where a given function is
executed until it succeeds, following a short/long retry logic.
2. Turns each of the steps above into a `retryProcess`.
3. Moves all of the logic under the `Start` method into a goroutine so
as to minimize blocking; the expectation of the `multichain.Manager` is
that `Chain.Start` returns quickly.
4. Modifies `Enqueue` so that it returns false (which results in a
SERVICE_UNAVAILABLE response to the Broadcast call) when all retriable
steps above haven't completed successfully.
5. Makes the orderer panic if LongRetryTotal elapses.

This changeset also introduces some unit test changes, beyoned the ones
warranted by the production path changes described above, the main one
being the closing of some client objects that were leaking.

This changeset is part of fixing FAB-4136.

Change-Id: I2198e020affa56a877ba61a928aae6a45707524d
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Jun 11, 2017
1 parent cd44fba commit 5dd2e33
Show file tree
Hide file tree
Showing 7 changed files with 513 additions and 265 deletions.
16 changes: 9 additions & 7 deletions bddtests/features/bootstrap.feature
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ Feature: Bootstrap

And the user "dev0Org0" using cert alias "consortium1-cert" broadcasts ConfigUpdate Tx "configUpdateTx1" to orderer "<orderer0>" to create channel "com.acme.blockchain.jdoe.Channel1"

# Sleep as the local orderer ledger needs to create the block that corresponds to the start number of the seek request
And I wait "<BroadcastWaitTime>" seconds
# Sleep as the local orderer needs to bring up the resources that correspond to the new channel
# For the Kafka orderer, this includes setting up a producer and consumer for the channel's partition
# Requesting a deliver earlier may result in a SERVICE_UNAVAILABLE response and a connection drop
And I wait "<ChannelJoinDelay>" seconds

When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "<orderer0>"
And user "dev0Org0" sends deliver a seek request on orderer "<orderer0>" with properties:
Expand Down Expand Up @@ -342,8 +344,8 @@ Feature: Bootstrap
# TODO: Once events are working, consider listen event listener as well.

Examples: Orderer Options
| ComposeFile | SystemUpWaitTime | ConsensusType | BroadcastWaitTime | orderer0 | orderer1 | orderer2 |Orderer Specific Info|
| dc-base.yml | 0 | solo | 2 | orderer0 | orderer0 | orderer0 | |
# | dc-base.yml dc-peer-couchdb.yml | 10 | solo | 2 | orderer0 | orderer0 | orderer0 | |
# | dc-base.yml dc-orderer-kafka.yml | 30 | kafka | 7 | orderer0 | orderer1 | orderer2 | |
# | dc-base.yml dc-peer-couchdb.yml dc-orderer-kafka.yml | 30 | kafka | 7 | orderer0 | orderer1 | orderer2 | |
| ComposeFile | SystemUpWaitTime | ConsensusType | ChannelJoinDelay | BroadcastWaitTime | orderer0 | orderer1 | orderer2 |Orderer Specific Info|
| dc-base.yml | 0 | solo | 2 | 2 | orderer0 | orderer0 | orderer0 | |
# | dc-base.yml dc-peer-couchdb.yml | 10 | solo | 2 | 2 | orderer0 | orderer0 | orderer0 | |
# | dc-base.yml dc-orderer-kafka.yml | 40 | kafka | 10 | 5 | orderer0 | orderer1 | orderer2 | |
# | dc-base.yml dc-peer-couchdb.yml dc-orderer-kafka.yml | 40 | kafka | 10 | 5 | orderer0 | orderer1 | orderer2 | |
1 change: 0 additions & 1 deletion orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
}

if !support.Enqueue(msg) {
logger.Infof("Consenter instructed us to shut down")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

Expand Down
163 changes: 92 additions & 71 deletions orderer/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func newChain(consenter commonConsenter, support multichain.ConsenterSupport, la
channel: newChannel(support.ChainID(), defaultPartition),
lastOffsetPersisted: lastOffsetPersisted,
lastCutBlockNumber: lastCutBlockNumber,
halted: false, // Redundant as the default value for booleans is false but added for readability
started: false, // Redundant as the default value for booleans is false but added for readability
startChan: make(chan struct{}),
halted: false,
exitChan: make(chan struct{}),
}, nil
}
Expand All @@ -62,10 +64,12 @@ type chainImpl struct {
parentConsumer sarama.Consumer
channelConsumer sarama.PartitionConsumer

halted bool // For the Enqueue() calls
exitChan chan struct{} // For the Chain's Halt() method
// Set the flag to true and close the channel when the retriable steps in `Start` have completed successfully
started bool
startChan chan struct{}

startCompleted bool // For testing
halted bool
exitChan chan struct{}
}

// Errored currently only closes on halt
Expand All @@ -76,47 +80,51 @@ func (chain *chainImpl) Errored() <-chan struct{} {
// Start allocates the necessary resources for staying up to date with this
// Chain. Implements the multichain.Chain interface. Called by
// multichain.NewManagerImpl() which is invoked when the ordering process is
// launched, before the call to NewServer().
// launched, before the call to NewServer(). Launches a goroutine so as not to
// block the multichain.Manager.
func (chain *chainImpl) Start() {
go startThread(chain)
}

// Called by Start().
func startThread(chain *chainImpl) {
var err error

// Set up the producer
chain.producer, err = setupProducerForChannel(chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel, chain.consenter.retryOptions())
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.exitChan, chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
if err != nil {
logger.Criticalf("[channel: %s] Cannot set up producer = %s", chain.channel.topic(), err)
close(chain.exitChan)
chain.halted = true
return
logger.Panicf("[channel: %s] Cannot set up producer = %s", chain.channel.topic(), err)
}
logger.Infof("[channel: %s] Producer set up successfully", chain.support.ChainID())

// Have the producer post the CONNECT message
if err = sendConnectMessage(chain.producer, chain.channel); err != nil {
logger.Criticalf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
close(chain.exitChan)
chain.halted = true
chain.producer.Close()
return
if err = sendConnectMessage(chain.consenter.retryOptions(), chain.exitChan, chain.producer, chain.channel); err != nil {
logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
}
logger.Infof("[channel: %s] CONNECT message posted successfully", chain.channel.topic())

// Set up the consumer
chain.parentConsumer, chain.channelConsumer, err = setupConsumerForChannel(chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel, chain.lastOffsetPersisted+1)
// Set up the parent consumer
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.exitChan, chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
if err != nil {
logger.Criticalf("[channel: %s] Cannot set up consumer = %s", chain.channel.topic(), err)
close(chain.exitChan)
chain.halted = true
chain.producer.Close()
return
logger.Panicf("[channel: %s] Cannot set up parent consumer = %s", chain.channel.topic(), err)
}
logger.Infof("[channel: %s] Parent consumer set up successfully", chain.channel.topic())

// Set up the channel consumer
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.exitChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
if err != nil {
logger.Panicf("[channel: %s] Cannot set up channel consumer = %s", chain.channel.topic(), err)
}
logger.Infof("[channel: %s] Consumer set up successfully", chain.channel.topic())
logger.Infof("[channel: %s] Channel consumer set up successfully", chain.channel.topic())

chain.started = true
close(chain.startChan)

go listenForErrors(chain.channelConsumer.Errors(), chain.exitChan)

// Keep up to date with the channel
go processMessagesToBlock(chain.support, chain.producer, chain.parentConsumer, chain.channelConsumer,
processMessagesToBlock(chain.support, chain.producer, chain.parentConsumer, chain.channelConsumer,
chain.channel, &chain.lastCutBlockNumber, &chain.halted, &chain.exitChan)

chain.startCompleted = true
}

// Halt frees the resources which were allocated for this Chain. Implements the
Expand All @@ -130,15 +138,21 @@ func (chain *chainImpl) Halt() {
logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID())
default:
logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID())
chain.halted = true
close(chain.exitChan)
}
}

// Enqueue accepts a message and returns true on acceptance, or false on
// shutdown. Implements the multichain.Chain interface. Called by Broadcast.
func (chain *chainImpl) Enqueue(env *cb.Envelope) bool {
if !chain.started {
logger.Warningf("[channel: %s] Will not enqueue because the chain hasn't completed its initialization yet", chain.support.ChainID())
return false
}

if chain.halted {
logger.Warningf("[channel: %s] Will not enqueue cause the chain has been halted", chain.support.ChainID())
logger.Warningf("[channel: %s] Will not enqueue because the chain has been halted", chain.support.ChainID())
return false
}

Expand Down Expand Up @@ -382,15 +396,38 @@ func processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, support multichain.C
return nil
}

// Post a CONNECT message to the channel. This prevents the panicking that would
// occur if we were to set up a consumer and seek on a partition that hadn't
// been written to yet.
func sendConnectMessage(producer sarama.SyncProducer, channel channel) error {
logger.Infof("[channel: %s] Posting the CONNECT message...", channel.topic())
// Sets up the partition consumer for a channel using the given retry options.
func setupChannelConsumerForChannel(retryOptions localconfig.Retry, exitChan chan struct{}, parentConsumer sarama.Consumer, channel channel, startFrom int64) (sarama.PartitionConsumer, error) {
var err error
var channelConsumer sarama.PartitionConsumer

logger.Infof("[channel: %s] Setting up the channel consumer for this channel...", channel.topic())

retryMsg := "Connecting to the Kafka cluster"
setupChannelConsumer := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error {
channelConsumer, err = parentConsumer.ConsumePartition(channel.topic(), channel.partition(), startFrom)
return err
})

return channelConsumer, setupChannelConsumer.retry()
}

// Post a CONNECT message to the channel using the given retry options. This
// prevents the panicking that would occur if we were to set up a consumer and
// seek on a partition that hadn't been written to yet.
func sendConnectMessage(retryOptions localconfig.Retry, exitChan chan struct{}, producer sarama.SyncProducer, channel channel) error {
logger.Infof("[channel: %s] About to post the CONNECT message...", channel.topic())

payload := utils.MarshalOrPanic(newConnectMessage())
message := newProducerMessage(channel, payload)
_, _, err := producer.SendMessage(message)
return err

retryMsg := "Attempting to post the CONNECT message..."
postConnect := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error {
_, _, err := producer.SendMessage(message)
return err
})

return postConnect.retry()
}

func sendTimeToCut(producer sarama.SyncProducer, channel channel, timeToCutBlockNumber uint64, timer *<-chan time.Time) error {
Expand All @@ -402,50 +439,34 @@ func sendTimeToCut(producer sarama.SyncProducer, channel channel, timeToCutBlock
return err
}

// Sets up the listener/consumer for a channel.
func setupConsumerForChannel(brokers []string, brokerConfig *sarama.Config, channel channel, startFrom int64) (sarama.Consumer, sarama.PartitionConsumer, error) {
logger.Infof("[channel: %s] Setting up the consumer for this channel...", channel.topic())
// Sets up the parent consumer for a channel using the given retry options.
func setupParentConsumerForChannel(retryOptions localconfig.Retry, exitChan chan struct{}, brokers []string, brokerConfig *sarama.Config, channel channel) (sarama.Consumer, error) {
var err error
var parentConsumer sarama.Consumer

parentConsumer, err := sarama.NewConsumer(brokers, brokerConfig)
if err != nil {
return nil, nil, err
}
logger.Debugf("[channel: %s] Created new parent consumer", channel.topic())
logger.Infof("[channel: %s] Setting up the parent consumer for this channel...", channel.topic())

channelConsumer, err := parentConsumer.ConsumePartition(channel.topic(), channel.partition(), startFrom)
if err != nil {
_ = parentConsumer.Close()
return nil, nil, err
}
logger.Debugf("[channel: %s] Created new channel consumer", channel.topic())
retryMsg := "Connecting to the Kafka cluster"
setupParentConsumer := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error {
parentConsumer, err = sarama.NewConsumer(brokers, brokerConfig)
return err
})

return parentConsumer, channelConsumer, nil
return parentConsumer, setupParentConsumer.retry()
}

// Sets up the writer/producer for a channel.
func setupProducerForChannel(brokers []string, brokerConfig *sarama.Config, channel channel, retryOptions localconfig.Retry) (sarama.SyncProducer, error) {
// Sets up the writer/producer for a channel using the given retry options.
func setupProducerForChannel(retryOptions localconfig.Retry, exitChan chan struct{}, brokers []string, brokerConfig *sarama.Config, channel channel) (sarama.SyncProducer, error) {
var err error
var producer sarama.SyncProducer

// This will be revised in: https://jira.hyperledger.org/browse/FAB-4136
repeatTick := time.NewTicker(retryOptions.ShortInterval)
panicTick := time.NewTicker(retryOptions.ShortTotal)
logger.Debugf("[channel: %s] Retrying every %s for a total of %s", channel.topic(), retryOptions.ShortInterval.String(), retryOptions.ShortTotal.String())
defer repeatTick.Stop()
defer panicTick.Stop()
logger.Infof("[channel: %s] Setting up the producer for this channel...", channel.topic())

loop:
for {
select {
case <-panicTick.C:
return nil, err
case <-repeatTick.C:
logger.Debugf("[channel: %s] Connecting to Kafka cluster: %s", channel.topic(), brokers)
if producer, err = sarama.NewSyncProducer(brokers, brokerConfig); err == nil {
break loop
}
}
}
retryMsg := "Connecting to the Kafka cluster"
setupProducer := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error {
producer, err = sarama.NewSyncProducer(brokers, brokerConfig)
return err
})

return producer, err
return producer, setupProducer.retry()
}
Loading

0 comments on commit 5dd2e33

Please sign in to comment.