Skip to content

Commit

Permalink
gossip component- datastructures
Browse files Browse the repository at this point in the history
This commit adds 2 datastructures used by the gossip forwarding mechanism:
 - messageStore: A storage cache for disseminated data items
 It is used to decide whether a message is "fresh",
 and needs to be forwarded to other peers,
 or maybe if its too "old" and it needs to be discarded.
 This cache is also in use by the pull mechanism
 the gossip code that'll be pushed later on will connect between them
 because it provides the data source for answering pull requests from other peers.
- forwardingBatcher: A batch and time based component that causes
  the gossip to forward messages
  When a timer expires or if too many messages are outstanding.

Change-Id: Ic23e9c168f6f387e85a0a10b8b0edf5ba30617ee
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Oct 5, 2016
1 parent 8a40a51 commit 6530669
Show file tree
Hide file tree
Showing 4 changed files with 511 additions and 0 deletions.
134 changes: 134 additions & 0 deletions gossip/gossip/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip

import (
"sync"
"sync/atomic"
"time"
)

type emitBatchCallback func([]interface{})

//batchingEmitter is used for the gossip push/forwarding phase.
// Messages are added into the batchingEmitter, and they are forwarded periodically T times in batches and then discarded.
// If the batchingEmitter's stored message count reaches a certain capacity, that also triggers a message dispatch
type batchingEmitter interface {
// Add adds a message to be batched
Add(interface{})

// Stop stops the component
Stop()

// Size returns the amount of pending messages to be emitted
Size() int
}

// newBatchingEmitter accepts the following parameters:
// iterations: number of times each message is forwarded
// burstSize: a threshold that triggers a forwarding because of message count
// latency: the maximum delay that each message can be stored without being forwarded
// cb: a callback that is called in order for the forwarding to take place
func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emitBatchCallback) batchingEmitter {
p := &batchingEmitterImpl{
cb: cb,
delay: latency,
iterations: iterations,
burstSize: burstSize,
lock: &sync.Mutex{},
buff: make([]*batchedMessage, 0),
stopFlag: int32(0),
}

go p.periodicEmit()
return p
}

func (p *batchingEmitterImpl) periodicEmit() {
for !p.toDie() {
time.Sleep(p.delay)
p.lock.Lock()
p.emit()
p.lock.Unlock()
}
}

func (p *batchingEmitterImpl) emit() {
if len(p.buff) == 0 {
return
}
msgs2beEmitted := make([]interface{}, len(p.buff))
for i, v := range p.buff {
msgs2beEmitted[i] = v.data
}

p.cb(msgs2beEmitted)
p.decrementCounters()
}

func (p *batchingEmitterImpl) decrementCounters() {
n := len(p.buff)
for i := 0; i < n; i++ {
msg := p.buff[i]
msg.iterationsLeft--
if msg.iterationsLeft == 0 {
p.buff = append(p.buff[:i], p.buff[i+1:]...)
n--
i--
}
}
}

func (p *batchingEmitterImpl) toDie() bool {
return atomic.LoadInt32(&(p.stopFlag)) == int32(1)
}

type batchingEmitterImpl struct {
iterations int
burstSize int
delay time.Duration
cb emitBatchCallback
lock *sync.Mutex
buff []*batchedMessage
stopFlag int32
}

type batchedMessage struct {
data interface{}
iterationsLeft int
}

func (p *batchingEmitterImpl) Stop() {
atomic.StoreInt32(&(p.stopFlag), int32(1))
}

func (p *batchingEmitterImpl) Size() int {
p.lock.Lock()
defer p.lock.Unlock()
return len(p.buff)
}

func (p *batchingEmitterImpl) Add(message interface{}) {
p.lock.Lock()
defer p.lock.Unlock()

p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations})

if len(p.buff) >= p.burstSize {
p.emit()
}
}
118 changes: 118 additions & 0 deletions gossip/gossip/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip

import (
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestBatchingEmitterAddAndSize(t *testing.T) {
emitter := newBatchingEmitter(1, 10, time.Second, func(a []interface{}) {})
defer emitter.Stop()
emitter.Add(1)
emitter.Add(2)
emitter.Add(3)
assert.Equal(t, 3, emitter.Size())
}

func TestBatchingEmitterStop(t *testing.T) {
// In this test we make sure the emitter doesn't do anything after it's stopped
disseminationAttempts := int32(0)
cb := func(a []interface{}) {
atomic.AddInt32(&disseminationAttempts, int32(1))
}

emitter := newBatchingEmitter(10, 1, time.Duration(10)*time.Millisecond, cb)
emitter.Add(1)
time.Sleep(time.Duration(10) * time.Millisecond)
emitter.Stop()
time.Sleep(time.Duration(100) * time.Millisecond)
assert.True(t, atomic.LoadInt32(&disseminationAttempts) < int32(5))
}

func TestBatchingEmitterExpiration(t *testing.T) {
// In this test we make sure that a message is expired and is discarded after enough time
// and that it was forwarded an adequate amount of times
disseminationAttempts := int32(0)
cb := func(a []interface{}) {
atomic.AddInt32(&disseminationAttempts, int32(1))
}

emitter := newBatchingEmitter(10, 1, time.Duration(1)*time.Millisecond, cb)
defer emitter.Stop()

emitter.Add(1)
time.Sleep(time.Duration(50) * time.Millisecond)
assert.Equal(t, int32(10), atomic.LoadInt32(&disseminationAttempts), "Inadaquate amount of dissemination attempts detected")
assert.Equal(t, 0, emitter.Size())
}

func TestBatchingEmitterCounter(t *testing.T) {
// In this test we count the number of times each message is forwarded, with relation to the time passed
counters := make(map[int]int)
lock := &sync.Mutex{}
cb := func(a []interface{}) {
lock.Lock()
defer lock.Unlock()
for _, e := range a {
n := e.(int)
if _, exists := counters[n]; !exists {
counters[n] = 0
} else {
counters[n]++
}
}
}

emitter := newBatchingEmitter(5, 100, time.Duration(50)*time.Millisecond, cb)
defer emitter.Stop()

for i := 1; i <= 5; i++ {
emitter.Add(i)
if i == 5 {
break
}
time.Sleep(time.Duration(60) * time.Millisecond)
}
emitter.Stop()

lock.Lock()
assert.Equal(t, 0, counters[4])
assert.Equal(t, 1, counters[3])
assert.Equal(t, 2, counters[2])
assert.Equal(t, 3, counters[1])
lock.Unlock()
}

// TestBatchingEmitterBurstSizeCap tests that the emitter
func TestBatchingEmitterBurstSizeCap(t *testing.T) {
disseminationAttempts := int32(0)
cb := func(a []interface{}) {
atomic.AddInt32(&disseminationAttempts, int32(1))
}
emitter := newBatchingEmitter(1, 10, time.Duration(800)*time.Millisecond, cb)
defer emitter.Stop()

for i := 0; i < 50; i++ {
emitter.Add(i)
}
assert.Equal(t, int32(5), atomic.LoadInt32(&disseminationAttempts))
}
118 changes: 118 additions & 0 deletions gossip/gossip/msgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip

import "sync"

type invalidationResult int

const (
MESSAGE_NO_ACTION = invalidationResult(0)
MESSAGE_INVALIDATES = invalidationResult(1)
MESSAGE_INVALIDATED = invalidationResult(2)
)

// Returns:
// MESSAGE_INVALIDATES if this message invalidates that
// MESSAGE_INVALIDATED if this message is invalidated by that
// MESSAGE_NO_ACTION otherwise
type messageReplacingPolicy func(this interface{}, that interface{}) invalidationResult

// invalidationTrigger is invoked on each message that was invalidated because of a message addition
// i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations
// then the invalidation trigger on 0 was called when 1 was added.
type invalidationTrigger func(message interface{})

func newMessageStore(pol messageReplacingPolicy, trigger invalidationTrigger) messageStore {
return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger}
}

// messageStore adds messages to an internal buffer.
// When a message is received, it might:
// - Be added to the buffer
// - Discarded because of some message already in the buffer (invalidated)
// - Make a message already in the buffer to be discarded (invalidates)
// When a message is invalidated, the invalidationTrigger is invoked on that message.
type messageStore interface {
// add adds a message to the store
// returns true or false whether the message was added to the store
add(msg interface{}) bool

// size returns the amount of messages in the store
size() int

// get returns all messages in the store
get() []interface{}
}

type messageStoreImpl struct {
pol messageReplacingPolicy
lock *sync.RWMutex
messages []*msg
invTrigger invalidationTrigger
}

type msg struct {
data interface{}
}

// add adds a message to the store
func (s *messageStoreImpl) add(message interface{}) bool {
s.lock.Lock()
defer s.lock.Unlock()

n := len(s.messages)
for i := 0; i < n; i++ {
m := s.messages[i]
switch s.pol(message, m.data) {
case MESSAGE_INVALIDATED:
return false
break
case MESSAGE_INVALIDATES:
s.invTrigger(m.data)
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
break
default:
break
}
}

s.messages = append(s.messages, &msg{data: message})
return true
}

// size returns the amount of messages in the store
func (s *messageStoreImpl) size() int {
s.lock.RLock()
defer s.lock.RUnlock()
return len(s.messages)
}

// get returns all messages in the store
func (s *messageStoreImpl) get() []interface{} {
s.lock.RLock()
defer s.lock.RUnlock()

n := len(s.messages)
res := make([]interface{}, n)
for i := 0; i < n; i++ {
res[i] = s.messages[i].data
}
return res
}

0 comments on commit 6530669

Please sign in to comment.