Skip to content

Commit

Permalink
Add an erorr store to record the errors encountered by DLM (#94391)
Browse files Browse the repository at this point in the history
DLM can encounter errors when attempting to execupte the needed
operations in order to implement the lifecycle configuration for a data
stream.

This adds an in-memory error store to record these errors.
  • Loading branch information
andreidan committed Mar 10, 2023
1 parent bea0468 commit f5b94eb
Show file tree
Hide file tree
Showing 6 changed files with 607 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
Expand All @@ -32,6 +38,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -40,8 +47,13 @@
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

Expand All @@ -64,6 +76,12 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return settings.build();
}

@After
public void cleanup() {
// we change SETTING_CLUSTER_MAX_SHARDS_PER_NODE in a test so let's make sure we clean it up even when the test fails
updateClusterSettings(Settings.builder().putNull("*"));
}

public void testRolloverLifecycle() throws Exception {
// empty lifecycle contains the default rollover
DataLifecycle lifecycle = new DataLifecycle();
Expand Down Expand Up @@ -116,6 +134,217 @@ public void testRolloverAndRetention() throws Exception {
});
}

public void testErrorRecordingOnRollover() throws Exception {
// empty lifecycle contains the default rollover
DataLifecycle lifecycle = new DataLifecycle();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

indexDocs(dataStreamName, 1);

// let's allow one rollover to go through
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

// prevent new indices from being created (ie. future rollovers)
updateClusterSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 1));

indexDocs(dataStreamName, 1);

assertBusy(() -> {
String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2);
String writeIndexRolloverError = null;
Iterable<DataLifecycleService> lifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService lifecycleService : lifecycleServices) {
writeIndexRolloverError = lifecycleService.getErrorStore().getError(writeIndexName);
if (writeIndexRolloverError != null) {
break;
}
}

assertThat(writeIndexRolloverError, is(notNullValue()));
assertThat(writeIndexRolloverError, containsString("maximum normal shards open"));
});

// let's reset the cluster max shards per node limit to allow rollover to proceed and check the error store is empty
updateClusterSettings(Settings.builder().putNull("*"));

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(3));
String writeIndex = backingIndices.get(2).getName();
// rollover was successful and we got to generation 3
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 3));

// we recorded the error against the previous write index (generation 2)
// let's check there's no error recorded against it anymore
String previousWriteInddex = DataStream.getDefaultBackingIndexName(dataStreamName, 2);
Iterable<DataLifecycleService> lifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(previousWriteInddex), nullValue());
}
});
}

public void testErrorRecordingOnRetention() throws Exception {
// starting with a lifecycle without retention so we can rollover the data stream and manipulate the second generation index such
// that its retention execution fails
DataLifecycle lifecycle = new DataLifecycle();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
indexDocs(dataStreamName, 1);

// let's allow one rollover to go through
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1L);

// mark the first generation index as read-only so deletion fails when we enable the retention configuration
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(firstGenerationIndex);
updateSettingsRequest.settings(Settings.builder().put(READ_ONLY.settingName(), true));
try {
client().admin().indices().updateSettings(updateSettingsRequest);

// TODO replace this with an API call to update the lifecycle for the data stream once available
PlainActionFuture.get(
fut -> internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitUnbatchedStateUpdateTask("update the data stream retention", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState state) {
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
assert dataStream != null : "data stream must exist";
Metadata.Builder builder = Metadata.builder(state.metadata());
DataStream updatedDataStream = new DataStream(
dataStreamName,
dataStream.getIndices(),
dataStream.getGeneration(),
dataStream.getMetadata(),
dataStream.isHidden(),
dataStream.isReplicated(),
dataStream.isSystem(),
dataStream.isAllowCustomRouting(),
dataStream.getIndexMode(),
new DataLifecycle(TimeValue.timeValueSeconds(1))
);
builder.put(updatedDataStream);
return ClusterState.builder(state).metadata(builder).build();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage(), e);
fail(
"unable to update the retention policy for data stream ["
+ dataStreamName
+ "] due to ["
+ e.getMessage()
+ "]"
);
}

@Override
public void clusterStateProcessed(ClusterState initialState, ClusterState newState) {
fut.onResponse(null);
}
})
);

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));

String recordedRetentionExecutionError = null;
Iterable<DataLifecycleService> lifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService lifecycleService : lifecycleServices) {
recordedRetentionExecutionError = lifecycleService.getErrorStore().getError(firstGenerationIndex);
if (recordedRetentionExecutionError != null) {
break;
}
}

assertThat(recordedRetentionExecutionError, is(notNullValue()));
assertThat(recordedRetentionExecutionError, containsString("blocked by: [FORBIDDEN/5/index read-only (api)"));
});

// let's mark the index as writeable and make sure it's deleted and the error store is empty
updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), false), firstGenerationIndex);

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
// data stream only has one index now
assertThat(backingIndices.size(), equalTo(1));

// error stores don't contain anything for the first generation index anymore
Iterable<DataLifecycleService> lifecycleServices = internalCluster().getInstances(DataLifecycleService.class);
for (DataLifecycleService lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(firstGenerationIndex), nullValue());
}
});
} finally {
// when the test executes successfully this will not be needed however, otherwise we need to make sure the index is
// "delete-able" for test cleanup
try {
updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), false), firstGenerationIndex);
} catch (Exception e) {
// index would be deleted if the test is successful
}
}
}

static void indexDocs(String dataStream, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.dlm;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.Nullable;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;

/**
* Provides a store for the errors DLM encounters.
* It offers the functionality to record, retrieve, and clear errors for a specified target.
* This class is thread safe.
*/
public class DataLifecycleErrorStore {

private final ConcurrentMap<String, String> targetToError = new ConcurrentHashMap<>();

/**
* Records a string representation of the provided exception for the provided target.
* If an error was already recorded for the provided target this will override that error.
*/
public void recordError(String target, Exception e) {
targetToError.put(target, org.elasticsearch.common.Strings.toString(((builder, params) -> {
ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e);
return builder;
})));
}

/**
* Clears the recorded error for the provided target (if any exists)
*/
public void clearRecordedError(String target) {
targetToError.remove(target);
}

/**
* Clears all the errors recorded in the store.
*/
public void clearStore() {
targetToError.clear();
}

/**
* Retrieves the recorderd error for the provided target.
*/
@Nullable
public String getError(String target) {
return targetToError.get(target);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public Collection<Object> createComponents(
clusterService,
getClock(),
threadPool,
threadPool::absoluteTimeInMillis
threadPool::absoluteTimeInMillis,
new DataLifecycleErrorStore()
)
);
dataLifecycleInitialisationService.get().init();
Expand Down

0 comments on commit f5b94eb

Please sign in to comment.