Skip to content

Commit

Permalink
Fix AbstractClient#execute Listener Leak (#65415)
Browse files Browse the repository at this point in the history
This was observed in #65405 due to trying to locally execute a
task whose parent was already cancelled but is a general issue.

We should not throw from APIs that consume a listener as this may
like in this case leak the listener in that case.
Rather than fixing the specific case of #65405 this fixes the
abstract client overall to avoid other such leaks.

Closes #65405
  • Loading branch information
original-brownbear committed Nov 30, 2020
1 parent b6d761e commit 8046899
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,42 @@ protected void masterOperation(Task task, final ResizeRequest resizeRequest, fin
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());

final IndexMetadata sourceMetadata = state.metadata().index(sourceIndex);
if (sourceMetadata == null) {
listener.onFailure(new IndexNotFoundException(sourceIndex));
return;
}

IndicesStatsRequestBuilder statsRequestBuilder = client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true);
IndicesStatsRequest statsRequest = statsRequestBuilder.request();
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
// TODO: only fetch indices stats for shrink type resize requests
client.execute(IndicesStatsAction.INSTANCE, statsRequest,
ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
i -> {
final CreateIndexClusterStateUpdateRequest updateRequest;
try {
updateRequest = prepareCreateIndexRequest(resizeRequest, sourceMetadata, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
}, targetIndex);
} catch (Exception e) {
delegatedListener.onFailure(e);
return;
}
createIndexService.createIndex(
updateRequest, ActionListener.map(delegatedListener,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}));

}

// static for unittesting this method
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ResizeRequest resizeRequest, final ClusterState state
, final IntFunction<DocsStats> perShardDocStats, String sourceIndexName, String targetIndexName) {
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ResizeRequest resizeRequest,
final IndexMetadata sourceMetadata,
final IntFunction<DocsStats> perShardDocStats,
final String targetIndexName) {
final CreateIndexRequest targetIndex = resizeRequest.getTargetIndexRequest();
final IndexMetadata metadata = state.metadata().index(sourceIndexName);
if (metadata == null) {
throw new IndexNotFoundException(sourceIndexName);
}
final Settings.Builder targetIndexSettingsBuilder = Settings.builder().put(targetIndex.settings())
.normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX);
targetIndexSettingsBuilder.remove(IndexMetadata.SETTING_HISTORY_UUID);
Expand All @@ -127,13 +137,13 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi
numShards = 1;
} else {
assert resizeRequest.getResizeType() == ResizeType.CLONE;
numShards = metadata.getNumberOfShards();
numShards = sourceMetadata.getNumberOfShards();
}
}

for (int i = 0; i < numShards; i++) {
if (resizeRequest.getResizeType() == ResizeType.SHRINK) {
Set<ShardId> shardIds = IndexMetadata.selectShrinkShards(i, metadata, numShards);
Set<ShardId> shardIds = IndexMetadata.selectShrinkShards(i, sourceMetadata, numShards);
long count = 0;
for (ShardId id : shardIds) {
DocsStats docsStats = perShardDocStats.apply(id.id());
Expand All @@ -146,10 +156,10 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi
}
}
} else if (resizeRequest.getResizeType() == ResizeType.SPLIT) {
Objects.requireNonNull(IndexMetadata.selectSplitShard(i, metadata, numShards));
Objects.requireNonNull(IndexMetadata.selectSplitShard(i, sourceMetadata, numShards));
// we just execute this to ensure we get the right exceptions if the number of shards is wrong or less then etc.
} else {
Objects.requireNonNull(IndexMetadata.selectCloneShard(i, metadata, numShards));
Objects.requireNonNull(IndexMetadata.selectCloneShard(i, sourceMetadata, numShards));
// we just execute this to ensure we get the right exceptions if the number of shards is wrong etc.
}
}
Expand All @@ -159,12 +169,13 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi
}
if (IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
// if we have a source index with 1 shards it's legal to set this
final boolean splitFromSingleShards = resizeRequest.getResizeType() == ResizeType.SPLIT && metadata.getNumberOfShards() == 1;
final boolean splitFromSingleShards =
resizeRequest.getResizeType() == ResizeType.SPLIT && sourceMetadata.getNumberOfShards() == 1;
if (splitFromSingleShards == false) {
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
}
}
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(metadata.getSettings()) &&
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(sourceMetadata.getSettings()) &&
IndexSettings.INDEX_SOFT_DELETES_SETTING.exists(targetIndexSettings) &&
IndexSettings.INDEX_SOFT_DELETES_SETTING.get(targetIndexSettings) == false) {
throw new IllegalArgumentException("Can't disable [index.soft_deletes.enabled] setting on resize");
Expand All @@ -184,7 +195,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi
.settings(targetIndex.settings())
.aliases(targetIndex.aliases())
.waitForActiveShards(targetIndex.waitForActiveShards())
.recoverFrom(metadata.getIndex())
.recoverFrom(sourceMetadata.getIndex())
.resizeType(resizeRequest.getResizeType())
.copySettings(resizeRequest.getCopySettings() == null ? false : resizeRequest.getCopySettings());
}
Expand Down
32 changes: 30 additions & 2 deletions server/src/main/java/org/elasticsearch/client/node/NodeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -79,24 +80,51 @@ public void close() {
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
try {
executeLocally(action, request, listener);
} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {
// #executeLocally returns the task and throws TaskCancelledException if it fails to register the task because the parent
// task has been cancelled, IllegalStateException if the client was not in a state to execute the request because it was not
// yet properly initialized or IllegalArgumentException if header validation fails we forward them to listener since this API
// does not concern itself with the specifics of the task handling
listener.onFailure(e);
}
}

/**
* Execute an {@link ActionType} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}.
* Prefer this method if you don't need access to the task when listening for the response. This is the method used to
* implement the {@link Client} interface.
*
* @throws TaskCancelledException if the request's parent task has been cancelled already
*/
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {
return taskManager.registerAndExecute("transport", transportAction(action), request,
(t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
(t, r) -> {
try {
listener.onResponse(r);
} catch (Exception e) {
assert false : new AssertionError("callback must handle its own exceptions", e);
throw e;
}
}, (t, e) -> {
try {
listener.onFailure(e);
} catch (Exception ex) {
ex.addSuppressed(e);
assert false : new AssertionError("callback must handle its own exceptions", ex);
throw ex;
}
});
}

/**
* Execute an {@link ActionType} locally, returning that {@link Task} used to track it, and linking an {@link TaskListener}.
* Prefer this method if you need access to the task when listening for the response.
*
* @throws TaskCancelledException if the request's parent task has been cancelled already
*/
public < Request extends ActionRequest,
Response extends ActionResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,12 @@ public final <Request extends ActionRequest, Response extends ActionResponse> Ac
@Override
public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
ActionType<Response> action, Request request, ActionListener<Response> listener) {
doExecute(action, request, listener);
try {
doExecute(action, request, listener);
} catch (Exception e) {
assert false : new AssertionError(e);
listener.onFailure(e);
}
}

protected abstract <Request extends ActionRequest, Response extends ActionResponse>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
* will invoke the listener immediately.
*/
void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
final RemoteClusterConnection remoteClusterConnection;
try {
remoteClusterConnection = getRemoteClusterConnection(clusterAlias);
} catch (NoSuchRemoteClusterException e) {
listener.onFailure(e);
return;
}
remoteClusterConnection.ensureConnected(listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,36 @@ private ClusterState createClusterState(String name, int numShards, int numRepli
}

public void testErrorCondition() {
ClusterState state = createClusterState("source", randomIntBetween(2, 42), randomIntBetween(0, 10),
Settings.builder().put("index.blocks.write", true).build());
IndexMetadata state = createClusterState("source", randomIntBetween(2, 42), randomIntBetween(0, 10),
Settings.builder().put("index.blocks.write", true).build()).metadata().index("source");
assertTrue(
expectThrows(IllegalStateException.class, () ->
TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), state,
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), "source", "target")
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), "target")
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));


assertTrue(
expectThrows(IllegalStateException.class, () -> {
ResizeRequest req = new ResizeRequest("target", "source");
req.getTargetIndexRequest().settings(Settings.builder().put("index.number_of_shards", 4));
ClusterState clusterState = createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).build());
TransportResizeAction.prepareCreateIndexRequest(req, clusterState,
TransportResizeAction.prepareCreateIndexRequest(req,
createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).build()).metadata().index("source"),
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null
, "source", "target");
, "target");
}
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));


IllegalArgumentException softDeletesError = expectThrows(IllegalArgumentException.class, () -> {
ResizeRequest req = new ResizeRequest("target", "source");
req.getTargetIndexRequest().settings(Settings.builder().put("index.soft_deletes.enabled", false));
ClusterState clusterState = createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).put("index.soft_deletes.enabled", true).build());
TransportResizeAction.prepareCreateIndexRequest(req, clusterState,
(i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), "source", "target");
TransportResizeAction.prepareCreateIndexRequest(req,
createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).put("index.soft_deletes.enabled", true).build())
.metadata().index("source"),
(i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), "target");
});
assertThat(softDeletesError.getMessage(), equalTo("Can't disable [index.soft_deletes.enabled] setting on resize"));

Expand All @@ -119,8 +120,8 @@ public void testErrorCondition() {
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState,
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), "source", "target");
TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState.metadata().index("source"),
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), "target");
}

public void testPassNumRoutingShards() {
Expand All @@ -142,14 +143,15 @@ public void testPassNumRoutingShards() {
resizeRequest.setResizeType(ResizeType.SPLIT);
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", 2).build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState, null, "source", "target");
IndexMetadata indexMetadata = clusterState.metadata().index("source");
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, null, "target");

resizeRequest.getTargetIndexRequest()
.settings(Settings.builder()
.put("index.number_of_routing_shards", randomIntBetween(2, 10))
.put("index.number_of_shards", 2)
.build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState, null, "source", "target");
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, null, "target");
}

public void testPassNumRoutingShardsAndFail() {
Expand All @@ -172,15 +174,15 @@ public void testPassNumRoutingShardsAndFail() {
resizeRequest.setResizeType(ResizeType.SPLIT);
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", numShards * 2).build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState, null, "source", "target");
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState.metadata().index("source"), null, "target");

resizeRequest.getTargetIndexRequest()
.settings(Settings.builder()
.put("index.number_of_shards", numShards * 2)
.put("index.number_of_routing_shards", numShards * 2).build());
ClusterState finalState = clusterState;
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState, null, "source", "target"));
() -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState.metadata().index("source"), null, "target"));
assertEquals("cannot provide index.number_of_routing_shards on resize", iae.getMessage());
}

Expand Down Expand Up @@ -208,7 +210,7 @@ public void testShrinkIndexSettings() {
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
target.setWaitForActiveShards(activeShardCount);
CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest(
target, clusterState, (i) -> stats, indexName, "target");
target, clusterState.metadata().index(indexName), (i) -> stats, "target");
assertNotNull(request.recoverFrom());
assertEquals(indexName, request.recoverFrom().getName());
assertEquals("1", request.settings().get("index.number_of_shards"));
Expand Down

0 comments on commit 8046899

Please sign in to comment.