-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Describe the bug
In case of PartitionedProducerImpl the getStats() method modifies the state of the internal "stats" object
public synchronized ProducerStatsRecorderImpl getStats() {
if (stats == null) {
return null;
}
stats.reset();
for (int i = 0; i < topicMetadata.numPartitions(); i++) {
stats.updateCumulativeStats(producers.get(i).getStats());
}
return stats;
}
The fact that the method is synchronized does not save the user from having corrupted stats, because the getStats() returns a reference to the internal stats object.
So if you call getStats() and the call some methods on the returned object from two different threads it can happen that the first thread sees corrupted values, because the second thread calls reset in the meantime.
ProducerStats stats = producer.getStats();
// accessing an object that is mutated from a different thread
long totalMsgsSent = stats.getTotalMsgsSent();
This behaviour prevents you from collecting the stats using an external system that does not provide a documented and consistent behaviour about concurrency.
For instance I would like to create "Gauges" and attach them to each of the exposed metrics, but it is actually not possible.
I can do it with this kind of code, by it is likely to break if we change the implementation of the PulsarProducer
Gauge gauge = () -> {
synchronized(producer) {
ProducerStats stats = producer.getStats();
long totalMsgsSent = stats.getTotalMsgsSent();
return totalMsgsSent;
}}
- Suggestions for improvement
Create a new instance of ProducerStats in PartitionedProducerImpl instead of recycling the existing object