Skip to content

Commit

Permalink
Gossip messageStore- move to separate package
Browse files Browse the repository at this point in the history
This is part of FAB-872 Gossip MultiChannel support
I want the channel handling to be in a different package
than gossip, so I need this one to be also outside
of gossip otherwise I'll have a dependency cycle.

Change-Id: Ibed52855ab5709c9ef2e5fb87dbfd402e366c08a
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Dec 25, 2016
1 parent 43f4e57 commit bafb37f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 38 deletions.
11 changes: 6 additions & 5 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
"github.com/hyperledger/fabric/gossip/gossip/pull"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/proto"
Expand Down Expand Up @@ -55,7 +56,7 @@ type gossipServiceImpl struct {
conf *Config
toDieChan chan struct{}
stopFlag int32
msgStore messageStore
msgStore msgstore.MessageStore
emitter batchingEmitter
goRoutines []uint64
discAdapter *discoveryAdapter
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService
Endpoint: conf.SelfEndpoint, PKIid: g.comm.GetPKIid(), Metadata: []byte{},
}, g.discAdapter, g.disSecAdap)

g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) {
g.msgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) {
g.blocksPuller.Remove(m.(*proto.GossipMessage))
})

Expand Down Expand Up @@ -145,7 +146,7 @@ func (g *gossipServiceImpl) createBlockPuller() pull.Mediator {
g.logger.Warning("Invalid DataMessage:", dataMsg)
return
}
added := g.msgStore.add(msg)
added := g.msgStore.Add(msg)
// if we can't add the message to the msgStore,
// no point in disseminating it to others...
if !added {
Expand Down Expand Up @@ -283,7 +284,7 @@ func (g *gossipServiceImpl) handleMessage(m comm.ReceivedMessage) {
}
}

added := g.msgStore.add(msg)
added := g.msgStore.Add(msg)
if added {
g.emitter.Add(msg)
if dataMsg := m.GetGossipMessage().GetDataMsg(); dataMsg != nil {
Expand Down Expand Up @@ -340,7 +341,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) {
func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) {
g.logger.Info(msg)
if dataMsg := msg.GetDataMsg(); dataMsg != nil {
g.msgStore.add(msg)
g.msgStore.Add(msg)
g.blocksPuller.Add(msg)
}
g.emitter.Add(msg)
Expand Down
20 changes: 10 additions & 10 deletions gossip/gossip/msgs.go → gossip/gossip/msgstore/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip
package msgstore

import (
"sync"
Expand All @@ -27,26 +27,26 @@ import (
// then the invalidation trigger on 0 was called when 1 was added.
type invalidationTrigger func(message interface{})

func newMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) messageStore {
func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore {
return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger}
}

// messageStore adds messages to an internal buffer.
// MessageStore adds messages to an internal buffer.
// When a message is received, it might:
// - Be added to the buffer
// - Discarded because of some message already in the buffer (invalidated)
// - Make a message already in the buffer to be discarded (invalidates)
// When a message is invalidated, the invalidationTrigger is invoked on that message.
type messageStore interface {
type MessageStore interface {
// add adds a message to the store
// returns true or false whether the message was added to the store
add(msg interface{}) bool
Add(msg interface{}) bool

// size returns the amount of messages in the store
size() int
Size() int

// get returns all messages in the store
get() []interface{}
Get() []interface{}
}

type messageStoreImpl struct {
Expand All @@ -61,7 +61,7 @@ type msg struct {
}

// add adds a message to the store
func (s *messageStoreImpl) add(message interface{}) bool {
func (s *messageStoreImpl) Add(message interface{}) bool {
s.lock.Lock()
defer s.lock.Unlock()

Expand All @@ -87,14 +87,14 @@ func (s *messageStoreImpl) add(message interface{}) bool {
}

// size returns the amount of messages in the store
func (s *messageStoreImpl) size() int {
func (s *messageStoreImpl) Size() int {
s.lock.RLock()
defer s.lock.RUnlock()
return len(s.messages)
}

// get returns all messages in the store
func (s *messageStoreImpl) get() []interface{} {
func (s *messageStoreImpl) Get() []interface{} {
s.lock.RLock()
defer s.lock.RUnlock()

Expand Down
46 changes: 23 additions & 23 deletions gossip/gossip/msgs_test.go → gossip/gossip/msgstore/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip
package msgstore

import (
"math/rand"
Expand Down Expand Up @@ -52,24 +52,24 @@ func compareInts(this interface{}, that interface{}) common.InvalidationResult {
}

func TestSize(t *testing.T) {
msgStore := newMessageStore(alwaysNoAction, noopTrigger)
msgStore.add(0)
msgStore.add(1)
msgStore.add(2)
assert.Equal(t, 3, msgStore.size())
msgStore := NewMessageStore(alwaysNoAction, noopTrigger)
msgStore.Add(0)
msgStore.Add(1)
msgStore.Add(2)
assert.Equal(t, 3, msgStore.Size())
}

func TestNewMessagesInvalidates(t *testing.T) {
invalidated := make([]int, 9)
msgStore := newMessageStore(compareInts, func(m interface{}) {
msgStore := NewMessageStore(compareInts, func(m interface{}) {
invalidated = append(invalidated, m.(int))
})
assert.True(t, msgStore.add(0))
assert.True(t, msgStore.Add(0))
for i := 1; i < 10; i++ {
assert.True(t, msgStore.add(i))
assert.True(t, msgStore.Add(i))
assert.Equal(t, i-1, invalidated[len(invalidated)-1])
assert.Equal(t, 1, msgStore.size())
assert.Equal(t, i, msgStore.get()[0].(int))
assert.Equal(t, 1, msgStore.Size())
assert.Equal(t, i, msgStore.Get()[0].(int))
}
}

Expand All @@ -83,33 +83,33 @@ func TestMessagesGet(t *testing.T) {
return false
}

msgStore := newMessageStore(alwaysNoAction, noopTrigger)
msgStore := NewMessageStore(alwaysNoAction, noopTrigger)
expected := []int{}
for i := 0; i < 2; i++ {
n := rand.Int()
expected = append(expected, n)
msgStore.add(n)
msgStore.Add(n)
}

for _, num2Search := range expected {
assert.True(t, contains(msgStore.get(), num2Search), "Value %v not found in array", num2Search)
assert.True(t, contains(msgStore.Get(), num2Search), "Value %v not found in array", num2Search)
}

}

func TestNewMessagesInvalidated(t *testing.T) {
msgStore := newMessageStore(compareInts, noopTrigger)
assert.True(t, msgStore.add(10))
msgStore := NewMessageStore(compareInts, noopTrigger)
assert.True(t, msgStore.Add(10))
for i := 9; i >= 0; i-- {
assert.False(t, msgStore.add(i))
assert.Equal(t, 1, msgStore.size())
assert.Equal(t, 10, msgStore.get()[0].(int))
assert.False(t, msgStore.Add(i))
assert.Equal(t, 1, msgStore.Size())
assert.Equal(t, 10, msgStore.Get()[0].(int))
}
}

func TestConcurrency(t *testing.T) {
stopFlag := int32(0)
msgStore := newMessageStore(compareInts, noopTrigger)
msgStore := NewMessageStore(compareInts, noopTrigger)
looper := func(f func()) func() {
return func() {
for {
Expand All @@ -122,15 +122,15 @@ func TestConcurrency(t *testing.T) {
}

addProcess := looper(func() {
msgStore.add(rand.Int())
msgStore.Add(rand.Int())
})

getProcess := looper(func() {
msgStore.get()
msgStore.Get()
})

sizeProcess := looper(func() {
msgStore.size()
msgStore.Size()
})

go addProcess()
Expand Down

0 comments on commit bafb37f

Please sign in to comment.