Skip to content

Commit

Permalink
[BWC] BWC layer for GetIndex should not block in a listener
Browse files Browse the repository at this point in the history
Today we execute BWC calls against nodes that have not GetIndex API
in a action listeners #onFailure method. These calls are blocking today
and might be executed on a bounded thread-pool which might deadlock the
call depending on how many threads are in the pool and how the pool is
setup. These calls should run async as well.
  • Loading branch information
s1monw committed Nov 17, 2014
1 parent c6a570f commit 584f65a
Showing 1 changed file with 51 additions and 12 deletions.
Expand Up @@ -144,8 +144,12 @@
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.transport.ActionNotFoundTransportException;

import java.util.ArrayList;
import java.util.List;

/**
*
*/
Expand Down Expand Up @@ -255,36 +259,71 @@ public void onFailure(Throwable e) {
Throwable rootCause = ExceptionsHelper.unwrapCause(e);
if (rootCause instanceof ActionNotFoundTransportException) {
String[] features = request.features();
GetAliasesResponse aliasResponse = null;
GetMappingsResponse mappingResponse = null;
GetSettingsResponse settingsResponse = null;
GetWarmersResponse warmerResponse = null;
if (features == null || features.length == 0) {
throw new ElasticsearchIllegalStateException("no features selected for GetIndex");
}
try {
final List<ActionRequestBuilder> builderList = new ArrayList<>();
for (String feature : features) {
switch (feature) {
case "_alias":
case "_aliases":
aliasResponse = prepareGetAliases(new String[0]).addIndices(request.indices())
.setIndicesOptions(request.indicesOptions()).get();
builderList.add(prepareGetAliases(new String[0]).addIndices(request.indices())
.setIndicesOptions(request.indicesOptions()));
break;
case "_mapping":
case "_mappings":
mappingResponse = prepareGetMappings(request.indices()).setIndicesOptions(request.indicesOptions()).get();
builderList.add(prepareGetMappings(request.indices()).setIndicesOptions(request.indicesOptions()));
break;
case "_settings":
settingsResponse = prepareGetSettings(request.indices()).setIndicesOptions(request.indicesOptions()).get();
builderList.add(prepareGetSettings(request.indices()).setIndicesOptions(request.indicesOptions()));
break;
case "_warmer":
case "_warmers":
warmerResponse = prepareGetWarmers(request.indices()).setIndicesOptions(request.indicesOptions()).get();
builderList.add(prepareGetWarmers(request.indices()).setIndicesOptions(request.indicesOptions()));
break;
default:
throw new ElasticsearchIllegalStateException("feature [" + feature + "] is not valid");
}
}
GetIndexResponse getIndexResponse = GetIndexResponse.convertResponses(aliasResponse, mappingResponse,
settingsResponse, warmerResponse);
onResponse(getIndexResponse);
final ActionListener<?> actionListener = new ActionListener<Object>() {
final CountDown countDown = new CountDown(builderList.size());
volatile GetAliasesResponse aliasResponse = null;
volatile GetMappingsResponse mappingResponse = null;
volatile GetSettingsResponse settingsResponse = null;
volatile GetWarmersResponse warmerResponse = null;

@Override
public void onResponse(Object o) {
if (o instanceof GetAliasesResponse) {
aliasResponse = (GetAliasesResponse) o;
} else if (o instanceof GetMappingsResponse) {
mappingResponse = (GetMappingsResponse) o;
} else if (o instanceof GetSettingsResponse) {
settingsResponse = (GetSettingsResponse) o;
} else if (o instanceof GetWarmersResponse) {
warmerResponse = (GetWarmersResponse) o;
} else {
assert false : "Unexpected response type: " + o.getClass();
}
if (countDown.countDown()) {
GetIndexResponse response = GetIndexResponse.convertResponses(aliasResponse, mappingResponse,
settingsResponse, warmerResponse);
listener.onResponse(response);
}
}

@Override
public void onFailure(Throwable e) {
if (countDown.fastForward()) {
listener.onFailure(e);
}
}
};

for (ActionRequestBuilder builder : builderList) {
builder.execute(actionListener);
}
} catch (Throwable e1) {
listener.onFailure(e1);
}
Expand Down

0 comments on commit 584f65a

Please sign in to comment.