From 65306698173075c6bf22c692ec664301c53f7620 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Tue, 27 Sep 2016 13:27:51 +0300 Subject: [PATCH] gossip component- datastructures This commit adds 2 datastructures used by the gossip forwarding mechanism: - messageStore: A storage cache for disseminated data items It is used to decide whether a message is "fresh", and needs to be forwarded to other peers, or maybe if its too "old" and it needs to be discarded. This cache is also in use by the pull mechanism the gossip code that'll be pushed later on will connect between them because it provides the data source for answering pull requests from other peers. - forwardingBatcher: A batch and time based component that causes the gossip to forward messages When a timer expires or if too many messages are outstanding. Change-Id: Ic23e9c168f6f387e85a0a10b8b0edf5ba30617ee Signed-off-by: Yacov Manevich --- gossip/gossip/batcher.go | 134 ++++++++++++++++++++++++++++++++ gossip/gossip/batcher_test.go | 118 ++++++++++++++++++++++++++++ gossip/gossip/msgs.go | 118 ++++++++++++++++++++++++++++ gossip/gossip/msgs_test.go | 141 ++++++++++++++++++++++++++++++++++ 4 files changed, 511 insertions(+) create mode 100644 gossip/gossip/batcher.go create mode 100644 gossip/gossip/batcher_test.go create mode 100644 gossip/gossip/msgs.go create mode 100644 gossip/gossip/msgs_test.go diff --git a/gossip/gossip/batcher.go b/gossip/gossip/batcher.go new file mode 100644 index 00000000000..cd39960d776 --- /dev/null +++ b/gossip/gossip/batcher.go @@ -0,0 +1,134 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "sync" + "sync/atomic" + "time" +) + +type emitBatchCallback func([]interface{}) + +//batchingEmitter is used for the gossip push/forwarding phase. +// Messages are added into the batchingEmitter, and they are forwarded periodically T times in batches and then discarded. +// If the batchingEmitter's stored message count reaches a certain capacity, that also triggers a message dispatch +type batchingEmitter interface { + // Add adds a message to be batched + Add(interface{}) + + // Stop stops the component + Stop() + + // Size returns the amount of pending messages to be emitted + Size() int +} + +// newBatchingEmitter accepts the following parameters: +// iterations: number of times each message is forwarded +// burstSize: a threshold that triggers a forwarding because of message count +// latency: the maximum delay that each message can be stored without being forwarded +// cb: a callback that is called in order for the forwarding to take place +func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emitBatchCallback) batchingEmitter { + p := &batchingEmitterImpl{ + cb: cb, + delay: latency, + iterations: iterations, + burstSize: burstSize, + lock: &sync.Mutex{}, + buff: make([]*batchedMessage, 0), + stopFlag: int32(0), + } + + go p.periodicEmit() + return p +} + +func (p *batchingEmitterImpl) periodicEmit() { + for !p.toDie() { + time.Sleep(p.delay) + p.lock.Lock() + p.emit() + p.lock.Unlock() + } +} + +func (p *batchingEmitterImpl) emit() { + if len(p.buff) == 0 { + return + } + msgs2beEmitted := make([]interface{}, len(p.buff)) + for i, v := range p.buff { + msgs2beEmitted[i] = v.data + } + + p.cb(msgs2beEmitted) + p.decrementCounters() +} + +func (p *batchingEmitterImpl) decrementCounters() { + n := len(p.buff) + for i := 0; i < n; i++ { + msg := p.buff[i] + msg.iterationsLeft-- + if msg.iterationsLeft == 0 { + p.buff = append(p.buff[:i], p.buff[i+1:]...) + n-- + i-- + } + } +} + +func (p *batchingEmitterImpl) toDie() bool { + return atomic.LoadInt32(&(p.stopFlag)) == int32(1) +} + +type batchingEmitterImpl struct { + iterations int + burstSize int + delay time.Duration + cb emitBatchCallback + lock *sync.Mutex + buff []*batchedMessage + stopFlag int32 +} + +type batchedMessage struct { + data interface{} + iterationsLeft int +} + +func (p *batchingEmitterImpl) Stop() { + atomic.StoreInt32(&(p.stopFlag), int32(1)) +} + +func (p *batchingEmitterImpl) Size() int { + p.lock.Lock() + defer p.lock.Unlock() + return len(p.buff) +} + +func (p *batchingEmitterImpl) Add(message interface{}) { + p.lock.Lock() + defer p.lock.Unlock() + + p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations}) + + if len(p.buff) >= p.burstSize { + p.emit() + } +} diff --git a/gossip/gossip/batcher_test.go b/gossip/gossip/batcher_test.go new file mode 100644 index 00000000000..9ced01f8a33 --- /dev/null +++ b/gossip/gossip/batcher_test.go @@ -0,0 +1,118 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "github.com/stretchr/testify/assert" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestBatchingEmitterAddAndSize(t *testing.T) { + emitter := newBatchingEmitter(1, 10, time.Second, func(a []interface{}) {}) + defer emitter.Stop() + emitter.Add(1) + emitter.Add(2) + emitter.Add(3) + assert.Equal(t, 3, emitter.Size()) +} + +func TestBatchingEmitterStop(t *testing.T) { + // In this test we make sure the emitter doesn't do anything after it's stopped + disseminationAttempts := int32(0) + cb := func(a []interface{}) { + atomic.AddInt32(&disseminationAttempts, int32(1)) + } + + emitter := newBatchingEmitter(10, 1, time.Duration(10)*time.Millisecond, cb) + emitter.Add(1) + time.Sleep(time.Duration(10) * time.Millisecond) + emitter.Stop() + time.Sleep(time.Duration(100) * time.Millisecond) + assert.True(t, atomic.LoadInt32(&disseminationAttempts) < int32(5)) +} + +func TestBatchingEmitterExpiration(t *testing.T) { + // In this test we make sure that a message is expired and is discarded after enough time + // and that it was forwarded an adequate amount of times + disseminationAttempts := int32(0) + cb := func(a []interface{}) { + atomic.AddInt32(&disseminationAttempts, int32(1)) + } + + emitter := newBatchingEmitter(10, 1, time.Duration(1)*time.Millisecond, cb) + defer emitter.Stop() + + emitter.Add(1) + time.Sleep(time.Duration(50) * time.Millisecond) + assert.Equal(t, int32(10), atomic.LoadInt32(&disseminationAttempts), "Inadaquate amount of dissemination attempts detected") + assert.Equal(t, 0, emitter.Size()) +} + +func TestBatchingEmitterCounter(t *testing.T) { + // In this test we count the number of times each message is forwarded, with relation to the time passed + counters := make(map[int]int) + lock := &sync.Mutex{} + cb := func(a []interface{}) { + lock.Lock() + defer lock.Unlock() + for _, e := range a { + n := e.(int) + if _, exists := counters[n]; !exists { + counters[n] = 0 + } else { + counters[n]++ + } + } + } + + emitter := newBatchingEmitter(5, 100, time.Duration(50)*time.Millisecond, cb) + defer emitter.Stop() + + for i := 1; i <= 5; i++ { + emitter.Add(i) + if i == 5 { + break + } + time.Sleep(time.Duration(60) * time.Millisecond) + } + emitter.Stop() + + lock.Lock() + assert.Equal(t, 0, counters[4]) + assert.Equal(t, 1, counters[3]) + assert.Equal(t, 2, counters[2]) + assert.Equal(t, 3, counters[1]) + lock.Unlock() +} + +// TestBatchingEmitterBurstSizeCap tests that the emitter +func TestBatchingEmitterBurstSizeCap(t *testing.T) { + disseminationAttempts := int32(0) + cb := func(a []interface{}) { + atomic.AddInt32(&disseminationAttempts, int32(1)) + } + emitter := newBatchingEmitter(1, 10, time.Duration(800)*time.Millisecond, cb) + defer emitter.Stop() + + for i := 0; i < 50; i++ { + emitter.Add(i) + } + assert.Equal(t, int32(5), atomic.LoadInt32(&disseminationAttempts)) +} diff --git a/gossip/gossip/msgs.go b/gossip/gossip/msgs.go new file mode 100644 index 00000000000..f9ebc769508 --- /dev/null +++ b/gossip/gossip/msgs.go @@ -0,0 +1,118 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import "sync" + +type invalidationResult int + +const ( + MESSAGE_NO_ACTION = invalidationResult(0) + MESSAGE_INVALIDATES = invalidationResult(1) + MESSAGE_INVALIDATED = invalidationResult(2) +) + +// Returns: +// MESSAGE_INVALIDATES if this message invalidates that +// MESSAGE_INVALIDATED if this message is invalidated by that +// MESSAGE_NO_ACTION otherwise +type messageReplacingPolicy func(this interface{}, that interface{}) invalidationResult + +// invalidationTrigger is invoked on each message that was invalidated because of a message addition +// i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations +// then the invalidation trigger on 0 was called when 1 was added. +type invalidationTrigger func(message interface{}) + +func newMessageStore(pol messageReplacingPolicy, trigger invalidationTrigger) messageStore { + return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger} +} + +// messageStore adds messages to an internal buffer. +// When a message is received, it might: +// - Be added to the buffer +// - Discarded because of some message already in the buffer (invalidated) +// - Make a message already in the buffer to be discarded (invalidates) +// When a message is invalidated, the invalidationTrigger is invoked on that message. +type messageStore interface { + // add adds a message to the store + // returns true or false whether the message was added to the store + add(msg interface{}) bool + + // size returns the amount of messages in the store + size() int + + // get returns all messages in the store + get() []interface{} +} + +type messageStoreImpl struct { + pol messageReplacingPolicy + lock *sync.RWMutex + messages []*msg + invTrigger invalidationTrigger +} + +type msg struct { + data interface{} +} + +// add adds a message to the store +func (s *messageStoreImpl) add(message interface{}) bool { + s.lock.Lock() + defer s.lock.Unlock() + + n := len(s.messages) + for i := 0; i < n; i++ { + m := s.messages[i] + switch s.pol(message, m.data) { + case MESSAGE_INVALIDATED: + return false + break + case MESSAGE_INVALIDATES: + s.invTrigger(m.data) + s.messages = append(s.messages[:i], s.messages[i+1:]...) + n-- + i-- + break + default: + break + } + } + + s.messages = append(s.messages, &msg{data: message}) + return true +} + +// size returns the amount of messages in the store +func (s *messageStoreImpl) size() int { + s.lock.RLock() + defer s.lock.RUnlock() + return len(s.messages) +} + +// get returns all messages in the store +func (s *messageStoreImpl) get() []interface{} { + s.lock.RLock() + defer s.lock.RUnlock() + + n := len(s.messages) + res := make([]interface{}, n) + for i := 0; i < n; i++ { + res[i] = s.messages[i].data + } + return res +} diff --git a/gossip/gossip/msgs_test.go b/gossip/gossip/msgs_test.go new file mode 100644 index 00000000000..083a31857ba --- /dev/null +++ b/gossip/gossip/msgs_test.go @@ -0,0 +1,141 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "github.com/stretchr/testify/assert" + "math/rand" + "sync/atomic" + "testing" + "time" +) + +func init() { + rand.Seed(42) +} + +func alwaysNoAction(this interface{}, that interface{}) invalidationResult { + return MESSAGE_NO_ACTION +} + +func noopTrigger(m interface{}) { + +} + +func compareInts(this interface{}, that interface{}) invalidationResult { + a := this.(int) + b := that.(int) + if a == b { + return MESSAGE_NO_ACTION + } + if a > b { + return MESSAGE_INVALIDATES + } + + return MESSAGE_INVALIDATED +} + +func TestSize(t *testing.T) { + msgStore := newMessageStore(alwaysNoAction, noopTrigger) + msgStore.add(0) + msgStore.add(1) + msgStore.add(2) + assert.Equal(t, 3, msgStore.size()) +} + +func TestNewMessagesInvalidates(t *testing.T) { + invalidated := make([]int, 9) + msgStore := newMessageStore(compareInts, func(m interface{}) { + invalidated = append(invalidated, m.(int)) + }) + assert.True(t, msgStore.add(0)) + for i := 1; i < 10; i++ { + assert.True(t, msgStore.add(i)) + assert.Equal(t, i - 1, invalidated[len(invalidated) - 1]) + assert.Equal(t, 1, msgStore.size()) + assert.Equal(t, i, msgStore.get()[0].(int)) + } +} + +func TestMessagesGet(t *testing.T) { + contains := func(a []interface{}, e interface{}) bool { + for _, v := range a { + if v == e { + return true + } + } + return false + } + + msgStore := newMessageStore(alwaysNoAction, noopTrigger) + expected := make([]int, 0) + for i := 0; i < 2; i++ { + n := rand.Int() + expected = append(expected, n) + msgStore.add(n) + } + + for _, num2Search := range expected { + assert.True(t, contains(msgStore.get(), num2Search), "Value %v not found in array", num2Search) + } + +} + +func TestNewMessagesInvalidated(t *testing.T) { + msgStore := newMessageStore(compareInts, noopTrigger) + assert.True(t, msgStore.add(10)) + for i := 9; i >= 0; i-- { + assert.False(t, msgStore.add(i)) + assert.Equal(t, 1, msgStore.size()) + assert.Equal(t, 10, msgStore.get()[0].(int)) + } +} + +func TestConcurrency(t *testing.T) { + stopFlag := int32(0) + msgStore := newMessageStore(compareInts, noopTrigger) + looper := func(f func()) func() { + return func() { + for { + if atomic.LoadInt32(&stopFlag) == int32(1) { + return + } + f() + } + } + } + + addProcess := looper(func() { + msgStore.add(rand.Int()) + }) + + getProcess := looper(func() { + msgStore.get() + }) + + sizeProcess := looper(func() { + msgStore.size() + }) + + go addProcess() + go getProcess() + go sizeProcess() + + time.Sleep(time.Duration(3) * time.Second) + + atomic.CompareAndSwapInt32(&stopFlag, 0, 1) +}