Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions server/src/main/java/org/elasticsearch/ingest/SamplingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,24 @@ private void maybeSample(
return;
}
long startTime = statsTimeSupplier.getAsLong();
SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName);
if (samplingConfig == null) {
return;
}
SoftReference<SampleInfo> sampleInfoReference = samples.compute(
new ProjectIndex(projectMetadata.id(), indexName),
(k, v) -> v == null || v.get() == null ? new SoftReference<>(new SampleInfo(samplingConfig.maxSamples())) : v
);
SoftReference<SampleInfo> sampleInfoReference = samples.compute(new ProjectIndex(projectMetadata.id(), indexName), (k, v) -> {
if (v == null || v.get() == null) {
SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName);
if (samplingConfig == null) {
/*
* Calls to getSamplingConfiguration() are relatively expensive. So we store the NONE object here to indicate that there
* was no sampling configuration. This way we don't have to do the lookup every single time for every index that has no
* sampling configuration. If a sampling configuration is added for this index, this NONE sample will be removed by
* the cluster state change listener.
*/
return new SoftReference<>(SampleInfo.NONE);
}
return new SoftReference<>(new SampleInfo(samplingConfig.maxSamples()));
}
return v;
});
SampleInfo sampleInfo = sampleInfoReference.get();
if (sampleInfo == null) {
if (sampleInfo == null || sampleInfo == SampleInfo.NONE) {
return;
}
SampleStats stats = sampleInfo.stats;
Expand All @@ -229,6 +237,10 @@ private void maybeSample(
stats.samplesRejectedForMaxSamplesExceeded.increment();
return;
}
SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName);
if (samplingConfig == null) {
return; // it was not null above, but has since become null because the index was deleted asynchronously
}
if (sampleInfo.getSizeInBytes() + indexRequest.source().length() > samplingConfig.maxSize().getBytes()) {
stats.samplesRejectedForSize.increment();
return;
Expand Down Expand Up @@ -475,7 +487,12 @@ private void maybeRemoveStaleSamples(ClusterChangedEvent event, ProjectId projec
if (oldSampleConfigsMap.containsKey(indexName) && entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
samples.remove(new ProjectIndex(projectId, indexName));
}
} else if (oldSampleConfigsMap.containsKey(indexName) == false
&& samples.containsKey(new ProjectIndex(projectId, indexName))) {
// There had previously been a NONE sample here. There is a real config now, so delete the NONE sample
logger.debug("Removing sample info for {} because its configuration has been created", indexName);
samples.remove(new ProjectIndex(projectId, indexName));
}
}
}
}
Expand Down Expand Up @@ -1003,6 +1020,7 @@ public SampleStats adjustForMaxSize(int maxSize) {
* This is used internally to store information about a sample in the samples Map.
*/
private static final class SampleInfo {
public static final SampleInfo NONE = new SampleInfo(0);
private final RawDocument[] rawDocuments;
/*
* This stores the maximum index in rawDocuments that has data currently. This is incremented speculatively before writing data to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void testMaybeSample() {
ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(ProjectId.DEFAULT);
final ProjectId projectId = projectBuilder.getId();
ProjectMetadata projectMetadata = projectBuilder.build();
ClusterState originalClusterState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
Map<String, Object> inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10)));
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource);
samplingService.maybeSample(projectMetadata, indexRequest);
Expand All @@ -74,6 +75,26 @@ public void testMaybeSample() {
)
);
projectMetadata = projectBuilder.build();
{
/*
* First we ingest some docs without notifying samplingService of the cluster state change. It will have cached the fact that
* there is no config for this index, and so it will not store any samples.
*/
int docsToSample = randomIntBetween(1, maxSize);
for (int i = 0; i < docsToSample; i++) {
samplingService.maybeSample(projectMetadata, indexRequest);
}
List<SamplingService.RawDocument> sample = samplingService.getLocalSample(projectId, indexName);
assertThat(sample, empty());
}
// Now we notify samplingService that the cluster state has changed, and it will pick up the new sampling config
samplingService.clusterChanged(
new ClusterChangedEvent(
"test",
ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(),
originalClusterState
)
);
int docsToSample = randomIntBetween(1, maxSize);
for (int i = 0; i < docsToSample; i++) {
samplingService.maybeSample(projectMetadata, indexRequest);
Expand Down