Skip to content

Commit

Permalink
Merge "[FAB-14265] de-vipererize gossip comm"
Browse files Browse the repository at this point in the history
  • Loading branch information
yacovm authored and Gerrit Code Review committed Feb 25, 2019
2 parents 30c781f + 7c874de commit f82f74a
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 96 deletions.
57 changes: 37 additions & 20 deletions gossip/comm/comm_impl.go
Expand Up @@ -24,17 +24,16 @@ import (
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/pkg/errors"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)

const (
handshakeTimeout = time.Second * time.Duration(10)
defDialTimeout = time.Second * time.Duration(3)
defConnTimeout = time.Second * time.Duration(2)
defRecvBuffSize = 20
defSendBuffSize = 20
handshakeTimeout = time.Second * 10
DefDialTimeout = time.Second * 3
DefConnTimeout = time.Second * 2
DefRecvBuffSize = 20
DefSendBuffSize = 20
)

// SecurityAdvisor defines an external auxiliary object
Expand All @@ -44,11 +43,6 @@ type SecurityAdvisor interface {
OrgByPeerIdentity(api.PeerIdentityType) api.OrgIdentityType
}

// SetDialTimeout sets the dial timeout
func SetDialTimeout(timeout time.Duration) {
viper.Set("peer.gossip.dialTimeout", timeout)
}

func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
if len(opts) == 0 {
c.logger.Warning("Given an empty set of grpc.DialOption, aborting")
Expand All @@ -60,7 +54,7 @@ func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper,
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts, sa api.SecurityAdvisor,
commMetrics *metrics.CommMetrics, dialOpts ...grpc.DialOption) (Comm, error) {
commMetrics *metrics.CommMetrics, config CommConfig, dialOpts ...grpc.DialOption) (Comm, error) {

commInst := &commImpl{
sa: sa,
Expand All @@ -77,18 +71,34 @@ func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore iden
stopping: int32(0),
exitChan: make(chan struct{}),
subscriptions: make([]chan proto.ReceivedMessage, 0),
dialTimeout: util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout),
tlsCerts: certs,
metrics: commMetrics,
dialTimeout: config.DialTimeout,
connTimeout: config.ConnTimeout,
recvBuffSize: config.RecvBuffSize,
sendBuffSize: config.SendBuffSize,
}

commInst.connStore = newConnStore(commInst, commInst.logger)
connConfig := ConnConfig{
RecvBuffSize: config.RecvBuffSize,
SendBuffSize: config.SendBuffSize,
}

commInst.connStore = newConnStore(commInst, commInst.logger, connConfig)

proto.RegisterGossipServer(s, commInst)

return commInst, nil
}

// CommConfig is the configuration required to initialize a new comm
type CommConfig struct {
DialTimeout time.Duration // Dial timeout
ConnTimeout time.Duration // Connection timeout
RecvBuffSize int // Buffer size of received messages
SendBuffSize int // Buffer size of sending messages
}

type commImpl struct {
sa api.SecurityAdvisor
tlsCerts *common.TLSCertificates
Expand All @@ -107,8 +117,11 @@ type commImpl struct {
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
stopping int32
dialTimeout time.Duration
metrics *metrics.CommMetrics
dialTimeout time.Duration
connTimeout time.Duration
recvBuffSize int
sendBuffSize int
}

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
Expand Down Expand Up @@ -138,7 +151,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT

cl := proto.NewGossipClient(cc)

ctx, cancel = context.WithTimeout(context.Background(), defConnTimeout)
ctx, cancel = context.WithTimeout(context.Background(), DefConnTimeout)
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
cc.Close()
Expand All @@ -164,7 +177,11 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
return nil, errors.New("authentication failure")
}
}
conn := newConnection(cl, cc, stream, nil, c.metrics)
connConfig := ConnConfig{
RecvBuffSize: c.recvBuffSize,
SendBuffSize: c.sendBuffSize,
}
conn := newConnection(cl, cc, stream, nil, c.metrics, connConfig)
conn.pkiID = pkiID
conn.info = connInfo
conn.logger = c.logger
Expand Down Expand Up @@ -248,7 +265,7 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {
}
defer cc.Close()
cl := proto.NewGossipClient(cc)
ctx, cancel = context.WithTimeout(context.Background(), defConnTimeout)
ctx, cancel = context.WithTimeout(context.Background(), DefConnTimeout)
defer cancel()
_, err = cl.Ping(ctx, &proto.Empty{})
c.logger.Debugf("Returning %v", err)
Expand All @@ -270,7 +287,7 @@ func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, erro
defer cc.Close()

cl := proto.NewGossipClient(cc)
ctx, cancel = context.WithTimeout(context.Background(), defConnTimeout)
ctx, cancel = context.WithTimeout(context.Background(), DefConnTimeout)
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
return nil, err
Expand Down Expand Up @@ -411,7 +428,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream, initiator bool) (*proto

c.logger.Debug("Sending", cMsg, "to", remoteAddress)
stream.Send(cMsg.Envelope)
m, err := readWithTimeout(stream, util.GetDurationOrDefault("peer.gossip.connTimeout", defConnTimeout), remoteAddress)
m, err := readWithTimeout(stream, c.connTimeout, remoteAddress)
if err != nil {
c.logger.Warningf("Failed reading messge from %s, reason: %v", remoteAddress, err)
return nil, err
Expand Down
49 changes: 14 additions & 35 deletions gossip/comm/comm_test.go
Expand Up @@ -15,9 +15,7 @@ import (
"fmt"
"math/rand"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -26,15 +24,13 @@ import (
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/mocks"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
Expand All @@ -48,6 +44,13 @@ func init() {
naiveSec.On("OrgByPeerIdentity", mock.Anything).Return(api.OrgIdentityType{})
}

var testCommConfig = CommConfig{
DialTimeout: 300 * time.Millisecond,
ConnTimeout: DefConnTimeout,
RecvBuffSize: DefRecvBuffSize,
SendBuffSize: DefSendBuffSize,
}

func acceptAll(msg interface{}) bool {
return true
}
Expand Down Expand Up @@ -128,7 +131,7 @@ func newCommInstanceOnlyWithMetrics(t *testing.T, commMetrics *metrics.CommMetri
identityMapper := identity.NewIdentityMapper(sec, id, noopPurgeIdentity, sec)

commInst, err := NewCommInstance(gRPCServer.Server(), certs, identityMapper, id, secureDialOpts,
sec, commMetrics, dialOpts...)
sec, commMetrics, testCommConfig, dialOpts...)
assert.NoError(t, err)

go func() {
Expand Down Expand Up @@ -234,23 +237,6 @@ func handshaker(port int, endpoint string, comm Comm, t *testing.T, connMutator
return acceptChan
}

func TestViperConfig(t *testing.T) {
viper.SetConfigName("core")
viper.SetEnvPrefix("CORE")
configtest.AddDevConfigPath(nil)
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
err := viper.ReadInConfig()
if err != nil { // Handle errors reading the config file
panic(fmt.Errorf("fatal error config file: %s", err))
}

assert.Equal(t, time.Duration(2)*time.Second, util.GetDurationOrDefault("peer.gossip.connTimeout", 0))
assert.Equal(t, time.Duration(300)*time.Millisecond, util.GetDurationOrDefault("peer.gossip.dialTimeout", 0))
assert.Equal(t, 20, util.GetIntOrDefault("peer.gossip.recvBuffSize", 0))
assert.Equal(t, 200, util.GetIntOrDefault("peer.gossip.sendBuffSize", 0))
}

func TestMutualParallelSendWithAck(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -332,7 +318,7 @@ func TestHandshake(t *testing.T) {
idMapper := identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity, naiveSec)
inst, err := NewCommInstance(s, nil, idMapper, api.PeerIdentityType(endpoint), func() []grpc.DialOption {
return []grpc.DialOption{grpc.WithInsecure()}
}, naiveSec, disabledMetrics)
}, naiveSec, disabledMetrics, testCommConfig)
go s.Serve(ll)
assert.NoError(t, err)
var msg proto.ReceivedMessage
Expand Down Expand Up @@ -623,7 +609,7 @@ func TestCloseConn(t *testing.T) {
Data: make([]byte, 1024*1024),
}
msg2Send.NoopSign()
for i := 0; i < defRecvBuffSize; i++ {
for i := 0; i < DefRecvBuffSize; i++ {
err := stream.Send(msg2Send.Envelope)
if err != nil {
gotErr = true
Expand All @@ -640,7 +626,7 @@ func TestParallelSend(t *testing.T) {
defer comm1.Stop()
defer comm2.Stop()

messages2Send := util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize)
messages2Send := DefRecvBuffSize

wg := sync.WaitGroup{}
wg.Add(messages2Send)
Expand Down Expand Up @@ -780,7 +766,7 @@ func TestAccept(t *testing.T) {
var evenResults []uint64
var oddResults []uint64

out := make(chan uint64, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
out := make(chan uint64, DefRecvBuffSize)
sem := make(chan struct{}, 0)

readIntoSlice := func(a *[]uint64, ch <-chan proto.ReceivedMessage) {
Expand All @@ -794,11 +780,11 @@ func TestAccept(t *testing.T) {
go readIntoSlice(&evenResults, evenNONCES)
go readIntoSlice(&oddResults, oddNONCES)

for i := 0; i < util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize); i++ {
for i := 0; i < DefRecvBuffSize; i++ {
comm2.Send(createGossipMsg(), remotePeer(port1))
}

waitForMessages(t, out, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize), "Didn't receive all messages sent")
waitForMessages(t, out, DefRecvBuffSize, "Didn't receive all messages sent")

comm1.Stop()
comm2.Stop()
Expand Down Expand Up @@ -976,13 +962,6 @@ func waitForMessages(t *testing.T, msgChan chan uint64, count int, errMsg string
assert.Equal(t, count, c, errMsg)
}

func TestMain(m *testing.M) {
SetDialTimeout(time.Duration(300) * time.Millisecond)

ret := m.Run()
os.Exit(ret)
}

func TestConcurrentCloseSend(t *testing.T) {
t.Parallel()
var stopping int32
Expand Down
20 changes: 15 additions & 5 deletions gossip/comm/conn.go
Expand Up @@ -35,6 +35,7 @@ type connFactory interface {
}

type connectionStore struct {
config ConnConfig
logger util.Logger // logger
isClosing bool // whether this connection store is shutting down
connFactory connFactory // creates a connection to remote peer
Expand All @@ -44,13 +45,14 @@ type connectionStore struct {
// used to prevent concurrent connection establishment to the same remote endpoint
}

func newConnStore(connFactory connFactory, logger util.Logger) *connectionStore {
func newConnStore(connFactory connFactory, logger util.Logger, config ConnConfig) *connectionStore {
return &connectionStore{
connFactory: connFactory,
isClosing: false,
pki2Conn: make(map[string]*connection),
destinationLocks: make(map[string]*sync.Mutex),
logger: logger,
config: config,
}
}

Expand Down Expand Up @@ -176,7 +178,7 @@ func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamSer

func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo,
serverStream proto.Gossip_GossipStreamServer, metrics *metrics.CommMetrics) *connection {
conn := newConnection(nil, nil, nil, serverStream, metrics)
conn := newConnection(nil, nil, nil, serverStream, metrics, cs.config)
conn.pkiID = connInfo.ID
conn.info = connInfo
conn.logger = cs.logger
Expand All @@ -194,21 +196,29 @@ func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) {
}

func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient,
ss proto.Gossip_GossipStreamServer, metrics *metrics.CommMetrics) *connection {
ss proto.Gossip_GossipStreamServer, metrics *metrics.CommMetrics, config ConnConfig) *connection {
connection := &connection{
metrics: metrics,
outBuff: make(chan *msgSending, util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize)),
outBuff: make(chan *msgSending, config.SendBuffSize),
cl: cl,
conn: c,
clientStream: cs,
serverStream: ss,
stopFlag: int32(0),
stopChan: make(chan struct{}, 1),
recvBuffSize: config.RecvBuffSize,
}
return connection
}

// ConnConfig is the configuration required to initialize a new conn
type ConnConfig struct {
RecvBuffSize int
SendBuffSize int
}

type connection struct {
recvBuffSize int
metrics *metrics.CommMetrics
cancel context.CancelFunc
info *proto.ConnectionInfo
Expand Down Expand Up @@ -283,7 +293,7 @@ func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error),

func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
msgChan := make(chan *proto.SignedGossipMessage, conn.recvBuffSize)
quit := make(chan struct{})
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
Expand Down

0 comments on commit f82f74a

Please sign in to comment.