From ccbf9f7e434a34d360e666210b92432f67c93e30 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 23 Oct 2025 15:12:36 -0500 Subject: [PATCH] Improving random sampling performance very slightly --- .../bulk/TransportAbstractBulkAction.java | 2 +- .../elasticsearch/ingest/IngestService.java | 2 +- .../elasticsearch/ingest/SamplingService.java | 26 ++++--------------- .../elasticsearch/node/NodeConstruction.java | 2 +- .../bulk/TransportBulkActionIngestTests.java | 2 +- .../action/bulk/TransportBulkActionTests.java | 2 +- .../ingest/IngestServiceTests.java | 2 +- .../ingest/SamplingServiceTests.java | 5 +--- 8 files changed, 12 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index f7460dd3de47d..c1b1cbf01741a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -312,7 +312,7 @@ private boolean applyPipelines( } }); return true; - } else if (haveRunIngestService == false && samplingService != null && samplingService.atLeastOneSampleConfigured()) { + } else if (haveRunIngestService == false && samplingService != null && samplingService.atLeastOneSampleConfigured(project)) { /* * Else ample only if this request has not passed through IngestService::executeBulkRequest. Otherwise, some request within the * bulk had pipelines and we sampled in IngestService already. diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 82be81eed7c2f..ddd60dc522b3f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1385,7 +1385,7 @@ private void attemptToSampleData( IngestDocument ingestDocument, Metadata originalDocumentMetadata ) { - if (samplingService != null && samplingService.atLeastOneSampleConfigured()) { + if (samplingService != null && samplingService.atLeastOneSampleConfigured(projectMetadata)) { /* * We need both the original document and the fully updated document for sampling, so we make a copy of the original * before overwriting it here. We can discard it after sampling. diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 4a6407585b4e5..c11c5bb1b30d0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -95,7 +94,6 @@ public class SamplingService extends AbstractLifecycleComponent implements Clust private static final String TTL_JOB_ID = "sampling_ttl"; private final ScriptService scriptService; private final ClusterService clusterService; - private final ProjectResolver projectResolver; private final LongSupplier statsTimeSupplier = System::nanoTime; private final MasterServiceTaskQueue updateSamplingConfigurationTaskQueue; private final MasterServiceTaskQueue deleteSamplingConfigurationTaskQueue; @@ -122,26 +120,15 @@ public class SamplingService extends AbstractLifecycleComponent implements Clust /* * This creates a new SamplingService, and configures various listeners on it. */ - public static SamplingService create( - ScriptService scriptService, - ClusterService clusterService, - ProjectResolver projectResolver, - Settings settings - ) { - SamplingService samplingService = new SamplingService(scriptService, clusterService, projectResolver, settings); + public static SamplingService create(ScriptService scriptService, ClusterService clusterService, Settings settings) { + SamplingService samplingService = new SamplingService(scriptService, clusterService, settings); samplingService.configureListeners(); return samplingService; } - private SamplingService( - ScriptService scriptService, - ClusterService clusterService, - ProjectResolver projectResolver, - Settings settings - ) { + private SamplingService(ScriptService scriptService, ClusterService clusterService, Settings settings) { this.scriptService = scriptService; this.clusterService = clusterService; - this.projectResolver = projectResolver; this.updateSamplingConfigurationTaskQueue = clusterService.createTaskQueue( "update-sampling-configuration", Priority.NORMAL, @@ -340,12 +327,9 @@ public SampleStats getLocalSampleStats(ProjectId projectId, String index) { return sampleInfo == null ? new SampleStats() : sampleInfo.stats; } - public boolean atLeastOneSampleConfigured() { + public boolean atLeastOneSampleConfigured(ProjectMetadata projectMetadata) { if (RANDOM_SAMPLING_FEATURE_FLAG) { - SamplingMetadata samplingMetadata = clusterService.state() - .projectState(projectResolver.getProjectId()) - .metadata() - .custom(SamplingMetadata.TYPE); + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); return samplingMetadata != null && samplingMetadata.getIndexToSamplingConfigMap().isEmpty() == false; } else { return false; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 6ae1ea9c9c6c8..4366789342c12 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -728,7 +728,7 @@ private void construct( FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class)); - SamplingService samplingService = SamplingService.create(scriptService, clusterService, projectResolver, settings); + SamplingService samplingService = SamplingService.create(scriptService, clusterService, settings); modules.bindToInstance(SamplingService.class, samplingService); clusterService.addListener(samplingService); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 119385319e52f..9263d2e3d742a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -180,7 +180,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { private static SamplingService initializeSamplingService() { SamplingService samplingService = mock(SamplingService.class); - when(samplingService.atLeastOneSampleConfigured()).thenReturn(true); + when(samplingService.atLeastOneSampleConfigured(any())).thenReturn(true); return samplingService; } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 4242c44e29808..fed735a7ecd0f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -163,7 +163,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { private static SamplingService initializeSamplingService() { SamplingService samplingService = mock(SamplingService.class); - when(samplingService.atLeastOneSampleConfigured()).thenReturn(true); + when(samplingService.atLeastOneSampleConfigured(any())).thenReturn(true); return samplingService; } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 395407c897a63..46a8baf541009 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -3357,7 +3357,7 @@ public void testSampling() { Predicates.never(), samplingService ); - when(samplingService.atLeastOneSampleConfigured()).thenReturn(true); + when(samplingService.atLeastOneSampleConfigured(any())).thenReturn(true); PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); var projectId = randomProjectIdOrDefault(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java index bd24090d24a52..f49316a41804e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -352,8 +351,6 @@ private SamplingService getTestSamplingService() { TestProjectResolvers.singleProject(randomProjectIdOrDefault()) ); ClusterService clusterService = ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()); - final ProjectId projectId = ProjectId.DEFAULT; - final ProjectResolver projectResolver = TestProjectResolvers.singleProject(projectId); - return SamplingService.create(scriptService, clusterService, projectResolver, Settings.EMPTY); + return SamplingService.create(scriptService, clusterService, Settings.EMPTY); } }