Skip to content

Commit

Permalink
MB-51340 : Prevent resetting sendMsgBuffer if cpp consumer is still
Browse files Browse the repository at this point in the history
spawning

Change-Id: I4c38affa97ebcb9a40a5fd5f439df9352bdcc06a
Reviewed-on: https://review.couchbase.org/c/eventing/+/172175
Well-Formed: Restriction Checker
Tested-by: abhishekjindal <abhishek.jindal@couchbase.com>
Reviewed-by: CI Bot
Reviewed-by: <ankit.prabhu@couchbase.com>
  • Loading branch information
abhijpes committed Dec 5, 2022
1 parent 497a9da commit 82f3c77
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions consumer/handle_messages.go
Expand Up @@ -712,6 +712,7 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
c.connMutex.Lock()
defer c.connMutex.Unlock()

msgSent := false
if !m.sendToDebugger && c.conn != nil {

err := io.ErrShortWrite
Expand All @@ -737,6 +738,7 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
c.killAndRespawn()
return err
}
msgSent = true
} else if c.debugConn != nil {
_, err := c.sendMsgBuffer.WriteTo(c.debugConn)
if err != nil {
Expand All @@ -745,12 +747,15 @@ func (c *Consumer) sendMessage(m *msgToTransmit) error {
c.debugConn.Close()
return err
}
msgSent = true
}

// Reset the sendMessage buffer and message counter
c.aggMessagesSentCounter += c.sendMsgCounter
c.sendMsgBuffer.Reset()
c.sendMsgCounter = 0
// Reset the sendMessage buffer and message counter only if message has been successfully sent
if msgSent {
c.aggMessagesSentCounter += c.sendMsgCounter
c.sendMsgBuffer.Reset()
c.sendMsgCounter = 0
}
}

return nil
Expand Down

0 comments on commit 82f3c77

Please sign in to comment.