Skip to content

Commit

Permalink
FAB-14699 Kafka2Raft renames & skeletons
Browse files Browse the repository at this point in the history
Rename structs & vars in migration packge, in response to
review comments.

Add skeleton function signatures in preparation for for
abort & recovery. Update comments.

No functional change.

Change-Id: I1d36b1011b16821ce45b78bbd96d9a180d95c292
Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm committed Mar 19, 2019
1 parent c97b9bf commit a0a9189
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 150 deletions.
2 changes: 1 addition & 1 deletion orderer/common/multichannel/registrar.go
Expand Up @@ -308,7 +308,7 @@ func (r *Registrar) ConsensusMigrationCommit() error {
}

// ConsensusMigrationAbort checks pre-conditions and aborts the consensus-type migration.
func (r *Registrar) ConsensusMigrationAbort() (err error) {
func (r *Registrar) ConsensusMigrationAbort() error {
//TODO implement the consensus-type migration abort path
return fmt.Errorf("Not implemented yet")
}
Expand Down
2 changes: 1 addition & 1 deletion orderer/common/multichannel/util_test.go
Expand Up @@ -30,7 +30,7 @@ func (mc *mockConsenter) HandleChain(support consensus.ConsenterSupport, metadat
support: support,
metadata: metadata,
done: make(chan struct{}),
migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()),
migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/consensus/etcdraft/chain.go
Expand Up @@ -267,7 +267,7 @@ func NewChain(
},
logger: lg,
opts: opts,
migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
}

// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
Expand Down
2 changes: 1 addition & 1 deletion orderer/consensus/inactive/inactive_chain.go
Expand Up @@ -45,5 +45,5 @@ func (c *Chain) Halt() {
}

func (c *Chain) MigrationStatus() migration.Status {
return &migration.StatusImpl{}
return migration.NewManager(false, "inactive")
}
37 changes: 19 additions & 18 deletions orderer/consensus/kafka/chain.go
Expand Up @@ -79,7 +79,7 @@ func newChain(
haltChan: make(chan struct{}),
startChan: make(chan struct{}),
doneReprocessingMsgInFlight: doneReprocessingMsgInFlight,
migrationStatusStepper: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()),
migrationManager: migration.NewManager(support.IsSystemChannel(), support.ChainID()),
}, nil
}

Expand Down Expand Up @@ -128,12 +128,12 @@ type chainImpl struct {

// provides access to the consensus-type migration status of the chain,
// and allows stepping through the state machine.
migrationStatusStepper migration.StatusStepper
migrationManager migration.Manager
}

// MigrationStatus provides access to the consensus-type migration status of the chain.
func (chain *chainImpl) MigrationStatus() migration.Status {
return chain.migrationStatusStepper
return chain.migrationManager
}

// Errored returns a channel which will close when a partition consumer error
Expand Down Expand Up @@ -226,8 +226,9 @@ func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
}

func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
// During consensus-type migration: stop all normal txs on the system-channel and standard-channels.
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
// During consensus-type migration: stop all normal txs on the system-channel and standard-channels. This
// happens in the broadcast-phase, and will prevent new transactions from entering Kafka.
if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() {
return fmt.Errorf("[channel: %s] cannot enqueue, consensus-type migration pending", chain.ChainID())
}

Expand All @@ -248,7 +249,7 @@ func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error {

func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, originalOffset int64) error {
// During consensus-type migration, stop channel creation
if chain.ConsenterSupport.IsSystemChannel() && chain.migrationStatusStepper.IsPending() {
if chain.ConsenterSupport.IsSystemChannel() && chain.migrationManager.IsPending() {
ordererTx, err := isOrdererTx(config)
if err != nil {
err = errors.Wrap(err, "cannot determine if config-tx is of type ORDERER_TX, on system channel")
Expand All @@ -272,7 +273,7 @@ func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, origina
return nil
}

// enqueue accepts a message and returns true on acceptance, or false otheriwse.
// enqueue accepts a message and returns true on acceptance, or false otherwise.
func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
select {
Expand Down Expand Up @@ -855,8 +856,8 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
offset = chain.lastOriginalOffsetProcessed
}

// During consensus-type migration, drop normal messages that managed to sneak in past Order, possibly from other orderers
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
// During consensus-type migration, drop normal messages on the channel.
if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() {
logger.Warningf("[channel: %s] Normal message is dropped, consensus-type migration pending", chain.ChainID())
return nil
}
Expand Down Expand Up @@ -935,7 +936,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
if doCommit {
commitConfigMsg(env, offset)
} else {
logger.Infof("[channel: %s] Dropping config message with offset %d, because of consensus-type migration step", chain.ChainID(), receivedOffset)
logger.Warningf("[channel: %s] Dropping config message with offset %d, because of consensus-type migration step", chain.ChainID(), receivedOffset)
}

default:
Expand Down Expand Up @@ -968,10 +969,10 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock
switch chdr.Type {

case int32(cb.HeaderType_ORDERER_TRANSACTION):
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() {
commitBlock = false
logger.Debugf("[channel: %s] Consensus-type migration: Dropping ORDERER_TRANSACTION because consensus-type migration pending; Status: %s",
chain.ChainID(), chain.migrationStatusStepper)
chain.ChainID(), chain.migrationManager)
} else {
commitBlock = true
}
Expand Down Expand Up @@ -999,8 +1000,8 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock
logger.Infof("[channel: %s] Consensus-type migration: Processing config tx: type: %s, state: %s, context: %d",
chain.ChainID(), nextConsensusType, nextMigState.String(), nextMigContext)

commitMigration := false // Prevent shadowing of commitBlock
commitBlock, commitMigration = chain.migrationStatusStepper.Step( // Evaluate the migration state machine
commitMigration := false // Prevent shadowing of commitBlock
commitBlock, commitMigration = chain.migrationManager.Step( // Evaluate the migration state machine
chain.ChainID(), nextConsensusType, nextMigState, nextMigContext, chain.lastCutBlockNumber, chain.consenter.migrationController())
logger.Debugf("[channel: %s] Consensus-type migration: commitBlock=%v, commitMigration=%v", chain.ChainID(), commitBlock, commitMigration)

Expand All @@ -1017,15 +1018,15 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock
block := chain.CreateNextBlock([]*cb.Envelope{configTx})
replacer := file.NewReplacer(chain.consenter.bootstrapFile())
if err = replacer.ReplaceGenesisBlockFile(block); err != nil {
_, context := chain.migrationStatusStepper.StateContext()
chain.migrationStatusStepper.SetStateContext(ab.ConsensusType_MIG_STATE_START, context) //Undo the commit
_, context := chain.migrationManager.StateContext()
chain.migrationManager.SetStateContext(ab.ConsensusType_MIG_STATE_START, context) //Undo the commit
logger.Warningf("[channel: %s] Consensus-type migration: Reject Config tx on system channel, cannot replace bootstrap file; Status: %s",
chain.ChainID(), chain.migrationStatusStepper.String())
chain.ChainID(), chain.migrationManager.String())
return false, err
}

logger.Infof("[channel: %s] Consensus-type migration: committed; Replaced bootstrap file: %s; Status: %s",
chain.ChainID(), chain.consenter.bootstrapFile(), chain.migrationStatusStepper.String())
chain.ChainID(), chain.consenter.bootstrapFile(), chain.migrationManager.String())
}

default:
Expand Down
4 changes: 2 additions & 2 deletions orderer/consensus/kafka/chain_test.go
Expand Up @@ -2581,7 +2581,7 @@ func TestResubmission(t *testing.T) {
errorChan: errorChan,
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
migrationStatusStepper: migration.NewStatusStepper(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
migrationManager: migration.NewManager(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
}

var counts []uint64
Expand Down Expand Up @@ -2771,7 +2771,7 @@ func TestResubmission(t *testing.T) {
haltChan: haltChan,
doneProcessingMessagesToBlocks: make(chan struct{}),
doneReprocessingMsgInFlight: doneReprocessing,
migrationStatusStepper: migration.NewStatusStepper(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
migrationManager: migration.NewManager(mockSupport.IsSystemChannel(), mockSupport.ChainID()),
}

var counts []uint64
Expand Down

0 comments on commit a0a9189

Please sign in to comment.