Fix multi-controller race conditions in ResponseStoreCleaner cursor cleanup#18113
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18113 +/- ##
============================================
+ Coverage 63.75% 63.98% +0.22%
- Complexity 1573 1617 +44
============================================
Files 3167 3181 +14
Lines 191658 193753 +2095
Branches 29469 29917 +448
============================================
+ Hits 122198 123970 +1772
- Misses 59851 59981 +130
- Partials 9609 9802 +193
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR aims to eliminate multi-controller races during cursor response-store cleanup by ensuring only one controller performs cleanup work and by making broker-side deletion idempotent under concurrent deletes.
Changes:
- Gate
ResponseStoreCleanerexecution to a single controller using lead-controller coordination. - Make broker
deleteResponse()resilient to TOCTOU races betweenexists()/ metadata read / delete. - Prevent cleanup batches from aborting due to individual DELETE failures by logging warnings instead of throwing.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java |
Adds a lead-controller gate and changes per-broker delete failure handling to avoid aborting cleanup. |
pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java |
Handles concurrent disappearance of response metadata during deletion to avoid failing deletes. |
pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java |
Treats already-deleted response directories as successful deletes to make deletion idempotent. |
| @Override | ||
| protected void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) { | ||
| // Make it so that only one controller is responsible for cleanup. | ||
| if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
ControllerPeriodicTask.runTask() only invokes processTables() when this controller is leader for at least one entry returned by getTablesToProcess(). With the new isLeaderForTable(TASK_NAME) gate inside processTables(), it’s possible that the controller that is leader for TASK_NAME is not leader for any actual table (e.g., few tables / many controllers), causing processTables() to never be called on that controller and the cleaner to never run. Consider overriding getTablesToProcess() to return a singleton list containing TASK_NAME (or switching this task to BasePeriodicTask) so exactly one controller runs cleanup independent of table distribution, and then remove/avoid the additional early-return gate.
There was a problem hiding this comment.
Good catch, this should be fixed.
| // readResponse() (TOCTOU race), so handle that gracefully. | ||
| long bytesWritten = 0; | ||
| try { | ||
| bytesWritten = readResponse(requestId).getBytesWritten(); | ||
| } catch (Exception e) { | ||
| LOGGER.debug("Could not read response metadata for requestId={} (may have been deleted concurrently)", |
There was a problem hiding this comment.
The intent here is to handle the TOCTOU “deleted between exists() and readResponse()” race, but catching a generic Exception can also hide non-race failures (e.g., deserialization bugs, permission issues, transient FS errors), and it will silently skip metrics adjustments in those cases. Consider narrowing the catch to the expected “not found”/IO exceptions (or rethrow unexpected exceptions) so real read failures aren’t masked.
| // readResponse() (TOCTOU race), so handle that gracefully. | |
| long bytesWritten = 0; | |
| try { | |
| bytesWritten = readResponse(requestId).getBytesWritten(); | |
| } catch (Exception e) { | |
| LOGGER.debug("Could not read response metadata for requestId={} (may have been deleted concurrently)", | |
| // readResponse() (TOCTOU race), so only suppress read failures when the response no longer exists. | |
| long bytesWritten = 0; | |
| try { | |
| bytesWritten = readResponse(requestId).getBytesWritten(); | |
| } catch (Exception e) { | |
| if (exists(requestId)) { | |
| throw e; | |
| } | |
| LOGGER.debug("Could not read response metadata for requestId={} because it was deleted concurrently", |
…leanup
When multiple controllers run the ResponseStoreCleaner concurrently, they
race to delete the same expired cursor responses on each broker. This causes
TOCTOU races in the broker's delete path (files disappear between exists()
and read/delete), resulting in HTTP 500s that cascade into aborting the
entire cleanup batch for that broker.
Three fixes:
1. Only the lead controller runs the cleaner by gating processTables() on
isLeaderForTable(TASK_NAME), eliminating the multi-controller race.
2. Handle concurrent deletion gracefully on the broker side:
- AbstractResponseStore.deleteResponse() catches exceptions from
readResponse() when files vanish between exists() and read.
- FsResponseStore.deleteResponseImpl() catches exceptions from
pinotFS.delete() and treats already-gone directories as success.
3. ResponseStoreCleaner.deleteExpiredResponses() logs individual delete
failures as warnings instead of throwing, so one failed DELETE no longer
aborts the entire broker batch.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3e90ba3 to
4455238
Compare
Summary
ResponseStoreCleanerby gatingprocessTables()onisLeaderForTable(TASK_NAME), preventing all controllers from racing to delete the same expired responses on each broker.AbstractResponseStore.deleteResponse()now catches exceptions fromreadResponse()when files vanish between theexists()check and the read (TOCTOU race).FsResponseStore.deleteResponseImpl()catches exceptions frompinotFS.delete()and treats already-gone directories as success instead of throwing.ResponseStoreCleaner.deleteExpiredResponses()logs individual DELETE failures as warnings instead of throwing aRuntimeException, so one failed DELETE no longer aborts the entire broker's cleanup batch.Root cause
When multiple controllers run the
ResponseStoreCleanerconcurrently (all controllers run it becauseprocessTables()ignores the table leadership list), they race to delete the same expired cursor responses on each broker. The broker'sdeleteResponse()has a TOCTOU race betweenexists()→readResponse()→deleteResponseImpl()— when one controller deletes a cursor's files, the others hitFileNotFoundException/IOException, and the broker returns HTTP 500 instead of 404. The controller'sdeleteExpiredResponses()then throws on any single 500, aborting the remaining successful deletes' logging for that broker.Test plan
ResponseStoreCleanerTesttests pass (includingtestPartialBrokerFailureDoesNotBlockOthersandtestCleanupTreats404AsSuccess)🤖 Generated with Claude Code