From bafb37fd065e4d073a28811f49ea0b079fd661e9 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Fri, 23 Dec 2016 21:48:04 +0200 Subject: [PATCH] Gossip messageStore- move to separate package This is part of FAB-872 Gossip MultiChannel support I want the channel handling to be in a different package than gossip, so I need this one to be also outside of gossip otherwise I'll have a dependency cycle. Change-Id: Ibed52855ab5709c9ef2e5fb87dbfd402e366c08a Signed-off-by: Yacov Manevich --- gossip/gossip/gossip_impl.go | 11 +++--- gossip/gossip/{ => msgstore}/msgs.go | 20 +++++----- gossip/gossip/{ => msgstore}/msgs_test.go | 46 +++++++++++------------ 3 files changed, 39 insertions(+), 38 deletions(-) rename gossip/gossip/{ => msgstore}/msgs.go (86%) rename gossip/gossip/{ => msgstore}/msgs_test.go (73%) diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 92cd4d9dbd3..28dacb75d42 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/msgstore" "github.com/hyperledger/fabric/gossip/gossip/pull" "github.com/hyperledger/fabric/gossip/identity" "github.com/hyperledger/fabric/gossip/proto" @@ -55,7 +56,7 @@ type gossipServiceImpl struct { conf *Config toDieChan chan struct{} stopFlag int32 - msgStore messageStore + msgStore msgstore.MessageStore emitter batchingEmitter goRoutines []uint64 discAdapter *discoveryAdapter @@ -108,7 +109,7 @@ func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService Endpoint: conf.SelfEndpoint, PKIid: g.comm.GetPKIid(), Metadata: []byte{}, }, g.discAdapter, g.disSecAdap) - g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) { + g.msgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) { g.blocksPuller.Remove(m.(*proto.GossipMessage)) }) @@ -145,7 +146,7 @@ func (g *gossipServiceImpl) createBlockPuller() pull.Mediator { g.logger.Warning("Invalid DataMessage:", dataMsg) return } - added := g.msgStore.add(msg) + added := g.msgStore.Add(msg) // if we can't add the message to the msgStore, // no point in disseminating it to others... if !added { @@ -283,7 +284,7 @@ func (g *gossipServiceImpl) handleMessage(m comm.ReceivedMessage) { } } - added := g.msgStore.add(msg) + added := g.msgStore.Add(msg) if added { g.emitter.Add(msg) if dataMsg := m.GetGossipMessage().GetDataMsg(); dataMsg != nil { @@ -340,7 +341,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { g.logger.Info(msg) if dataMsg := msg.GetDataMsg(); dataMsg != nil { - g.msgStore.add(msg) + g.msgStore.Add(msg) g.blocksPuller.Add(msg) } g.emitter.Add(msg) diff --git a/gossip/gossip/msgs.go b/gossip/gossip/msgstore/msgs.go similarity index 86% rename from gossip/gossip/msgs.go rename to gossip/gossip/msgstore/msgs.go index 40e5fb0b6b4..7b7316fe8eb 100644 --- a/gossip/gossip/msgs.go +++ b/gossip/gossip/msgstore/msgs.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package gossip +package msgstore import ( "sync" @@ -27,26 +27,26 @@ import ( // then the invalidation trigger on 0 was called when 1 was added. type invalidationTrigger func(message interface{}) -func newMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) messageStore { +func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore { return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger} } -// messageStore adds messages to an internal buffer. +// MessageStore adds messages to an internal buffer. // When a message is received, it might: // - Be added to the buffer // - Discarded because of some message already in the buffer (invalidated) // - Make a message already in the buffer to be discarded (invalidates) // When a message is invalidated, the invalidationTrigger is invoked on that message. -type messageStore interface { +type MessageStore interface { // add adds a message to the store // returns true or false whether the message was added to the store - add(msg interface{}) bool + Add(msg interface{}) bool // size returns the amount of messages in the store - size() int + Size() int // get returns all messages in the store - get() []interface{} + Get() []interface{} } type messageStoreImpl struct { @@ -61,7 +61,7 @@ type msg struct { } // add adds a message to the store -func (s *messageStoreImpl) add(message interface{}) bool { +func (s *messageStoreImpl) Add(message interface{}) bool { s.lock.Lock() defer s.lock.Unlock() @@ -87,14 +87,14 @@ func (s *messageStoreImpl) add(message interface{}) bool { } // size returns the amount of messages in the store -func (s *messageStoreImpl) size() int { +func (s *messageStoreImpl) Size() int { s.lock.RLock() defer s.lock.RUnlock() return len(s.messages) } // get returns all messages in the store -func (s *messageStoreImpl) get() []interface{} { +func (s *messageStoreImpl) Get() []interface{} { s.lock.RLock() defer s.lock.RUnlock() diff --git a/gossip/gossip/msgs_test.go b/gossip/gossip/msgstore/msgs_test.go similarity index 73% rename from gossip/gossip/msgs_test.go rename to gossip/gossip/msgstore/msgs_test.go index 4ce64a561d3..e7bd8759adc 100644 --- a/gossip/gossip/msgs_test.go +++ b/gossip/gossip/msgstore/msgs_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package gossip +package msgstore import ( "math/rand" @@ -52,24 +52,24 @@ func compareInts(this interface{}, that interface{}) common.InvalidationResult { } func TestSize(t *testing.T) { - msgStore := newMessageStore(alwaysNoAction, noopTrigger) - msgStore.add(0) - msgStore.add(1) - msgStore.add(2) - assert.Equal(t, 3, msgStore.size()) + msgStore := NewMessageStore(alwaysNoAction, noopTrigger) + msgStore.Add(0) + msgStore.Add(1) + msgStore.Add(2) + assert.Equal(t, 3, msgStore.Size()) } func TestNewMessagesInvalidates(t *testing.T) { invalidated := make([]int, 9) - msgStore := newMessageStore(compareInts, func(m interface{}) { + msgStore := NewMessageStore(compareInts, func(m interface{}) { invalidated = append(invalidated, m.(int)) }) - assert.True(t, msgStore.add(0)) + assert.True(t, msgStore.Add(0)) for i := 1; i < 10; i++ { - assert.True(t, msgStore.add(i)) + assert.True(t, msgStore.Add(i)) assert.Equal(t, i-1, invalidated[len(invalidated)-1]) - assert.Equal(t, 1, msgStore.size()) - assert.Equal(t, i, msgStore.get()[0].(int)) + assert.Equal(t, 1, msgStore.Size()) + assert.Equal(t, i, msgStore.Get()[0].(int)) } } @@ -83,33 +83,33 @@ func TestMessagesGet(t *testing.T) { return false } - msgStore := newMessageStore(alwaysNoAction, noopTrigger) + msgStore := NewMessageStore(alwaysNoAction, noopTrigger) expected := []int{} for i := 0; i < 2; i++ { n := rand.Int() expected = append(expected, n) - msgStore.add(n) + msgStore.Add(n) } for _, num2Search := range expected { - assert.True(t, contains(msgStore.get(), num2Search), "Value %v not found in array", num2Search) + assert.True(t, contains(msgStore.Get(), num2Search), "Value %v not found in array", num2Search) } } func TestNewMessagesInvalidated(t *testing.T) { - msgStore := newMessageStore(compareInts, noopTrigger) - assert.True(t, msgStore.add(10)) + msgStore := NewMessageStore(compareInts, noopTrigger) + assert.True(t, msgStore.Add(10)) for i := 9; i >= 0; i-- { - assert.False(t, msgStore.add(i)) - assert.Equal(t, 1, msgStore.size()) - assert.Equal(t, 10, msgStore.get()[0].(int)) + assert.False(t, msgStore.Add(i)) + assert.Equal(t, 1, msgStore.Size()) + assert.Equal(t, 10, msgStore.Get()[0].(int)) } } func TestConcurrency(t *testing.T) { stopFlag := int32(0) - msgStore := newMessageStore(compareInts, noopTrigger) + msgStore := NewMessageStore(compareInts, noopTrigger) looper := func(f func()) func() { return func() { for { @@ -122,15 +122,15 @@ func TestConcurrency(t *testing.T) { } addProcess := looper(func() { - msgStore.add(rand.Int()) + msgStore.Add(rand.Int()) }) getProcess := looper(func() { - msgStore.get() + msgStore.Get() }) sizeProcess := looper(func() { - msgStore.size() + msgStore.Size() }) go addProcess()