Skip to content

Commit

Permalink
[7.14] Add logging of shard failures (#75275) (#75357)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Jul 15, 2021
1 parent eb28a49 commit b5a1412
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* An exception indicating that a failure occurred performing an operation on the shard.
*
*/
public abstract class ShardOperationFailedException implements Writeable, ToXContentObject {
public abstract class ShardOperationFailedException extends Exception implements Writeable, ToXContentObject {

protected String index;
protected int shardId = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
import org.elasticsearch.ElasticsearchException;

class CheckpointException extends ElasticsearchException {
CheckpointException(String msg, Object... params) {
super(msg, null, params);
}

CheckpointException(String msg, Throwable cause, Object... params) {
super(msg, cause, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,19 @@ private static void getCheckpointsFromOneCluster(
new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
ActionListener.wrap(response -> {
if (response.getFailedShards() != 0) {
listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
for (int i = 0; i < response.getShardFailures().length; ++i) {
logger.warn(
new ParameterizedMessage(
"Source has [{}] failed shards, shard failure [{}]",
response.getFailedShards(), i).getFormattedMessage(),
response.getShardFailures()[i]);
}
listener.onFailure(
new CheckpointException(
"Source has [{}] failed shards, first shard failure: {}",
response.getShardFailures()[0],
response.getFailedShards(),
response.getShardFailures()[0].toString()));
return;
}
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class RemoteClusterResolver extends RemoteClusterAware {

private final CopyOnWriteArraySet<String> clusters;

class ResolvedIndices {
static class ResolvedIndices {
private final Map<String, List<String>> remoteIndicesPerClusterAlias;
private final List<String> localIndices;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", getJobId()), createCheckpointException);
listener.onFailure(
new RuntimeException(
"Failed to create checkpoint due to " + createCheckpointException.getMessage(),
"Failed to create checkpoint due to: " + createCheckpointException.getMessage(),
createCheckpointException
)
);
Expand All @@ -237,7 +237,7 @@ protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", getJobId()), getCheckPointException);
listener.onFailure(
new RuntimeException(
"Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(),
"Failed to retrieve checkpoint due to: " + getCheckPointException.getMessage(),
getCheckPointException
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,44 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor.AuditExpectation;
import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
import org.junit.Before;
import org.mockito.stubbing.Answer;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DefaultCheckpointProviderTests extends ESTestCase {

Expand All @@ -40,7 +59,10 @@ public class DefaultCheckpointProviderTests extends ESTestCase {

@Before
public void setUpMocks() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
transformConfigManager = mock(IndexBasedTransformConfigManager.class);
transformAuditor = MockTransformAuditor.createMockAuditor();
}
Expand Down Expand Up @@ -188,6 +210,55 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
);
}

public void testHandlingShardFailures() throws Exception {
String transformId = getTestName();
String indexName = "some-index";
TransformConfig transformConfig =
new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId))
.setSource(new SourceConfig(indexName))
.build();

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);
doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName)))
.when(remoteClusterResolver).resolve(transformConfig.getSource().getIndex());

GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());

IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
doReturn(7).when(indicesStatsResponse).getFailedShards();
doReturn(
new DefaultShardOperationFailedException[] {
new DefaultShardOperationFailedException(indexName, 3, new Exception("something's wrong"))
}).when(indicesStatsResponse).getShardFailures();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

CountDownLatch latch = new CountDownLatch(1);
provider.createNextCheckpoint(
null,
new LatchedActionListener<>(
ActionListener.wrap(
response -> fail("This test case must fail"),
e -> assertThat(
e.getMessage(),
startsWith(
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
+ "reason [Exception[something's wrong]]"))
),
latch
)
);
latch.await(10, TimeUnit.SECONDS);
}

private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock)
throws IllegalAccessException {
MockLogAppender mockLogAppender = new MockLogAppender();
Expand All @@ -210,4 +281,12 @@ private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpec
}
}

@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
};
}
}

0 comments on commit b5a1412

Please sign in to comment.