Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goroutine leak in DeltaStreamHandler #913

Closed
zhiyanfoo opened this issue Apr 4, 2024 · 0 comments
Closed

Goroutine leak in DeltaStreamHandler #913

zhiyanfoo opened this issue Apr 4, 2024 · 0 comments

Comments

@zhiyanfoo
Copy link
Contributor

zhiyanfoo commented Apr 4, 2024

go-control-plane version: 2259f26

In our control-plane implementation, we are seeing a significant memory leak in specific cases.
Taking a closer look the memory leak can be attributed with a goroutine leak.
Screenshot 2024-04-04 at 2 20 25 PM

Looking at profiles we see that we have far too many goroutines from the DeltaStreamHandler method
(significantly more than active streams).

This is currently occurring when using the delta protocol with ads from an envoy client,
targeting a snapshot cache.

The following goroutine has the potential to leak when s.processDelta returns with an error.

Here is an example of how the goroutine leaks.

  1. Two or more requests are made on the same stream (as in the case when using ADS)
  2. The first request is sent to reqCh and read from the channel
    here.
  3. The goroutine for-loop gets past and is blocked on the request being sent on the (unbuffered) channel.
  4. While processing the first request, an error is returned (e.g. from a callback) and the processDelta method is exited, stopping any processing of the request channel. For example you could return an error here.
  5. The goroutine is still blocked on the channel sending. here. Because processDelta has returned, there will not be a read from reqCh and the goroutine ends up leaking.

A possible fix would be the following

for {
    req, err := str.Recv()
    if err != nil {
            close(reqCh)
            return
    }

    select {
    case reqCh <- req:
    case <-str.Context().Done():
            close(reqCh)
            return
    }
}

this relies on the fact if processDelta does return an error it implies that the stream would be closed, and so we can rely on str.Context().Done() to run in this scenario.

This is similar to what is implemented for the STOW stream handler. (We could also use this implementation instead)

	go func() {
		defer close(reqCh)
		for {
			req, err := stream.Recv()
			if err != nil {
				return
			}
			select {
			case reqCh <- req:
			case <-stream.Context().Done():
				return
			case <-s.ctx.Done():
				return
			}
		}
	}()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant