-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Improve concurrency design of EnterpriseGeoIpDownloader
#134223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and ad-hoc runs. See the discussion on elastic#126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and ad-hoc runs. The periodic runs simply run periodically on the configured poll interval. The ad-hoc runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately, to download any GeoIP databases that were just added by a user. By using a `Semaphore` and an `AtomicReference<ClusterState>`, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes elastic#126124
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the EnterpriseGeoIpDownloader to improve its concurrency design and eliminate race conditions between periodic and ad-hoc runs. The changes introduce a semaphore-based approach to ensure only one downloader run executes at a time while guaranteeing that cluster state changes trigger immediate downloads.
Key changes:
- Replace the previous scheduling mechanism with separate periodic and cluster state-triggered runs
- Introduce a
Semaphoreto prevent concurrent execution and anAtomicReference<ClusterState>to queue cluster state updates - Update method signatures to accept
ClusterStateparameters instead of retrieving state fromClusterService
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| muted-tests.yml | Removes a muted test that was failing due to the race condition being fixed |
| EnterpriseGeoIpDownloaderTests.java | Updates test methods to pass ClusterState parameters to match new API |
| EnterpriseGeoIpDownloaderTaskExecutor.java | Updates method calls to use new API names and cluster state handling |
| EnterpriseGeoIpDownloader.java | Core refactoring with new concurrency mechanism using semaphore and atomic reference |
| EnterpriseGeoIpDownloaderIT.java | Updates test logging configuration for better debugging |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
|
Pinging @elastic/es-data-management (Team:Data Management) |
|
Hi @nielsbauman, I've created a changelog YAML for you. |
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Show resolved
Hide resolved
|
Also, I've been running this commit for ~15 hours/4600 iterations on two VMs at the same time (!), and no failure yet. So I feel confident that this at least addresses the existing race condition, but it doesn't exclude the possibility that I introduced another one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
higher-level review, still trying to understand this class entirely
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of a splitting it into a background process that runs one way and an adhoc process that runs another way is interesting. I left some concerns about an edge case and a potential simplification.
One thing to note here is that this has the potential to tie up multiple threads waiting on that lock if there's contention, where as the old way seems to try and keep it to one task running at a time (except for when it races up to two).
It seems like the root of the issue is that we schedule a task and then set a volatile member variable and that we sneak in an update to that member before it can be updated the first time. Since it's the first thing we do in that scheduled task, I'm surprised this isn't happening all the time. From the looks of things, one could rapidly execute multiple threads by just adding geo ip databases over and over.
I threw an extra idea on to see if that is any simpler to work with. Thoughts?
| return; | ||
| } | ||
| logger.trace("Requesting downloader run on cluster state [{}]", clusterState.version()); | ||
| if (queue.getAndSet(clusterState) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that this hand off might cause other race conditions or leaks. If we set a state here and the runOnState function is interrupted while waiting for the running lock, there's nothing that can clear this reference which means this conditional can never schedule it to run again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm good point. I think a similar problem also existed in the current/previous code, as the thread could be interrupted before it scheduled the next run with scheduleNextRun(pollIntervalSupplier.get()); I think we could improve the periodic aspect by using ThreadPool#scheduleWithFixedDelay once (and restart when the poll interval changes) instead of calling ThreadPool#schedule every time. That would ensure that a thread interrupt doesn't prevent the periodic run from never being run again, but it doesn't fix the locking problem. Thinking out loud, we could wrap runOnState in a try/catch where we catch thread interrupts and release the lock in the finally block, but I'm not sure if that guarantees that the lock is released - could the thread be interrupted while the lock release is running?
| // reference differs from `clusterState`), then we schedule another run to happen immediately after this one. | ||
| if (queue.compareAndSet(clusterState, null) == false) { | ||
| logger.debug("A new cluster state came in while running, scheduling another run"); | ||
| threadPool.schedule(this::runOnState, TimeValue.ZERO, threadPool.generic()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought: Does this need another schedule call? If we're aware of a new state could we loop to the start of the method instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, looping to the start of the current method wouldn't work because we could risk stack overflows, and we'd get deadlocks (because we'd never reach the running.release() below). We could essentially change this method to loop while the "queue" (I agree with Szymon that that variable name should be rephrased) is changed. But I'm not sure what benefit that would have other than potentially reducing the amount of locking and releasing we do. An advantage of the current implementation is that we're a friendlier neighbour of the generic threadpool as we're hoarding threads for shorter periods of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could essentially change this method to loop while the "queue"
That is what I was thinking yeah. I think I described it poorly.
An advantage of the current implementation is that we're a friendlier neighbour of the generic threadpool
The current logic waits for a lock on a thread which is consuming that generic thread during the wait period regardless. If multiple updates trickle in then we will be repeatedly rescheduling the operation with zero wait anyway, which means grabbing another thread to wait on immediately reacquiring the lock again.
I think if we move forward with this specific change we'll be consuming the time on generic threads no matter what. That said, if this keeps it simpler for folks to reason about then that's fine too.
| // to N(>=2) databases in quick succession, then all but the first database wouldn't necessarily get downloaded, because the | ||
| // requestReschedule call in the EnterpriseGeoIpDownloaderTaskExecutor's clusterChanged wouldn't have a scheduled future run to | ||
| // reschedule. scheduling the next run at the beginning of this run means that there's a much smaller window (milliseconds?, rather | ||
| // than seconds) in which such a race could occur. technically there's a window here, still, but i think it's _greatly_ reduced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this race condition is a result of chasing our tail instead of relying on the scheduling code. It looks like we schedule the next run immediately every execution because the first time we run the downloader (in nodeOperation) we don't have a scheduled task to cancel. But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field. We could even use a barrier or gate on adhoc runs to enforce that the scheduling thread sets the field before the scheduled function can in the case that the scheduler is suspended strangely. Maybe I'm missing something, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if we scheduled that first run (even at zero seconds) then we'd have the scheduled task field initialized for when we want to call it adhoc. And then when we run it adhoc, we aren't racing to set the scheduled field.
Do you mean that this would allow us to move the scheduleNextRun(pollIntervalSupplier.get()); call to the end of the runDownloader() method, which would reduce the chance of this race condition occurring? As in, because the body of runDownloader() takes time, we'd significantly reduce the chance that scheduleNextRun(pollIntervalSupplier.get()) is run before the scheduled field is set to the ad-hoc run?
I considered that too, but that doesn't address the race condition we have with clusterService.state(). If we have a significantly slow cluster state applier in the cluster, our clusterService.state() could run before ClusterStateApplier's state field is set to the new cluster state.
While typing this, I realized that EnterpriseGeoIpDownloaderTaskExecutor implements ClusterStateListener and not ClusterStateApplier, and listeners are only invoked after the state field is updated:
elasticsearch/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
Lines 531 to 538 in 4eeaae3
| callClusterStateAppliers(clusterChangedEvent, stopWatch); | |
| nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); | |
| logger.debug("set locally applied cluster state to version {}", newClusterState.version()); | |
| state.set(newClusterState); | |
| callClusterStateListeners(clusterChangedEvent, stopWatch); |
So, I don't think we need to worry about that cluster state race condition anymore. That does simplify things a lot. Then I think we could indeed revert to scheduling the next run at the end of
runDownloader() and ensure that works properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, looking at this again, I realized I forgot to mention another part of my reasoning for the current changes in this PR. I am personally not a big fan of the "cancel scheduled run" approach, because it creates some uncertainty about what happens if a scheduled run is already in progress. We cancel with mayInterruptIfRunning=false here:
elasticsearch/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java
Line 32 in 4eeaae3
| return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads |
But to me it's not clear what exactly happens with a scheduled run that is already in progress - does the
scheduled.cancel() call block/wait for the scheduled run to finish? This was part of the reason why I went the approach of splitting the periodic and ad-hoc runs up, and making them explicitly wait for the run to finish, instead of cancelling one. Let me know what your thoughts are on that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jbaiera @szybia @joegallo I pushed some changes to:
- Remove the cluster state queue, as I realized in #134223 (comment) that we don't need to hold a reference to a cluster state there, a simple boolean is sufficient.
- I added some
finallyblocks to ensure that locks are released.
I would greatly appreciate it if all of you could have another look and let me know what your thoughts are :)
| when(clusterService.state()).thenReturn(state); | ||
| geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY); | ||
| geoIpDownloader.restartPeriodicRun(); | ||
| assertBusy(() -> assertEquals(1, calls.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of using assertBusy here; I'd prefer to use something like a future to avoid waiting unnecessarily long in assertBusy, but I don't immediately have a good example of that at hand and wanted to push my changes already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from reading assertBusy, seems quite efficient with exponentialBackoff so i'd be happy with this, more readable too
but couldn't turn down an opportunity to play with the code (haven't tested this)
diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java
index 4fd2b7b751c..06913cd9cb6 100644
--- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java
+++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTests.java
@@ -56,6 +56,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@@ -576,6 +577,8 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
public void testRestartPeriodicRunReleasesLock() throws Exception {
ClusterState state = createClusterState(projectId, new PersistentTasksCustomMetadata(1L, Map.of()));
AtomicInteger calls = new AtomicInteger();
+ final CompletableFuture<Void> firstRunInvoked = new CompletableFuture<>();
+ final CompletableFuture<Void> secondRunInvoked = new CompletableFuture<>();
geoIpDownloader = new EnterpriseGeoIpDownloader(
client,
httpClient,
@@ -592,8 +595,13 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
) {
@Override
synchronized void runDownloader() {
- if (calls.incrementAndGet() == 1) {
- throw new RuntimeException("test exception");
+ switch (calls.incrementAndGet()) {
+ case 1 -> {
+ firstRunInvoked.complete(null);
+ throw new RuntimeException("test exception");
+ }
+ case 2 -> secondRunInvoked.complete(null);
+ default -> throw new IllegalStateException("unexpected to be called > 2 times");
}
super.runDownloader();
}
@@ -601,9 +609,9 @@ public class EnterpriseGeoIpDownloaderTests extends ESTestCase {
when(clusterService.state()).thenReturn(state);
geoIpDownloader.setState(EnterpriseGeoIpTaskState.EMPTY);
geoIpDownloader.restartPeriodicRun();
- assertBusy(() -> assertEquals(1, calls.get()));
+ firstRunInvoked.join();
geoIpDownloader.restartPeriodicRun();
- assertBusy(() -> { assertEquals(2, calls.get()); });
+ secondRunInvoked.join();
}
private static class MockClient extends NoOpClient {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
thanks for the learning experience :)
| // We synchronize to ensure we only have one scheduledPeriodicRun at a time. | ||
| synchronized (this) { | ||
| if (scheduledPeriodicRun != null) { | ||
| final boolean cancelSuccessful = scheduledPeriodicRun.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't actually want this comment addressed, but just for completeness
but given from my reading (and as you mentioned) we do Future.cancel(false) here
which iiuc will mean if a runPeriodic is running when it's cancelled, it won't be interrupted, so will schedule another
which i believe means we can still have multiple runPeriodics scheduled in the thread pool
but from my reading of code just means we'll run downloader more often so who cares 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, I think you're right. Perhaps a simple fix would be doing a scheduledPeriodicRun.cancel() inside runPeriodic as well, just to be sure, but I'm just like you, not sure if that's worth it. But good catch!
...les/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left one comment nit (I suspect the comment is just outdated and can be rewritten to avoid the wording that I don't like). Other than that, though, this LGTM.
Also, I think that managing things with a abstract 'queue' like this is really nice, and I like where this PR ended up quite a bit.
Great work, @nielsbauman!
💔 Backport failed
You can use sqren/backport to manually backport by running |
…4223) Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on elastic#126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes elastic#126124 (cherry picked from commit e946ca8) # Conflicts: # muted-tests.yml
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
…4223) Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on elastic#126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes elastic#126124 (cherry picked from commit e946ca8) # Conflicts: # muted-tests.yml
…4223) Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on elastic#126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes elastic#126124 (cherry picked from commit e946ca8) # Conflicts: # muted-tests.yml
…137053) Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on #126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes #126124 (cherry picked from commit e946ca8) # Conflicts: # muted-tests.yml
…137052) Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on #126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes #126124 (cherry picked from commit e946ca8) # Conflicts: # muted-tests.yml
…137054) Refactors `EnterpriseGeoIpDownloader` to avoid race conditions between the periodic and on-demand runs. See the discussion on #126124 for more details on the previously existing race condition. With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an `AtomicInteger` to track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently. While the (non-enterprise) `GeoIpDownloader` has the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify the `GeoIpDownloader` to have the same implementation as the enterprise downloader. Fixes #126124 (cherry picked from commit e946ca8) # Conflicts: # muted-tests.yml
Refactors
EnterpriseGeoIpDownloaderto avoid race conditions between the periodic and on-demand runs. See the discussion on #126124 for more details on the previously existing race condition.With this new approach, we make a distinction between the periodic and on-demand runs. The periodic runs simply run periodically on the configured poll interval. The on-demand runs are typically triggered by changes in the cluster state to the GeoIP metadata, and require running the downloader immediately to download any GeoIP databases that were just added by a user. By using an
AtomicIntegerto track the number of on-demand runs that were requested concurrently, we can guarantee that a new cluster state will result in the downloader running and avoid the downloader from running concurrently.While the (non-enterprise)
GeoIpDownloaderhas the exact same concurrency implementation, we scope this PR to just the enterprise downloader to focus discussions on the design changes. A follow-up PR will modify theGeoIpDownloaderto have the same implementation as the enterprise downloader.Fixes #126124