Skip to content

Commit

Permalink
Merge "[FAB-2061] Gossip inter-org confidentiality - P2"
Browse files Browse the repository at this point in the history
  • Loading branch information
hacera-jonathan authored and Gerrit Code Review committed Apr 20, 2017
2 parents 37f411c + 90b4c72 commit 3c62974
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 23 deletions.
35 changes: 29 additions & 6 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func SetResponseWaitTime(time time.Duration) {
viper.Set("peer.gossip.responseWaitTime", time)
}

// DigestFilter filters digests to be sent to a remote peer that
// sent a hello or a request, based on its messages's context
type DigestFilter func(context interface{}) func(digestItem string) bool

// PullAdapter is needed by the PullEngine in order to
// send messages to the remote PullEngine instances.
// The PullEngine expects to be invoked with
Expand Down Expand Up @@ -109,11 +113,12 @@ type PullEngine struct {
lock sync.Mutex
outgoingNONCES *util.Set
incomingNONCES *util.Set
digFilter DigestFilter
}

// NewPullEngine creates an instance of a PullEngine with a certain sleep time
// between pull initiations
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
// NewPullEngineWithFilter creates an instance of a PullEngine with a certain sleep time
// between pull initiations, and uses the given filters when sending digests and responses
func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine {
engine := &PullEngine{
PullAdapter: participant,
stopFlag: int32(0),
Expand All @@ -125,6 +130,7 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine
acceptingResponses: int32(0),
incomingNONCES: util.NewSet(),
outgoingNONCES: util.NewSet(),
digFilter: df,
}

go func() {
Expand All @@ -140,8 +146,19 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine
return engine
}

// NewPullEngine creates an instance of a PullEngine with a certain sleep time
// between pull initiations
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
acceptAllFilter := func(_ interface{}) func(string) bool {
return func(_ string) bool {
return true
}
}
return NewPullEngineWithFilter(participant, sleepTime, acceptAllFilter)
}

func (engine *PullEngine) toDie() bool {
return (atomic.LoadInt32(&(engine.stopFlag)) == int32(1))
return atomic.LoadInt32(&(engine.stopFlag)) == int32(1)
}

func (engine *PullEngine) acceptResponses() {
Expand Down Expand Up @@ -275,8 +292,13 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {

a := engine.state.ToArray()
digest := make([]string, len(a))
filter := engine.digFilter(context)
for i, item := range a {
digest[i] = item.(string)
dig := item.(string)
if !filter(dig) {
continue
}
digest[i] = dig
}
engine.SendDigest(digest, nonce, context)
}
Expand All @@ -288,9 +310,10 @@ func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{
}
engine.lock.Lock()

filter := engine.digFilter(context)
var items2Send []string
for _, item := range items {
if engine.state.Exists(item) {
if engine.state.Exists(item) && filter(item) {
items2Send = append(items2Send, item)
}
}
Expand Down
51 changes: 48 additions & 3 deletions gossip/gossip/algo/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ limitations under the License.
package algo

import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"fmt"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/util"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -501,6 +501,51 @@ func TestSpread(t *testing.T) {
}
}
lock.Unlock()
}

func TestFilter(t *testing.T) {
t.Parallel()
// Scenario: 3 instances, items [0-5] are found only in the first instance, the other 2 have none.
// and also the first instance only gives the 2nd instance even items, and odd items to the 3rd.
// also, instances 2 and 3 don't know each other.
// Expected outcome: inst2 has only even items, and inst3 has only odd items
peers := make(map[string]*pullTestInstance)
inst1 := newPushPullTestInstance("p1", peers)
inst2 := newPushPullTestInstance("p2", peers)
inst3 := newPushPullTestInstance("p3", peers)
defer inst1.stop()
defer inst2.stop()
defer inst3.stop()

inst1.PullEngine.digFilter = func(context interface{}) func(digestItem string) bool {
return func(digestItem string) bool {
n, _ := strconv.ParseInt(digestItem, 10, 64)
if context == "p2" {
return n%2 == 0
}
return n%2 == 1
}
}

inst1.Add("0", "1", "2", "3", "4", "5")
inst2.setNextPeerSelection([]string{"p1"})
inst3.setNextPeerSelection([]string{"p1"})

time.Sleep(time.Second * 2)

assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "0", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "1", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "2", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "3", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "4", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "5", Strcmp) == -1)

assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "0", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "1", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "2", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "3", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "4", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "5", Strcmp) != -1)

}

Expand Down
15 changes: 10 additions & 5 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,17 @@ func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.Receiv
sender := &senderMock{}
memberSvc := &membershipSvcMock{}
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})
adapter := pull.PullAdapter{
Sndr: sender,
MemSvc: memberSvc,
IdExtractor: func(msg *proto.SignedGossipMessage) string {
return string(msg.GetPeerIdentity().PkiId)
},
MsgCons: func(msg *proto.SignedGossipMessage) {

pullMediator := pull.NewPullMediator(config,
sender,
memberSvc,
func(msg *proto.SignedGossipMessage) string { return string(msg.GetPeerIdentity().PkiId) },
func(msg *proto.SignedGossipMessage) {})
},
}
pullMediator := pull.NewPullMediator(config, adapter)
certStore := newCertStore(&pullerMock{
Mediator: pullMediator,
}, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{})
Expand Down
8 changes: 7 additions & 1 deletion gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator {
}
gc.DeMultiplex(msg)
}
return pull.NewPullMediator(conf, gc, gc.memFilter, seqNumFromMsg, blockConsumer)
adapter := pull.PullAdapter{
Sndr: gc,
MemSvc: gc.memFilter,
IdExtractor: seqNumFromMsg,
MsgCons: blockConsumer,
}
return pull.NewPullMediator(conf, adapter)
}

// IsMemberInChan checks whether the given member is eligible to be in the channel
Expand Down
8 changes: 7 additions & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,13 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
g.logger.Info("Learned of a new certificate:", idMsg.Cert)

}
return pull.NewPullMediator(conf, g.comm, g.disc, pkiIDFromMsg, certConsumer)
adapter := pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
}
return pull.NewPullMediator(conf, adapter)
}

func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) {
Expand Down
48 changes: 42 additions & 6 deletions gossip/gossip/pull/pullstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ type MembershipService interface {
GetMembership() []discovery.NetworkMember
}

// DigestFilter filters digests to be sent to a remote peer, that
// sent a hello with the following message
type DigestFilter func(helloMsg proto.ReceivedMessage) func(digestItem string) bool

// byContext converts this DigestFilter to an algo.DigestFilter
func (df DigestFilter) byContext() algo.DigestFilter {
return func(context interface{}) func(digestItem string) bool {
return func(digestItem string) bool {
return df(context.(proto.ReceivedMessage))(digestItem)
}
}
}

// PullConfig defines the configuration of the pull mediator
type PullConfig struct {
ID string
Expand All @@ -65,6 +78,16 @@ type PullConfig struct {
MsgType proto.PullMsgType
}

// PullAdapter defines methods of the pullStore to interact
// with various modules of gossip
type PullAdapter struct {
Sndr Sender
MemSvc MembershipService
IdExtractor proto.IdentifierExtractor
MsgCons proto.MsgConsumer
DigFilter DigestFilter
}

// Mediator is a component wrap a PullEngine and provides the methods
// it needs to perform pull synchronization.
// The specialization of a pull mediator to a certain type of message is
Expand Down Expand Up @@ -103,18 +126,31 @@ type pullMediatorImpl struct {
}

// NewPullMediator returns a new Mediator
func NewPullMediator(config PullConfig, sndr Sender, memSvc MembershipService, idExtractor proto.IdentifierExtractor, msgCons proto.MsgConsumer) Mediator {
func NewPullMediator(config PullConfig, adapter PullAdapter) Mediator {
digFilter := adapter.DigFilter

acceptAllFilter := func(_ proto.ReceivedMessage) func(string) bool {
return func(_ string) bool {
return true
}
}

if digFilter == nil {
digFilter = acceptAllFilter
}

p := &pullMediatorImpl{
msgCons: msgCons,
msgCons: adapter.MsgCons,
msgType2Hook: make(map[PullMsgType][]MessageHook),
idExtractor: idExtractor,
idExtractor: adapter.IdExtractor,
config: config,
logger: util.GetLogger(util.LoggingPullModule, config.ID),
itemID2Msg: make(map[string]*proto.SignedGossipMessage),
memBvc: memSvc,
Sender: sndr,
memBvc: adapter.MemSvc,
Sender: adapter.Sndr,
}
p.engine = algo.NewPullEngine(p, config.PullInterval)

p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, digFilter.byContext())
return p
}

Expand Down
49 changes: 48 additions & 1 deletion gossip/gossip/pull/pullstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pull

import (
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -106,6 +107,10 @@ func (p *pullInstance) wrapPullMsg(msg *proto.SignedGossipMessage) proto.Receive
}

func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) *pullInstance {
return createPullInstanceWithFilters(endpoint, peer2PullInst, nil)
}

func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pullInstance, df DigestFilter) *pullInstance {
inst := &pullInstance{
items: util.NewSet(),
stopChan: make(chan struct{}),
Expand Down Expand Up @@ -137,7 +142,14 @@ func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance)
blockConsumer := func(msg *proto.SignedGossipMessage) {
inst.items.Add(msg.GetDataMsg().Payload.SeqNum)
}
inst.mediator = NewPullMediator(conf, inst, inst, seqNumFromMsg, blockConsumer)
adapter := PullAdapter{
Sndr: inst,
MemSvc: inst,
IdExtractor: seqNumFromMsg,
MsgCons: blockConsumer,
DigFilter: df,
}
inst.mediator = NewPullMediator(conf, adapter)
go func() {
for {
select {
Expand Down Expand Up @@ -182,6 +194,41 @@ func TestRegisterMsgHook(t *testing.T) {

}

func TestFilter(t *testing.T) {
t.Parallel()
peer2pullInst := make(map[string]*pullInstance)

eq := func(a interface{}, b interface{}) bool {
return a == b
}
df := func(msg proto.ReceivedMessage) func(string) bool {
if msg.GetGossipMessage().IsDataReq() {
req := msg.GetGossipMessage().GetDataReq()
return func(item string) bool {
return util.IndexInSlice(req.Digests, item, eq) != -1
}
}
return func(digestItem string) bool {
n, _ := strconv.ParseInt(digestItem, 10, 64)
return n%2 == 0
}
}
inst1 := createPullInstanceWithFilters("localhost:5611", peer2pullInst, df)
inst2 := createPullInstance("localhost:5612", peer2pullInst)
defer inst1.stop()
defer inst2.stop()

inst1.mediator.Add(dataMsg(0))
inst1.mediator.Add(dataMsg(1))
inst1.mediator.Add(dataMsg(2))
inst1.mediator.Add(dataMsg(3))

waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(0)) })
waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(2)) })
assert.False(t, inst2.items.Exists(uint64(1)))
assert.False(t, inst2.items.Exists(uint64(3)))
}

func TestAddAndRemove(t *testing.T) {
t.Parallel()
peer2pullInst := make(map[string]*pullInstance)
Expand Down

0 comments on commit 3c62974

Please sign in to comment.