Skip to content

Commit

Permalink
[FAB-1524] Reinitialize chains on orderer restart
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1524

This changeset utilizes the last configuration embedded in the block
metadata to properly initialize any existing chains.

It also fixes the template based configuration transaction creation in
the utilties to omit the ChainCreators configuration item which
inappropriately flags a chain as an ordering system chain.

Change-Id: Ia998b3c5d40d6fef6dad41c28cd86627c3d4b4e9
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jan 10, 2017
1 parent 75909aa commit 141ab4c
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 87 deletions.
2 changes: 0 additions & 2 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ func main() {
}
} else {
logger.Infof("Not bootstrapping because of existing chains")
logger.Warningf("XXXXXXX RESTART IS NOT CURRENTLY SUPPORTED XXXXXXXXX")
// XXX Remove this once restart is supported
}

if conf.Kafka.Verbose {
Expand Down
12 changes: 2 additions & 10 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"

"github.com/golang/protobuf/proto"
)

type mockLedgerReadWriter struct {
Expand Down Expand Up @@ -94,17 +92,11 @@ func TestWriteLastConfiguration(t *testing.T) {
cs := &chainSupport{ledger: ml, configManager: cm}

lastConfig := func(block *cb.Block) uint64 {
md := &cb.Metadata{}
err := proto.Unmarshal(block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIGURATION], md)
if err != nil {
panic(err)
}
lc := &cb.LastConfiguration{}
err = proto.Unmarshal(md.Value, lc)
index, err := utils.GetLastConfigurationIndexFromBlock(block)
if err != nil {
panic(err)
}
return lc.Index
return index
}

expected := uint64(0)
Expand Down
58 changes: 14 additions & 44 deletions orderer/multichain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/sharedconfig"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -59,48 +59,18 @@ type multiLedger struct {
sysChain *systemChain
}

// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one
func getConfigTx(reader rawledger.Reader) *cb.Envelope {
var lastConfigTx *cb.Envelope

it, _ := reader.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
// Iterate over the blockchain, looking for config transactions, track the most recent one encountered
// this will be the transaction which is returned
for {
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
logger.Fatalf("Error parsing blockchain at startup: %v", status)
}
// ConfigTxs should always be by themselves
if len(block.Data.Data) != 1 {
continue
}

maybeConfigTx := &cb.Envelope{}

err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx)

if err != nil {
logger.Fatalf("Found data which was not an envelope: %s", err)
}

payload := &cb.Payload{}
if err = proto.Unmarshal(maybeConfigTx.Payload, payload); err != nil {
logger.Fatalf("Unable to unmarshal transaction payload: %s", err)
}

if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
continue
}

logger.Debugf("Found configuration transaction for chain %x at block %d", payload.Header.ChainHeader.ChainID, block.Header.Number)
lastConfigTx = maybeConfigTx
default:
return lastConfigTx
}
lastBlock := rawledger.GetBlock(reader, reader.Height()-1)
index, err := utils.GetLastConfigurationIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last configuration in its latest block: %s", err)
}
configBlock := rawledger.GetBlock(reader, index)
if configBlock == nil {
logger.Panicf("Configuration block does not exist")
}

return utils.ExtractEnvelopeOrPanic(configBlock, 0)
}

// NewManagerImpl produces an instance of a Manager
Expand All @@ -126,16 +96,16 @@ func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Conse

if sharedConfigManager.ChainCreators() != nil {
if ml.sysChain != nil {
logger.Fatalf("There appear to be two system chains %x and %x", ml.sysChain.support.ChainID(), chainID)
logger.Fatalf("There appear to be two system chains %s and %s", ml.sysChain.support.ChainID(), chainID)
}
logger.Debugf("Starting with system chain: %x", chainID)
logger.Debugf("Starting with system chain: %s", chainID)
chain := newChainSupport(createSystemChainFilters(ml, configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters)
ml.chains[string(chainID)] = chain
ml.sysChain = newSystemChain(chain)
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
logger.Debugf("Starting chain: %x", chainID)
logger.Debugf("Starting chain: %s", chainID)
chain := newChainSupport(createStandardFilters(configManager), configManager, policyManager, backingLedger, sharedConfigManager, consenters)
ml.chains[string(chainID)] = chain
chain.start()
Expand Down
15 changes: 10 additions & 5 deletions orderer/multichain/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func TestGetConfigTx(t *testing.T) {
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeConfigTx(provisional.TestChainID, 5)}))
ctx := makeConfigTx(provisional.TestChainID, 6)
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{ctx}))
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)}))

block := rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)})
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIGURATION] = utils.MarshalOrPanic(&cb.Metadata{Value: utils.MarshalOrPanic(&cb.LastConfiguration{Index: 7})})
rl.Append(block)

pctx := getConfigTx(rl)

Expand All @@ -104,11 +107,13 @@ func TestGetConfigTxFailure(t *testing.T) {
}))
}
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 11)}))
pctx := getConfigTx(rl)
defer func() {
if recover() == nil {
t.Fatalf("Should have panic-ed because there was no configuration tx")
}
}()
getConfigTx(rl)

if pctx != nil {
t.Fatalf("Should not have found a configuration tx")
}
}

// This test essentially brings the entire system up and is ultimately what main.go will replicate
Expand Down
26 changes: 6 additions & 20 deletions orderer/rawledger/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,6 @@ type ledgerTestFactory interface {

var testables []ledgerTestable

func getBlock(number uint64, li ReadWriter) *cb.Block {
i, _ := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: number}}})
select {
case <-i.ReadyChan():
block, status := i.Next()
if status != cb.Status_SUCCESS {
return nil
}
return block
default:
return nil
}
}

func allTest(t *testing.T, test func(ledgerTestFactory, *testing.T)) {
for _, lt := range testables {

Expand Down Expand Up @@ -83,7 +69,7 @@ func testInitialization(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 1 {
t.Fatalf("Block height should be 1")
}
block := getBlock(0, li)
block := GetBlock(li, 0)
if block == nil {
t.Fatalf("Error retrieving genesis block")
}
Expand All @@ -109,7 +95,7 @@ func testReinitialization(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
block := getBlock(1, li)
block := GetBlock(li, 1)
if block == nil {
t.Fatalf("Error retrieving block 1")
}
Expand All @@ -124,7 +110,7 @@ func TestAddition(t *testing.T) {

func testAddition(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
genesis := getBlock(0, li)
genesis := GetBlock(li, 0)
if genesis == nil {
t.Fatalf("Could not retrieve genesis block")
}
Expand All @@ -134,7 +120,7 @@ func testAddition(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
block := getBlock(1, li)
block := GetBlock(li, 1)
if block == nil {
t.Fatalf("Error retrieving genesis block")
}
Expand Down Expand Up @@ -255,7 +241,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error retrieving chain1: %s", err)
}

if b := getBlock(1, c1); !reflect.DeepEqual(c1b1, b) {
if b := GetBlock(c1, 1); !reflect.DeepEqual(c1b1, b) {
t.Fatalf("Did not properly store block 1 on chain 1:")
}

Expand All @@ -264,7 +250,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error retrieving chain2: %s", err)
}

if b := getBlock(0, c2); reflect.DeepEqual(c2b0, b) {
if b := GetBlock(c2, 0); reflect.DeepEqual(c2b0, b) {
t.Fatalf("Did not properly store block 1 on chain 1")
}
}
15 changes: 15 additions & 0 deletions orderer/rawledger/rawledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,18 @@ func CreateNextBlock(rl Reader, messages []*cb.Envelope) *cb.Block {

return block
}

// GetBlock is a utility method for retrieving a single block
func GetBlock(rl Reader, index uint64) *cb.Block {
i, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: index}}})
select {
case <-i.ReadyChan():
block, status := i.Next()
if status != cb.Status_SUCCESS {
return nil
}
return block
default:
return nil
}
}
15 changes: 15 additions & 0 deletions protos/utils/blockutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func GetChainIDFromBlock(block *cb.Block) (string, error) {
return payload.Header.ChainHeader.ChainID, nil
}

// GetLastConfigurationIndexFromBlock retrieves the index of the last configuration block as encoded in the block metadata
func GetLastConfigurationIndexFromBlock(block *cb.Block) (uint64, error) {
md := &cb.Metadata{}
err := proto.Unmarshal(block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIGURATION], md)
if err != nil {
return 0, err
}
lc := &cb.LastConfiguration{}
err = proto.Unmarshal(md.Value, lc)
if err != nil {
return 0, err
}
return lc.Index, nil
}

// GetBlockFromBlockBytes marshals the bytes into Block
func GetBlockFromBlockBytes(blockBytes []byte) (*cb.Block, error) {
block := &cb.Block{}
Expand Down
20 changes: 14 additions & 6 deletions protos/utils/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,32 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
)

const CreationPolicyKey = "CreationPolicy"
const (
CreationPolicyKey = "CreationPolicy"
ChainCreatorsKey = "ChainCreators"
)

// ChainCreationConfiguration creates a new chain creation configuration envelope from
// the supplied creationPolicy, new chainID, and a template configuration envelope
// The template configuration envelope will have the correct chainID set on all items,
// and the first item will be a CreationPolicy which is ready for the signatures as
// required by the policy
// required by the policy, it also strips out the ChainCreators item as this is invalid
// for the ordering system chain
func ChainCreationConfiguration(creationPolicy, newChainID string, template *cb.ConfigurationEnvelope) *cb.ConfigurationEnvelope {
newConfigItems := make([]*cb.SignedConfigurationItem, len(template.Items))
var newConfigItems []*cb.SignedConfigurationItem
var hashBytes []byte

for i, item := range template.Items {
for _, item := range template.Items {
configItem := UnmarshalConfigurationItemOrPanic(item.ConfigurationItem)
if configItem.Type == cb.ConfigurationItem_Orderer && configItem.Key == ChainCreatorsKey {
continue
}
configItem.Header.ChainID = newChainID
newConfigItems[i] = &cb.SignedConfigurationItem{
newConfigItem := &cb.SignedConfigurationItem{
ConfigurationItem: MarshalOrPanic(configItem),
}
hashBytes = append(hashBytes, newConfigItems[i].ConfigurationItem...)
newConfigItems = append(newConfigItems, newConfigItem)
hashBytes = append(hashBytes, newConfigItem.ConfigurationItem...)
}

digest := cu.ComputeCryptoHash(hashBytes)
Expand Down

0 comments on commit 141ab4c

Please sign in to comment.