Skip to content

Commit 95d6f1e

Browse files
committed
FAB-13265 migration status in channelconfig
Change the channelconfig of an Orderer to reflect the extension of ConsensusType: - MigrationState - MigrationContext Add a method to the bundle to validate the migartion steps of a new versus old config. Add test-cases to bundle_test.go to unit-test said method. Improved comments language. Needed to regenerate and update mocks in 'common' and 'blockcutter' packages for unit tests to build correctly. Change-Id: If060c05bcb9a0e0ca81b1f754a2b0e69a7f6c896 Signed-off-by: Yoav Tock <tock@il.ibm.com>
1 parent 1bcc20b commit 95d6f1e

File tree

9 files changed

+976
-286
lines changed

9 files changed

+976
-286
lines changed

common/channelconfig/api.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ type Orderer interface {
8686
// ConsensusMetadata returns the metadata associated with the consensus type.
8787
ConsensusMetadata() []byte
8888

89+
// ConsensusMigrationState returns the consensus-type migration state.
90+
ConsensusMigrationState() ab.ConsensusType_MigrationState
91+
92+
// ConsensusMigrationContext returns the consensus-type migration context.
93+
ConsensusMigrationContext() uint64
94+
8995
// BatchSize returns the maximum number of messages to include in a block
9096
BatchSize() *ab.BatchSize
9197

@@ -181,6 +187,9 @@ type OrdererCapabilities interface {
181187
// ExpirationCheck specifies whether the orderer checks for identity expiration checks
182188
// when validating messages
183189
ExpirationCheck() bool
190+
191+
// Kafka2RaftMigration checks whether the orderer permits a Kafka to Raft migration.
192+
Kafka2RaftMigration() bool
184193
}
185194

186195
// PolicyMapper is an interface for

common/channelconfig/bundle.go

Lines changed: 120 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/hyperledger/fabric/common/policies"
1414
"github.com/hyperledger/fabric/msp"
1515
cb "github.com/hyperledger/fabric/protos/common"
16+
ab "github.com/hyperledger/fabric/protos/orderer"
1617
"github.com/hyperledger/fabric/protos/utils"
1718
"github.com/pkg/errors"
1819
)
@@ -35,58 +36,70 @@ type Bundle struct {
3536
configtxManager configtx.Validator
3637
}
3738

38-
// PolicyManager returns the policy manager constructed for this config
39+
// PolicyManager returns the policy manager constructed for this config.
3940
func (b *Bundle) PolicyManager() policies.Manager {
4041
return b.policyManager
4142
}
4243

43-
// MSPManager returns the MSP manager constructed for this config
44+
// MSPManager returns the MSP manager constructed for this config.
4445
func (b *Bundle) MSPManager() msp.MSPManager {
4546
return b.channelConfig.MSPManager()
4647
}
4748

48-
// ChannelConfig returns the config.Channel for the chain
49+
// ChannelConfig returns the config.Channel for the chain.
4950
func (b *Bundle) ChannelConfig() Channel {
5051
return b.channelConfig
5152
}
5253

5354
// OrdererConfig returns the config.Orderer for the channel
54-
// and whether the Orderer config exists
55+
// and whether the Orderer config exists.
5556
func (b *Bundle) OrdererConfig() (Orderer, bool) {
5657
result := b.channelConfig.OrdererConfig()
5758
return result, result != nil
5859
}
5960

60-
// ConsortiumsConfig() returns the config.Consortiums for the channel
61-
// and whether the consortiums config exists
61+
// ConsortiumsConfig returns the config.Consortiums for the channel
62+
// and whether the consortiums config exists.
6263
func (b *Bundle) ConsortiumsConfig() (Consortiums, bool) {
6364
result := b.channelConfig.ConsortiumsConfig()
6465
return result, result != nil
6566
}
6667

6768
// ApplicationConfig returns the configtxapplication.SharedConfig for the channel
68-
// and whether the Application config exists
69+
// and whether the Application config exists.
6970
func (b *Bundle) ApplicationConfig() (Application, bool) {
7071
result := b.channelConfig.ApplicationConfig()
7172
return result, result != nil
7273
}
7374

74-
// ConfigtxValidator returns the configtx.Validator for the channel
75+
// ConfigtxValidator returns the configtx.Validator for the channel.
7576
func (b *Bundle) ConfigtxValidator() configtx.Validator {
7677
return b.configtxManager
7778
}
7879

7980
// ValidateNew checks if a new bundle's contained configuration is valid to be derived from the current bundle.
80-
// This allows checks of the nature "Make sure that the consensus type did not change." which is otherwise
81+
// This allows checks of the nature "Make sure that the consensus type did not change".
8182
func (b *Bundle) ValidateNew(nb Resources) error {
8283
if oc, ok := b.OrdererConfig(); ok {
8384
noc, ok := nb.OrdererConfig()
8485
if !ok {
8586
return errors.New("Current config has orderer section, but new config does not")
8687
}
8788

88-
if oc.ConsensusType() != noc.ConsensusType() {
89-
return errors.Errorf("Attempted to change consensus type from %s to %s", oc.ConsensusType(), noc.ConsensusType())
89+
// Prevent consensus-type migration when capabilities Kafka2RaftMigration is disabled
90+
if !oc.Capabilities().Kafka2RaftMigration() {
91+
if oc.ConsensusType() != noc.ConsensusType() {
92+
return errors.Errorf("Attempted to change consensus type from %s to %s",
93+
oc.ConsensusType(), noc.ConsensusType())
94+
}
95+
if noc.ConsensusMigrationState() != ab.ConsensusType_MIG_STATE_NONE || noc.ConsensusMigrationContext() != 0 {
96+
return errors.Errorf("New config has unexpected consensus-migration state or context: (%s/%d) should be (MIG_STATE_NONE/0)",
97+
noc.ConsensusMigrationState().String(), noc.ConsensusMigrationContext())
98+
}
99+
} else {
100+
if err := validateMigrationStep(oc, noc); err != nil {
101+
return err
102+
}
90103
}
91104

92105
for orgName, org := range oc.Organizations() {
@@ -238,3 +251,99 @@ func preValidate(config *cb.Config) error {
238251

239252
return nil
240253
}
254+
255+
// validateMigrationStep checks the validity of the state transitions of a possible migration step.
256+
// Since at this point we don't know whether it is a system or standard channel, we allow a wider range of options.
257+
// The migration state machine (for both types of channels) is enforced in the chain implementation.
258+
func validateMigrationStep(oc Orderer, noc Orderer) error {
259+
oldType := oc.ConsensusType()
260+
oldState := oc.ConsensusMigrationState()
261+
newType := noc.ConsensusType()
262+
newState := noc.ConsensusMigrationState()
263+
newContext := noc.ConsensusMigrationContext()
264+
265+
// The following code explicitly checks for permitted transitions; all other transitions return an error.
266+
if oldType != newType {
267+
// Consensus-type changes from Kafka to Raft in the "green" path:
268+
// - The system channel starts the migration
269+
// - A standard channel prepares the context, type change Kafka to Raft
270+
// - The system channel commits the migration, type change Kafka to Raft
271+
// Consensus-type changes from Raft to Kafka in the "abort" path:
272+
// - The system channel starts the migration
273+
// - A standard channel prepares the context, type change Kafka to Raft
274+
// - The system channel aborts the migration
275+
// - The standard channel reverts the type back, type change Raft to Kafka
276+
if oldType == "kafka" && newType == "etcdraft" {
277+
// On the system channels, this is permitted, green path commit
278+
isSysCommit := oldState == ab.ConsensusType_MIG_STATE_START && newState == ab.ConsensusType_MIG_STATE_COMMIT
279+
// On the standard channels, this is permitted, green path context
280+
isStdCtx := oldState == ab.ConsensusType_MIG_STATE_NONE && newState == ab.ConsensusType_MIG_STATE_CONTEXT
281+
if isSysCommit || isStdCtx {
282+
logger.Debugf("Kafka-to-etcdraft migration, config update, state transition: %s to %s", oldState, newState)
283+
} else {
284+
return errors.Errorf("Attempted to change consensus type from %s to %s, unexpected migration state transition: %s to %s",
285+
oldType, newType, oldState, newState)
286+
}
287+
} else if oldType == "etcdraft" && newType == "kafka" {
288+
// On the standard channels, this is permitted, abort path
289+
if oldState == ab.ConsensusType_MIG_STATE_CONTEXT && newState == ab.ConsensusType_MIG_STATE_NONE {
290+
logger.Debugf("Kafka-to-etcdraft migration, config update, state transition: %s to %s", oldState, newState)
291+
} else {
292+
return errors.Errorf("Attempted to change consensus type from %s to %s, unexpected migration state transition: %s to %s",
293+
oldType, newType, oldState, newState)
294+
}
295+
} else {
296+
return errors.Errorf("Attempted to change consensus type from %s to %s, only kafka to etcdraft is supported",
297+
oldType, newType)
298+
}
299+
} else {
300+
// On the system channel & standard channels, this is always permitted, not a migration
301+
isNotMig := oldState == ab.ConsensusType_MIG_STATE_NONE && newState == ab.ConsensusType_MIG_STATE_NONE
302+
303+
// Migration state may change when the type stays the same
304+
if oldType == "kafka" {
305+
// On the system channel, these transitions are permitted
306+
// In the "green" path: the system channel starts migration
307+
isSysStart := oldState == ab.ConsensusType_MIG_STATE_NONE && newState == ab.ConsensusType_MIG_STATE_START
308+
// In the "abort" path: the system channel aborts a migration
309+
isSysAbort := oldState == ab.ConsensusType_MIG_STATE_START && newState == ab.ConsensusType_MIG_STATE_ABORT
310+
// In the "abort" path: the system channel reconfigures after an abort, not a migration
311+
isSysNotMigAfterAbort := oldState == ab.ConsensusType_MIG_STATE_ABORT && newState == ab.ConsensusType_MIG_STATE_NONE
312+
// In the "abort" path: the system channel starts a new migration attempt after an abort
313+
isSysStartAfterAbort := oldState == ab.ConsensusType_MIG_STATE_ABORT && newState == ab.ConsensusType_MIG_STATE_START
314+
if !(isNotMig || isSysStart || isSysAbort || isSysNotMigAfterAbort || isSysStartAfterAbort) {
315+
return errors.Errorf("Consensus type %s, unexpected migration state transition: %s to %s",
316+
oldType, oldState, newState)
317+
} else if newState != ab.ConsensusType_MIG_STATE_NONE {
318+
logger.Debugf("Kafka-to-etcdraft migration, config update, state transition: %s to %s", oldState, newState)
319+
}
320+
} else if oldType == "etcdraft" {
321+
// On the system channel, this is permitted
322+
// In the "green" path: the system channel reconfigures after a successful migration
323+
isSysAfterSuccess := oldState == ab.ConsensusType_MIG_STATE_COMMIT && newState == ab.ConsensusType_MIG_STATE_NONE
324+
// On the standard channels, this is permitted
325+
// In the "green" path: a standard channel reconfigures after a successful migration
326+
isStdAfterSuccess := oldState == ab.ConsensusType_MIG_STATE_CONTEXT && newState == ab.ConsensusType_MIG_STATE_NONE
327+
if !(isNotMig || isSysAfterSuccess || isStdAfterSuccess) {
328+
return errors.Errorf("Consensus type %s, unexpected migration state transition: %s to %s",
329+
oldType, oldState.String(), newState)
330+
}
331+
}
332+
}
333+
334+
// Check for a valid range on migration context
335+
switch newState {
336+
case ab.ConsensusType_MIG_STATE_START, ab.ConsensusType_MIG_STATE_ABORT, ab.ConsensusType_MIG_STATE_NONE:
337+
if newContext != 0 {
338+
return errors.Errorf("Consensus migration state %s, unexpected migration context: %d (expected: 0)",
339+
newState, newContext)
340+
}
341+
case ab.ConsensusType_MIG_STATE_CONTEXT, ab.ConsensusType_MIG_STATE_COMMIT:
342+
if newContext <= 0 {
343+
return errors.Errorf("Consensus migration state %s, unexpected migration context: %d (expected >0)",
344+
newState, newContext)
345+
}
346+
}
347+
348+
return nil
349+
}

0 commit comments

Comments
 (0)