Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak when using metrics #1321

Closed
thomas-mangin opened this issue Mar 22, 2019 · 14 comments
Closed

Memory leak when using metrics #1321

thomas-mangin opened this issue Mar 22, 2019 · 14 comments
Labels
maintenance stale Issues and pull requests without any recent activity

Comments

@thomas-mangin
Copy link

thomas-mangin commented Mar 22, 2019

Versions

Sarama Version: 1.20.1 release
Kafka Version: N/A
Go Version: 1.12.1

using Sarama commit 03a43f9

Problem Description

Using Sarama to test the "liveliness" of a Kafka server, and calling NewSyncProducer in a new goroutine every second, it was noted that the application was slowly leaking memory.

Looking at the memory profiler, the memory was allocated by "go-metrics". The go-metrics commit used was 3113b8401b8a98917cde58f8bbd42a1b1c03b1fd

This issue was discussed in #897 and marked as resolved but it may not be the case.

The issue was noted when:

  • using nil as configuration with NewSyncProducer
  • using a NewConfig() when calling NewSyncProducer

The code which triggered the issue looked like

	var endpoints []string
       ...
	producer, err := sarama.NewSyncProducer(endpoints, nil)
	if err != nil {
           ....
	}
	defer producer.Close()

	message := &sarama.ProducerMessage{
		Topic: "the-topic",
		Value: sarama.StringEncoder(...),
	}

	_, _, err = producer.SendMessage(message)
       ...

A work around was to reuse Sarama configuration objects when re-connecting to the same destination (I only noticed the option to disable metrics when looking to report this issue).

@xiezhenye
Copy link

I meet the same problem.

@varun06
Copy link
Contributor

varun06 commented May 3, 2019

@bai There is a real issue with go-metrics and Sarama use of it. My old colleagues also reported the same issue. What are our options here?

@bai
Copy link
Contributor

bai commented May 4, 2019

If you'd ask me, I don't find Sarama's metrics useful and have no production workloads that rely on it in any way, preferring prometheus or custom integrations instead. Also, go-metrics seems abandoned (or on life support, depending on what your threshold of "abandoned" is) with last commit made in October last year.

In fact, it seems that you already raised this issue a month ago. Do you happen to have any data on usage of Sarama's metrics?
I'd better not provide functionality at all, rather than provide something that doesn't work for a majority of users and impacts production workloads.

@varun06
Copy link
Contributor

varun06 commented May 5, 2019

Yeah, I also think that we should announce and remove this functionality. It is maintenance nightmare and not very good. How do we do that @bai?

As far as usage, I was trying to get some broker/consumer related metrics from sarama, but they are not good at all. You get an amalgamation bunch metric type and then you have to turn them into their particular metric type(count, gauge etc) on call site. So more unnecessary complexity.

@eapache
Copy link
Contributor

eapache commented May 6, 2019

I have historically managed to completely avoid breaking changes to the API (thus the fact we're still on v1 with semantic versioning). If this is worth ripping out, then perhaps it's finally time for a v2, and we can roll in a bunch of the things I'd collected in https://github.com/Shopify/sarama/wiki/Ideas-that-will-break-backwards-compatibility at the same time?

@varun06
Copy link
Contributor

varun06 commented May 6, 2019

I agree.

@nicolaenicora
Copy link

nicolaenicora commented May 7, 2019

Will be nice if will be a way to disable the metrics from sarama configuration.

Meanwhile resolved this problem by writing mine metrics registry. Because default one is keeping all metrics in memory.
When you create a Sarama configuration it is used by default:
c.MetricRegistry = metrics.NewRegistry() under which is used StandardRegistry.

Solution is to have a empty one:

var (
	EMPTY = make(map[string]map[string]interface{})
)

type EmptyRegistry struct {
}

func NewEmptyRegistry() metrics.Registry {
	return &EmptyRegistry{}
}
func (r *EmptyRegistry) Each(f func(string, interface{})) {
}
func (r *EmptyRegistry) Get(name string) interface{} {
	return metrics.NilMeter{}
}
func (r *EmptyRegistry) GetOrRegister(name string, i interface{}) interface{} {
	str := fmt.Sprintf("%s", i)
	if strings.Contains(str, "metrics.Meter") {
		return metrics.NilMeter{}
	}
	if strings.Contains(str, "metrics.Histogram") {
		return metrics.NilHistogram{}
	}
	if strings.Contains(str, "metrics.Counter") {
		return metrics.NilCounter{}
	}
	if strings.Contains(str, "metrics.Gauge") {
		return metrics.NilGauge{}
	}
	if strings.Contains(str, "metrics.GaugeFloat64") {
		return metrics.NilGaugeFloat64{}
	}
	if strings.Contains(str, "metrics.EWMA") {
		return metrics.NilEWMA{}
	}
	if strings.Contains(str, "metrics.Healthcheck") {
		return metrics.NilHealthcheck{}
	}
	return metrics.NilMeter{}
}
func (r *EmptyRegistry) Register(name string, i interface{}) error {
	return nil
}
func (r *EmptyRegistry) RunHealthchecks() {
}

@luzhuqun
Copy link

luzhuqun commented Jun 18, 2019

I met this trouble too. I am using the latest version package.

1

my code :
`func StartTickDeal() {
defer func() {
if perr := recover(); perr != nil {
log.Errorf("startTickDeal panic, errinfo:%v", perr)
}
go StartTickDeal()
}()
broker := strings.Join(config.DbConfig.KafkaSet.Host, ",")
topic := config.Conf.Messagekafka.Topic
snsTopic := config.Conf.Messagekafka.SnsTopic
for range time.Tick(2 * time.Second) {
go consumer(broker, topic)
}
}

func consumer(broker, topic string) {
var wg sync.WaitGroup
c, err := sarama.NewConsumer(strings.Split(broker, ","), nil)
if err != nil {
log.Errorf("Failed to start consumer: %s", err)
return
}
defer c.Close()
closeCh := make(chan int, 2)
go func() {
time.Sleep(2 * time.Second)
close(closeCh)
}()
partitionList, err := c.Partitions(topic)
if err != nil {
log.Errorf("Failed to get the list of partitions: %v", err)
return
}
for _, v := range partitionList {
pc, err := c.ConsumePartition(topic, v, sarama.OffsetNewest)
if err != nil {
log.Errorf("Failed to start consumer for partition %d: %s\n", v, err)
continue
}
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
defer pc.AsyncClose()
var i int
for {
select {
case msg := <-pc.Messages():
i++
log.Tracef("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
deal(msg.Value)
if i > 1000 {
closeCh <- 1
}
case <-closeCh:
return
}
}
}(pc)
}
wg.Wait()
}`

@varun06
Copy link
Contributor

varun06 commented Jun 19, 2019

@ToughK Are you also seeing the issue with using metrics from Sarama or without it?

@ToughK
Copy link

ToughK commented Jun 19, 2019

@ToughK Are you also seeing the issue with using metrics from Sarama or without it?

Using metrics from Sarama. There is no memory leak problem when not using metrics.

@varun06
Copy link
Contributor

varun06 commented Jun 19, 2019

Yeah, it's a known problem, it's because the library used to expose these metrics keeps the metrics in memory.

@d1egoaz d1egoaz changed the title Memory leak Memory leak when using metrics Aug 22, 2019
@d1egoaz
Copy link
Contributor

d1egoaz commented Aug 22, 2019

wondering if we could introduce something like this: #1321 (comment) without breaking the API, and tell people the feature was removed?

@slaunay
Copy link
Contributor

slaunay commented Oct 29, 2019

If you do not care about metrics, you can disable them as documented here (this was referenced by @thomas-mangin but probably not that easy to find):
https://github.com/Shopify/sarama/blob/65e4c5f9905926359079cd336942cf8c23fc791e/config.go#L385-L390

If you do care about metrics (probaby does not make much sense if AsyncProducer are short lived) then you can reuse the configuration:

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true // Required for SyncProducer
producer, err := sarama.NewSyncProducer(endpoints, saramaConfig)
defer producer.Close()
...
producer, err := sarama.NewSyncProducer(endpoints, saramaConfig)
defer producer.Close()

Or you can unregister the "cluster" metrics once you are done with, the "broker" metrics are unregistered on producer.Close() with:

defer producer.Close()
defer saramaConfig.MetricRegistry.UnregisterAll()

To be precise only the "cluster" meters (e.g. outgoing-byte-rate) are not eventually garbage collected because they are referenced by a global meterArbiter Goroutine started by go-metrics that never stops and we never call Unregister()/Stop() on those 😢.

That being said reusing an AsyncProducer vs creating one for each message or every 1 second (short lived) is a much more efficient approach on your client and on your brokers (similar to opening the connection to a DBMS for each query).

@ghost
Copy link

ghost commented Feb 21, 2020

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
maintenance stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests

10 participants