Skip to content

Commit

Permalink
FAB-1045 Gossip pull refactoring: uint64->string
Browse files Browse the repository at this point in the history
Currently the gossip pull mechanism can only handle items
uint64 as keys (for digests).
I refactored this to be string, because a string can hold
both ints, hashes and can also be used as a key in a map.

This is needed for disseminating entities such as certificates
of peers.

Change-Id: I06be757275dccd4e78f055bb703aeadd52787c33
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Dec 6, 2016
1 parent ede30a4 commit baea89c
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 194 deletions.
34 changes: 17 additions & 17 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items
identified by uint64 numbers.
identified by string numbers.
The protocol is as follows:
1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers.
2) Each remote peer responds with a digest of its messages and returns that NONCE.
Expand Down Expand Up @@ -82,14 +82,14 @@ type PullAdapter interface {

// SendDigest sends a digest to a remote PullEngine.
// The context parameter specifies the remote engine to send to.
SendDigest(digest []uint64, nonce uint64, context interface{})
SendDigest(digest []string, nonce uint64, context interface{})

// SendReq sends an array of items to a certain remote PullEngine identified
// by a string
SendReq(dest string, items []uint64, nonce uint64)
SendReq(dest string, items []string, nonce uint64)

// SendRes sends an array of items to a remote PullEngine identified by a context.
SendRes(items []uint64, context interface{}, nonce uint64)
SendRes(items []string, context interface{}, nonce uint64)
}

// PullEngine is the component that actually invokes the pull algorithm
Expand All @@ -98,7 +98,7 @@ type PullEngine struct {
PullAdapter
stopFlag int32
state *util.Set
item2owners map[uint64][]string
item2owners map[string][]string
peers2nonces map[string]uint64
nonces2peers map[uint64]string
acceptingDigests int32
Expand All @@ -115,7 +115,7 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine
PullAdapter: participant,
stopFlag: int32(0),
state: util.NewSet(),
item2owners: make(map[uint64][]string),
item2owners: make(map[string][]string),
peers2nonces: make(map[string]uint64),
nonces2peers: make(map[uint64]string),
acceptingDigests: int32(0),
Expand Down Expand Up @@ -190,12 +190,12 @@ func (engine *PullEngine) processIncomingDigests() {
engine.lock.Lock()
defer engine.lock.Unlock()

requestMapping := make(map[string][]uint64)
requestMapping := make(map[string][]string)
for n, sources := range engine.item2owners {
// select a random source
source := sources[rand.Intn(len(sources))]
if _, exists := requestMapping[source]; !exists {
requestMapping[source] = make([]uint64, 0)
requestMapping[source] = make([]string, 0)
}
// append the number to that source
requestMapping[source] = append(requestMapping[source], n)
Expand All @@ -218,13 +218,13 @@ func (engine *PullEngine) endPull() {
atomic.StoreInt32(&(engine.acceptingResponses), int32(0))
engine.outgoingNONCES.Clear()

engine.item2owners = make(map[uint64][]string)
engine.item2owners = make(map[string][]string)
engine.peers2nonces = make(map[string]uint64)
engine.nonces2peers = make(map[uint64]string)
}

// OnDigest notifies the engine that a digest has arrived
func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interface{}) {
func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) {
if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {
return
}
Expand All @@ -246,14 +246,14 @@ func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interf
}

// Add adds items to the state
func (engine *PullEngine) Add(seqs ...uint64) {
func (engine *PullEngine) Add(seqs ...string) {
for _, seq := range seqs {
engine.state.Add(seq)
}
}

// Remove removes items from the state
func (engine *PullEngine) Remove(seqs ...uint64) {
func (engine *PullEngine) Remove(seqs ...string) {
for _, seq := range seqs {
engine.state.Remove(seq)
}
Expand All @@ -267,21 +267,21 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
})

a := engine.state.ToArray()
digest := make([]uint64, len(a))
digest := make([]string, len(a))
for i, item := range a {
digest[i] = item.(uint64)
digest[i] = item.(string)
}
engine.SendDigest(digest, nonce, context)
}

// OnReq notifies the engine a request has arrived
func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{}) {
func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
if !engine.incomingNONCES.Exists(nonce) {
return
}
engine.lock.Lock()

var items2Send []uint64
var items2Send []string
for _, item := range items {
if engine.state.Exists(item) {
items2Send = append(items2Send, item)
Expand All @@ -294,7 +294,7 @@ func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{
}

// OnRes notifies the engine a response has arrived
func (engine *PullEngine) OnRes(items []uint64, nonce uint64) {
func (engine *PullEngine) OnRes(items []string, nonce uint64) {
if !engine.outgoingNONCES.Exists(nonce) || !engine.isAcceptingResponses() {
return
}
Expand Down
Loading

0 comments on commit baea89c

Please sign in to comment.