Skip to content

Commit

Permalink
FAB-12060 payload buf don't signal ready if empty
Browse files Browse the repository at this point in the history
While there are inteleaving push's and pop's into payload buffer, there
are a lot of go routines spinned off to report "readyness" while it
might be the case where buffer is already empty. This commit takes care
of this issue by properly handling signaling logic prevent to have
multiple ready messages at a time also if after pop operation buffer is
empty it drains the channel so it won't report ready if empty.

Added test reported in bug description.

FAB-12060 - #done

Change-Id: Idae7a5290f024a436798bdb16abdb65614b95eb6
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Sep 22, 2018
1 parent b7aeb21 commit 47ba9b9
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 6 deletions.
28 changes: 22 additions & 6 deletions gossip/state/payloads_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type PayloadsBufferImpl struct {
func NewPayloadsBuffer(next uint64) PayloadsBuffer {
return &PayloadsBufferImpl{
buf: make(map[uint64]*proto.Payload),
readyChan: make(chan struct{}, 0),
readyChan: make(chan struct{}, 1),
next: next,
logger: util.GetLogger(util.LoggingStateModule, ""),
}
Expand Down Expand Up @@ -87,11 +87,8 @@ func (b *PayloadsBufferImpl) Push(payload *proto.Payload) {
b.buf[seqNum] = payload

// Send notification that next sequence has arrived
if seqNum == b.next {
// Do not block execution of current routine
go func() {
b.readyChan <- struct{}{}
}()
if seqNum == b.next && len(b.readyChan) == 0 {
b.readyChan <- struct{}{}
}
}

Expand All @@ -114,10 +111,29 @@ func (b *PayloadsBufferImpl) Pop() *proto.Payload {
delete(b.buf, b.Next())
// Increment next expect block index
atomic.AddUint64(&b.next, 1)

b.drainReadChannel()

}

return result
}

// drainReadChannel empties ready channel in case last
// payload has been poped up and there are still awaiting
// notifications in the channel
func (b *PayloadsBufferImpl) drainReadChannel() {
if len(b.buf) == 0 {
for {
if len(b.readyChan) > 0 {
<-b.readyChan
} else {
break
}
}
}
}

// Size returns current number of payloads stored within buffer
func (b *PayloadsBufferImpl) Size() int {
b.mutex.RLock()
Expand Down
116 changes: 116 additions & 0 deletions gossip/state/payloads_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,119 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
// Buffer size has to be only one
assert.Equal(t, 1, buffer.Size())
}

// Tests the scenario where payload pushes and pops are interleaved after a Ready() signal.
func TestPayloadsBufferImpl_Interleave(t *testing.T) {
buffer := NewPayloadsBuffer(1)
assert.Equal(t, buffer.Next(), uint64(1))

//
// First two sequences arrives and the buffer is emptied without interleave.
//
// This is also an example of the produce/consumer pattern in Fabric.
// Producer:
//
// Payloads are pushed into the buffer. These payloads can be out of order.
// When the buffer has a sequence of payloads ready (in order), it fires a signal
// on it's Ready() channel.
//
// The consumer waits for the signal and then drains all ready payloads.

payload, err := randomPayloadWithSeqNum(1)
assert.NoError(t, err, "generating random payload failed")
buffer.Push(payload)

payload, err = randomPayloadWithSeqNum(2)
assert.NoError(t, err, "generating random payload failed")
buffer.Push(payload)

select {
case <-buffer.Ready():
case <-time.After(500 * time.Millisecond):
t.Error("buffer wasn't ready after 500 ms for first sequence")
}

// The consumer empties the buffer.
for payload := buffer.Pop(); payload != nil; payload = buffer.Pop() {
}

// The buffer isn't ready since no new sequences have come since emptying the buffer.
select {
case <-buffer.Ready():
t.Error("buffer should not be ready as no new sequences have come")
case <-time.After(500 * time.Millisecond):
}

//
// Next sequences are incoming at the same time the buffer is being emptied by the consumer.
//
payload, err = randomPayloadWithSeqNum(3)
assert.NoError(t, err, "generating random payload failed")
buffer.Push(payload)

select {
case <-buffer.Ready():
case <-time.After(500 * time.Millisecond):
t.Error("buffer wasn't ready after 500 ms for second sequence")
}
payload = buffer.Pop()
assert.NotNil(t, payload, "payload should not be nil")

// ... Block processing now happens on sequence 3.

// In the mean time, sequence 4 is pushed into the queue.
payload, err = randomPayloadWithSeqNum(4)
assert.NoError(t, err, "generating random payload failed")
buffer.Push(payload)

// ... Block processing completes on sequence 3, the consumer loop grabs the next one (4).
payload = buffer.Pop()
assert.NotNil(t, payload, "payload should not be nil")

// In the mean time, sequence 5 is pushed into the queue.
payload, err = randomPayloadWithSeqNum(5)
assert.NoError(t, err, "generating random payload failed")
buffer.Push(payload)

// ... Block processing completes on sequence 4, the consumer loop grabs the next one (5).
payload = buffer.Pop()
assert.NotNil(t, payload, "payload should not be nil")

//
// Now we see that goroutines are building up due to the interleaved push and pops above.
//
select {
case <-buffer.Ready():
//
// Should be error - no payloads are ready
//
t.Log("buffer ready (1) -- should be error")
t.Fail()
case <-time.After(500 * time.Millisecond):
t.Log("buffer not ready (1)")
}
payload = buffer.Pop()
t.Logf("payload: %v", payload)
assert.Nil(t, payload, "payload should be nil")

select {
case <-buffer.Ready():
//
// Should be error - no payloads are ready
//
t.Log("buffer ready (2) -- should be error")
t.Fail()
case <-time.After(500 * time.Millisecond):
t.Log("buffer not ready (2)")
}
payload = buffer.Pop()
assert.Nil(t, payload, "payload should be nil")
t.Logf("payload: %v", payload)

select {
case <-buffer.Ready():
t.Error("buffer ready (3)")
case <-time.After(500 * time.Millisecond):
t.Log("buffer not ready (3) -- good")
}
}

0 comments on commit 47ba9b9

Please sign in to comment.