Skip to content

Commit

Permalink
Fix leak in TransformIndexerFailureHandlingTests (#108541)
Browse files Browse the repository at this point in the history
Uses `ActionListener#respondAndRelease` to release the search response
properly. Also cleans up a few other warnings.

Closes #108530
  • Loading branch information
DaveCTurner committed May 13, 2024
1 parent 5fbeff1 commit a46cef4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -39,7 +38,6 @@
import org.elasticsearch.search.profile.SearchProfileResults;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.indexing.IndexerState;
Expand Down Expand Up @@ -105,7 +103,6 @@
*/
public class TransformIndexerFailureHandlingTests extends ESTestCase {

private Client client;
private ThreadPool threadPool;
private static final Function<BulkRequest, BulkResponse> EMPTY_BULK_RESPONSE = bulkRequest -> new BulkResponse(
new BulkItemResponse[0],
Expand All @@ -127,7 +124,6 @@ static class MockedTransformIndexer extends ClientTransformIndexer {
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformExtension transformExtension,
String executorName,
IndexBasedTransformConfigManager transformsConfigManager,
CheckpointProvider checkpointProvider,
TransformConfig transformConfig,
Expand Down Expand Up @@ -195,12 +191,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
throw new IllegalStateException(e);
}

try {
SearchResponse response = searchFunction.apply(buildSearchRequest().v2());
nextPhase.onResponse(response);
} catch (Exception e) {
nextPhase.onFailure(e);
}
ActionListener.run(nextPhase, l -> ActionListener.respondAndRelease(l, searchFunction.apply(buildSearchRequest().v2())));
}

@Override
Expand Down Expand Up @@ -307,7 +298,6 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
@Before
public void setUpMocks() {
threadPool = createThreadPool();
client = new NoOpClient(threadPool);
}

@After
Expand Down Expand Up @@ -349,17 +339,7 @@ public void testPageSizeAdapt() throws Exception {
TransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);
final CountDownLatch latch = indexer.newLatch(1);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
Expand Down Expand Up @@ -439,7 +419,6 @@ public void testDoProcessAggNullCheck() {
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -500,17 +479,7 @@ public void testScriptError() throws Exception {
TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);

final CountDownLatch latch = indexer.newLatch(1);

Expand Down Expand Up @@ -566,7 +535,10 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti
);
try {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
searchResponse.mustIncRef();
return searchResponse;
};

Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);

Expand Down Expand Up @@ -595,7 +567,6 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti
bulkFunction,
deleteByQueryFunction,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -659,7 +630,10 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce
);
try {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
searchResponse.mustIncRef();
return searchResponse;
};

Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);

Expand Down Expand Up @@ -694,7 +668,6 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce
bulkFunction,
deleteByQueryFunction,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -768,6 +741,7 @@ public SearchResponse apply(SearchRequest searchRequest) {
new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
);
}
searchResponse.mustIncRef();
return searchResponse;
}
};
Expand All @@ -788,7 +762,6 @@ public SearchResponse apply(SearchRequest searchRequest) {
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -889,17 +862,7 @@ public void testHandleFailureAuditing() {
)
);

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);

indexer.handleFailure(
new SearchPhaseExecutionException(
Expand Down Expand Up @@ -1056,7 +1019,6 @@ private MockedTransformIndexer createMockIndexer(
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
mock(TransformAuditor.class),
new TransformContext(TransformTaskState.STARTED, "", 0, listener),
1
Expand Down Expand Up @@ -1166,17 +1128,7 @@ private void testHandleFailure(
)
);

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);

for (int i = 0; i < expectedEffectiveNumFailureRetries; ++i) {
indexer.handleFailure(new Exception("exception no. " + (i + 1)));
Expand Down Expand Up @@ -1209,22 +1161,10 @@ private MockedTransformIndexer createMockIndexer(
Function<BulkRequest, BulkResponse> bulkFunction,
Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction,
ThreadPool threadPool,
String executorName,
TransformAuditor auditor,
TransformContext context
) {
return createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
deleteByQueryFunction,
threadPool,
executorName,
auditor,
context,
0
);
return createMockIndexer(config, state, searchFunction, bulkFunction, deleteByQueryFunction, threadPool, auditor, context, 0);
}

private MockedTransformIndexer createMockIndexer(
Expand All @@ -1234,7 +1174,6 @@ private MockedTransformIndexer createMockIndexer(
Function<BulkRequest, BulkResponse> bulkFunction,
Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction,
ThreadPool threadPool,
String executorName,
TransformAuditor auditor,
TransformContext context,
int doProcessCount
Expand All @@ -1250,7 +1189,6 @@ private MockedTransformIndexer createMockIndexer(
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
mock(TransformExtension.class),
executorName,
transformConfigManager,
mock(CheckpointProvider.class),
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.transform.transforms.pivot;

import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
Expand All @@ -28,7 +27,6 @@
import static org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSourceTests.randomTermsGroupSource;
import static org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSourceTests.randomTermsGroupSourceNoScript;

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108530")
public class GroupByOptimizerTests extends ESTestCase {

public void testOneGroupBy() {
Expand Down

0 comments on commit a46cef4

Please sign in to comment.