Skip to content
Permalink
Browse files

Better exception handling in actions when forking to a thread pool

An execution on a thread pool might be rejected due to its settings, have better handling in those cases across the actions we have.
closes #3524
  • Loading branch information...
kimchy committed Aug 16, 2013
1 parent b11f81d commit ad0eeef8593abf5d14fe43cf02ed10d93a5a0361
Showing with 377 additions and 285 deletions.
  1. +25 −17 src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java
  2. +50 −34 src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java
  3. +25 −17 src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java
  4. +26 −18 src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java
  5. +25 −17 src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java
  6. +26 −18 src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java
  7. +10 −6 src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java
  8. +40 −40 src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java
  9. +13 −9 src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java
  10. +47 −43 src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java
  11. +39 −23 ...n/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
  12. +16 −16 ...ain/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java
  13. +19 −11 ...java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java
  14. +16 −16 src/main/java/org/elasticsearch/action/support/single/shard/TransportShardSingleOperationAction.java
@@ -115,15 +115,19 @@ public void run() {
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
});
} else {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
});
} else {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
} catch (Throwable t) {
onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter);
}
}
}
@@ -144,18 +148,22 @@ public void onResult(QueryFetchSearchResult result) {

@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
}
});
}

void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

void finishHim() {
try {
innerFinishHim();
@@ -124,15 +124,19 @@ public void run() {
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
});
} else {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
});
} else {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
} catch (Throwable t) {
onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter);
}
}
}
@@ -153,18 +157,22 @@ public void onResult(QuerySearchResult result) {

@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
}
});
}

void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}

void executeFetchPhase() {
try {
innerExecuteFetchPhase();
@@ -217,15 +225,19 @@ public void run() {
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
} catch (Throwable t) {
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
}
}
}
@@ -246,18 +258,22 @@ public void onResult(FetchSearchResult result) {

@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}

void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

void finishHim() {
try {
innerFinishHim();
@@ -126,15 +126,19 @@ public void run() {
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
} catch (Throwable t) {
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
}
}
}
@@ -155,18 +159,22 @@ public void onResult(FetchSearchResult result) {

@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}

void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

void finishHim() {
try {
innerFinishHim();
@@ -170,15 +170,19 @@ public void run() {
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(shardIndex, node, target.v2());
}
});
} else {
executePhase(shardIndex, node, target.v2());
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executePhase(shardIndex, node, target.v2());
}
});
} else {
executePhase(shardIndex, node, target.v2());
}
} catch (Throwable t) {
onPhaseFailure(t, target.v2(), shardIndex);
}
}
}
@@ -200,7 +204,7 @@ public void run() {
}
}

private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override
public void onResult(QueryFetchSearchResult result) {
@@ -212,18 +216,22 @@ public void onResult(QueryFetchSearchResult result) {

@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
onPhaseFailure(t, searchId, shardIndex);
}
});
}

private void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
try {
innerFinishHim();
@@ -176,15 +176,19 @@ public void run() {
final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQueryPhase(shardIndex, counter, node, target.v2());
}
});
} else {
executeQueryPhase(shardIndex, counter, node, target.v2());
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeQueryPhase(shardIndex, counter, node, target.v2());
}
});
} else {
executeQueryPhase(shardIndex, counter, node, target.v2());
}
} catch (Throwable t) {
onQueryPhaseFailure(shardIndex, counter, target.v2(), t);
}
}
}
@@ -204,18 +208,22 @@ public void onResult(QuerySearchResult result) {

@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
onQueryPhaseFailure(shardIndex, counter, searchId, t);
}
});
}

void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}

private void executeFetchPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults);
AtomicArray<ExtTIntArrayList> docIdsToLoad = new AtomicArray<ExtTIntArrayList>(queryResults.length());
Oops, something went wrong.

0 comments on commit ad0eeef

Please sign in to comment.
You can’t perform that action at this time.