Skip to content

Commit

Permalink
[FAB-4431] Orderer multichain api consenter errors
Browse files Browse the repository at this point in the history
The orderer multichain consenter API currently does not provide a
mechanism to provide feedback for a channel when the consenter is out of
sync or is otherwise not producing data for that channel.

This CR adds an Errored() method to the common consenter Chain
interface, and consumes that API via the Deliver common code.

It provides only a skeleton implementation for the consenter
implementatinos (Kafka and Solo).  The Kafka implementation will provide
a more robust implementation of the API in the near future.

Change-Id: I22863f2a8b37932a5ecbd5241372e2a5c512571d
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jun 8, 2017
1 parent dee53d0 commit 2590cce
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 2 deletions.
19 changes: 18 additions & 1 deletion orderer/common/deliver/deliver.go
Expand Up @@ -50,6 +50,9 @@ type Support interface {

// Reader returns the chain Reader for the chain
Reader() ledger.Reader

// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}
}

type deliverServer struct {
Expand Down Expand Up @@ -103,6 +106,15 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("Rejecting deliver request because of consenter error")
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:

}

sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
Expand Down Expand Up @@ -140,7 +152,12 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {

for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
<-cursor.ReadyChan()
select {
case <-erroredChan:
logger.Warningf("Aborting deliver request because of consenter error")
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
Expand Down
64 changes: 64 additions & 0 deletions orderer/common/deliver/deliver_test.go
Expand Up @@ -112,6 +112,11 @@ func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) {
type mockSupport struct {
ledger ledger.ReadWriter
policyManager *mockpolicies.Manager
erroredChan chan struct{}
}

func (mcs *mockSupport) Errored() <-chan struct{} {
return mcs.erroredChan
}

func (mcs *mockSupport) PolicyManager() policies.Manager {
Expand Down Expand Up @@ -147,6 +152,7 @@ func newMockMultichainManager() *mockSupportManager {
mm.chains[systemChainID] = &mockSupport{
ledger: rl,
policyManager: &mockpolicies.Manager{Policy: &mockpolicies.Policy{}},
erroredChan: make(chan struct{}),
}
return mm
}
Expand Down Expand Up @@ -382,6 +388,64 @@ func TestBlockingSeek(t *testing.T) {
}
}

func TestErroredSeek(t *testing.T) {
mm := newMockMultichainManager()
ms := mm.chains[systemChainID]
l := ms.ledger
close(ms.erroredChan)
for i := 1; i < ledgerSize; i++ {
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)

go ds.Handle(m)

m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})

select {
case deliverReply := <-m.sendChan:
assert.Equal(t, cb.Status_SERVICE_UNAVAILABLE, deliverReply.GetStatus(), "Mock support errored")
case <-time.After(time.Second):
t.Fatalf("Timed out waiting for error response")
}
}

func TestErroredBlockingSeek(t *testing.T) {
mm := newMockMultichainManager()
ms := mm.chains[systemChainID]
l := ms.ledger
for i := 1; i < ledgerSize; i++ {
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)

go ds.Handle(m)

m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})

select {
case deliverReply := <-m.sendChan:
assert.NotNil(t, deliverReply.GetBlock(), "Expected first block")
case <-time.After(time.Second):
t.Fatalf("Timed out waiting to get first block")
}

close(ms.erroredChan)

select {
case deliverReply := <-m.sendChan:
assert.Equal(t, cb.Status_SERVICE_UNAVAILABLE, deliverReply.GetStatus(), "Mock support errored")
case <-time.After(time.Second):
t.Fatalf("Timed out waiting for error response")
}
}

func TestSGracefulShutdown(t *testing.T) {
m := newMockD()
ds := NewHandlerImpl(nil)
Expand Down
5 changes: 5 additions & 0 deletions orderer/kafka/chain.go
Expand Up @@ -68,6 +68,11 @@ type chainImpl struct {
startCompleted bool // For testing
}

// Errored currently only closes on halt
func (chain *chainImpl) Errored() <-chan struct{} {
return chain.exitChan
}

// Start allocates the necessary resources for staying up to date with this
// Chain. Implements the multichain.Chain interface. Called by
// multichain.NewManagerImpl() which is invoked when the ordering process is
Expand Down
14 changes: 13 additions & 1 deletion orderer/multichain/chainsupport.go
Expand Up @@ -50,9 +50,14 @@ type Consenter interface {
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
// Enqueue accepts a message and returns true on acceptance, or false on failure
Enqueue(env *cb.Envelope) bool

// Errored returns a channel which will close when an error has occurred
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date
Errored() <-chan struct{}

// Start should allocate whatever resources are needed for staying up to date with the chain
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger
Expand Down Expand Up @@ -84,6 +89,9 @@ type ChainSupport interface {
// Reader returns the chain Reader for the chain
Reader() ledger.Reader

// Errored returns whether the backing consenter has errored
Errored() <-chan struct{}

broadcast.Support
ConsenterSupport

Expand Down Expand Up @@ -205,6 +213,10 @@ func (cs *chainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
}

func (cs *chainSupport) Errored() <-chan struct{} {
return cs.chain.Errored()
}

func (cs *chainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
return ledger.CreateNextBlock(cs.ledger, messages)
}
Expand Down
4 changes: 4 additions & 0 deletions orderer/multichain/util_test.go
Expand Up @@ -48,6 +48,10 @@ type mockChain struct {
done chan struct{}
}

func (mch *mockChain) Errored() <-chan struct{} {
return nil
}

func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
mch.queue <- env
return true
Expand Down
5 changes: 5 additions & 0 deletions orderer/solo/consensus.go
Expand Up @@ -79,6 +79,11 @@ func (ch *chain) Enqueue(env *cb.Envelope) bool {
}
}

// Errored only closes on exit
func (ch *chain) Errored() <-chan struct{} {
return ch.exitChan
}

func (ch *chain) main() {
var timer <-chan time.Time

Expand Down

0 comments on commit 2590cce

Please sign in to comment.