Skip to content

Commit

Permalink
FAB-1016 Gossip comm layer send buffering
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1016
FAB-1016
Added send buffering to gossip comm layer.
Now each send simply queues a message and a dedicated goroutine
dispatches the sending from the queue for each remote peer.

Previously, all sends were queued by spawning a goroutine that handles
the send, and that didn't preserve FIFO order between peers.

Change-Id: Ia34616324e28c81920ad0c31a231487ac03ae9c0
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Nov 7, 2016
1 parent 48a117a commit e7e93aa
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 33 deletions.
22 changes: 10 additions & 12 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ import (
const (
defDialTimeout = time.Second * time.Duration(3)
defConnTimeout = time.Second * time.Duration(2)
defRecvBuffSize = 100
defRecvBuffSize = 20
defSendBuffSize = 20
sendOverflowErr = "Send buffer overflow"
)

var errSendOverflow = fmt.Errorf(sendOverflowErr)
var dialTimeout = defDialTimeout

func init() {
Expand Down Expand Up @@ -195,7 +198,6 @@ func (c *commImpl) Send(msg *proto.GossipMessage, peers ...*RemotePeer) {
c.logger.Info("Entering, sending", msg, "to ", len(peers), "peers")

for _, peer := range peers {
// TODO: create outgoing buffers and flow control per connection
go func(peer *RemotePeer, msg *proto.GossipMessage) {
c.sendToEndpoint(peer, msg)
}(peer, msg)
Expand Down Expand Up @@ -224,29 +226,25 @@ func (c *commImpl) isPKIblackListed(p PKIidType) bool {
return false
}

func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.GossipMessage) error {
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.GossipMessage) {
if c.isStopping() {
return nil
return
}
c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
defer c.logger.Debug("Exiting")
var err error

conn, err := c.connStore.getConnection(peer)
if err == nil {
t1 := time.Now()
err = conn.send(msg)
if err != nil {
disConnectOnErr := func(err error) {
c.logger.Warning(peer, "isn't responsive:", err)
c.disconnect(peer.PKIID)
return err
}
c.logger.Debug("Send took", time.Since(t1))
return nil
conn.send(msg, disConnectOnErr)
return
}
c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
c.disconnect(peer.PKIID)
return err
}

func (c *commImpl) isStopping() bool {
Expand Down Expand Up @@ -450,7 +448,7 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
c.connStore.closeByPKIid(PKIID)
}()

return conn.serviceInput()
return conn.serviceConnection()
}

func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
Expand Down
14 changes: 9 additions & 5 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ func TestHandshake(t *testing.T) {
m := <-comm1.Accept(acceptAll)
rcvChan <- m.GetGossipMessage()
}()
time.Sleep(time.Second)
go stream.Send(msg2Send)
time.Sleep(time.Second)
assert.Equal(t, 1, len(rcvChan))
var receivedMsg *proto.GossipMessage
select {
case receivedMsg = <-rcvChan:
break
case <- time.NewTicker(time.Duration(time.Second * 2)).C:
case <-time.NewTicker(time.Duration(time.Second * 2)).C:
assert.Fail(t, "Timed out waiting for received message")
break
}
Expand Down Expand Up @@ -255,7 +256,7 @@ func TestParallelSend(t *testing.T) {
defer comm1.Stop()
defer comm2.Stop()

messages2Send := 100
messages2Send := 20

wg := sync.WaitGroup{}
go func() {
Expand All @@ -272,13 +273,16 @@ func TestParallelSend(t *testing.T) {

c := 0
waiting := true
ticker := time.NewTicker(time.Duration(1) * time.Second)
ticker := time.NewTicker(time.Duration(5) * time.Second)
ch := comm2.Accept(acceptAll)
for waiting {
select {
case <-ch:
c++
continue
if c == messages2Send {
waiting = false
}
break
case <-ticker.C:
waiting = false
break
Expand Down Expand Up @@ -359,7 +363,7 @@ func TestAccept(t *testing.T) {
comm2.Send(createGossipMsg(), &RemotePeer{Endpoint: "localhost:7611", PKIID: []byte("localhost:7611")})
}

time.Sleep(time.Duration(1) * time.Second)
time.Sleep(time.Duration(5) * time.Second)

comm1.Stop()
comm2.Stop()
Expand Down
49 changes: 37 additions & 12 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn

go conn.serviceInput()
go conn.serviceConnection()

return conn, nil
}
Expand Down Expand Up @@ -184,6 +184,7 @@ func (cs *connectionStore) closeByPKIid(pkiID PKIidType) {

func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection {
connection := &connection{
outBuff: make(chan *msgSending, defSendBuffSize),
cl: cl,
conn: c,
clientStream: cs,
Expand All @@ -196,6 +197,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
}

type connection struct {
outBuff chan *msgSending
logger *util.Logger // logger
pkiID PKIidType // pkiID of the remote endpoint
handler handler // function to invoke upon a message reception
Expand Down Expand Up @@ -237,26 +239,24 @@ func (conn *connection) toDie() bool {
return atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
}

func (conn *connection) send(msg *proto.GossipMessage) error {
func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {
conn.Lock()
defer conn.Unlock()

if conn.toDie() {
return fmt.Errorf("Connection aborted")
}

if conn.clientStream != nil {
return conn.clientStream.Send(msg)
if len(conn.outBuff) == defSendBuffSize {
go onErr(errSendOverflow)
return
}

if conn.serverStream != nil {
return conn.serverStream.Send(msg)
m := &msgSending{
msg: msg,
onErr: onErr,
}

return fmt.Errorf("Both streams are nil")
conn.outBuff <- m
}

func (conn *connection) serviceInput() error {
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.GossipMessage, defRecvBuffSize)
defer close(msgChan)
Expand All @@ -268,6 +268,8 @@ func (conn *connection) serviceInput() error {
// readFromStream() method
go conn.readFromStream(errChan, msgChan)

go conn.writeToStream()

for !conn.toDie() {
select {
case stop := <-conn.stopChan:
Expand All @@ -283,6 +285,29 @@ func (conn *connection) serviceInput() error {
return nil
}

func (conn *connection) writeToStream() {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
return
}
select {
case m := <-conn.outBuff:
err := stream.Send(m.msg)
if err != nil {
go m.onErr(err)
return
}
break
case stop := <-conn.stopChan:
conn.logger.Warning("Closing writing to stream")
conn.stopChan <- stop
return
}
}
}

func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.GossipMessage) {
defer func() {
recover()
Expand Down
6 changes: 3 additions & 3 deletions gossip/comm/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func generateCertificates(privKeyFile string, certKeyFile string) error {

sn, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
template := x509.Certificate{
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
SerialNumber: sn,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
SerialNumber: sn,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}
rawBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ReceivedMessageImpl struct {

// Respond sends a msg to the source that sent the ReceivedMessageImpl
func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
m.conn.send(msg)
m.conn.send(msg, func(e error) {})
}

// GetGossipMessage returns the inner GossipMessage
Expand Down

0 comments on commit e7e93aa

Please sign in to comment.