From 584f65a196da4d9a847d135329af41bda514d99a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 17 Nov 2014 09:44:56 +0100 Subject: [PATCH] [BWC] BWC layer for GetIndex should not block in a listener Today we execute BWC calls against nodes that have not GetIndex API in a action listeners #onFailure method. These calls are blocking today and might be executed on a bounded thread-pool which might deadlock the call depending on how many threads are in the pool and how the pool is setup. These calls should run async as well. --- .../support/AbstractIndicesAdminClient.java | 63 +++++++++++++++---- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java b/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java index b692d42e63010..a48ae0859067b 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java @@ -144,8 +144,12 @@ import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.transport.ActionNotFoundTransportException; +import java.util.ArrayList; +import java.util.List; + /** * */ @@ -255,36 +259,71 @@ public void onFailure(Throwable e) { Throwable rootCause = ExceptionsHelper.unwrapCause(e); if (rootCause instanceof ActionNotFoundTransportException) { String[] features = request.features(); - GetAliasesResponse aliasResponse = null; - GetMappingsResponse mappingResponse = null; - GetSettingsResponse settingsResponse = null; - GetWarmersResponse warmerResponse = null; + if (features == null || features.length == 0) { + throw new ElasticsearchIllegalStateException("no features selected for GetIndex"); + } try { + final List builderList = new ArrayList<>(); for (String feature : features) { switch (feature) { case "_alias": case "_aliases": - aliasResponse = prepareGetAliases(new String[0]).addIndices(request.indices()) - .setIndicesOptions(request.indicesOptions()).get(); + builderList.add(prepareGetAliases(new String[0]).addIndices(request.indices()) + .setIndicesOptions(request.indicesOptions())); break; case "_mapping": case "_mappings": - mappingResponse = prepareGetMappings(request.indices()).setIndicesOptions(request.indicesOptions()).get(); + builderList.add(prepareGetMappings(request.indices()).setIndicesOptions(request.indicesOptions())); break; case "_settings": - settingsResponse = prepareGetSettings(request.indices()).setIndicesOptions(request.indicesOptions()).get(); + builderList.add(prepareGetSettings(request.indices()).setIndicesOptions(request.indicesOptions())); break; case "_warmer": case "_warmers": - warmerResponse = prepareGetWarmers(request.indices()).setIndicesOptions(request.indicesOptions()).get(); + builderList.add(prepareGetWarmers(request.indices()).setIndicesOptions(request.indicesOptions())); break; default: throw new ElasticsearchIllegalStateException("feature [" + feature + "] is not valid"); } } - GetIndexResponse getIndexResponse = GetIndexResponse.convertResponses(aliasResponse, mappingResponse, - settingsResponse, warmerResponse); - onResponse(getIndexResponse); + final ActionListener actionListener = new ActionListener() { + final CountDown countDown = new CountDown(builderList.size()); + volatile GetAliasesResponse aliasResponse = null; + volatile GetMappingsResponse mappingResponse = null; + volatile GetSettingsResponse settingsResponse = null; + volatile GetWarmersResponse warmerResponse = null; + + @Override + public void onResponse(Object o) { + if (o instanceof GetAliasesResponse) { + aliasResponse = (GetAliasesResponse) o; + } else if (o instanceof GetMappingsResponse) { + mappingResponse = (GetMappingsResponse) o; + } else if (o instanceof GetSettingsResponse) { + settingsResponse = (GetSettingsResponse) o; + } else if (o instanceof GetWarmersResponse) { + warmerResponse = (GetWarmersResponse) o; + } else { + assert false : "Unexpected response type: " + o.getClass(); + } + if (countDown.countDown()) { + GetIndexResponse response = GetIndexResponse.convertResponses(aliasResponse, mappingResponse, + settingsResponse, warmerResponse); + listener.onResponse(response); + } + } + + @Override + public void onFailure(Throwable e) { + if (countDown.fastForward()) { + listener.onFailure(e); + } + } + }; + + for (ActionRequestBuilder builder : builderList) { + builder.execute(actionListener); + } } catch (Throwable e1) { listener.onFailure(e1); }