Skip to content

Commit

Permalink
Do not block on Broadcast responses
Browse files Browse the repository at this point in the history
FAB-839

Added per-connection buffered channel of size QueueSize
for responses.

Added unit test TestBroadcastResponseQueueOverflow.

Change-Id: I317d127f74dcc8115201f8c127e83230d9d13a58
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Nov 14, 2016
1 parent 82e72f4 commit 7442b12
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 13 deletions.
43 changes: 36 additions & 7 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/hyperledger/fabric/orderer/config"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand Down Expand Up @@ -112,7 +114,7 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
case msg := <-b.batchChan:
data, err := proto.Marshal(msg)
if err != nil {
logger.Fatalf("Error marshaling what should be a valid proto message: %s", err)
panic(fmt.Errorf("Error marshaling what should be a valid proto message: %s", err))
}
b.messages = append(b.messages, data)
if len(b.messages) >= int(maxSize) {
Expand All @@ -136,7 +138,9 @@ func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
}

func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error {
reply := new(ab.BroadcastResponse)
context, cancel := context.WithCancel(context.Background())
defer cancel()
bsr := newBroadcastSessionResponder(context, stream, b.config.General.QueueSize)
for {
msg, err := stream.Recv()
if err != nil {
Expand All @@ -145,12 +149,37 @@ func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer
}

b.batchChan <- msg
reply.Status = cb.Status_SUCCESS // TODO This shouldn't always be a success
bsr.reply(cb.Status_SUCCESS) // TODO This shouldn't always be a success

if err := stream.Send(reply); err != nil {
logger.Info("Cannot send broadcast reply to client")
return err
}
}

type broadcastSessionResponder struct {
queue chan *ab.BroadcastResponse
}

func newBroadcastSessionResponder(context context.Context, stream ab.AtomicBroadcast_BroadcastServer, queueSize uint) *broadcastSessionResponder {
bsr := &broadcastSessionResponder{
queue: make(chan *ab.BroadcastResponse, queueSize),
}
go bsr.sendReplies(context, stream)
return bsr
}

func (bsr *broadcastSessionResponder) reply(status cb.Status) {
bsr.queue <- &ab.BroadcastResponse{Status: status}
}

func (bsr *broadcastSessionResponder) sendReplies(context context.Context, stream ab.AtomicBroadcast_BroadcastServer) {
for {
select {
case reply := <-bsr.queue:
if err := stream.Send(reply); err != nil {
logger.Info("Cannot send broadcast reply to client")
}
logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
case <-context.Done():
return
}
logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
}
}
2 changes: 1 addition & 1 deletion orderer/kafka/broadcast_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk ch
mb := &broadcasterImpl{
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *cb.Envelope, conf.General.BatchSize),
batchChan: make(chan *cb.Envelope, batchChanSize),
messages: [][]byte{[]byte("checkpoint")},
nextNumber: uint64(seek),
}
Expand Down
47 changes: 47 additions & 0 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,53 @@ func TestBroadcastBatch(t *testing.T) {
}
}

// If the capacity of the response queue is less than the batch size,
// then if the response queue overflows, the order should not be able
// to send back a block to the client. (Sending replies and adding
// messages to the about-to-be-sent block happens on the same routine.)
func TestBroadcastResponseQueueOverflow(t *testing.T) {

// Make sure that the response queue is less than the batch size
originalQueueSize := testConf.General.QueueSize
defer func() { testConf.General.QueueSize = originalQueueSize }()
testConf.General.QueueSize = testConf.General.BatchSize - 1

disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Force the response queue to overflow by blocking the broadcast stream's Send() method
mbs.closed = true
defer func() { mbs.closed = false }()

// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
mbs.incoming <- &cb.Envelope{Payload: []byte("message " + strconv.Itoa(i))}
}
}()

loop:
for {
select {
case <-mbs.outgoing:
t.Fatal("Client shouldn't have received anything from the orderer")
case <-time.After(testConf.General.BatchTimeout + timePadding):
break loop // This is the success path
}
}
}

func TestBroadcastIncompleteBatch(t *testing.T) {
if testConf.General.BatchSize <= 1 {
t.Skip("Skipping test as it requires a batchsize > 1")
Expand Down
9 changes: 5 additions & 4 deletions orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
)

var (
brokerID = int32(0)
oldestOffset = int64(100) // The oldest block available on the broker
newestOffset = int64(1100) // The offset that will be assigned to the next block
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
brokerID = int32(0)
oldestOffset = int64(100) // The oldest block available on the broker
newestOffset = int64(1100) // The offset that will be assigned to the next block
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle
batchChanSize = 1000 // Size of batch channel (eventually sync with FAB-821)

// Amount of time to wait for block processing when doing time-based tests
// We generally want this value to be as small as possible so as to make tests execute faster
Expand Down
2 changes: 2 additions & 0 deletions orderer/kafka/orderer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (mbs *mockBroadcastStream) Recv() (*cb.Envelope, error) {
}

func (mbs *mockBroadcastStream) Send(reply *ab.BroadcastResponse) error {
for mbs.closed {
}
if !mbs.closed {
mbs.outgoing <- reply
}
Expand Down
1 change: 0 additions & 1 deletion orderer/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ General:
BatchSize: 10

# Queue Size: The maximum number of messages to allow pending from a gRPC client
# When Kafka is chosen as the OrdererType, this option is ignored.
QueueSize: 10

# Max Window Size: The maximum number of messages to for the orderer Deliver
Expand Down

0 comments on commit 7442b12

Please sign in to comment.