From 7bc86fee7de1d79036ea472669ab79a6f0940fa7 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 24 Nov 2025 09:01:10 -0500 Subject: [PATCH 1/3] Use wrapped action listeners --- .../ShardBulkInferenceActionFilter.java | 184 ++++++++---------- 1 file changed, 86 insertions(+), 98 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index 4c286d56d839a..e2792dbdc94a5 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -325,61 +325,55 @@ private void executeChunkedInferenceAsync( final Releasable onFinish ) { if (inferenceProvider == null) { - ActionListener modelLoadingListener = new ActionListener<>() { - @Override - public void onResponse(UnparsedModel unparsedModel) { - var service = inferenceServiceRegistry.getService(unparsedModel.service()); - if (service.isEmpty() == false) { - var provider = new InferenceProvider( - service.get(), - service.get() - .parsePersistedConfigWithSecrets( - inferenceId, - unparsedModel.taskType(), - unparsedModel.settings(), - unparsedModel.secrets() + ActionListener modelLoadingListener = ActionListener.wrap(unparsedModel -> { + var service = inferenceServiceRegistry.getService(unparsedModel.service()); + if (service.isEmpty() == false) { + var provider = new InferenceProvider( + service.get(), + service.get() + .parsePersistedConfigWithSecrets( + inferenceId, + unparsedModel.taskType(), + unparsedModel.settings(), + unparsedModel.secrets() + ) + ); + executeChunkedInferenceAsync(inferenceId, provider, requests, onFinish); + } else { + try (onFinish) { + for (FieldInferenceRequest request : requests) { + inferenceResults.get(request.bulkItemIndex).failures.add( + new ResourceNotFoundException( + "Inference service [{}] not found for field [{}]", + unparsedModel.service(), + request.field ) - ); - executeChunkedInferenceAsync(inferenceId, provider, requests, onFinish); - } else { - try (onFinish) { - for (FieldInferenceRequest request : requests) { - inferenceResults.get(request.bulkItemIndex).failures.add( - new ResourceNotFoundException( - "Inference service [{}] not found for field [{}]", - unparsedModel.service(), - request.field - ) - ); - } + ); } } } - - @Override - public void onFailure(Exception exc) { - try (onFinish) { - for (FieldInferenceRequest request : requests) { - Exception failure; - if (ExceptionsHelper.unwrap(exc, ResourceNotFoundException.class) instanceof ResourceNotFoundException) { - failure = new ResourceNotFoundException( - "Inference id [{}] not found for field [{}]", - inferenceId, - request.field - ); - } else { - failure = new InferenceException( - "Error loading inference for inference id [{}] on field [{}]", - exc, - inferenceId, - request.field - ); - } - inferenceResults.get(request.bulkItemIndex).failures.add(failure); + }, exc -> { + try (onFinish) { + for (FieldInferenceRequest request : requests) { + Exception failure; + if (ExceptionsHelper.unwrap(exc, ResourceNotFoundException.class) instanceof ResourceNotFoundException) { + failure = new ResourceNotFoundException( + "Inference id [{}] not found for field [{}]", + inferenceId, + request.field + ); + } else { + failure = new InferenceException( + "Error loading inference for inference id [{}] on field [{}]", + exc, + inferenceId, + request.field + ); } + inferenceResults.get(request.bulkItemIndex).failures.add(failure); } } - }; + }); modelRegistry.getModelWithSecrets(inferenceId, modelLoadingListener); return; } @@ -398,65 +392,59 @@ public void onFailure(Exception exc) { .map(r -> new ChunkInferenceInput(new InferenceString(r.input, TEXT), r.chunkingSettings)) .collect(Collectors.toList()); - ActionListener> completionListener = new ActionListener<>() { - - @Override - public void onResponse(List results) { - try (onFinish) { - var requestsIterator = requests.iterator(); - int success = 0; - for (ChunkedInference result : results) { - var request = requestsIterator.next(); - var acc = inferenceResults.get(request.bulkItemIndex); - if (result instanceof ChunkedInferenceError error) { - recordRequestCountMetrics(inferenceProvider.model, 1, error.exception()); - acc.addFailure( - new InferenceException( - "Exception when running inference id [{}] on field [{}]", - error.exception(), - inferenceProvider.model.getInferenceEntityId(), - request.field - ) - ); - } else { - success++; - acc.addOrUpdateResponse( - new FieldInferenceResponse( - request.field(), - request.sourceField(), - useLegacyFormat ? request.input() : null, - request.inputOrder(), - request.offsetAdjustment(), - inferenceProvider.model, - result - ) - ); - } - } - if (success > 0) { - recordRequestCountMetrics(inferenceProvider.model, success, null); - } - } - } - - @Override - public void onFailure(Exception exc) { - try (onFinish) { - recordRequestCountMetrics(inferenceProvider.model, requests.size(), exc); - for (FieldInferenceRequest request : requests) { - addInferenceResponseFailure( - request.bulkItemIndex, + ActionListener> completionListener = ActionListener.wrap(results -> { + try (onFinish) { + var requestsIterator = requests.iterator(); + int success = 0; + for (ChunkedInference result : results) { + var request = requestsIterator.next(); + var acc = inferenceResults.get(request.bulkItemIndex); + if (result instanceof ChunkedInferenceError error) { + recordRequestCountMetrics(inferenceProvider.model, 1, error.exception()); + acc.addFailure( new InferenceException( "Exception when running inference id [{}] on field [{}]", - exc, + error.exception(), inferenceProvider.model.getInferenceEntityId(), request.field ) ); + } else { + success++; + acc.addOrUpdateResponse( + new FieldInferenceResponse( + request.field(), + request.sourceField(), + useLegacyFormat ? request.input() : null, + request.inputOrder(), + request.offsetAdjustment(), + inferenceProvider.model, + result + ) + ); } } + if (success > 0) { + recordRequestCountMetrics(inferenceProvider.model, success, null); + } } - }; + }, exc -> { + try (onFinish) { + recordRequestCountMetrics(inferenceProvider.model, requests.size(), exc); + for (FieldInferenceRequest request : requests) { + addInferenceResponseFailure( + request.bulkItemIndex, + new InferenceException( + "Exception when running inference id [{}] on field [{}]", + exc, + inferenceProvider.model.getInferenceEntityId(), + request.field + ) + ); + } + } + }); + inferenceProvider.service() .chunkedInfer( inferenceProvider.model(), From 084932c75e64759bc5e0d9472f58352492f38707 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 24 Nov 2025 10:57:42 -0500 Subject: [PATCH 2/3] Log server-side errors --- .../ShardBulkInferenceActionFilter.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index e2792dbdc94a5..e7772fd08eca5 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -50,6 +50,8 @@ import org.elasticsearch.inference.UnparsedModel; import org.elasticsearch.inference.telemetry.InferenceStats; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.XContent; @@ -92,6 +94,8 @@ * */ public class ShardBulkInferenceActionFilter implements MappedActionFilter { + private static final Logger logger = LogManager.getLogger(ShardBulkInferenceActionFilter.class); + private static final ByteSizeValue DEFAULT_BATCH_SIZE = ByteSizeValue.ofMb(1); /** @@ -372,6 +376,11 @@ private void executeChunkedInferenceAsync( } inferenceResults.get(request.bulkItemIndex).failures.add(failure); } + + if (ExceptionsHelper.status(exc).getStatus() >= 500) { + List fields = requests.stream().map(FieldInferenceRequest::field).distinct().toList(); + logger.warn("Error loading inference for inference id [" + inferenceId + "] on fields " + fields, exc); + } } }); modelRegistry.getModelWithSecrets(inferenceId, modelLoadingListener); @@ -442,6 +451,17 @@ private void executeChunkedInferenceAsync( ) ); } + + if (ExceptionsHelper.status(exc).getStatus() >= 500) { + List fields = requests.stream().map(FieldInferenceRequest::field).distinct().toList(); + logger.warn( + "Exception when running inference id [" + + inferenceProvider.model.getInferenceEntityId() + + "] on fields " + + fields, + exc + ); + } } }); From 2cbba3a0bc084344e044f43ff3e1e9e5b9e833dc Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 24 Nov 2025 11:56:11 -0500 Subject: [PATCH 3/3] Use error log level --- .../action/filter/ShardBulkInferenceActionFilter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index e7772fd08eca5..cc3e0f2a9efa8 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -379,7 +379,7 @@ private void executeChunkedInferenceAsync( if (ExceptionsHelper.status(exc).getStatus() >= 500) { List fields = requests.stream().map(FieldInferenceRequest::field).distinct().toList(); - logger.warn("Error loading inference for inference id [" + inferenceId + "] on fields " + fields, exc); + logger.error("Error loading inference for inference id [" + inferenceId + "] on fields " + fields, exc); } } }); @@ -454,7 +454,7 @@ private void executeChunkedInferenceAsync( if (ExceptionsHelper.status(exc).getStatus() >= 500) { List fields = requests.stream().map(FieldInferenceRequest::field).distinct().toList(); - logger.warn( + logger.error( "Exception when running inference id [" + inferenceProvider.model.getInferenceEntityId() + "] on fields "