Skip to content

Commit

Permalink
Merge pull request #121867 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-121817

release-24.1: changefeedccl: fix memory leak in deprecatedGcpPubsubClient
  • Loading branch information
wenyihu6 committed Apr 11, 2024
2 parents 4d746a2 + d71a928 commit 8a6a600
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
14 changes: 11 additions & 3 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const numOfWorkers = 128

type deprecatedPubsubClient interface {
init() error
closeTopics()
close() error
flushTopics()
sendMessage(content []byte, topic string, key string) error
sendMessageToAllTopics(content []byte) error
Expand Down Expand Up @@ -278,7 +278,6 @@ func (p *deprecatedPubsubSink) flush(ctx context.Context) error {

// Close closes all the channels and shutdowns the topic
func (p *deprecatedPubsubSink) Close() error {
p.client.closeTopics()
p.exitWorkers()
_ = p.workerGroup.Wait()
if p.errChan != nil {
Expand All @@ -292,6 +291,9 @@ func (p *deprecatedPubsubSink) Close() error {
close(p.eventsChans[i])
}
}
if err := p.client.close(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -496,11 +498,17 @@ func (p *deprecatedGcpPubsubClient) openTopic(topicName string) (*pubsub.Topic,
return t, nil
}

func (p *deprecatedGcpPubsubClient) closeTopics() {
func (p *deprecatedGcpPubsubClient) close() error {
_ = p.forEachTopic(func(_ string, t *pubsub.Topic) error {
t.Stop()
return nil
})
if p.client != nil {
// Close the client to release resources held by the client to avoid memory
// leaks.
return p.client.Close()
}
return nil
}

// sendMessage sends a message to the topic
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2248,7 +2248,8 @@ func (p *deprecatedFakePubsubClient) init() error {
return nil
}

func (p *deprecatedFakePubsubClient) closeTopics() {
func (p *deprecatedFakePubsubClient) close() error {
return nil
}

// sendMessage sends a message to the topic
Expand Down

0 comments on commit 8a6a600

Please sign in to comment.