Skip to content

Commit

Permalink
MB-51275 : Prevent resetting sendMsgBuffer if cpp consumer is still
Browse files Browse the repository at this point in the history
spawning

Change-Id: I3d57e495a342322c0616952ae036d5aae62b10ac
Reviewed-on: https://review.couchbase.org/c/eventing/+/171835
Tested-by: <abhishek.jindal@couchbase.com>
Reviewed-by: <ankit.prabhu@couchbase.com>
Reviewed-by: CI Bot
  • Loading branch information
abhijpes committed Mar 9, 2022
1 parent 59d06d3 commit 26ce763
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions consumer/handle_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
c.connMutex.Lock()
defer c.connMutex.Unlock()

msgSent := false
if !m.sendToDebugger && c.conn != nil {

err := io.ErrShortWrite
Expand All @@ -774,6 +775,7 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
c.killAndRespawn()
return err
}
msgSent = true
} else if c.debugConn != nil {
_, err := c.sendMsgBuffer.WriteTo(c.debugConn)
if err != nil {
Expand All @@ -782,12 +784,15 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
c.debugConn.Close()
return err
}
msgSent = true
}

// Reset the sendMessage buffer and message counter
c.aggMessagesSentCounter += c.sendMsgCounter
c.sendMsgBuffer.Reset()
c.sendMsgCounter = 0
// Reset the sendMessage buffer and message counter only if message has been successfully sent
if msgSent {
c.aggMessagesSentCounter += c.sendMsgCounter
c.sendMsgBuffer.Reset()
c.sendMsgCounter = 0
}
}

return nil
Expand Down

0 comments on commit 26ce763

Please sign in to comment.