diff --git a/orderer/common/follower/follower_chain.go b/orderer/common/follower/follower_chain.go index 1138191e666..1bc7e10dd8a 100644 --- a/orderer/common/follower/follower_chain.go +++ b/orderer/common/follower/follower_chain.go @@ -9,6 +9,7 @@ package follower import ( "bytes" "sync" + "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -60,7 +61,7 @@ type BlockPullerFactory interface { // ChainCreator defines a function that creates a new consensus.Chain for this channel, to replace the current // follower.Chain. This interface is meant to be implemented by the multichannel.Registrar. type ChainCreator interface { - SwitchFollowerToChain(chainName string) + SwitchFollowerToChain(chainName string) bool } //go:generate counterfeiter -o mocks/channel_participation_metrics_reporter.go -fake-name ChannelParticipationMetricsReporter . ChannelParticipationMetricsReporter @@ -100,6 +101,7 @@ type Chain struct { doneChan chan struct{} // The go-routine signals the 'closer' that it is done by closing this channel. consensusRelation types.ConsensusRelation status types.Status + removingExternal atomic.Int32 // External channel removal once. ledgerResources LedgerResources // ledger & config resources clusterConsenter consensus.ClusterConsenter // detects whether a block indicates channel membership @@ -224,6 +226,7 @@ func (c *Chain) Start() { // Halt signals the Chain to stop and waits for the internal go-routine to exit. func (c *Chain) Halt() { + c.removingExternal.Store(1) c.halt() <-c.doneChan } @@ -354,7 +357,22 @@ func (c *Chain) pull() error { // Trigger creation of a new consensus.Chain. c.logger.Info("Block pulling finished successfully, going to switch from follower to a consensus.Chain") c.halt() - c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID()) + + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + + // SwitchFollowerToChain is trying to take over the mutex. + // If it fails, false is returned. + // Perhaps at this moment the command "remove channel" is being executed, + // let's check it `c.removingExternal.Load() == 1`. If so, leave the loop. + // If SwitchFollowerToChain returns true, we will also leave the loop. + // In other cases, we need to repeat the loop. + for range ticker.C { + if isExec := c.chainCreator.SwitchFollowerToChain(c.ledgerResources.ChannelID()); isExec || + c.removingExternal.Load() == 1 { + break + } + } return nil } diff --git a/orderer/common/follower/follower_chain_test.go b/orderer/common/follower/follower_chain_test.go index 825a6118c3b..c9e45c580c1 100644 --- a/orderer/common/follower/follower_chain_test.go +++ b/orderer/common/follower/follower_chain_test.go @@ -195,7 +195,10 @@ func TestFollowerPullUpToJoin(t *testing.T) { wgChain = sync.WaitGroup{} wgChain.Add(1) - mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) + mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool { + wgChain.Done() + return true + }) wgChain = sync.WaitGroup{} wgChain.Add(1) @@ -559,7 +562,10 @@ func TestFollowerPullAfterJoin(t *testing.T) { } return nil }) - mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created + mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool { + wgChain.Done() + return true + }) // Stop when a new chain is created require.Equal(t, joinNum+1, localBlockchain.Height()) chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, nil, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter) @@ -604,7 +610,10 @@ func TestFollowerPullAfterJoin(t *testing.T) { } return nil }) - mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created + mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool { + wgChain.Done() + return true + }) // Stop when a new chain is created require.Equal(t, joinNum+1, localBlockchain.Height()) failPull := 10 @@ -794,7 +803,10 @@ func TestFollowerPullPastJoin(t *testing.T) { } return nil }) - mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created + mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool { + wgChain.Done() + return true + }) // Stop when a new chain is created require.Equal(t, uint64(6), localBlockchain.Height()) chain, err := follower.NewChain(ledgerResources, mockClusterConsenter, joinBlockAppRaft, options, pullerFactory, mockChainCreator, cryptoProvider, mockChannelParticipationMetricsReporter) @@ -839,7 +851,10 @@ func TestFollowerPullPastJoin(t *testing.T) { } return nil }) - mockChainCreator.SwitchFollowerToChainCalls(func(_ string) { wgChain.Done() }) // Stop when a new chain is created + mockChainCreator.SwitchFollowerToChainCalls(func(_ string) bool { + wgChain.Done() + return true + }) // Stop when a new chain is created require.Equal(t, uint64(0), localBlockchain.Height()) failPull := 10 @@ -935,7 +950,7 @@ func (mbc *memoryBlockChain) fill(numBlocks uint64) { defer mbc.lock.Unlock() height := uint64(len(mbc.chain)) - prevHash := []byte{} + var prevHash []byte for i := height; i < height+numBlocks; i++ { if i > 0 { diff --git a/orderer/common/follower/mocks/chain_creator.go b/orderer/common/follower/mocks/chain_creator.go index 9d3b28a560c..63aa73d0b52 100644 --- a/orderer/common/follower/mocks/chain_creator.go +++ b/orderer/common/follower/mocks/chain_creator.go @@ -8,26 +8,37 @@ import ( ) type ChainCreator struct { - SwitchFollowerToChainStub func(string) + SwitchFollowerToChainStub func(string) bool switchFollowerToChainMutex sync.RWMutex switchFollowerToChainArgsForCall []struct { arg1 string } + switchFollowerToChainReturns struct { + result1 bool + } + switchFollowerToChainReturnsOnCall map[int]struct { + result1 bool + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *ChainCreator) SwitchFollowerToChain(arg1 string) { +func (fake *ChainCreator) SwitchFollowerToChain(arg1 string) bool { fake.switchFollowerToChainMutex.Lock() + ret, specificReturn := fake.switchFollowerToChainReturnsOnCall[len(fake.switchFollowerToChainArgsForCall)] fake.switchFollowerToChainArgsForCall = append(fake.switchFollowerToChainArgsForCall, struct { arg1 string }{arg1}) - stub := fake.SwitchFollowerToChainStub fake.recordInvocation("SwitchFollowerToChain", []interface{}{arg1}) fake.switchFollowerToChainMutex.Unlock() - if stub != nil { - fake.SwitchFollowerToChainStub(arg1) + if fake.SwitchFollowerToChainStub != nil { + return fake.SwitchFollowerToChainStub(arg1) + } + if specificReturn { + return ret.result1 } + fakeReturns := fake.switchFollowerToChainReturns + return fakeReturns.result1 } func (fake *ChainCreator) SwitchFollowerToChainCallCount() int { @@ -36,7 +47,7 @@ func (fake *ChainCreator) SwitchFollowerToChainCallCount() int { return len(fake.switchFollowerToChainArgsForCall) } -func (fake *ChainCreator) SwitchFollowerToChainCalls(stub func(string)) { +func (fake *ChainCreator) SwitchFollowerToChainCalls(stub func(string) bool) { fake.switchFollowerToChainMutex.Lock() defer fake.switchFollowerToChainMutex.Unlock() fake.SwitchFollowerToChainStub = stub @@ -49,6 +60,29 @@ func (fake *ChainCreator) SwitchFollowerToChainArgsForCall(i int) string { return argsForCall.arg1 } +func (fake *ChainCreator) SwitchFollowerToChainReturns(result1 bool) { + fake.switchFollowerToChainMutex.Lock() + defer fake.switchFollowerToChainMutex.Unlock() + fake.SwitchFollowerToChainStub = nil + fake.switchFollowerToChainReturns = struct { + result1 bool + }{result1} +} + +func (fake *ChainCreator) SwitchFollowerToChainReturnsOnCall(i int, result1 bool) { + fake.switchFollowerToChainMutex.Lock() + defer fake.switchFollowerToChainMutex.Unlock() + fake.SwitchFollowerToChainStub = nil + if fake.switchFollowerToChainReturnsOnCall == nil { + fake.switchFollowerToChainReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.switchFollowerToChainReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *ChainCreator) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() diff --git a/orderer/common/multichannel/registrar.go b/orderer/common/multichannel/registrar.go index 61f10d44236..92f0fe78206 100644 --- a/orderer/common/multichannel/registrar.go +++ b/orderer/common/multichannel/registrar.go @@ -488,8 +488,10 @@ func (r *Registrar) createNewChain(configtx *cb.Envelope) *ChainSupport { // SwitchFollowerToChain creates a consensus.Chain from the tip of the ledger, and removes the follower. // It is called when a follower detects a config block that indicates cluster membership and halts, transferring // execution to the consensus.Chain. -func (r *Registrar) SwitchFollowerToChain(channelID string) { - r.lock.Lock() +func (r *Registrar) SwitchFollowerToChain(channelID string) bool { + if !r.lock.TryLock() { + return false + } defer r.lock.Unlock() lf, err := r.ledgerFactory.GetOrCreate(channelID) @@ -504,11 +506,13 @@ func (r *Registrar) SwitchFollowerToChain(channelID string) { delete(r.followers, channelID) logger.Debugf("Removed follower for channel %s", channelID) cs := r.createNewChain(configTx(lf)) - if err := r.removeJoinBlock(channelID); err != nil { + if err = r.removeJoinBlock(channelID); err != nil { logger.Panicf("Failed removing join-block for channel: %s: %v", channelID, err) } cs.start() logger.Infof("Created and started channel %s", cs.ChannelID()) + + return true } // SwitchChainToFollower creates a follower.Chain from the tip of the ledger and removes the consensus.Chain.