Skip to content

Commit

Permalink
Gossip implementation
Browse files Browse the repository at this point in the history
This commit adds the gossip code itself, which is the code
that connects between the comm layer, the pull process, and the discovery layer

Change-Id: I9650790c61f318837fb7a68072386dfea09fc54f
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Nov 1, 2016
1 parent a293bc9 commit 898ea6e
Show file tree
Hide file tree
Showing 9 changed files with 1,205 additions and 22 deletions.
12 changes: 10 additions & 2 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,18 @@ func TestHandshake(t *testing.T) {
m := <-comm1.Accept(acceptAll)
rcvChan <- m.GetGossipMessage()
}()
stream.Send(msg2Send)
go stream.Send(msg2Send)
time.Sleep(time.Second)
assert.Equal(t, 1, len(rcvChan))
receivedMsg := <-rcvChan
var receivedMsg *proto.GossipMessage
select {
case receivedMsg = <-rcvChan:
break
case <- time.NewTicker(time.Duration(time.Second * 2)).C:
assert.Fail(t, "Timed out waiting for received message")
break
}

assert.Equal(t, nonce, receivedMsg.Nonce)

// negative path, nothing should be read from the channel because the signature is wrong
Expand Down
15 changes: 9 additions & 6 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func init() {
rand.Seed(42)
}

var digestWaitTime = time.Duration(4) * time.Second
var requestWaitTime = time.Duration(4) * time.Second
var responseWaitTime = time.Duration(7) * time.Second
var digestWaitTime = time.Duration(1) * time.Second
var requestWaitTime = time.Duration(1) * time.Second
var responseWaitTime = time.Duration(2) * time.Second

// SetDigestWaitTime sets the digest wait time
func SetDigestWaitTime(time time.Duration) {
Expand Down Expand Up @@ -127,8 +127,10 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine
go func() {
for !engine.toDie() {
time.Sleep(sleepTime)
if engine.toDie() {
return
}
engine.initiatePull()

}
}()

Expand Down Expand Up @@ -278,7 +280,6 @@ func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{
return
}
engine.lock.Lock()
defer engine.lock.Unlock()

var items2Send []uint64
for _, item := range items {
Expand All @@ -287,7 +288,9 @@ func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{
}
}

engine.SendRes(items2Send, context, nonce)
engine.lock.Unlock()

go engine.SendRes(items2Send, context, nonce)
}

// OnRes notifies the engine a response has arrived
Expand Down
3 changes: 3 additions & 0 deletions gossip/gossip/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (p *batchingEmitterImpl) periodicEmit() {
}

func (p *batchingEmitterImpl) emit() {
if p.toDie() {
return
}
if len(p.buff) == 0 {
return
}
Expand Down
3 changes: 2 additions & 1 deletion gossip/gossip/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package gossip

import (
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBatchingEmitterAddAndSize(t *testing.T) {
Expand Down

0 comments on commit 898ea6e

Please sign in to comment.