diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index d77603744c703..b06821658f85c 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -280,6 +280,42 @@ private void iterateBeforeIndexShardRecovery( outerListener.onResponse(null); } + private void iterateAfterIndexShardRecovery( + final IndexShard indexShard, + final Iterator iterator, + final ActionListener outerListener + ) { + while (iterator.hasNext()) { + final var nextListener = iterator.next(); + final var future = new ListenableFuture(); + try { + nextListener.afterIndexShardRecovery(indexShard, future); + if (future.isDone()) { + // common case, not actually async, so just check for an exception and continue on the same thread + future.result(); + continue; + } + } catch (Exception e) { + outerListener.onFailure(e); + return; + } + + // future was not completed straight away, but might be done by now, so continue on a fresh thread to avoid stack overflow + future.addListener( + outerListener.delegateFailure( + (delegate, v) -> indexShard.getThreadPool() + .executor(ThreadPool.Names.GENERIC) + .execute( + ActionRunnable.wrap(delegate, l -> iterateAfterIndexShardRecovery(indexShard, iterator, l)) + ) + ) + ); + return; + } + + outerListener.onResponse(null); + } + @Override public void beforeIndexShardRecovery( final IndexShard indexShard, @@ -292,6 +328,14 @@ public void beforeIndexShardRecovery( })); } + @Override + public void afterIndexShardRecovery(IndexShard indexShard, ActionListener outerListener) { + iterateAfterIndexShardRecovery(indexShard, listeners.iterator(), outerListener.delegateResponse((l, e) -> { + logger.warn(() -> format("failed to invoke the listener after the shard recovery for %s", indexShard.shardId()), e); + l.onFailure(e); + })); + } + @Override public void afterFilesRestoredFromRepository(IndexShard indexShard) { for (IndexEventListener listener : listeners) {