New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2419; Garbage collect unused sensors #233
Conversation
kafka-trunk-git-pr #494 FAILURE |
kafka-trunk-git-pr #495 FAILURE |
A heads-up, this branch doesn't merge cleanly anymore. |
@@ -254,6 +297,13 @@ synchronized void registerMetric(KafkaMetric metric) { | |||
*/ | |||
@Override | |||
public void close() { | |||
this.scheduler.shutdown(); | |||
try { | |||
this.scheduler.awaitTermination(30, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should ideally set these threads as daemon to avoid the possibility of hanging shutdowns in clients/server. This requires associating a thread factory with the executor as we do in the server’s KafkaScheduler
. This will also get rid of the requirement to formally close the metrics object in all the tests (although I think close is a good practice regardless if we are associating a thread pool with metrics).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I'm still closing the metrics instances though.
I've messed up the diff. Let me fix this. |
Fixed the diff. Addressed comments. |
@@ -37,16 +38,20 @@ | |||
private final List<KafkaMetric> metrics; | |||
private final MetricConfig config; | |||
private final Time time; | |||
private volatile long lastRecordTime; | |||
private final long expireInactiveSensorTimeMillis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We normally use Ms
as the suffix for milliseconds, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it seems like inactiveSensorExpirationTimeMs
would read a bit better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to ms. Having said that, I found numerous instances in code where we refer to values as "xxMillis" and the associated config as "Ms". Perhaps we should standardize this in the future and fix all existing instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Yeah, we should standardise indeed. I'm trying to get us to be more consistent when it comes to naming.
73f8d49
to
2693fc9
Compare
// removeSensor also locks the sensor object. This is fine because synchronized is reentrant | ||
// There is however a minor race condition here. Assume we have a parent sensor P and child sensor C. | ||
// Calling record on C would cause a record on P as well. | ||
// So expiration time for P == C1. If the record on P happens via C just after P is removed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expiration time for P == expiration time for C
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
+1 apart from the above minor comments. I think this needs a rebase though. |
Joel - I think I've addressed all your comments. take a look. |
* Override all producer & consumer properties from connector * Fix tests * Fix checktyle issue
Reviewed-By: Ismael Juma <ismael@juma.me.uk>
Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, but nowhere can use. Then I see beyond 800,000 sensors existed in my application, and it is in Old generation ,can't be GC. |
TICKET = N/A LI_DESCRIPTION = The uploadArchivesAll is a root level gradle command, and thus shouldn't be called under the subproject. EXIT_CRITERIA = N/A
As discussed in KAFKA-2419 - I've added a time based sensor retention config to Sensor. Sensors that have not been "recorded" for 'n' seconds are eligible for expiration.
In addition to the time based retention, I've also altered several tests to close the Metrics and scheduler objects since they can cause leaks while running tests. This causes TestUtils.verifyNonDaemonThreadStatus to fail.