Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leak in TransformIndexerFailureHandlingTests #108541

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -39,7 +38,6 @@
import org.elasticsearch.search.profile.SearchProfileResults;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.indexing.IndexerState;
Expand Down Expand Up @@ -105,7 +103,6 @@
*/
public class TransformIndexerFailureHandlingTests extends ESTestCase {

private Client client;
private ThreadPool threadPool;
private static final Function<BulkRequest, BulkResponse> EMPTY_BULK_RESPONSE = bulkRequest -> new BulkResponse(
new BulkItemResponse[0],
Expand All @@ -127,7 +124,6 @@ static class MockedTransformIndexer extends ClientTransformIndexer {
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformExtension transformExtension,
String executorName,
IndexBasedTransformConfigManager transformsConfigManager,
CheckpointProvider checkpointProvider,
TransformConfig transformConfig,
Expand Down Expand Up @@ -195,12 +191,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
throw new IllegalStateException(e);
}

try {
SearchResponse response = searchFunction.apply(buildSearchRequest().v2());
nextPhase.onResponse(response);
} catch (Exception e) {
nextPhase.onFailure(e);
}
ActionListener.run(nextPhase, l -> ActionListener.respondAndRelease(l, searchFunction.apply(buildSearchRequest().v2())));
}

@Override
Expand Down Expand Up @@ -307,7 +298,6 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
@Before
public void setUpMocks() {
threadPool = createThreadPool();
client = new NoOpClient(threadPool);
}

@After
Expand Down Expand Up @@ -349,17 +339,7 @@ public void testPageSizeAdapt() throws Exception {
TransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);
final CountDownLatch latch = indexer.newLatch(1);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
Expand Down Expand Up @@ -439,7 +419,6 @@ public void testDoProcessAggNullCheck() {
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -500,17 +479,7 @@ public void testScriptError() throws Exception {
TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);

final CountDownLatch latch = indexer.newLatch(1);

Expand Down Expand Up @@ -566,7 +535,10 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti
);
try {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
searchResponse.mustIncRef();
return searchResponse;
};

Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);

Expand Down Expand Up @@ -595,7 +567,6 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti
bulkFunction,
deleteByQueryFunction,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -659,7 +630,10 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce
);
try {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
searchResponse.mustIncRef();
return searchResponse;
};

Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);

Expand Down Expand Up @@ -694,7 +668,6 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce
bulkFunction,
deleteByQueryFunction,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -768,6 +741,7 @@ public SearchResponse apply(SearchRequest searchRequest) {
new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
);
}
searchResponse.mustIncRef();
return searchResponse;
}
};
Expand All @@ -788,7 +762,6 @@ public SearchResponse apply(SearchRequest searchRequest) {
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
Expand Down Expand Up @@ -889,17 +862,7 @@ public void testHandleFailureAuditing() {
)
);

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);

indexer.handleFailure(
new SearchPhaseExecutionException(
Expand Down Expand Up @@ -1056,7 +1019,6 @@ private MockedTransformIndexer createMockIndexer(
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
mock(TransformAuditor.class),
new TransformContext(TransformTaskState.STARTED, "", 0, listener),
1
Expand Down Expand Up @@ -1166,17 +1128,7 @@ private void testHandleFailure(
)
);

MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
MockedTransformIndexer indexer = createMockIndexer(config, state, searchFunction, bulkFunction, null, threadPool, auditor, context);

for (int i = 0; i < expectedEffectiveNumFailureRetries; ++i) {
indexer.handleFailure(new Exception("exception no. " + (i + 1)));
Expand Down Expand Up @@ -1209,22 +1161,10 @@ private MockedTransformIndexer createMockIndexer(
Function<BulkRequest, BulkResponse> bulkFunction,
Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction,
ThreadPool threadPool,
String executorName,
TransformAuditor auditor,
TransformContext context
) {
return createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
deleteByQueryFunction,
threadPool,
executorName,
auditor,
context,
0
);
return createMockIndexer(config, state, searchFunction, bulkFunction, deleteByQueryFunction, threadPool, auditor, context, 0);
}

private MockedTransformIndexer createMockIndexer(
Expand All @@ -1234,7 +1174,6 @@ private MockedTransformIndexer createMockIndexer(
Function<BulkRequest, BulkResponse> bulkFunction,
Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction,
ThreadPool threadPool,
String executorName,
TransformAuditor auditor,
TransformContext context,
int doProcessCount
Expand All @@ -1250,7 +1189,6 @@ private MockedTransformIndexer createMockIndexer(
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
mock(TransformExtension.class),
executorName,
transformConfigManager,
mock(CheckpointProvider.class),
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.transform.transforms.pivot;

import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
Expand All @@ -28,7 +27,6 @@
import static org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSourceTests.randomTermsGroupSource;
import static org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSourceTests.randomTermsGroupSourceNoScript;

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108530")
public class GroupByOptimizerTests extends ESTestCase {

public void testOneGroupBy() {
Expand Down