Skip to content

Commit 8351c8c

Browse files
committed
[FAB-13180] Orderer: auto-join existing inactive chains
This change set makes cluster type OSNs autonomously detect channels that exist and that they should be part of (the channel configuration has their public credentials as a consenter for the channel), but that they do not run chains for, or have the blocks in their ledger. This can happen from several reasons: - The OSN is added to an existing chain, and since it didn't participate in the chain so far, it didn't get the blocks that tell it is now part of the channel. - The OSN tried to detect whether it is part of a channel, but it wasn't able, because all OSNs of the system channel returned service-unavailable. This can happen if: - a leader election takes place - the network is acting up so the leadership was lost - a channel has been deserted (all OSNs left it). To take care of such use cases, all OSNs now: - Track inactive chains that they know of, but they do not participate in - Periodically(*) probe the system channel OSNs to see if they are now part of these chains or not. - If so, then they replicate the chains, and create instances of them, and replace the instances of the inactive chains in the registrar with the new instances of type etcdraft. (*) - 10 seconds after boot, then after 20 seconds, then after 40 seconds, etc. etc. eventually- every 5 minutes. Change-Id: I3c2a84e6f4f402e011e7a895345b3d3982247083 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent c20365c commit 8351c8c

File tree

16 files changed

+640
-78
lines changed

16 files changed

+640
-78
lines changed

integration/e2e/etcdraft_reconfig_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,30 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
316316
assertInvoke(network, peer, o4, mycc3.Name, "testchannel3", "channel testchannel3 is not serviced by me", 1)
317317
Expect(string(orderer4Runner.Err().Contents())).To(ContainSubstring("I do not belong to channel testchannel2 or am forbidden pulling it (not in the channel), skipping chain retrieval"))
318318
Expect(string(orderer4Runner.Err().Contents())).To(ContainSubstring("I do not belong to channel testchannel3 or am forbidden pulling it (forbidden), skipping chain retrieval"))
319+
320+
By("Adding orderer4 to testchannel2")
321+
nwo.AddConsenter(network, peer, o1, "testchannel2", etcdraft.Consenter{
322+
ServerTlsCert: orderer4Certificate,
323+
ClientTlsCert: orderer4Certificate,
324+
Host: "127.0.0.1",
325+
Port: uint32(network.OrdererPort(o4, nwo.ListenPort)),
326+
})
327+
328+
By("Waiting for orderer4 and to replicate testchannel2")
329+
assertBlockReception(map[string]int{
330+
"testchannel2": 2,
331+
}, []*nwo.Orderer{o4}, peer, network)
332+
319333
By("Ensuring that all orderers don't log errors to the log")
320334
assertNoErrorsAreLogged(ordererRunners)
335+
336+
By("Submitting a transaction through orderer4")
337+
assertInvoke(network, peer, o4, mycc2.Name, "testchannel2", "Chaincode invoke successful. result: status:200", 0)
338+
339+
By("And ensuring it is propagated amongst all orderers")
340+
assertBlockReception(map[string]int{
341+
"testchannel2": 3,
342+
}, orderers, peer, network)
321343
})
322344
})
323345
})
@@ -467,14 +489,17 @@ func waitForBlockReception(o *nwo.Orderer, submitter *nwo.Peer, network *nwo.Net
467489
Eventually(func() string {
468490
sess, err := network.OrdererAdminSession(o, submitter, c)
469491
Expect(err).NotTo(HaveOccurred())
470-
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
492+
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit())
493+
if sess.ExitCode() != 0 {
494+
return fmt.Sprintf("exit code is %d: %s", sess.ExitCode(), string(sess.Err.Contents()))
495+
}
471496
sessErr := string(sess.Err.Contents())
472497
expected := fmt.Sprintf("Received block: %d", blockSeq)
473498
if strings.Contains(sessErr, expected) {
474499
return ""
475500
}
476501
return sessErr
477-
}, network.EventuallyTimeout).Should(BeEmpty())
502+
}, time.Minute, time.Second).Should(BeEmpty())
478503
}
479504

480505
func assertNoErrorsAreLogged(ordererRunners []*ginkgomon.Runner) {

orderer/common/cluster/replication.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,15 @@ type ChannelLister interface {
7777

7878
// Replicator replicates chains
7979
type Replicator struct {
80-
Filter func(string) bool
81-
SystemChannel string
82-
ChannelLister ChannelLister
83-
Logger *flogging.FabricLogger
84-
Puller *BlockPuller
85-
BootBlock *common.Block
86-
AmIPartOfChannel selfMembershipPredicate
87-
LedgerFactory LedgerFactory
80+
DoNotPanicIfClusterNotReachable bool
81+
Filter func(string) bool
82+
SystemChannel string
83+
ChannelLister ChannelLister
84+
Logger *flogging.FabricLogger
85+
Puller *BlockPuller
86+
BootBlock *common.Block
87+
AmIPartOfChannel selfMembershipPredicate
88+
LedgerFactory LedgerFactory
8889
}
8990

9091
// IsReplicationNeeded returns whether replication is needed,
@@ -112,13 +113,20 @@ func (r *Replicator) IsReplicationNeeded() (bool, error) {
112113
}
113114

114115
// ReplicateChains pulls chains and commits them.
115-
func (r *Replicator) ReplicateChains() {
116+
// Returns the names of the chains replicated successfully.
117+
func (r *Replicator) ReplicateChains() []string {
118+
var replicatedChains []string
116119
channels := r.discoverChannels()
117120
pullHints := r.channelsToPull(channels)
118121
totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull)
119122
r.Logger.Info("Found myself in", len(pullHints.channelsToPull), "channels out of", totalChannelCount, ":", pullHints)
120123
for _, channel := range pullHints.channelsToPull {
121-
r.PullChannel(channel.ChannelName)
124+
err := r.PullChannel(channel.ChannelName)
125+
if err == nil {
126+
replicatedChains = append(replicatedChains, channel.ChannelName)
127+
} else {
128+
r.Logger.Warningf("Failed pulling channel %s: %v", channel.ChannelName, err)
129+
}
122130
}
123131
// Next, just commit the genesis blocks of the channels we shouldn't pull.
124132
for _, channel := range pullHints.channelsNotToPull {
@@ -137,9 +145,10 @@ func (r *Replicator) ReplicateChains() {
137145
}
138146

139147
// Last, pull the system chain
140-
if err := r.PullChannel(r.SystemChannel); err != nil {
148+
if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped {
141149
r.Logger.Panicf("Failed pulling system channel: %v", err)
142150
}
151+
return replicatedChains
143152
}
144153

145154
func (r *Replicator) discoverChannels() []ChannelGenesisBlock {
@@ -156,7 +165,7 @@ func (r *Replicator) discoverChannels() []ChannelGenesisBlock {
156165
func (r *Replicator) PullChannel(channel string) error {
157166
if !r.Filter(channel) {
158167
r.Logger.Info("Channel", channel, "shouldn't be pulled. Skipping it")
159-
return nil
168+
return ErrSkipped
160169
}
161170
r.Logger.Info("Pulling channel", channel)
162171
puller := r.Puller.Clone()
@@ -239,7 +248,7 @@ type channelPullHints struct {
239248
}
240249

241250
func (r *Replicator) channelsToPull(channels GenesisBlocks) channelPullHints {
242-
r.Logger.Info("Will now pull channels:", channels.Names())
251+
r.Logger.Info("Will now attempt to pull channels:", channels.Names())
243252
var channelsNotToPull []ChannelGenesisBlock
244253
var channelsToPull []ChannelGenesisBlock
245254
for _, channel := range channels {
@@ -266,9 +275,12 @@ func (r *Replicator) channelsToPull(channels GenesisBlocks) channelPullHints {
266275
continue
267276
}
268277
if err != nil {
269-
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel.ChannelName, err)
278+
if !r.DoNotPanicIfClusterNotReachable {
279+
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel.ChannelName, err)
280+
}
270281
continue
271282
}
283+
r.Logger.Infof("I need to pull channel %s", channel.ChannelName)
272284
channelsToPull = append(channelsToPull, channel)
273285
}
274286
return channelPullHints{
@@ -360,6 +372,9 @@ type ChainInspector struct {
360372
LastConfigBlock *common.Block
361373
}
362374

375+
// ErrSkipped denotes that replicating a chain was skipped
376+
var ErrSkipped = errors.New("skipped")
377+
363378
// ErrForbidden denotes that an ordering node refuses sending blocks due to access control.
364379
var ErrForbidden = errors.New("forbidden")
365380

@@ -520,6 +535,7 @@ func ChannelCreationBlockToGenesisBlock(block *common.Block) (*common.Block, err
520535
return nil, err
521536
}
522537
block.Data.Data = [][]byte{payload.Data}
538+
block.Header.DataHash = block.Data.Hash()
523539
block.Header.Number = 0
524540
block.Header.PreviousHash = nil
525541
metadata := &common.BlockMetadata{

orderer/common/cluster/replication_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1399,5 +1399,5 @@ func TestFilter(t *testing.T) {
13991399
},
14001400
Logger: logger,
14011401
}
1402-
assert.Nil(t, r.PullChannel("foo"))
1402+
assert.Equal(t, cluster.ErrSkipped, r.PullChannel("foo"))
14031403
}

orderer/common/localconfig/config.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,19 @@ type General struct {
5858
}
5959

6060
type Cluster struct {
61-
ListenAddress string
62-
ListenPort uint16
63-
ServerCertificate string
64-
ServerPrivateKey string
65-
ClientCertificate string
66-
ClientPrivateKey string
67-
RootCAs []string
68-
DialTimeout time.Duration
69-
RPCTimeout time.Duration
70-
ReplicationBufferSize int
71-
ReplicationPullTimeout time.Duration
72-
ReplicationRetryTimeout time.Duration
61+
ListenAddress string
62+
ListenPort uint16
63+
ServerCertificate string
64+
ServerPrivateKey string
65+
ClientCertificate string
66+
ClientPrivateKey string
67+
RootCAs []string
68+
DialTimeout time.Duration
69+
RPCTimeout time.Duration
70+
ReplicationBufferSize int
71+
ReplicationPullTimeout time.Duration
72+
ReplicationRetryTimeout time.Duration
73+
ReplicationBackgroundRefreshInterval time.Duration
7374
}
7475

7576
// Keepalive contains configuration for gRPC servers.

orderer/common/multichannel/registrar.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package multichannel
1111

1212
import (
1313
"fmt"
14+
"reflect"
1415
"sync"
1516

1617
"github.com/hyperledger/fabric/common/channelconfig"
@@ -104,7 +105,9 @@ type Registrar struct {
104105
callbacks []func(bundle *channelconfig.Bundle)
105106
}
106107

107-
func getConfigTx(reader blockledger.Reader) *cb.Envelope {
108+
// ConfigBlock retrieves the last configuration block from the given ledger.
109+
// Panics on failure.
110+
func ConfigBlock(reader blockledger.Reader) *cb.Block {
108111
lastBlock := blockledger.GetBlock(reader, reader.Height()-1)
109112
index, err := utils.GetLastConfigIndexFromBlock(lastBlock)
110113
if err != nil {
@@ -115,7 +118,11 @@ func getConfigTx(reader blockledger.Reader) *cb.Envelope {
115118
logger.Panicf("Config block does not exist")
116119
}
117120

118-
return utils.ExtractEnvelopeOrPanic(configBlock, 0)
121+
return configBlock
122+
}
123+
124+
func configTx(reader blockledger.Reader) *cb.Envelope {
125+
return utils.ExtractEnvelopeOrPanic(ConfigBlock(reader), 0)
119126
}
120127

121128
// NewRegistrar produces an instance of a *Registrar.
@@ -140,7 +147,7 @@ func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
140147
if err != nil {
141148
logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
142149
}
143-
configTx := getConfigTx(rl)
150+
configTx := configTx(rl)
144151
if configTx == nil {
145152
logger.Panic("Programming error, configTx should never be nil here")
146153
}
@@ -275,12 +282,30 @@ func (r *Registrar) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
275282
}
276283
}
277284

285+
// CreateChain makes the Registrar create a chain with the given name.
286+
func (r *Registrar) CreateChain(chainName string) {
287+
lf, err := r.ledgerFactory.GetOrCreate(chainName)
288+
if err != nil {
289+
logger.Panicf("Failed obtaining ledger factory for %s: %v", chainName, err)
290+
}
291+
chain := r.GetChain(chainName)
292+
if chain != nil {
293+
logger.Infof("A chain of type %v for channel %s already exists. "+
294+
"Halting it.", reflect.TypeOf(chain.Chain), chainName)
295+
chain.Halt()
296+
}
297+
r.newChain(configTx(lf))
298+
}
299+
278300
func (r *Registrar) newChain(configtx *cb.Envelope) {
279301
r.lock.Lock()
280302
defer r.lock.Unlock()
281303

282304
ledgerResources := r.newLedgerResources(configtx)
283-
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
305+
// If we have no blocks, we need to create the genesis block ourselves.
306+
if ledgerResources.Height() == 0 {
307+
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
308+
}
284309

285310
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
286311
newChains := make(map[string]*ChainSupport)

orderer/common/multichannel/registrar_test.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestGetConfigTx(t *testing.T) {
8080
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = utils.MarshalOrPanic(&cb.Metadata{Value: utils.MarshalOrPanic(&cb.LastConfig{Index: 7})})
8181
rl.Append(block)
8282

83-
pctx := getConfigTx(rl)
83+
pctx := configTx(rl)
8484
assert.True(t, proto.Equal(pctx, ctx), "Did not select most recent config transaction")
8585
}
8686

@@ -94,11 +94,11 @@ func TestGetConfigTxFailure(t *testing.T) {
9494
}))
9595
}
9696
rl.Append(blockledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(genesisconfig.TestChainID, 11)}))
97-
assert.Panics(t, func() { getConfigTx(rl) }, "Should have panicked because there was no config tx")
97+
assert.Panics(t, func() { configTx(rl) }, "Should have panicked because there was no config tx")
9898

9999
block := blockledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(genesisconfig.TestChainID, 12)})
100100
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = []byte("bad metadata")
101-
assert.Panics(t, func() { getConfigTx(rl) }, "Should have panicked because of bad last config metadata")
101+
assert.Panics(t, func() { configTx(rl) }, "Should have panicked because of bad last config metadata")
102102
}
103103

104104
// This test checks to make sure the orderer refuses to come up if it cannot find a system channel
@@ -167,6 +167,41 @@ func TestManagerImpl(t *testing.T) {
167167
}
168168
}
169169

170+
func TestCreateChain(t *testing.T) {
171+
lf, _ := NewRAMLedgerAndFactory(10)
172+
173+
consenters := make(map[string]consensus.Consenter)
174+
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
175+
176+
manager := NewRegistrar(lf, mockCrypto(), &disabled.Provider{})
177+
manager.Initialize(consenters)
178+
179+
ledger, err := lf.GetOrCreate("mychannel")
180+
assert.NoError(t, err)
181+
182+
genesisBlock := encoder.New(conf).GenesisBlockForChannel("mychannel")
183+
ledger.Append(genesisBlock)
184+
185+
// Before creating the chain, it doesn't exist
186+
assert.Nil(t, manager.GetChain("mychannel"))
187+
// After creating the chain, it exists
188+
manager.CreateChain("mychannel")
189+
chain := manager.GetChain("mychannel")
190+
assert.NotNil(t, chain)
191+
// A subsequent creation, replaces the chain.
192+
manager.CreateChain("mychannel")
193+
chain2 := manager.GetChain("mychannel")
194+
assert.NotNil(t, chain2)
195+
// They are not the same
196+
assert.NotEqual(t, chain, chain2)
197+
// The old chain is halted
198+
_, ok := <-chain.Chain.(*mockChain).queue
199+
assert.False(t, ok)
200+
// The new chain is not halted: Close the channel to prove that.
201+
close(chain2.Chain.(*mockChain).queue)
202+
203+
}
204+
170205
// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
171206
func TestNewChain(t *testing.T) {
172207
expectedLastConfigBlockNumber := uint64(0)

0 commit comments

Comments
 (0)