From 43544a139eb666dc62f97109ab9a1e39e92f54a7 Mon Sep 17 00:00:00 2001 From: Kevin Parsons Date: Fri, 10 May 2024 21:42:50 -0700 Subject: [PATCH] client: Fix deadlock when writing to pipe blocks Swap to calling send, which handles taking sendLock for us rather than doing it directly in createStream. This means that streamLock is now released before taking sendLock. Taking sendLock before releasing streamLock means that if a goroutine blocks writing to the pipe, it can make another goroutine get stuck trying to take sendLock, and therefore streamLock will be kept locked as well. This can lead to the receiver goroutine no longer being able to read responses from the pipe, since it needs to take streamLock when processing a response. This ultimately leads to a complete deadlock of the client. It is reasonable for a server to block writes to the pipe if the client is not reading responses fast enough. So we can't expect writes to never block. I have repro'd the hang with a simple ttrpc client and server. The client spins up 100 goroutines that spam the server with requests constantly. After a few seconds of running I can see it hang. I have set the buffer size for the pipe to 0 to more easily repro, but it would still be possible to hit with a larger buffer size (just may take a higher volume of requests or larger payloads). I also validated that I no longer see the hang with this fix, by leaving the test client/server running for a few minutes. Obviously not 100% conclusive, but before I could get a hang within several seconds of running. Signed-off-by: Kevin Parsons --- client.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index 482a68e3d..532476684 100644 --- a/client.go +++ b/client.go @@ -386,25 +386,36 @@ func (c *Client) receiveLoop() error { // createStream creates a new stream and registers it with the client // Introduce stream types for multiple or single response func (c *Client) createStream(flags uint8, b []byte) (*stream, error) { - c.streamLock.Lock() + c.sendLock.Lock() + defer c.sendLock.Unlock() // Check if closed since lock acquired to prevent adding // anything after cleanup completes select { case <-c.ctx.Done(): - c.streamLock.Unlock() return nil, ErrClosed default: } - // Stream ID should be allocated at same time - s := newStream(c.nextStreamID, c) - c.streams[s.id] = s - c.nextStreamID = c.nextStreamID + 2 + var s *stream + if err := func() error { + c.streamLock.Lock() + defer c.streamLock.Unlock() - c.sendLock.Lock() - defer c.sendLock.Unlock() - c.streamLock.Unlock() + select { + case <-c.ctx.Done(): + return ErrClosed + default: + } + + s = newStream(c.nextStreamID, c) + c.streams[s.id] = s + c.nextStreamID = c.nextStreamID + 2 + + return nil + }(); err != nil { + return nil, err + } if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil { return s, filterCloseErr(err)