Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix memory leak issue in publish scheduler #4282

merged 6 commits into from Jun 21, 2021


Copy link

@hongalex hongalex commented Jun 17, 2021

This PR builds off the work completed by @mzanibelli in #4253. A detailed description was written there, but essentially we were instantiating outstanding messages to be 2 instead of 1, leading bundles to never be freed, which leads to memory leaks when using large number of ordering keys. This switches the underlying implementation to a sync.Map to properly synchronize access to the bundlers and outstanding maps, allowing the user to safely call Flush and FlushAndStop without race conditions.

Running benchmarks against both the unordered and ordered publish benchmarks reveals no performance degradation. Below are results from the ordered case, publishing 100k 10kb messages, using different ordering keys.

prev(using built-in maps)

BenchmarkPublishThroughput-12    	       1	68024964685 ns/op	  14.70 MB/s	5010368568 B/op	35265687 allocs/o
BenchmarkPublishThroughput-12    	       1	68054813422 ns/op	  14.69 MB/s	5002350416 B/op	35229700 allocs/op

now(using sync.Map)

BenchmarkPublishThroughput-12    	       1	68124099897 ns/op	  14.68 MB/s	5023081976 B/op	36219292 allocs/op
BenchmarkPublishThroughput-12    	       1	68060299607 ns/op	  14.69 MB/s	5027381288 B/op	36268891 allocs/op

In addition, a test was added to topic_test to verify the correct behavior of calling PublishScheduler.Add while a Flush is in flight.

This also fixes a small issue when using the pstest fake's AddPublishResponse method which allows you to call Publish before calling AddPublishResponse by removing the lock acquisition.

Fixes #4288

if !ok {
s.outstanding[key] = 1
s.outstanding.Store(key, 1)
b = bundler.NewBundler(item, func(bundle interface{}) {
s.workers <- struct{}{}

nlen := reflect.ValueOf(bundle).Len()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that's it! You left the lock between Add() and handle() so you don't load non-existing keys from the map inside the else clause. This gave me a hard time and I thought I was doing something wrong when I tried this.


@hongalex hongalex merged commit 22ffc18 into googleapis:master Jun 21, 2021
4 checks passed
@hongalex hongalex deleted the pubsub-publish-leak branch Jun 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
Linked issues

Successfully merging this pull request may close these issues.

3 participants