From f1e741959f2b17a01e6099590e968c9785bbcd01 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 11:26:19 +0200 Subject: [PATCH 01/23] Remove worst-case additional 50ms latency for non-rate limited requests --- .../http/sender/RequestExecutorService.java | 196 +++++++++++++++--- .../services/settings/RateLimitSettings.java | 2 +- .../http/sender/HttpRequestSenderTests.java | 21 +- .../sender/RequestExecutorServiceTests.java | 159 +++++++++----- .../http/sender/RequestManagerTests.java | 26 ++- 5 files changed, 314 insertions(+), 90 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 7138cd30aa4d1..bd2bf72f22252 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,6 +54,29 @@ * {@link org.apache.http.client.methods.HttpUriRequest} to set a timeout for how long this executor will wait * attempting to execute a task (aka waiting for the connection manager to lease a connection). See * {@link org.apache.http.client.config.RequestConfig.Builder#setConnectionRequestTimeout} for more info. + * + * The request flow looks as follows: + * + * -------------> Execute request immediately. + * | + * | + * request NOT supporting + * rate limiting + * | + * | + * Request ---> [-Request Queue-] + * | + * | + * request supporting + * rate limiting + * | + * | + * ------------> {Rate Limit Group 1 -> Queue 1, ..., Rate Limit Group N -> Queue N} + * + * Explanation: Submit request to the queue for the specific rate limiting group. + * The rate limiting groups are polled at the same specified interval, + * which in the worst cases introduces an additional latency of + * {@link RequestExecutorServiceSettings#getTaskPollFrequency()}. */ public class RequestExecutorService implements RequestExecutor { @@ -109,6 +133,8 @@ interface RateLimiterCreator { private final RateLimiterCreator rateLimiterCreator; private final AtomicReference cancellableCleanupTask = new AtomicReference<>(); private final AtomicBoolean started = new AtomicBoolean(false); + private final AdjustableCapacityBlockingQueue requestQueue; + private volatile Future requestQueueTask; public RequestExecutorService( ThreadPool threadPool, @@ -135,10 +161,16 @@ public RequestExecutorService( this.settings = Objects.requireNonNull(settings); this.clock = Objects.requireNonNull(clock); this.rateLimiterCreator = Objects.requireNonNull(rateLimiterCreator); + this.requestQueue = new AdjustableCapacityBlockingQueue<>(queueCreator, settings.getQueueCapacity()); } public void shutdown() { if (shutdown.compareAndSet(false, true)) { + if (requestQueueTask != null) { + boolean cancelled = requestQueueTask.cancel(true); + logger.debug(() -> format("Request queue cancellation successful: %s", cancelled)); + } + if (cancellableCleanupTask.get() != null) { logger.debug(() -> "Stopping clean up thread"); cancellableCleanupTask.get().cancel(); @@ -159,7 +191,7 @@ public boolean isTerminated() { } public int queueSize() { - return rateLimitGroupings.values().stream().mapToInt(RateLimitingEndpointHandler::queueSize).sum(); + return requestQueue.size() + rateLimitGroupings.values().stream().mapToInt(RateLimitingEndpointHandler::queueSize).sum(); } /** @@ -175,8 +207,8 @@ public void start() { startCleanupTask(); signalStartInitiated(); - - handleTasks(); + startRequestQueueTask(); + startHandlingRateLimitedTasks(); } catch (Exception e) { logger.warn("Failed to start request executor", e); cleanup(); @@ -194,6 +226,11 @@ private void startCleanupTask() { cancellableCleanupTask.set(startCleanupThread(RATE_LIMIT_GROUP_CLEANUP_INTERVAL)); } + private void startRequestQueueTask() { + assert requestQueueTask == null : "The request queue can only be started once"; + requestQueueTask = threadPool.executor(UTILITY_THREAD_POOL_NAME).submit(this::processRequestQueue); + } + private Scheduler.Cancellable startCleanupThread(TimeValue interval) { logger.debug(() -> Strings.format("Clean up task scheduled with interval [%s]", interval)); @@ -221,7 +258,83 @@ private void scheduleNextHandleTasks(TimeValue timeToWait) { return; } - threadPool.schedule(this::handleTasks, timeToWait, threadPool.executor(UTILITY_THREAD_POOL_NAME)); + threadPool.schedule(this::startHandlingRateLimitedTasks, timeToWait, threadPool.executor(UTILITY_THREAD_POOL_NAME)); + } + + private void processRequestQueue() { + try { + while (isShutdown() == false) { + // Blocks the request queue thread until a new request comes in + var task = (RequestTask) requestQueue.take(); + + if (isShutdown()) { + logger.debug("Shutdown requested while handling request tasks, cleaning up"); + cleanup(); + return; + } else { + var requestManager = task.getRequestManager(); + + if (rateLimitingEnabled(requestManager)) { + submitTaskToRateLimitedExecutionPath(task); + } else { + executeTaskImmediately(task); + } + } + } + } catch (InterruptedException e) { + // Restore interrupt to propagate to the calling thread + Thread.currentThread().interrupt(); + logger.debug("Inference request queue interrupted, exiting"); + } catch (Exception e) { + logger.warn("Error processing task in inference request queue", e); + cleanup(); + } + } + + private void executeTaskImmediately(RequestTask task) { + try { + task.getRequestManager() + .execute(task.getInferenceInputs(), requestSender, task.getRequestCompletedFunction(), task.getListener()); + } catch (Exception e) { + logger.warn( + format("Failed to execute fast-path request for inference id [%s]", task.getRequestManager().inferenceEntityId()), + e + ); + + task.onRejection( + new EsRejectedExecutionException( + format("Failed to execute request for inference id [%s]", task.getRequestManager().inferenceEntityId()), + false + ) + ); + } + } + + // visible for testing + void submitTaskToRateLimitedExecutionPath(RequestTask task) { + var requestManager = task.getRequestManager(); + var endpoint = rateLimitGroupings.computeIfAbsent(requestManager.rateLimitGrouping(), key -> { + var endpointHandler = new RateLimitingEndpointHandler( + Integer.toString(requestManager.rateLimitGrouping().hashCode()), + queueCreator, + settings, + requestSender, + clock, + requestManager.rateLimitSettings(), + this::isShutdown, + rateLimiterCreator, + rateLimitDivisor.get() + ); + + endpointHandler.init(); + return endpointHandler; + }); + + endpoint.enqueue(task); + } + + private boolean rateLimitingEnabled(RequestManager requestManager) { + return requestManager.rateLimitSettings() != null && requestManager.rateLimitSettings().isEnabled(); } private void cleanup() { @@ -234,12 +347,12 @@ private void cleanup() { } } - private void handleTasks() { + private void startHandlingRateLimitedTasks() { try { TimeValue timeToWait; do { - if (shutdown.get()) { - logger.debug("Shutdown requested while handling tasks, cleaning up"); + if (isShutdown()) { + logger.debug("Shutdown requested while handling rate limited tasks, cleaning up"); cleanup(); return; } @@ -253,7 +366,7 @@ private void handleTasks() { scheduleNextHandleTasks(timeToWait); } catch (Exception e) { - logger.warn("Encountered an error while handling tasks", e); + logger.warn("Encountered an error while handling rate limited tasks", e); cleanup(); } } @@ -261,9 +374,39 @@ private void handleTasks() { private void notifyRequestsOfShutdown() { assert isShutdown() : "Requests should only be notified if the executor is shutting down"; + // Reject rate-limited requests for (var endpoint : rateLimitGroupings.values()) { endpoint.notifyRequestsOfShutdown(); } + + // Reject non-rate-limited requests + List requests = new ArrayList<>(); + requestQueue.drainTo(requests); + + for (var request : requests) { + rejectRequest(request); + } + } + + private void rejectRequest(RejectableTask task) { + try { + task.onRejection( + new EsRejectedExecutionException( + format( + "Failed to send request for inference id [%s] has shutdown prior to executing request", + task.getRequestManager().inferenceEntityId() + ), + true + ) + ); + } catch (Exception e) { + logger.warn( + format( + "Failed to notify request for inference id [%s] of rejection after executor service shutdown", + task.getRequestManager().inferenceEntityId() + ) + ); + } } // default for testing @@ -308,26 +451,29 @@ public void execute( ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()) ); - var endpoint = rateLimitGroupings.computeIfAbsent(requestManager.rateLimitGrouping(), key -> { - var endpointHandler = new RateLimitingEndpointHandler( - Integer.toString(requestManager.rateLimitGrouping().hashCode()), - queueCreator, - settings, - requestSender, - clock, - requestManager.rateLimitSettings(), - this::isShutdown, - rateLimiterCreator, - rateLimitDivisor.get() + if (isShutdown()) { + task.onRejection( + new EsRejectedExecutionException( + format( + "Failed to enqueue request task for inference id [%s] because the request executor service has been shutdown", + requestManager.inferenceEntityId() + ), + true + ) ); + return; + } - // TODO: add or create/compute if absent set for new map (service/task-type-key -> rate limit endpoint handler) + boolean taskAccepted = requestQueue.offer(task); - endpointHandler.init(); - return endpointHandler; - }); - - endpoint.enqueue(task); + if (taskAccepted == false) { + task.onRejection( + new EsRejectedExecutionException( + format("Failed to enqueue request task for inference id [%s]", requestManager.inferenceEntityId()), + false + ) + ); + } } /** diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/settings/RateLimitSettings.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/settings/RateLimitSettings.java index c66bc8d33c05b..2bf687a8992da 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/settings/RateLimitSettings.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/settings/RateLimitSettings.java @@ -125,7 +125,7 @@ public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit) { } // This should only be used for testing. - RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) { + public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) { if (requestsPerTimeUnit <= 0) { throw new IllegalArgumentException("requests per minute must be positive"); } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/HttpRequestSenderTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/HttpRequestSenderTests.java index 027a19aca6d1f..2cde70720dabe 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/HttpRequestSenderTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/HttpRequestSenderTests.java @@ -354,7 +354,12 @@ public void testHttpRequestSender_Throws_WhenCallingSendBeforeStart() throws Exc PlainActionFuture listener = new PlainActionFuture<>(); var thrownException = expectThrows( AssertionError.class, - () -> sender.send(RequestManagerTests.createMock(), new EmbeddingsInput(List.of(), null), null, listener) + () -> sender.send( + RequestManagerTests.createMockWithRateLimitingEnabled(), + new EmbeddingsInput(List.of(), null), + null, + listener + ) ); assertThat(thrownException.getMessage(), is("call start() before sending a request")); } @@ -375,7 +380,12 @@ public void testHttpRequestSender_Throws_WhenATimeoutOccurs() throws Exception { sender.startSynchronously(); PlainActionFuture listener = new PlainActionFuture<>(); - sender.send(RequestManagerTests.createMock(), new EmbeddingsInput(List.of(), null), TimeValue.timeValueNanos(1), listener); + sender.send( + RequestManagerTests.createMockWithRateLimitingEnabled(), + new EmbeddingsInput(List.of(), null), + TimeValue.timeValueNanos(1), + listener + ); var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); @@ -397,7 +407,12 @@ public void testHttpRequestSenderWithTimeout_Throws_WhenATimeoutOccurs() throws sender.startSynchronously(); PlainActionFuture listener = new PlainActionFuture<>(); - sender.send(RequestManagerTests.createMock(), new EmbeddingsInput(List.of(), null), TimeValue.timeValueNanos(1), listener); + sender.send( + RequestManagerTests.createMockWithRateLimitingEnabled(), + new EmbeddingsInput(List.of(), null), + TimeValue.timeValueNanos(1), + listener + ); var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 163c4b84f1780..903213a5fc6aa 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -37,6 +37,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -82,7 +83,7 @@ public void testQueueSize_IsEmpty() { public void testQueueSize_IsOne() { var service = createRequestExecutorServiceWithMocks(); service.execute( - RequestManagerTests.createMock(), + RequestManagerTests.createMockWithRateLimitingEnabled(), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, new PlainActionFuture<>() @@ -161,7 +162,7 @@ public void testExecute_AfterShutdown_Throws() { service.shutdown(); - var requestManager = RequestManagerTests.createMock("id"); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled("id"); var listener = new PlainActionFuture(); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); @@ -171,7 +172,7 @@ public void testExecute_AfterShutdown_Throws() { thrownException.getMessage(), is( Strings.format( - "Failed to enqueue task for inference id [id] because the request service [%s] has already shutdown", + "Failed to enqueue request task for inference id [id] because the request executor service is shutdown", requestManager.rateLimitGrouping().hashCode() ) ) @@ -179,17 +180,18 @@ public void testExecute_AfterShutdown_Throws() { assertTrue(thrownException.isExecutorShutdown()); } - public void testExecute_Throws_WhenQueueIsFull() { + public void testExecute_Throws_WhenRateLimitedQueueIsFull() { var service = new RequestExecutorService(threadPool, null, createRequestExecutorServiceSettings(1), mock(RetryingHttpSender.class)); + service.start(); service.execute( - RequestManagerTests.createMock(), + RequestManagerTests.createMockWithRateLimitingEnabled(), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, new PlainActionFuture<>() ); - var requestManager = RequestManagerTests.createMock("id"); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled("id"); var listener = new PlainActionFuture(); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); @@ -251,7 +253,7 @@ public void testExecute_CallsOnFailure_WhenRequestTimesOut() { var listener = new PlainActionFuture(); service.execute( - RequestManagerTests.createMock(), + RequestManagerTests.createMockWithRateLimitingEnabled(), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), TimeValue.timeValueNanos(1), listener @@ -312,7 +314,7 @@ public void onFailure(Exception e) { }; service.execute( - RequestManagerTests.createMock(requestSender), + RequestManagerTests.createMockWithRateLimitingEnabled(requestSender), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener @@ -326,10 +328,10 @@ public void onFailure(Exception e) { finishedOnResponse.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); } - public void testExecute_NotifiesTasksOfShutdown() { + public void testExecute_NotifiesNonRateLimitedTasksOfShutdown() { var service = createRequestExecutorServiceWithMocks(); - var requestManager = RequestManagerTests.createMock(mock(RequestSender.class), "id"); + var requestManager = RequestManagerTests.createMockWithRateLimitingDisabled(mock(RequestSender.class), "id"); var listener = new PlainActionFuture(); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); @@ -342,7 +344,34 @@ public void testExecute_NotifiesTasksOfShutdown() { thrownException.getMessage(), is( Strings.format( - "Failed to send request, request service [%s] for inference id [id] has shutdown prior to executing request", + "Failed to send request for inference id [id] has shutdown prior to executing request", + requestManager.rateLimitGrouping().hashCode() + ) + ) + ); + assertTrue(thrownException.isExecutorShutdown()); + assertTrue(service.isTerminated()); + } + + public void testExecute_NotifiesRateLimitedTasksOfShutdown() { + var service = createRequestExecutorServiceWithMocks(); + + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(mock(RequestSender.class), "id"); + var listener = new PlainActionFuture(); + service.submitTaskToRateLimitedExecutionPath( + new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) + ); + + service.shutdown(); + service.start(); + + var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); + + assertThat( + thrownException.getMessage(), + is( + Strings.format( + "Failed to send request, request service [3355] for inference id [id] has shutdown prior to executing request", requestManager.rateLimitGrouping().hashCode() ) ) @@ -368,8 +397,10 @@ public void testQueuePoll_DoesNotCauseServiceToTerminate_WhenItThrows() throws I ); PlainActionFuture listener = new PlainActionFuture<>(); - var requestManager = RequestManagerTests.createMock(requestSender, "id"); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"); + service.submitTaskToRateLimitedExecutionPath( + new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) + ); when(queue.poll()).thenThrow(new ElasticsearchException("failed")).thenAnswer(invocation -> { service.shutdown(); @@ -388,7 +419,7 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, var service = new RequestExecutorService(threadPool, null, settings, requestSender); service.execute( - RequestManagerTests.createMock(requestSender), + RequestManagerTests.createMockWithRateLimitingEnabled(requestSender), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, new PlainActionFuture<>() @@ -396,19 +427,11 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); - var requestManager = RequestManagerTests.createMock(requestSender, "id"); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); - assertThat( - thrownException.getMessage(), - is( - Strings.format( - "Failed to execute task for inference id [id] because the request service [%s] queue is full", - requestManager.rateLimitGrouping().hashCode() - ) - ) - ); + assertThat(thrownException.getMessage(), is("Failed to enqueue request task for inference id [id]")); settings.setQueueCapacity(2); @@ -437,22 +460,30 @@ public void testChangingCapacity_DoesNotRejectsOverflowTasks_BecauseOfQueueFull( var settings = createRequestExecutorServiceSettings(3); var service = new RequestExecutorService(threadPool, null, settings, requestSender); - service.execute( - RequestManagerTests.createMock(requestSender, "id"), - new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), - null, - new PlainActionFuture<>() + service.submitTaskToRateLimitedExecutionPath( + new RequestTask( + RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"), + new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), + null, + threadPool, + new PlainActionFuture<>() + ) ); - service.execute( - RequestManagerTests.createMock(requestSender, "id"), - new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), - null, - new PlainActionFuture<>() + service.submitTaskToRateLimitedExecutionPath( + new RequestTask( + RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"), + new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), + null, + threadPool, + new PlainActionFuture<>() + ) ); PlainActionFuture listener = new PlainActionFuture<>(); - var requestManager = RequestManagerTests.createMock(requestSender, "id"); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"); + service.submitTaskToRateLimitedExecutionPath( + new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) + ); assertThat(service.queueSize(), is(3)); settings.setQueueCapacity(1); @@ -497,29 +528,21 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO var settings = createRequestExecutorServiceSettings(1); var service = new RequestExecutorService(threadPool, null, settings, requestSender); - var requestManager = RequestManagerTests.createMock(requestSender); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, new PlainActionFuture<>()); assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); service.execute( - RequestManagerTests.createMock(requestSender, "id"), + RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener ); var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); - assertThat( - thrownException.getMessage(), - is( - Strings.format( - "Failed to execute task for inference id [id] because the request service [%s] queue is full", - requestManager.rateLimitGrouping().hashCode() - ) - ) - ); + assertThat(thrownException.getMessage(), is("Failed to enqueue request task for inference id [id]")); settings.setQueueCapacity(0); @@ -556,7 +579,7 @@ public void testDoesNotExecuteTask_WhenCannotReserveTokens() { Clock.systemUTC(), rateLimiterCreator ); - var requestManager = RequestManagerTests.createMock(requestSender); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender); PlainActionFuture listener = new PlainActionFuture<>(); service.execute(requestManager, new EmbeddingsInput(List.of(), null), null, listener); @@ -628,7 +651,7 @@ public void testDoesNotExecuteTask_WhenCannotReserveTokens_AndThenCanReserve_And Clock.systemUTC(), rateLimiterCreator ); - var requestManager = RequestManagerTests.createMock(requestSender); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender); PlainActionFuture listener = new PlainActionFuture<>(); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); @@ -665,10 +688,12 @@ public void testRemovesRateLimitGroup_AfterStaleDuration() { clock, RequestExecutorService.DEFAULT_RATE_LIMIT_CREATOR ); - var requestManager = RequestManagerTests.createMock(requestSender, "id1"); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id1"); PlainActionFuture listener = new PlainActionFuture<>(); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + service.submitTaskToRateLimitedExecutionPath( + new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) + ); assertThat(service.numberOfRateLimitGroups(), is(1)); // the time is moved to after the stale duration, so now we should remove this grouping @@ -676,8 +701,10 @@ public void testRemovesRateLimitGroup_AfterStaleDuration() { service.removeStaleGroupings(); assertThat(service.numberOfRateLimitGroups(), is(0)); - var requestManager2 = RequestManagerTests.createMock(requestSender, "id2"); - service.execute(requestManager2, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + var requestManager2 = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id2"); + service.submitTaskToRateLimitedExecutionPath( + new RequestTask(requestManager2, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) + ); assertThat(service.numberOfRateLimitGroups(), is(1)); } @@ -707,6 +734,32 @@ public void testStartsCleanupThread() { assertThat(argument.getValue(), is(TimeValue.timeValueDays(1))); } + public void testStartsRequestQueueTask() { + var mockExecutorService = mock(ExecutorService.class); + @SuppressWarnings("unchecked") + var stubbing = when(mockExecutorService.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + + var mockThreadPool = mock(ThreadPool.class); + when(mockThreadPool.executor(any())).thenReturn(mockExecutorService); + + var requestSender = mock(RetryingHttpSender.class); + var settings = createRequestExecutorServiceSettings(2, TimeValue.timeValueDays(1)); + var service = new RequestExecutorService( + mockThreadPool, + RequestExecutorService.DEFAULT_QUEUE_CREATOR, + null, + settings, + requestSender, + Clock.systemUTC(), + RequestExecutorService.DEFAULT_RATE_LIMIT_CREATOR + ); + + service.shutdown(); + service.start(); + + verify(mockExecutorService, times(1)).submit(any(Runnable.class)); + } + private Future submitShutdownRequest( CountDownLatch waitToShutdown, CountDownLatch waitToReturnFromSend, diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestManagerTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestManagerTests.java index d8a1f2c4227e4..773590fb0b0c6 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestManagerTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestManagerTests.java @@ -15,26 +15,36 @@ import org.elasticsearch.xpack.inference.external.request.RequestTests; import org.elasticsearch.xpack.inference.services.settings.RateLimitSettings; +import java.util.concurrent.TimeUnit; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RequestManagerTests { - public static RequestManager createMock() { - return createMock(mock(RequestSender.class)); + public static RequestManager createMockWithRateLimitingDisabled(RequestSender requestSender, String inferenceEntityId) { + return createMock(requestSender, inferenceEntityId, new RateLimitSettings(1, TimeUnit.MINUTES, false)); + } + + public static RequestManager createMockWithRateLimitingDisabled(String inferenceEntityId) { + return createMock(mock(RequestSender.class), inferenceEntityId, new RateLimitSettings(1, TimeUnit.MINUTES, false)); + } + + public static RequestManager createMockWithRateLimitingEnabled() { + return createMockWithRateLimitingEnabled(mock(RequestSender.class)); } - public static RequestManager createMock(String inferenceEntityId) { - return createMock(mock(RequestSender.class), inferenceEntityId); + public static RequestManager createMockWithRateLimitingEnabled(String inferenceEntityId) { + return createMockWithRateLimitingEnabled(mock(RequestSender.class), inferenceEntityId); } - public static RequestManager createMock(RequestSender requestSender) { - return createMock(requestSender, "id", new RateLimitSettings(1)); + public static RequestManager createMockWithRateLimitingEnabled(RequestSender requestSender) { + return createMock(requestSender, "id", new RateLimitSettings(1, TimeUnit.MINUTES, true)); } - public static RequestManager createMock(RequestSender requestSender, String inferenceEntityId) { - return createMock(requestSender, inferenceEntityId, new RateLimitSettings(1)); + public static RequestManager createMockWithRateLimitingEnabled(RequestSender requestSender, String inferenceEntityId) { + return createMock(requestSender, inferenceEntityId, new RateLimitSettings(1, TimeUnit.MINUTES, true)); } public static RequestManager createMock(RequestSender requestSender, String inferenceEntityId, RateLimitSettings settings) { From a326e313423d03c72dca1ca574a19921cf9b8b3d Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 11:33:20 +0200 Subject: [PATCH 02/23] Update docs/changelog/136167.yaml --- docs/changelog/136167.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/136167.yaml diff --git a/docs/changelog/136167.yaml b/docs/changelog/136167.yaml new file mode 100644 index 0000000000000..6b63e41225ed3 --- /dev/null +++ b/docs/changelog/136167.yaml @@ -0,0 +1,6 @@ +pr: 136167 +summary: "[Inference API] Remove worst-case additional 50ms latency for non-rate limited\ + \ requests" +area: Machine Learning +type: bug +issues: [] From edaa0f06a83d8fc91d112021beae4802688c39f1 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 14:03:00 +0200 Subject: [PATCH 03/23] Do not use forbidden API --- .../inference/external/http/sender/RequestExecutorService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index bd2bf72f22252..5f9b181ea9387 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; @@ -167,7 +168,7 @@ public RequestExecutorService( public void shutdown() { if (shutdown.compareAndSet(false, true)) { if (requestQueueTask != null) { - boolean cancelled = requestQueueTask.cancel(true); + boolean cancelled = FutureUtils.cancel(requestQueueTask); logger.debug(() -> format("Request queue cancellation successful: %s", cancelled)); } From f17ec009dc174bd09d06d4060bddab61e0a21ead Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 16:56:31 +0200 Subject: [PATCH 04/23] Move startRequestQueueTask before start signal --- .../inference/external/http/sender/RequestExecutorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 5f9b181ea9387..1411975461d9c 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -207,8 +207,8 @@ public void start() { started.set(true); startCleanupTask(); - signalStartInitiated(); startRequestQueueTask(); + signalStartInitiated(); startHandlingRateLimitedTasks(); } catch (Exception e) { logger.warn("Failed to start request executor", e); From a9e7610c921d2e06f7cf6774bb6a61dd448456a0 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 16:58:39 +0200 Subject: [PATCH 05/23] Cleanup in finally block --- .../inference/external/http/sender/RequestExecutorService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 1411975461d9c..80f33cd3125c4 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -288,6 +288,7 @@ private void processRequestQueue() { logger.debug("Inference request queue interrupted, exiting"); } catch (Exception e) { logger.warn("Error processing task in inference request queue", e); + } finally { cleanup(); } } From ec513be6ae70f8a8ce5afff3597e7cc86b04d6fa Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 17:00:16 +0200 Subject: [PATCH 06/23] Reject request on shutdown --- .../inference/external/http/sender/RequestExecutorService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 80f33cd3125c4..ccc733c971014 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -270,8 +270,7 @@ private void processRequestQueue() { if (isShutdown()) { logger.debug("Shutdown requested while handling request tasks, cleaning up"); - cleanup(); - return; + rejectRequest(task); } else { var requestManager = task.getRequestManager(); From 174526cb314142d1158aa9d4a0c8c1f2f980ed4c Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 8 Oct 2025 17:04:39 +0200 Subject: [PATCH 07/23] Reuse rateLimitSettingsEnabled check --- .../external/http/sender/RequestExecutorService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index ccc733c971014..506996174044d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -274,7 +274,7 @@ private void processRequestQueue() { } else { var requestManager = task.getRequestManager(); - if (rateLimitingEnabled(requestManager)) { + if (rateLimitingEnabled(requestManager.rateLimitSettings())) { submitTaskToRateLimitedExecutionPath(task); } else { executeTaskImmediately(task); @@ -334,8 +334,8 @@ void submitTaskToRateLimitedExecutionPath(RequestTask task) { endpoint.enqueue(task); } - private boolean rateLimitingEnabled(RequestManager requestManager) { - return requestManager.rateLimitSettings() != null && requestManager.rateLimitSettings().isEnabled(); + private static boolean rateLimitingEnabled(RateLimitSettings rateLimitSettings) { + return rateLimitSettings != null && rateLimitSettings.isEnabled(); } private void cleanup() { @@ -570,7 +570,7 @@ public synchronized TimeValue executeEnqueuedTask() { } private TimeValue executeEnqueuedTaskInternal() { - if (rateLimitSettings.isEnabled()) { + if (rateLimitingEnabled(rateLimitSettings)) { var timeBeforeAvailableToken = rateLimiter.timeToReserve(1); if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) { return timeBeforeAvailableToken; From f506cb3398011431e2985e9caded8f4f434e3383 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Thu, 9 Oct 2025 11:11:28 +0200 Subject: [PATCH 08/23] Add NoopTask to wake up queue on shutdown --- .../http/sender/RequestExecutorService.java | 59 ++++++++++++++++--- .../sender/RequestExecutorServiceTests.java | 2 +- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 506996174044d..160309a649780 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; @@ -168,8 +167,8 @@ public RequestExecutorService( public void shutdown() { if (shutdown.compareAndSet(false, true)) { if (requestQueueTask != null) { - boolean cancelled = FutureUtils.cancel(requestQueueTask); - logger.debug(() -> format("Request queue cancellation successful: %s", cancelled)); + // Wakes up the queue in processRequestQueue + requestQueue.offer(NoopTask); } if (cancellableCleanupTask.get() != null) { @@ -265,19 +264,30 @@ private void scheduleNextHandleTasks(TimeValue timeToWait) { private void processRequestQueue() { try { while (isShutdown() == false) { - // Blocks the request queue thread until a new request comes in - var task = (RequestTask) requestQueue.take(); + // Blocks the request queue thread until a new task comes in + var task = requestQueue.take(); + + if (task == NoopTask) { + if (isShutdown()) { + logger.debug("Shutdown requested, exiting request queue processing"); + break; + } else { + // Skip processing NoopTask + continue; + } + } if (isShutdown()) { logger.debug("Shutdown requested while handling request tasks, cleaning up"); rejectRequest(task); } else { - var requestManager = task.getRequestManager(); + RequestTask requestTask = (RequestTask) task; + var requestManager = requestTask.getRequestManager(); if (rateLimitingEnabled(requestManager.rateLimitSettings())) { - submitTaskToRateLimitedExecutionPath(task); + submitTaskToRateLimitedExecutionPath(requestTask); } else { - executeTaskImmediately(task); + executeTaskImmediately(requestTask); } } } @@ -696,4 +706,37 @@ public void close() { requestExecutorServiceSettings.deregisterQueueCapacityCallback(id); } } + + private static final RejectableTask NoopTask = new RejectableTask() { + @Override + public void onRejection(Exception e) { + throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue"); + } + + @Override + public RequestManager getRequestManager() { + throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue"); + } + + @Override + public InferenceInputs getInferenceInputs() { + throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue"); + } + + @Override + public ActionListener getListener() { + throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue"); + } + + @Override + public boolean hasCompleted() { + throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue"); + } + + @Override + public Supplier getRequestCompletedFunction() { + throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue"); + } + }; + } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 903213a5fc6aa..7e18a5ac0de60 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -172,7 +172,7 @@ public void testExecute_AfterShutdown_Throws() { thrownException.getMessage(), is( Strings.format( - "Failed to enqueue request task for inference id [id] because the request executor service is shutdown", + "Failed to enqueue request task for inference id [id] because the request executor service has been shutdown", requestManager.rateLimitGrouping().hashCode() ) ) From ae349fd716635937e169ebf123c34efc31eba531 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Thu, 9 Oct 2025 11:51:00 +0200 Subject: [PATCH 09/23] Only add non-rate-limited tasks to fast-path request queue --- .../http/sender/RequestExecutorService.java | 44 +++++++++---------- .../sender/RequestExecutorServiceTests.java | 10 ++++- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 160309a649780..3edfe3d3e0389 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -57,14 +57,14 @@ * * The request flow looks as follows: * - * -------------> Execute request immediately. + * -------------> Add request to fast-path request queue. * | * | * request NOT supporting * rate limiting * | * | - * Request ---> [-Request Queue-] + * Request ------------| * | * | * request supporting @@ -264,35 +264,27 @@ private void scheduleNextHandleTasks(TimeValue timeToWait) { private void processRequestQueue() { try { while (isShutdown() == false) { - // Blocks the request queue thread until a new task comes in var task = requestQueue.take(); if (task == NoopTask) { if (isShutdown()) { logger.debug("Shutdown requested, exiting request queue processing"); break; - } else { - // Skip processing NoopTask - continue; } + + // Skip processing NoopTask + continue; } if (isShutdown()) { logger.debug("Shutdown requested while handling request tasks, cleaning up"); rejectRequest(task); - } else { - RequestTask requestTask = (RequestTask) task; - var requestManager = requestTask.getRequestManager(); - - if (rateLimitingEnabled(requestManager.rateLimitSettings())) { - submitTaskToRateLimitedExecutionPath(requestTask); - } else { - executeTaskImmediately(requestTask); - } + break; } + + executeTaskImmediately((RequestTask) task); } } catch (InterruptedException e) { - // Restore interrupt to propagate to the calling thread Thread.currentThread().interrupt(); logger.debug("Inference request queue interrupted, exiting"); } catch (Exception e) { @@ -475,15 +467,19 @@ public void execute( return; } - boolean taskAccepted = requestQueue.offer(task); + if (rateLimitingEnabled(requestManager.rateLimitSettings())) { + submitTaskToRateLimitedExecutionPath(task); + } else { + boolean taskAccepted = requestQueue.offer(task); - if (taskAccepted == false) { - task.onRejection( - new EsRejectedExecutionException( - format("Failed to enqueue request task for inference id [%s]", requestManager.inferenceEntityId()), - false - ) - ); + if (taskAccepted == false) { + task.onRejection( + new EsRejectedExecutionException( + format("Failed to enqueue request task for inference id [%s]", requestManager.inferenceEntityId()), + false + ) + ); + } } } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 7e18a5ac0de60..6bd00097e102d 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -431,7 +431,10 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); - assertThat(thrownException.getMessage(), is("Failed to enqueue request task for inference id [id]")); + assertThat( + thrownException.getMessage(), + is("Failed to execute task for inference id [id] because the request service [3355] queue is full") + ); settings.setQueueCapacity(2); @@ -542,7 +545,10 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO ); var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); - assertThat(thrownException.getMessage(), is("Failed to enqueue request task for inference id [id]")); + assertThat( + thrownException.getMessage(), + is("Failed to execute task for inference id [id] because the request service [3355] queue is full") + ); settings.setQueueCapacity(0); From 540f49d537dc45ee2839480971c59244cb0d8cef Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Thu, 9 Oct 2025 12:14:13 +0200 Subject: [PATCH 10/23] Extract rejection logic to common static method --- .../http/sender/RequestExecutorService.java | 60 ++++++++----------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 3edfe3d3e0389..0160f16118381 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -278,7 +278,7 @@ private void processRequestQueue() { if (isShutdown()) { logger.debug("Shutdown requested while handling request tasks, cleaning up"); - rejectRequest(task); + rejectNonRateLimitedRequest(task); break; } @@ -387,28 +387,25 @@ private void notifyRequestsOfShutdown() { requestQueue.drainTo(requests); for (var request : requests) { - rejectRequest(request); + rejectNonRateLimitedRequest(request); } } - private void rejectRequest(RejectableTask task) { + private void rejectNonRateLimitedRequest(RejectableTask task) { + var inferenceEntityId = task.getRequestManager().inferenceEntityId(); + + rejectRequest( + task, + format("Failed to send request for inference id [%s] has shutdown prior to executing request", inferenceEntityId), + format("Failed to notify request for inference id [%s] of rejection after executor service shutdown", inferenceEntityId) + ); + } + + private static void rejectRequest(RejectableTask task, String rejectionMessage, String rejectionFailedMessage) { try { - task.onRejection( - new EsRejectedExecutionException( - format( - "Failed to send request for inference id [%s] has shutdown prior to executing request", - task.getRequestManager().inferenceEntityId() - ), - true - ) - ); + task.onRejection(new EsRejectedExecutionException(rejectionMessage, true)); } catch (Exception e) { - logger.warn( - format( - "Failed to notify request for inference id [%s] of rejection after executor service shutdown", - task.getRequestManager().inferenceEntityId() - ) - ); + logger.warn(rejectionFailedMessage); } } @@ -667,27 +664,18 @@ public synchronized void notifyRequestsOfShutdown() { private void rejectTasks(List tasks) { for (var task : tasks) { - rejectTaskForShutdown(task); - } - } + var inferenceEntityId = task.getRequestManager().inferenceEntityId(); - private void rejectTaskForShutdown(RejectableTask task) { - try { - task.onRejection( - new EsRejectedExecutionException( - format( - "Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request", - id, - task.getRequestManager().inferenceEntityId() - ), - true - ) - ); - } catch (Exception e) { - logger.warn( + rejectRequest( + task, + format( + "Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request", + id, + inferenceEntityId + ), format( "Failed to notify request for inference id [%s] of rejection after executor service grouping [%s] shutdown", - task.getRequestManager().inferenceEntityId(), + inferenceEntityId, id ) ); From 0dca88a46bde8d503a13685d627eebd26d815be1 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 10 Oct 2025 10:07:56 +0200 Subject: [PATCH 11/23] Remove unnecessary cast --- .../external/http/sender/RequestExecutorService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 0160f16118381..971572b9cba1d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -282,7 +282,7 @@ private void processRequestQueue() { break; } - executeTaskImmediately((RequestTask) task); + executeTaskImmediately(task); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -294,7 +294,7 @@ private void processRequestQueue() { } } - private void executeTaskImmediately(RequestTask task) { + private void executeTaskImmediately(RejectableTask task) { try { task.getRequestManager() .execute(task.getInferenceInputs(), requestSender, task.getRequestCompletedFunction(), task.getListener()); From 05905613e98508ed2d7e41fa3b104b96e66613e9 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 10 Oct 2025 10:12:31 +0200 Subject: [PATCH 12/23] Use string placeholder in assertion --- .../external/http/sender/RequestExecutorServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 6bd00097e102d..61bdf6eb35aa3 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -371,7 +371,7 @@ public void testExecute_NotifiesRateLimitedTasksOfShutdown() { thrownException.getMessage(), is( Strings.format( - "Failed to send request, request service [3355] for inference id [id] has shutdown prior to executing request", + "Failed to send request, request service [%s] for inference id [id] has shutdown prior to executing request", requestManager.rateLimitGrouping().hashCode() ) ) From 2e6547502839c072b89870373a414c5fc9f5febb Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 10 Oct 2025 11:09:31 +0200 Subject: [PATCH 13/23] Adjust test to check that a throwing task does not terminate the service --- .../http/sender/RequestExecutorService.java | 2 +- .../sender/RequestExecutorServiceTests.java | 46 +++++++++---------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 971572b9cba1d..2db91ab41384c 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -288,7 +288,7 @@ private void processRequestQueue() { Thread.currentThread().interrupt(); logger.debug("Inference request queue interrupted, exiting"); } catch (Exception e) { - logger.warn("Error processing task in inference request queue", e); + logger.warn("Error taking next task in inference request queue", e); } finally { cleanup(); } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 61bdf6eb35aa3..39113a40969bd 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -34,18 +34,15 @@ import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors; -import static org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueueTests.mockQueueCreator; import static org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorServiceSettingsTests.createRequestExecutorServiceSettings; import static org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorServiceSettingsTests.createRequestExecutorServiceSettingsEmpty; import static org.hamcrest.Matchers.instanceOf; @@ -380,35 +377,34 @@ public void testExecute_NotifiesRateLimitedTasksOfShutdown() { assertTrue(service.isTerminated()); } - public void testQueuePoll_DoesNotCauseServiceToTerminate_WhenItThrows() throws InterruptedException { - @SuppressWarnings("unchecked") - BlockingQueue queue = mock(LinkedBlockingQueue.class); - + public void testTask_DoesNotCauseServiceToTerminate_WhenItThrows() throws InterruptedException { var requestSender = mock(RetryingHttpSender.class); + var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"); + CountDownLatch taskProcessedLatch = new CountDownLatch(1); - var service = new RequestExecutorService( - threadPool, - mockQueueCreator(queue), - null, - createRequestExecutorServiceSettingsEmpty(), - requestSender, - Clock.systemUTC(), - RequestExecutorService.DEFAULT_RATE_LIMIT_CREATOR - ); + doAnswer(invocation -> { + taskProcessedLatch.countDown(); + throw new ElasticsearchException("failed"); + }).when(requestManager).execute(any(), any(), any(), any()); - PlainActionFuture listener = new PlainActionFuture<>(); - var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"); - service.submitTaskToRateLimitedExecutionPath( - new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) - ); + var service = new RequestExecutorService(threadPool, null, createRequestExecutorServiceSettingsEmpty(), requestSender); - when(queue.poll()).thenThrow(new ElasticsearchException("failed")).thenAnswer(invocation -> { - service.shutdown(); - return null; - }); service.start(); + PlainActionFuture listener = new PlainActionFuture<>(); + service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + + // Wait for throwing task to be executed + assertTrue(taskProcessedLatch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + + // Make sure service is still running after processing a task, which threw an Exception + assertFalse(service.isShutdown()); + assertFalse(service.isTerminated()); + + service.shutdown(); + service.awaitTermination(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(service.isShutdown()); assertTrue(service.isTerminated()); } From 4fb2372dd5afb5c836d956decd05f2c04774b9b5 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 10 Oct 2025 11:18:56 +0200 Subject: [PATCH 14/23] Adjust error message in general exception handler --- .../inference/external/http/sender/RequestExecutorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 2db91ab41384c..88e14e811f2e0 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -288,7 +288,7 @@ private void processRequestQueue() { Thread.currentThread().interrupt(); logger.debug("Inference request queue interrupted, exiting"); } catch (Exception e) { - logger.warn("Error taking next task in inference request queue", e); + logger.warn("Unexpected error processing request queue, terminating", e); } finally { cleanup(); } From 2930151a22549a7443504999357b396ad0193c6e Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 10 Oct 2025 11:20:16 +0200 Subject: [PATCH 15/23] Adjust warn to error --- .../inference/external/http/sender/RequestExecutorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 88e14e811f2e0..ca6d89c734780 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -288,7 +288,7 @@ private void processRequestQueue() { Thread.currentThread().interrupt(); logger.debug("Inference request queue interrupted, exiting"); } catch (Exception e) { - logger.warn("Unexpected error processing request queue, terminating", e); + logger.error("Unexpected error processing request queue, terminating", e); } finally { cleanup(); } From b2fd85f67c0a715cc380f28b88c2f17fb1ea3094 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 13 Oct 2025 09:44:51 +0200 Subject: [PATCH 16/23] Adjust error message when request gets rejected --- .../external/http/sender/RequestExecutorService.java | 5 ++++- .../external/http/sender/RequestExecutorServiceTests.java | 7 +------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index ca6d89c734780..325ef7171ad85 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -396,7 +396,10 @@ private void rejectNonRateLimitedRequest(RejectableTask task) { rejectRequest( task, - format("Failed to send request for inference id [%s] has shutdown prior to executing request", inferenceEntityId), + format( + "Failed to send request for inference id [%s] because the request executor service has been shutdown", + inferenceEntityId + ), format("Failed to notify request for inference id [%s] of rejection after executor service shutdown", inferenceEntityId) ); } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 39113a40969bd..1861639ed4e57 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -339,12 +339,7 @@ public void testExecute_NotifiesNonRateLimitedTasksOfShutdown() { assertThat( thrownException.getMessage(), - is( - Strings.format( - "Failed to send request for inference id [id] has shutdown prior to executing request", - requestManager.rateLimitGrouping().hashCode() - ) - ) + is("Failed to send request for inference id [id] because the request executor service has been shutdown") ); assertTrue(thrownException.isExecutorShutdown()); assertTrue(service.isTerminated()); From 91f387ac9dd26f0ff0a7ecce9a94602c1d93d38b Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 13 Oct 2025 09:47:06 +0200 Subject: [PATCH 17/23] Rename id in RateLimitingEndpointHandler to rateLimitGroupingId --- .../http/sender/RequestExecutorService.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 325ef7171ad85..0f0ddd0a116bd 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -498,7 +498,7 @@ static class RateLimitingEndpointHandler { private final AdjustableCapacityBlockingQueue queue; private final Supplier isShutdownMethod; private final RequestSender requestSender; - private final String id; + private final String rateLimitGroupingId; private final AtomicReference timeOfLastEnqueue = new AtomicReference<>(); private final Clock clock; private final RateLimiter rateLimiter; @@ -507,7 +507,7 @@ static class RateLimitingEndpointHandler { private final Long originalRequestsPerTimeUnit; RateLimitingEndpointHandler( - String id, + String rateLimitGroupingId, AdjustableCapacityBlockingQueue.QueueCreator createQueue, RequestExecutorServiceSettings settings, RequestSender requestSender, @@ -518,7 +518,7 @@ static class RateLimitingEndpointHandler { Integer rateLimitDivisor ) { this.requestExecutorServiceSettings = Objects.requireNonNull(settings); - this.id = Objects.requireNonNull(id); + this.rateLimitGroupingId = Objects.requireNonNull(rateLimitGroupingId); this.queue = new AdjustableCapacityBlockingQueue<>(createQueue, settings.getQueueCapacity()); this.requestSender = Objects.requireNonNull(requestSender); this.clock = Objects.requireNonNull(clock); @@ -536,20 +536,25 @@ static class RateLimitingEndpointHandler { } public void init() { - requestExecutorServiceSettings.registerQueueCapacityCallback(id, this::onCapacityChange); - } - - public String id() { - return id; + requestExecutorServiceSettings.registerQueueCapacityCallback(rateLimitGroupingId, this::onCapacityChange); } private void onCapacityChange(int capacity) { - logger.debug(() -> Strings.format("Executor service grouping [%s] setting queue capacity to [%s]", id, capacity)); + logger.debug( + () -> Strings.format("Executor service grouping [%s] setting queue capacity to [%s]", rateLimitGroupingId, capacity) + ); try { queue.setCapacity(capacity); } catch (Exception e) { - logger.warn(format("Executor service grouping [%s] failed to set the capacity of the task queue to [%s]", id, capacity), e); + logger.warn( + format( + "Executor service grouping [%s] failed to set the capacity of the task queue to [%s]", + rateLimitGroupingId, + capacity + ), + e + ); } } @@ -569,7 +574,7 @@ public synchronized TimeValue executeEnqueuedTask() { try { return executeEnqueuedTaskInternal(); } catch (Exception e) { - logger.warn(format("Executor service grouping [%s] failed to execute request", id), e); + logger.warn(format("Executor service grouping [%s] failed to execute request", rateLimitGroupingId), e); // we tried to do some work but failed, so we'll say we did something to try looking for more work return EXECUTED_A_TASK; } @@ -625,7 +630,7 @@ public void enqueue(RequestTask task) { format( "Failed to enqueue task for inference id [%s] because the request service [%s] has already shutdown", task.getRequestManager().inferenceEntityId(), - id + rateLimitGroupingId ), true ); @@ -641,7 +646,7 @@ public void enqueue(RequestTask task) { format( "Failed to execute task for inference id [%s] because the request service [%s] queue is full", task.getRequestManager().inferenceEntityId(), - id + rateLimitGroupingId ), false ); @@ -661,7 +666,7 @@ public synchronized void notifyRequestsOfShutdown() { rejectTasks(notExecuted); } catch (Exception e) { - logger.warn(format("Failed to notify tasks of executor service grouping [%s] shutdown", id)); + logger.warn(format("Failed to notify tasks of executor service grouping [%s] shutdown", rateLimitGroupingId)); } } @@ -673,13 +678,13 @@ private void rejectTasks(List tasks) { task, format( "Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request", - id, + rateLimitGroupingId, inferenceEntityId ), format( "Failed to notify request for inference id [%s] of rejection after executor service grouping [%s] shutdown", inferenceEntityId, - id + rateLimitGroupingId ) ); } @@ -690,7 +695,7 @@ public int remainingCapacity() { } public void close() { - requestExecutorServiceSettings.deregisterQueueCapacityCallback(id); + requestExecutorServiceSettings.deregisterQueueCapacityCallback(rateLimitGroupingId); } } From 90e672b27ba60f857bbbb68e391a2828fa2c03c4 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 13 Oct 2025 09:49:05 +0200 Subject: [PATCH 18/23] Use Strings.format(...) in assertion --- .../external/http/sender/RequestExecutorServiceTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 1861639ed4e57..de54b95b6e682 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -538,7 +538,12 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is("Failed to execute task for inference id [id] because the request service [3355] queue is full") + is( + Strings.format( + "Failed to execute task for inference id [id] because the request service [%s] queue is full", + requestManager.rateLimitGrouping().hashCode() + ) + ) ); settings.setQueueCapacity(0); From 1cf24dc611f5677e4be2a31b1337dea0905c28f3 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 13 Oct 2025 09:50:24 +0200 Subject: [PATCH 19/23] Use thenAnswer instead of suppression --- .../external/http/sender/RequestExecutorServiceTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index de54b95b6e682..f89cd45f80c45 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -738,8 +738,7 @@ public void testStartsCleanupThread() { public void testStartsRequestQueueTask() { var mockExecutorService = mock(ExecutorService.class); - @SuppressWarnings("unchecked") - var stubbing = when(mockExecutorService.submit(any(Runnable.class))).thenReturn(mock(Future.class)); + when(mockExecutorService.submit(any(Runnable.class))).thenAnswer(i -> mock(Future.class)); var mockThreadPool = mock(ThreadPool.class); when(mockThreadPool.executor(any())).thenReturn(mockExecutorService); From a92a7c024b0242e38d090f54e58c1ba611454ca1 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 13 Oct 2025 16:10:54 +0200 Subject: [PATCH 20/23] Only reject requests of the respective execution path (rate-limited vs. not rate-limited) --- .../http/sender/RequestExecutorService.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 0f0ddd0a116bd..689c8e7b3c927 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -211,7 +211,7 @@ public void start() { startHandlingRateLimitedTasks(); } catch (Exception e) { logger.warn("Failed to start request executor", e); - cleanup(); + cleanup(CleanupStrategy.RATE_LIMITED_REQUEST_QUEUES_ONLY); } } @@ -254,7 +254,7 @@ void removeStaleGroupings() { private void scheduleNextHandleTasks(TimeValue timeToWait) { if (shutdown.get()) { logger.debug("Shutdown requested while scheduling next handle task call, cleaning up"); - cleanup(); + cleanup(CleanupStrategy.RATE_LIMITED_REQUEST_QUEUES_ONLY); return; } @@ -290,7 +290,7 @@ private void processRequestQueue() { } catch (Exception e) { logger.error("Unexpected error processing request queue, terminating", e); } finally { - cleanup(); + cleanup(CleanupStrategy.REQUEST_QUEUE_ONLY); } } @@ -340,10 +340,16 @@ private static boolean rateLimitingEnabled(RateLimitSettings rateLimitSettings) return rateLimitSettings != null && rateLimitSettings.isEnabled(); } - private void cleanup() { + private void cleanup(CleanupStrategy cleanupStrategy) { try { shutdown(); - notifyRequestsOfShutdown(); + + switch (cleanupStrategy) { + case RATE_LIMITED_REQUEST_QUEUES_ONLY -> notifyRateLimitedRequestsOfShutdown(); + case REQUEST_QUEUE_ONLY -> rejectRequestsInRequestQueue(); + default -> logger.error(Strings.format("Unknown clean up strategy for request executor: [%s]", cleanupStrategy.toString())); + } + terminationLatch.countDown(); } catch (Exception e) { logger.warn("Encountered an error while cleaning up", e); @@ -356,7 +362,7 @@ private void startHandlingRateLimitedTasks() { do { if (isShutdown()) { logger.debug("Shutdown requested while handling rate limited tasks, cleaning up"); - cleanup(); + cleanup(CleanupStrategy.RATE_LIMITED_REQUEST_QUEUES_ONLY); return; } @@ -370,24 +376,29 @@ private void startHandlingRateLimitedTasks() { scheduleNextHandleTasks(timeToWait); } catch (Exception e) { logger.warn("Encountered an error while handling rate limited tasks", e); - cleanup(); + cleanup(CleanupStrategy.RATE_LIMITED_REQUEST_QUEUES_ONLY); } } - private void notifyRequestsOfShutdown() { + private void notifyRateLimitedRequestsOfShutdown() { assert isShutdown() : "Requests should only be notified if the executor is shutting down"; - // Reject rate-limited requests for (var endpoint : rateLimitGroupings.values()) { endpoint.notifyRequestsOfShutdown(); } + } + + private void rejectRequestsInRequestQueue() { + assert isShutdown() : "Requests in request queue should only be notified if the executor is shutting down"; - // Reject non-rate-limited requests List requests = new ArrayList<>(); requestQueue.drainTo(requests); for (var request : requests) { - rejectNonRateLimitedRequest(request); + // NoopTask does not implement being rejected, therefore we need to skip it + if (request != NoopTask) { + rejectNonRateLimitedRequest(request); + } } } @@ -731,4 +742,8 @@ public Supplier getRequestCompletedFunction() { } }; + private enum CleanupStrategy { + REQUEST_QUEUE_ONLY, + RATE_LIMITED_REQUEST_QUEUES_ONLY + } } From a868152b68e2c889a0431765a2a39ceda53fddad Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Tue, 14 Oct 2025 11:28:31 +0200 Subject: [PATCH 21/23] Submit only ingest embeddings requests to rate-limited execution path --- .../org/elasticsearch/inference/InputType.java | 4 ++++ .../http/sender/RequestExecutorService.java | 9 +++++---- .../xpack/inference/InputTypeTests.java | 8 ++++++++ .../http/sender/RequestExecutorServiceTests.java | 14 +++++++------- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/inference/InputType.java b/server/src/main/java/org/elasticsearch/inference/InputType.java index c930acdc0f45e..2c81db1bed116 100644 --- a/server/src/main/java/org/elasticsearch/inference/InputType.java +++ b/server/src/main/java/org/elasticsearch/inference/InputType.java @@ -61,6 +61,10 @@ public static boolean isInternalTypeOrUnspecified(InputType inputType) { return inputType == InputType.INTERNAL_INGEST || inputType == InputType.INTERNAL_SEARCH || inputType == InputType.UNSPECIFIED; } + public static boolean isIngest(InputType inputType) { + return inputType == InputType.INGEST || inputType == InputType.INTERNAL_INGEST; + } + public static boolean isSpecified(InputType inputType) { return inputType != null && inputType != InputType.UNSPECIFIED; } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 689c8e7b3c927..0aa55da7341a3 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -16,6 +16,7 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; +import org.elasticsearch.inference.InputType; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue; @@ -336,8 +337,8 @@ void submitTaskToRateLimitedExecutionPath(RequestTask task) { endpoint.enqueue(task); } - private static boolean rateLimitingEnabled(RateLimitSettings rateLimitSettings) { - return rateLimitSettings != null && rateLimitSettings.isEnabled(); + private static boolean isEmbeddingsIngestInput(InferenceInputs inputs) { + return inputs instanceof EmbeddingsInput embeddingsInput && InputType.isIngest(embeddingsInput.getInputType()); } private void cleanup(CleanupStrategy cleanupStrategy) { @@ -478,7 +479,7 @@ public void execute( return; } - if (rateLimitingEnabled(requestManager.rateLimitSettings())) { + if (isEmbeddingsIngestInput(inferenceInputs)) { submitTaskToRateLimitedExecutionPath(task); } else { boolean taskAccepted = requestQueue.offer(task); @@ -592,7 +593,7 @@ public synchronized TimeValue executeEnqueuedTask() { } private TimeValue executeEnqueuedTaskInternal() { - if (rateLimitingEnabled(rateLimitSettings)) { + if (rateLimitSettings.isEnabled()) { var timeBeforeAvailableToken = rateLimiter.timeToReserve(1); if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) { return timeBeforeAvailableToken; diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/InputTypeTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/InputTypeTests.java index c6e08a8d5bdf3..9f2055d825fd7 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/InputTypeTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/InputTypeTests.java @@ -64,6 +64,10 @@ public static InputType randomWithInternalAndUnspecified() { return randomFrom(InputType.INTERNAL_SEARCH, InputType.INTERNAL_INGEST, InputType.UNSPECIFIED); } + public static InputType randomIngest() { + return randomFrom(InputType.INGEST, InputType.INTERNAL_INGEST); + } + public void testFromRestString_ValidInputType() { for (String internal : List.of("search", "ingest", "classification", "clustering", "unspecified")) { assertEquals(InputType.fromRestString(internal), InputType.fromString(internal)); @@ -211,4 +215,8 @@ public void testValidateInputTypeTranslationValues_ThrowsAnException_WhenValueIs ) ); } + + public void testIsIngest() { + assertTrue(InputType.isIngest(randomFrom(InputType.INGEST, InputType.INTERNAL_INGEST))); + } } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index f89cd45f80c45..3cfecd5a9cd2e 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -183,14 +183,14 @@ public void testExecute_Throws_WhenRateLimitedQueueIsFull() { service.execute( RequestManagerTests.createMockWithRateLimitingEnabled(), - new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), + new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, new PlainActionFuture<>() ); var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled("id"); var listener = new PlainActionFuture(); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener); var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); @@ -411,7 +411,7 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, service.execute( RequestManagerTests.createMockWithRateLimitingEnabled(requestSender), - new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), + new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, new PlainActionFuture<>() ); @@ -419,7 +419,7 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, PlainActionFuture listener = new PlainActionFuture<>(); var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener); var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); assertThat( @@ -524,13 +524,13 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO var service = new RequestExecutorService(threadPool, null, settings, requestSender); var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, new PlainActionFuture<>()); + service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, new PlainActionFuture<>()); assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); service.execute( RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"), - new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), + new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener ); @@ -656,7 +656,7 @@ public void testDoesNotExecuteTask_WhenCannotReserveTokens_AndThenCanReserve_And var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender); PlainActionFuture listener = new PlainActionFuture<>(); - service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); + service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener); when(mockRateLimiter.timeToReserve(anyInt())).thenReturn(TimeValue.timeValueDays(1)).thenReturn(TimeValue.timeValueDays(0)); From c575eba8bf32eeadee0c313f1dc0ce5dbe8ec4c9 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Tue, 14 Oct 2025 15:52:23 +0200 Subject: [PATCH 22/23] Add rate limiting check --- .../external/http/sender/RequestExecutorService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 0aa55da7341a3..c91387ad234c8 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -341,6 +341,10 @@ private static boolean isEmbeddingsIngestInput(InferenceInputs inputs) { return inputs instanceof EmbeddingsInput embeddingsInput && InputType.isIngest(embeddingsInput.getInputType()); } + private static boolean rateLimitingEnabled(RateLimitSettings rateLimitSettings) { + return rateLimitSettings != null && rateLimitSettings.isEnabled(); + } + private void cleanup(CleanupStrategy cleanupStrategy) { try { shutdown(); @@ -479,7 +483,7 @@ public void execute( return; } - if (isEmbeddingsIngestInput(inferenceInputs)) { + if (isEmbeddingsIngestInput(inferenceInputs) || rateLimitingEnabled(requestManager.rateLimitSettings())) { submitTaskToRateLimitedExecutionPath(task); } else { boolean taskAccepted = requestQueue.offer(task); From 69db0e1d2949bb828a9368b005a826d739297135 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Tue, 14 Oct 2025 15:52:57 +0200 Subject: [PATCH 23/23] Make NoopTask all caps --- .../external/http/sender/RequestExecutorService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index c91387ad234c8..4ce3b65866de1 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -169,7 +169,7 @@ public void shutdown() { if (shutdown.compareAndSet(false, true)) { if (requestQueueTask != null) { // Wakes up the queue in processRequestQueue - requestQueue.offer(NoopTask); + requestQueue.offer(NOOP_TASK); } if (cancellableCleanupTask.get() != null) { @@ -267,7 +267,7 @@ private void processRequestQueue() { while (isShutdown() == false) { var task = requestQueue.take(); - if (task == NoopTask) { + if (task == NOOP_TASK) { if (isShutdown()) { logger.debug("Shutdown requested, exiting request queue processing"); break; @@ -401,7 +401,7 @@ private void rejectRequestsInRequestQueue() { for (var request : requests) { // NoopTask does not implement being rejected, therefore we need to skip it - if (request != NoopTask) { + if (request != NOOP_TASK) { rejectNonRateLimitedRequest(request); } } @@ -715,7 +715,7 @@ public void close() { } } - private static final RejectableTask NoopTask = new RejectableTask() { + private static final RejectableTask NOOP_TASK = new RejectableTask() { @Override public void onRejection(Exception e) { throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");