Skip to content

Commit

Permalink
changefeedccl: fix memory leak in deprecatedGcpPubsubClient
Browse files Browse the repository at this point in the history
Previously, `deprecatedGcpPubsubClient` did not close `pubsub.Client` properly
which could lead to memory leaks. This patch fixes the problem by invoking
`pubsub.Client.Close()` during `deprecatedGcpPubsubClient.Close()`.

Epic: none
Release note: Fixed a slow memory leak in deprecated pubsub changefeeds which
can accumulate when restarting/cancelling many deprecated pubsub changefeeds.
The bug had been there since the deprecated pubsub was introduced in 22.1
(beta).
  • Loading branch information
wenyihu6 committed Apr 5, 2024
1 parent ab217f1 commit aca5a10
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
14 changes: 11 additions & 3 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const numOfWorkers = 128

type deprecatedPubsubClient interface {
init() error
closeTopics()
close() error
flushTopics()
sendMessage(content []byte, topic string, key string) error
sendMessageToAllTopics(content []byte) error
Expand Down Expand Up @@ -278,7 +278,6 @@ func (p *deprecatedPubsubSink) flush(ctx context.Context) error {

// Close closes all the channels and shutdowns the topic
func (p *deprecatedPubsubSink) Close() error {
p.client.closeTopics()
p.exitWorkers()
_ = p.workerGroup.Wait()
if p.errChan != nil {
Expand All @@ -292,6 +291,9 @@ func (p *deprecatedPubsubSink) Close() error {
close(p.eventsChans[i])
}
}
if err := p.client.close(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -496,11 +498,17 @@ func (p *deprecatedGcpPubsubClient) openTopic(topicName string) (*pubsub.Topic,
return t, nil
}

func (p *deprecatedGcpPubsubClient) closeTopics() {
func (p *deprecatedGcpPubsubClient) close() error {
_ = p.forEachTopic(func(_ string, t *pubsub.Topic) error {
t.Stop()
return nil
})
if p.client != nil {
// Close the client to release resources held by the client to avoid memory
// leaks.
return p.client.Close()
}
return nil
}

// sendMessage sends a message to the topic
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2248,7 +2248,8 @@ func (p *deprecatedFakePubsubClient) init() error {
return nil
}

func (p *deprecatedFakePubsubClient) closeTopics() {
func (p *deprecatedFakePubsubClient) close() error {
return nil
}

// sendMessage sends a message to the topic
Expand Down

0 comments on commit aca5a10

Please sign in to comment.