Skip to content

Commit

Permalink
Fix circuit breaker leak in MultiTerms aggregation (#79362) (#79422)
Browse files Browse the repository at this point in the history
The MultiTermsAggregator creates a BytesKeyedBucketOrds that never gets closed and therefore it might leak the
memory allocated into the circuit breaker.
  • Loading branch information
iverase committed Oct 19, 2021
1 parent ef6b6c8 commit b4b20bd
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
indexSettings,
query,
breakerService,
builder.bytesToPreallocate(),
randomBoolean() ? 0 : builder.bytesToPreallocate(),
maxBucket,
fieldTypes
);
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/analytics/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'

esplugin {
name 'x-pack-analytics'
description 'Elasticsearch Expanded Pack Plugin - Analytics'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.xpack.analytics.multiterms;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;

import java.util.Collection;
import java.util.stream.IntStream;

/**
* test forked from CardinalityWithRequestBreakerIT
*/
public class MultiTermsWithRequestBreakerIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return org.elasticsearch.core.List.of(AnalyticsPlugin.class);
}

@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return org.elasticsearch.core.List.of(AnalyticsPlugin.class);
}

/**
* Test that searches using multiterms aggregations returns all request breaker memory.
*/
public void testRequestBreaker() throws Exception {
final String requestBreaker = randomIntBetween(1, 10000) + "kb";
logger.info("--> Using request breaker setting: {}", requestBreaker);

indexRandom(
true,
IntStream.range(0, randomIntBetween(10, 1000))
.mapToObj(
i -> client().prepareIndex("test", "_doc")
.setId("id_" + i)
.setSource(org.elasticsearch.core.Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5)))
)
.toArray(IndexRequestBuilder[]::new)
);

client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), requestBreaker)
)
.get();

try {
client().prepareSearch("test")
.addAggregation(
new MultiTermsAggregationBuilder("xxx").terms(
org.elasticsearch.core.List.of(
new MultiValuesSourceFieldConfig.Builder().setFieldName("field0.keyword").build(),
new MultiValuesSourceFieldConfig.Builder().setFieldName("field1.keyword").build()
)
)
)
.get();
} catch (ElasticsearchException e) {
if (ExceptionsHelper.unwrap(e, CircuitBreakingException.class) == null) {
throw e;
}
}

client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())
)
.get();

// validation done by InternalTestCluster.ensureEstimatedStats()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
Expand Down Expand Up @@ -220,6 +221,11 @@ public void accept(Integer start) throws IOException {
};
}

@Override
protected void doClose() {
Releasables.close(bucketOrds);
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][];
Expand Down

0 comments on commit b4b20bd

Please sign in to comment.