Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private boolean applyPipelines(
}
});
return true;
} else if (haveRunIngestService == false && samplingService != null && samplingService.atLeastOneSampleConfigured()) {
} else if (haveRunIngestService == false && samplingService != null && samplingService.atLeastOneSampleConfigured(project)) {
/*
* Else ample only if this request has not passed through IngestService::executeBulkRequest. Otherwise, some request within the
* bulk had pipelines and we sampled in IngestService already.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ private void attemptToSampleData(
IngestDocument ingestDocument,
Metadata originalDocumentMetadata
) {
if (samplingService != null && samplingService.atLeastOneSampleConfigured()) {
if (samplingService != null && samplingService.atLeastOneSampleConfigured(projectMetadata)) {
/*
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
* before overwriting it here. We can discard it after sampling.
Expand Down
26 changes: 5 additions & 21 deletions server/src/main/java/org/elasticsearch/ingest/SamplingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -95,7 +94,6 @@ public class SamplingService extends AbstractLifecycleComponent implements Clust
private static final String TTL_JOB_ID = "sampling_ttl";
private final ScriptService scriptService;
private final ClusterService clusterService;
private final ProjectResolver projectResolver;
private final LongSupplier statsTimeSupplier = System::nanoTime;
private final MasterServiceTaskQueue<UpdateSamplingConfigurationTask> updateSamplingConfigurationTaskQueue;
private final MasterServiceTaskQueue<DeleteSampleConfigurationTask> deleteSamplingConfigurationTaskQueue;
Expand All @@ -122,26 +120,15 @@ public class SamplingService extends AbstractLifecycleComponent implements Clust
/*
* This creates a new SamplingService, and configures various listeners on it.
*/
public static SamplingService create(
ScriptService scriptService,
ClusterService clusterService,
ProjectResolver projectResolver,
Settings settings
) {
SamplingService samplingService = new SamplingService(scriptService, clusterService, projectResolver, settings);
public static SamplingService create(ScriptService scriptService, ClusterService clusterService, Settings settings) {
SamplingService samplingService = new SamplingService(scriptService, clusterService, settings);
samplingService.configureListeners();
return samplingService;
}

private SamplingService(
ScriptService scriptService,
ClusterService clusterService,
ProjectResolver projectResolver,
Settings settings
) {
private SamplingService(ScriptService scriptService, ClusterService clusterService, Settings settings) {
this.scriptService = scriptService;
this.clusterService = clusterService;
this.projectResolver = projectResolver;
this.updateSamplingConfigurationTaskQueue = clusterService.createTaskQueue(
"update-sampling-configuration",
Priority.NORMAL,
Expand Down Expand Up @@ -340,12 +327,9 @@ public SampleStats getLocalSampleStats(ProjectId projectId, String index) {
return sampleInfo == null ? new SampleStats() : sampleInfo.stats;
}

public boolean atLeastOneSampleConfigured() {
public boolean atLeastOneSampleConfigured(ProjectMetadata projectMetadata) {
if (RANDOM_SAMPLING_FEATURE_FLAG) {
SamplingMetadata samplingMetadata = clusterService.state()
.projectState(projectResolver.getProjectId())
.metadata()
.custom(SamplingMetadata.TYPE);
SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE);
return samplingMetadata != null && samplingMetadata.getIndexToSamplingConfigMap().isEmpty() == false;
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ private void construct(

FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));

SamplingService samplingService = SamplingService.create(scriptService, clusterService, projectResolver, settings);
SamplingService samplingService = SamplingService.create(scriptService, clusterService, settings);
modules.bindToInstance(SamplingService.class, samplingService);
clusterService.addListener(samplingService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {

private static SamplingService initializeSamplingService() {
SamplingService samplingService = mock(SamplingService.class);
when(samplingService.atLeastOneSampleConfigured()).thenReturn(true);
when(samplingService.atLeastOneSampleConfigured(any())).thenReturn(true);
return samplingService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {

private static SamplingService initializeSamplingService() {
SamplingService samplingService = mock(SamplingService.class);
when(samplingService.atLeastOneSampleConfigured()).thenReturn(true);
when(samplingService.atLeastOneSampleConfigured(any())).thenReturn(true);
return samplingService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3357,7 +3357,7 @@ public void testSampling() {
Predicates.never(),
samplingService
);
when(samplingService.atLeastOneSampleConfigured()).thenReturn(true);
when(samplingService.atLeastOneSampleConfigured(any())).thenReturn(true);
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}");
var projectId = randomProjectIdOrDefault();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -352,8 +351,6 @@ private SamplingService getTestSamplingService() {
TestProjectResolvers.singleProject(randomProjectIdOrDefault())
);
ClusterService clusterService = ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool());
final ProjectId projectId = ProjectId.DEFAULT;
final ProjectResolver projectResolver = TestProjectResolvers.singleProject(projectId);
return SamplingService.create(scriptService, clusterService, projectResolver, Settings.EMPTY);
return SamplingService.create(scriptService, clusterService, Settings.EMPTY);
}
}