Skip to content

Commit

Permalink
Extract common gossip data type
Browse files Browse the repository at this point in the history
Right now PKIidType data type separetely placed
in two different packages, while it serves exactly
same purpose and for one who would like to use
both dicsovery and communication module from the
gossip it will require to make needless convertations.

This commits converges and extracts that data type
into the common place so it could be used by both
packages.

Change-Id: Iedfa689e745a472cdb39c70b1ff99cc7880ff91d
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Nov 11, 2016
1 parent 148322b commit 7703c81
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 82 deletions.
18 changes: 7 additions & 11 deletions gossip/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,29 @@ package comm
import (
"fmt"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/util"
)

// Comm is an object that enables to communicate with other peers
// that also embed a CommModule.
type Comm interface {

// GetPKIid returns this instance's PKI id
GetPKIid() PKIidType
GetPKIid() common.PKIidType

// Send sends a message to remote peers
Send(msg *proto.GossipMessage, peers ...*RemotePeer)

// Probe probes a remote node and returns nil if its responsive
Probe(endpoint string, pkiID PKIidType) error
Probe(endpoint string, pkiID common.PKIidType) error

// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// Each message from the channel can be used to send a reply back to the sender
Accept(util.MessageAcceptor) <-chan ReceivedMessage
Accept(common.MessageAcceptor) <-chan ReceivedMessage

// PresumedDead returns a read-only channel for node endpoints that are suspected to be offline
PresumedDead() <-chan PKIidType
PresumedDead() <-chan common.PKIidType

// CloseConn closes a connection to a certain endpoint
CloseConn(peer *RemotePeer)
Expand All @@ -50,17 +50,13 @@ type Comm interface {
Stop()

// BlackListPKIid prohibits the module communicating with the given PKIid
BlackListPKIid(PKIid PKIidType)
BlackListPKIid(PKIid common.PKIidType)
}

// PKIidType defines the type that holds the PKI-id
// which is the security identifier of a peer
type PKIidType []byte

// RemotePeer defines a peer's endpoint and its PKIid
type RemotePeer struct {
Endpoint string
PKIID PKIidType
PKIID common.PKIidType
}

// String converts a RemotePeer to a string
Expand Down
33 changes: 17 additions & 16 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"crypto/tls"
"os"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/util"
"github.com/op/go-logging"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
}

// NewCommInstanceWithServer creates a comm instance that creates an underlying gRPC server
func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID PKIidType, dialOpts ...grpc.DialOption) (Comm, error) {
func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID common.PKIidType, dialOpts ...grpc.DialOption) (Comm, error) {
var ll net.Listener
var s *grpc.Server
var secOpt grpc.DialOption
Expand All @@ -86,11 +87,11 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID PKIidType, d
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.RWMutex{},
deadEndpoints: make(chan PKIidType, 100),
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan ReceivedMessage, 0),
blackListedPKIIDs: make([]PKIidType, 0),
blackListedPKIIDs: make([]common.PKIidType, 0),
}
commInst.connStore = newConnStore(commInst, pkID, commInst.logger)

Expand All @@ -112,7 +113,7 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID PKIidType, d
}

// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
func NewCommInstance(s *grpc.Server, sec SecurityProvider, PKIID PKIidType, dialOpts ...grpc.DialOption) (Comm, error) {
func NewCommInstance(s *grpc.Server, sec SecurityProvider, PKIID common.PKIidType, dialOpts ...grpc.DialOption) (Comm, error) {
commInst, err := NewCommInstanceWithServer(-1, sec, PKIID)
if err != nil {
return nil, err
Expand All @@ -128,7 +129,7 @@ type commImpl struct {
connStore *connectionStore
PKIID []byte
port int
deadEndpoints chan PKIidType
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.RWMutex
lsnr net.Listener
Expand All @@ -137,10 +138,10 @@ type commImpl struct {
stopping int32
stopWG sync.WaitGroup
subscriptions []chan ReceivedMessage
blackListedPKIIDs []PKIidType
blackListedPKIIDs []common.PKIidType
}

func (c *commImpl) createConnection(endpoint string, expectedPKIID PKIidType) (*connection, error) {
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")
if c.isStopping() {
Expand Down Expand Up @@ -204,7 +205,7 @@ func (c *commImpl) Send(msg *proto.GossipMessage, peers ...*RemotePeer) {
}
}

func (c *commImpl) BlackListPKIid(PKIID PKIidType) {
func (c *commImpl) BlackListPKIid(PKIID common.PKIidType) {
c.logger.Info("Entering", PKIID)
defer c.logger.Info("Exiting")
c.lock.Lock()
Expand All @@ -213,7 +214,7 @@ func (c *commImpl) BlackListPKIid(PKIID PKIidType) {
c.blackListedPKIIDs = append(c.blackListedPKIIDs, PKIID)
}

func (c *commImpl) isPKIblackListed(p PKIidType) bool {
func (c *commImpl) isPKIblackListed(p common.PKIidType) bool {
c.lock.RLock()
defer c.lock.RUnlock()
for _, pki := range c.blackListedPKIIDs {
Expand Down Expand Up @@ -251,7 +252,7 @@ func (c *commImpl) isStopping() bool {
return atomic.LoadInt32(&c.stopping) == int32(1)
}

func (c *commImpl) Probe(endpoint string, pkiID PKIidType) error {
func (c *commImpl) Probe(endpoint string, pkiID common.PKIidType) error {
if c.isStopping() {
return fmt.Errorf("Stopping!")
}
Expand All @@ -274,7 +275,7 @@ func (c *commImpl) Probe(endpoint string, pkiID PKIidType) error {
return err
}

func (c *commImpl) Accept(acceptor util.MessageAcceptor) <-chan ReceivedMessage {
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan ReceivedMessage {
genericChan := c.msgPublisher.AddChannel(acceptor)
specificChan := make(chan ReceivedMessage, 10)

Expand Down Expand Up @@ -311,7 +312,7 @@ func (c *commImpl) Accept(acceptor util.MessageAcceptor) <-chan ReceivedMessage
return specificChan
}

func (c *commImpl) PresumedDead() <-chan PKIidType {
func (c *commImpl) PresumedDead() <-chan common.PKIidType {
return c.deadEndpoints
}

Expand Down Expand Up @@ -351,7 +352,7 @@ func (c *commImpl) Stop() {
c.stopWG.Wait()
}

func (c *commImpl) GetPKIid() PKIidType {
func (c *commImpl) GetPKIid() common.PKIidType {
return c.PKIID
}

Expand All @@ -366,7 +367,7 @@ func extractRemoteAddress(stream stream) string {
return remoteAddress
}

func (c *commImpl) authenticateRemotePeer(stream stream) (PKIidType, error) {
func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
tlsUnique := ExtractTLSUnique(ctx)
Expand Down Expand Up @@ -455,7 +456,7 @@ func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
return &proto.Empty{}, nil
}

func (c *commImpl) disconnect(pkiID PKIidType) {
func (c *commImpl) disconnect(pkiID common.PKIidType) {
if c.isStopping() {
return
}
Expand Down Expand Up @@ -485,7 +486,7 @@ func readWithTimeout(stream interface{}, timeout time.Duration) *proto.GossipMes
}
}

func createConnectionMsg(pkiID PKIidType, sig []byte) *proto.GossipMessage {
func createConnectionMsg(pkiID common.PKIidType, sig []byte) *proto.GossipMessage {
return &proto.GossipMessage{
Nonce: 0,
Content: &proto.GossipMessage_Conn{
Expand Down
5 changes: 3 additions & 2 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/hyperledger/fabric/gossip/common"
)

func init() {
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestHandshake(t *testing.T) {
clientTLSUnique := ExtractTLSUnique(stream.Context())
sig, err := naiveSec.Sign(clientTLSUnique)
assert.NoError(t, err, "%v", err)
msg := createConnectionMsg(PKIidType("localhost:9610"), sig)
msg := createConnectionMsg(common.PKIidType("localhost:9610"), sig)
stream.Send(msg)
msg, err = stream.Recv()
assert.NoError(t, err, "%v", err)
Expand Down Expand Up @@ -150,7 +151,7 @@ func TestHandshake(t *testing.T) {
} else {
sig[0] = 0
}
msg = createConnectionMsg(PKIidType("localhost:9612"), sig)
msg = createConnectionMsg(common.PKIidType("localhost:9612"), sig)
stream.Send(msg)
msg, err = stream.Recv()
assert.Equal(t, []byte("localhost:9611"), msg.GetConn().PkiID)
Expand Down
15 changes: 8 additions & 7 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import (
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/util"
"google.golang.org/grpc"
"github.com/hyperledger/fabric/gossip/common"
)

type handler func(*proto.GossipMessage)

type connFactory interface {
createConnection(endpoint string, pkiID PKIidType) (*connection, error)
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
}

type connectionStore struct {
logger *util.Logger // logger
selfPKIid PKIidType // pkiID of this peer
selfPKIid common.PKIidType // pkiID of this peer
isClosing bool // whether this connection store is shutting down
connFactory connFactory // creates a connection to remote peer
sync.RWMutex // synchronize access to shared variables
Expand All @@ -43,7 +44,7 @@ type connectionStore struct {
// used to prevent concurrent connection establishment to the same remote endpoint
}

func newConnStore(connFactory connFactory, pkiID PKIidType, logger *util.Logger) *connectionStore {
func newConnStore(connFactory connFactory, pkiID common.PKIidType, logger *util.Logger) *connectionStore {
return &connectionStore{
connFactory: connFactory,
isClosing: false,
Expand Down Expand Up @@ -154,7 +155,7 @@ func (cs *connectionStore) shutdown() {
wg.Wait()
}

func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID PKIidType) *connection {
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID common.PKIidType) *connection {
cs.Lock()
defer cs.Unlock()

Expand All @@ -165,15 +166,15 @@ func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamSer
return cs.registerConn(pkiID, serverStream)
}

func (cs *connectionStore) registerConn(pkiID PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection {
func (cs *connectionStore) registerConn(pkiID common.PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection {
conn := newConnection(nil, nil, nil, serverStream)
conn.pkiID = pkiID
conn.logger = cs.logger
cs.pki2Conn[string(pkiID)] = conn
return conn
}

func (cs *connectionStore) closeByPKIid(pkiID PKIidType) {
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) {
cs.Lock()
defer cs.Unlock()
if conn, exists := cs.pki2Conn[string(pkiID)]; exists {
Expand All @@ -199,7 +200,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
type connection struct {
outBuff chan *msgSending
logger *util.Logger // logger
pkiID PKIidType // pkiID of the remote endpoint
pkiID common.PKIidType // pkiID of the remote endpoint
handler handler // function to invoke upon a message reception
conn *grpc.ClientConn // gRPC connection to remote endpoint
cl proto.GossipClient // gRPC stub of remote endpoint
Expand Down
6 changes: 3 additions & 3 deletions gossip/comm/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/gossip/common"
)

// ChannelDeMultiplexer is a struct that can receive channel registrations (AddChannel)
Expand All @@ -42,7 +42,7 @@ func NewChannelDemultiplexer() *ChannelDeMultiplexer {
}

type channel struct {
pred util.MessageAcceptor
pred common.MessageAcceptor
ch chan interface{}
}

Expand All @@ -66,7 +66,7 @@ func (m *ChannelDeMultiplexer) Close() {
}

// AddChannel registers a channel with a certain predicate
func (m *ChannelDeMultiplexer) AddChannel(predicate util.MessageAcceptor) chan interface{} {
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} {
m.lock.Lock()
defer m.lock.Unlock()
ch := &channel{ch: make(chan interface{}, 10), pred: predicate}
Expand Down
23 changes: 23 additions & 0 deletions gossip/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

// PKIidType defines the type that holds the PKI-id
// which is the security identifier of a peer
type PKIidType []byte

type MessageAcceptor func(interface{}) bool
12 changes: 6 additions & 6 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package discovery

import "github.com/hyperledger/fabric/gossip/proto"
import (
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/common"
)

// CryptoService is an interface that the discovery expects to be implemented and passed on creation
type CryptoService interface {
Expand All @@ -43,20 +46,17 @@ type CommService interface {
Accept() <-chan *proto.GossipMessage

// PresumedDead returns a read-only channel for peers that are presumed to be dead
PresumedDead() <-chan PKIidType
PresumedDead() <-chan common.PKIidType

// CloseConn orders to close the connection with a certain peer
CloseConn(peer *NetworkMember)
}

// PKIidType represents a peer's security identity
type PKIidType []byte

// NetworkMember is a peer's representation
type NetworkMember struct {
Endpoint string
Metadata []byte
PKIid PKIidType
PKIid common.PKIidType
}

// Discovery is the interface that represents a discovery module
Expand Down

0 comments on commit 7703c81

Please sign in to comment.