Skip to content

Commit

Permalink
Apply QueueSize on a per-client basis.
Browse files Browse the repository at this point in the history
FAB-507 https://jira.hyperledger.org/browse/FAB-507

The implementation of the solo orderer has a send queue of
size QueueSize that is shared amoungst all clients. This
changeset provides each client with it's own send queue of
size QueueSize.

Change-Id: I2ecb690f484ba1e531c99c3f5e6ff046239cb0a0
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Oct 12, 2016
1 parent 801f1e4 commit 5659656
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 6 deletions.
50 changes: 46 additions & 4 deletions orderer/solo/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
)

type broadcastServer struct {
queue chan *ab.BroadcastMessage
queueSize int
batchSize int
batchTimeout time.Duration
rl rawledger.Writer
sendChan chan *ab.BroadcastMessage
exitChan chan struct{}
}

Expand All @@ -39,10 +40,11 @@ func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl

func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer) *broadcastServer {
bs := &broadcastServer{
queue: make(chan *ab.BroadcastMessage, queueSize),
queueSize: queueSize,
batchSize: batchSize,
batchTimeout: batchTimeout,
rl: rl,
sendChan: make(chan *ab.BroadcastMessage),
exitChan: make(chan struct{}),
}
return bs
Expand All @@ -59,7 +61,7 @@ outer:
timer := time.After(bs.batchTimeout)
for {
select {
case msg := <-bs.queue:
case msg := <-bs.sendChan:
curBatch = append(curBatch, msg)
if len(curBatch) < bs.batchSize {
continue
Expand All @@ -83,6 +85,38 @@ outer:
}

func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
b := newBroadcaster(bs)
defer close(b.queue)
go b.drainQueue()
return b.queueBroadcastMessages(srv)
}

type broadcaster struct {
bs *broadcastServer
queue chan *ab.BroadcastMessage
}

func (b *broadcaster) drainQueue() {
for {
select {
case msg, ok := <-b.queue:
if ok {
select {
case b.bs.sendChan <- msg:
case <-b.bs.exitChan:
return
}
} else {
return
}
case <-b.bs.exitChan:
return
}
}
}

func (b *broadcaster) queueBroadcastMessages(srv ab.AtomicBroadcast_BroadcastServer) error {

for {
msg, err := srv.Recv()
if err != nil {
Expand All @@ -97,7 +131,7 @@ func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServe
}

select {
case bs.queue <- msg:
case b.queue <- msg:
err = srv.Send(&ab.BroadcastResponse{ab.Status_SUCCESS})
default:
err = srv.Send(&ab.BroadcastResponse{ab.Status_SERVICE_UNAVAILABLE})
Expand All @@ -108,3 +142,11 @@ func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServe
}
}
}

func newBroadcaster(bs *broadcastServer) *broadcaster {
b := &broadcaster{
bs: bs,
queue: make(chan *ab.BroadcastMessage, bs.queueSize),
}
return b
}
38 changes: 36 additions & 2 deletions orderer/solo/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ func (m *mockB) Recv() (*ab.BroadcastMessage, error) {
func TestQueueOverflow(t *testing.T) {
bs := newPlainBroadcastServer(2, 1, time.Second, nil) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused)
m := newMockB()
go bs.handleBroadcast(m)
b := newBroadcaster(bs)
go b.queueBroadcastMessages(m)
defer close(m.recvChan)

bs.halt()

for i := 0; i < 2; i++ {
m.recvChan <- &ab.BroadcastMessage{[]byte("Some bytes")}
reply := <-m.sendChan
Expand All @@ -73,6 +76,37 @@ func TestQueueOverflow(t *testing.T) {
if reply.Status != ab.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
}

}

func TestMultiQueueOverflow(t *testing.T) {
bs := newPlainBroadcastServer(2, 1, time.Second, nil) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused)
// m := newMockB()
ms := []*mockB{newMockB(), newMockB(), newMockB()}

for _, m := range ms {
b := newBroadcaster(bs)
go b.queueBroadcastMessages(m)
defer close(m.recvChan)
}

for _, m := range ms {
for i := 0; i < 2; i++ {
m.recvChan <- &ab.BroadcastMessage{[]byte("Some bytes")}
reply := <-m.sendChan
if reply.Status != ab.Status_SUCCESS {
t.Fatalf("Should have successfully queued the message")
}
}
}

for _, m := range ms {
m.recvChan <- &ab.BroadcastMessage{[]byte("Some bytes")}
reply := <-m.sendChan
if reply.Status != ab.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
}
}
}

func TestEmptyBroadcastMessage(t *testing.T) {
Expand Down Expand Up @@ -103,7 +137,7 @@ func TestFilledBatch(t *testing.T) {
defer bs.halt()
messages := 11 // Sending 11 messages, with a batch size of 2, ensures the 10th message is processed before we proceed for 5 blocks
for i := 0; i < messages; i++ {
bs.queue <- &ab.BroadcastMessage{[]byte("Some bytes")}
bs.sendChan <- &ab.BroadcastMessage{[]byte("Some bytes")}
}
expected := uint64(1 + messages/batchSize)
if bs.rl.(rawledger.Reader).Height() != expected {
Expand Down

0 comments on commit 5659656

Please sign in to comment.