Skip to content

Commit

Permalink
Cleanups and refactoring of payloads buffer
Browse files Browse the repository at this point in the history
Removed redundant methonds to keep API cleaner.

Change-Id: I9610062f82bb5a2a95ac9e2656fe6b2284017307
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Nov 15, 2016
1 parent 2bed988 commit 5981d37
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 132 deletions.
51 changes: 16 additions & 35 deletions gossip/state/payloads_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package state

import (
"fmt"
"github.com/hyperledger/fabric/gossip/proto"
"strconv"
"sync"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/proto"
"github.com/op/go-logging"
)

// PayloadsBuffer is used to store payloads into which used to
Expand All @@ -41,35 +43,35 @@ type PayloadsBuffer interface {
// Get current buffer size
Size() int

// Minimum available seq number
MinAvail() (uint64, error)

// Channel to indicate event when new payload pushed with sequence
// number equal to the next expected value.
Ready() chan struct{}

Close()
}

// PayloadsBufferImpl structure to implement PayloadsBuffer interface
// store inner state of available payloads and sequence numbers
type PayloadsBufferImpl struct {
buf map[uint64]*proto.Payload

minQueue []uint64
buf map[uint64]*proto.Payload

next uint64
next uint64

readyChan chan struct{}

mutex sync.RWMutex
mutex sync.RWMutex

logger *logging.Logger
}

// NewPayloadsBuffer is factory function to create new payloads buffer
func NewPayloadsBuffer(next uint64) PayloadsBuffer {
logger, _ := logging.GetLogger("GossipStateProvider")
return &PayloadsBufferImpl{
buf: make(map[uint64]*proto.Payload),
minQueue: make([]uint64, 0),
readyChan: make(chan struct{}),
readyChan: make(chan struct{}, 0),
next: next,
logger: logger,
}
}

Expand All @@ -96,18 +98,6 @@ func (b *PayloadsBufferImpl) Push(payload *proto.Payload) error {

b.buf[seqNum] = payload

lenMinQueue := len(b.minQueue)
if lenMinQueue == 0 {
// New element to insert
b.minQueue = append(b.minQueue, seqNum)
} else {
if b.minQueue[lenMinQueue - 1] > seqNum {
// in case new sequence number is lower than
// available one add it to the queue
b.minQueue = append(b.minQueue, seqNum)
}
}

// Send notification that next sequence has arrived
if seqNum == b.next {
// Do not block execution of current routine
Expand Down Expand Up @@ -135,7 +125,6 @@ func (b *PayloadsBufferImpl) Pop() *proto.Payload {
if result != nil {
// If there is such sequence in the buffer need to delete it
delete(b.buf, b.Next())
b.minQueue = b.minQueue[:len(b.minQueue) - 1]
// Increment next expect block index
atomic.AddUint64(&b.next, 1)
}
Expand All @@ -149,15 +138,7 @@ func (b *PayloadsBufferImpl) Size() int {
return len(b.buf)
}

// MinAvail returns minimum available payload sequence number, if no payloads
// within buffer results with error "Empty buffer".
func (b *PayloadsBufferImpl) MinAvail() (uint64, error) {
b.mutex.Lock()
defer b.mutex.Unlock()

if len(b.buf) == 0 {
return ^uint64(0), fmt.Errorf("Empty buffer")
}

return b.minQueue[len(b.minQueue) - 1], nil
// Close cleanups resources and channels in maintained
func (b *PayloadsBufferImpl) Close() {
close(b.readyChan)
}
108 changes: 11 additions & 97 deletions gossip/state/payloads_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package state
import (
"crypto/rand"
"fmt"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/stretchr/testify/assert"
"testing"
"time"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/hyperledger/fabric/gossip/proto"
"github.com/stretchr/testify/assert"
)

func uuid() (string, error) {
Expand All @@ -33,9 +34,9 @@ func uuid() (string, error) {
if err != nil {
return "", err
}
uuid[8] = uuid[8] &^ 0xc0 | 0x80
uuid[8] = uuid[8]&^0xc0 | 0x80

uuid[6] = uuid[6] &^ 0xf0 | 0x40
uuid[6] = uuid[6]&^0xf0 | 0x40
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}

Expand Down Expand Up @@ -102,7 +103,7 @@ func TestPayloadsBufferImpl_Ready(t *testing.T) {
fin <- struct{}{}
}()

time.AfterFunc(100 * time.Millisecond, func() {
time.AfterFunc(100*time.Millisecond, func() {
payload, err := randomPayloadWithSeqNum(1)

if err != nil {
Expand All @@ -120,85 +121,6 @@ func TestPayloadsBufferImpl_Ready(t *testing.T) {
}
}

func TestPayloadsBufferImpl_MinAvail(t *testing.T) {
buffer := NewPayloadsBuffer(1)

assert.Equal(t, buffer.Next(), uint64(1))

// Buffer is empty no messages expected,
// hence no min shoyld be value available
_, err := buffer.MinAvail()
assert.Error(t, err)

pushNewRandomPayload(t, buffer, 10)

min, err := buffer.MinAvail()
assert.NoError(t, err)
assert.Equal(t, min, uint64(10))

pushNewRandomPayload(t, buffer, 17)

// Presence of payload w/ sequence number 17 should not affect the minimum available block
min, err = buffer.MinAvail()
assert.NoError(t, err)
assert.Equal(t, min, uint64(10))

// Add new block w/ lower sequence number
pushNewRandomPayload(t, buffer, 6)

min, err = buffer.MinAvail()
assert.NoError(t, err)
// New sequence number now should be the minimum
assert.Equal(t, min, uint64(6))
}

func TestPayloadsBufferImpl_MinAvail2(t *testing.T) {
buffer := NewPayloadsBuffer(1)

assert.Equal(t, buffer.Next(), uint64(1))

_, err := buffer.MinAvail()
assert.Error(t, err)

pushNewRandomPayload(t, buffer, 3)
min, err := buffer.MinAvail()
assert.NoError(t, err)
assert.Equal(t, min, uint64(3))

pushNewRandomPayload(t, buffer, 1)
min, err = buffer.MinAvail()
assert.NoError(t, err)
assert.Equal(t, min, uint64(1))

done := sync.WaitGroup{}
done.Add(1)

go func() {
select {
case <-buffer.Ready():
{
// Once payload is ready extract it
assert.Equal(t, buffer.Next(), uint64(1))
payload := buffer.Pop()
assert.Equal(t, payload.SeqNum, uint64(1))

// Next min sequence number has to be 3
min, err = buffer.MinAvail()
assert.NoError(t, err)
assert.Equal(t, min, uint64(3))
}
case <-time.After(500 * time.Millisecond):
{
t.Fatalf("Expected to receive notification with next payload")
}
}
done.Done()
}()

// Wait to make sure that payload was extracted
done.Wait()
}

// Test to push several concurrent blocks into the buffer
// with same sequence number, only one expected to succeed
func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
Expand All @@ -220,7 +142,7 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
payload, err := randomPayloadWithSeqNum(nextSeqNum)
assert.NoError(t, err)

errors := make([]error, 0)
var errors []error

ready := int32(0)
go func() {
Expand All @@ -235,7 +157,7 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
startWG.Wait()
errors = append(errors, buffer.Push(payload))
finishWG.Done()
}();
}()
}
startWG.Done()
finishWG.Wait()
Expand All @@ -245,7 +167,7 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
// Only one push attempt expected to succeed
for _, err := range errors {
if err == nil {
success ++
success++
}
}

Expand All @@ -254,11 +176,3 @@ func TestPayloadsBufferImpl_ConcurrentPush(t *testing.T) {
// Buffer size has to be only one
assert.Equal(t, 1, buffer.Size())
}

func pushNewRandomPayload(t *testing.T, b PayloadsBuffer, seqNum uint64) {
// Add new block w/ lower sequence number
payload, err := randomPayloadWithSeqNum(seqNum);
assert.NoError(t, err)
err = b.Push(payload)
assert.NoError(t, err)
}

0 comments on commit 5981d37

Please sign in to comment.