From 13f787ecc59677c95e3d6ab62ddc7184bca68b0e Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Tue, 22 Oct 2024 10:36:06 +0300 Subject: [PATCH] Adjust failure store to work with TSDS (#114307) In this PR we add a test and we fix the issues we encountered when we enabled the failure store for TSDS and logsdb. **Logsdb** Logsdb worked out of the box, so we just added the test that indexes with a bulk request a couple of documents and tests how they are ingested. **TSDS** Here it was a bit trickier. We encountered the following issues: - TSDS requires a timestamp to determine the write index of the data stream meaning the failure happens earlier than we have anticipated so far. We added a special exception to detect this case and we treat it accordingly. - The template of a TSDS data stream sets certain settings that we do not want to have in the failure store index. We added an allowlist that gets applied before we add the necessary index settings. Furthermore, we added a test case to capture this. --- .../datastreams/TSDBIndexingIT.java | 3 +- .../datastreams/DataStreamFeatures.java | 6 ++ .../test/data_stream/150_tsdb.yml | 101 ++++++++++++++++++ .../190_failure_store_redirection.yml | 2 +- .../CreateIndexClusterStateUpdateRequest.java | 12 +++ .../action/bulk/BulkOperation.java | 7 ++ .../cluster/metadata/DataStream.java | 24 ++++- .../DataStreamFailureStoreDefinition.java | 47 ++++++-- .../MetadataCreateDataStreamService.java | 3 +- .../metadata/MetadataCreateIndexService.java | 7 +- ...DataStreamFailureStoreDefinitionTests.java | 73 +++++++++++++ .../rest-api-spec/test/20_failure_store.yml | 99 +++++++++++++++++ 12 files changed, 369 insertions(+), 15 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinitionTests.java create mode 100644 x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java index 686e253d1d173..c404029fa4095 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -170,7 +171,7 @@ public void testTimeRanges() throws Exception { var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE); time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99)); indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON); - expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet()); + expectThrows(IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class, () -> client().index(indexRequest).actionGet()); } // Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index: diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java index ab7e590b1631e..f60a3e5c47a7f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java @@ -26,6 +26,7 @@ public class DataStreamFeatures implements FeatureSpecification { public static final NodeFeature DATA_STREAM_LIFECYCLE = new NodeFeature("data_stream.lifecycle"); + public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix"); @Override public Map getHistoricalFeatures() { @@ -41,4 +42,9 @@ public Set getFeatures() { DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14 ); } + + @Override + public Set getTestFeatures() { + return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX); + } } diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml index 56f387c016261..de5cf3baa744e 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml @@ -182,6 +182,107 @@ index without timestamp: body: - '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' +--- +TSDB failures go to failure store: + - requires: + cluster_features: ["data_stream.failure_store.tsdb_fix"] + reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store." + + - do: + allowed_warnings: + - "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation" + indices.put_index_template: + name: my-template2 + body: + index_patterns: [ "fs-k8s*" ] + data_stream: + failure_store: true + template: + settings: + index: + mode: time_series + number_of_replicas: 1 + number_of_shards: 2 + routing_path: [ metricset, time_series_dimension ] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + ip: + type: ip + network: + properties: + tx: + type: long + rx: + type: long + - do: + index: + index: fs-k8s + body: + - '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - match: { result : "created"} + - match: { failure_store : "used"} + + - do: + bulk: + refresh: true + body: + - '{ "create": { "_index": "fs-k8s"} }' + - '{"@timestamp":"2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - '{ "create": { "_index": "k8s"} }' + - '{ "@timestamp": "2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - '{ "create": { "_index": "fs-k8s"} }' + - '{ "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - '{ "create": { "_index": "fs-k8s"} }' + - '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - '{ "create": { "_index": "k8s"} }' + - '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - '{ "create": { "_index": "k8s"} }' + - '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - is_true: errors + + # Successfully indexed to backing index + - match: { items.0.create._index: '/\.ds-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { items.0.create.status: 201 } + - is_false: items.0.create.failure_store + - match: { items.1.create._index: '/\.ds-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { items.1.create.status: 201 } + - is_false: items.1.create.failure_store + + # Successfully indexed to failure store + - match: { items.2.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' } + - match: { items.2.create.status: 201 } + - match: { items.2.create.failure_store: used } + - match: { items.3.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' } + - match: { items.3.create.status: 201 } + - match: { items.3.create.failure_store: used } + + # Rejected, eligible to go to failure store, but failure store not enabled + - match: { items.4.create._index: 'k8s' } + - match: { items.4.create.status: 400 } + - match: { items.4.create.error.type: timestamp_error } + - match: { items.4.create.failure_store: not_enabled } + - match: { items.4.create._index: 'k8s' } + - match: { items.4.create.status: 400 } + - match: { items.4.create.error.type: timestamp_error } + - match: { items.4.create.failure_store: not_enabled } + --- index without timestamp with pipeline: - do: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml index cb5578a282dc9..9b5a9dae8bc0a 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml @@ -879,7 +879,7 @@ teardown: # Successfully indexed to backing index - match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } - match: { items.0.create.status: 201 } - - is_false: items.1.create.failure_store + - is_false: items.0.create.failure_store # Rejected but not eligible to go to failure store - match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 080ebb5951a7a..553f784d23a87 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -35,6 +35,7 @@ public class CreateIndexClusterStateUpdateRequest { private ResizeType resizeType; private boolean copySettings; private SystemDataStreamDescriptor systemDataStreamDescriptor; + private boolean isFailureIndex = false; private Settings settings = Settings.EMPTY; @@ -102,6 +103,11 @@ public CreateIndexClusterStateUpdateRequest systemDataStreamDescriptor(SystemDat return this; } + public CreateIndexClusterStateUpdateRequest isFailureIndex(boolean isFailureIndex) { + this.isFailureIndex = isFailureIndex; + return this; + } + public String cause() { return cause; } @@ -168,6 +174,10 @@ public String dataStreamName() { return dataStreamName; } + public boolean isFailureIndex() { + return isFailureIndex; + } + public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName) { this.dataStreamName = dataStreamName; return this; @@ -228,6 +238,8 @@ public String toString() { + systemDataStreamDescriptor + ", matchingTemplate=" + matchingTemplate + + ", isFailureIndex=" + + isFailureIndex + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 007f274d7f493..130d6286f7e02 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -320,6 +320,12 @@ private Map> groupRequestsByShards( shard -> new ArrayList<>() ); shardRequests.add(bulkItemRequest); + } catch (DataStream.TimestampError timestampError) { + IndexDocFailureStoreStatus failureStoreStatus = processFailure(bulkItemRequest, clusterState, timestampError); + if (IndexDocFailureStoreStatus.USED.equals(failureStoreStatus) == false) { + String name = ia != null ? ia.getName() : docWriteRequest.index(); + addFailureAndDiscardRequest(docWriteRequest, bulkItemRequest.id(), name, timestampError, failureStoreStatus); + } } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) { String name = ia != null ? ia.getName() : docWriteRequest.index(); var failureStoreStatus = isFailureStoreRequest(docWriteRequest) @@ -545,6 +551,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques boolean added = addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName()); if (added) { failureStoreMetrics.incrementFailureStore(bulkItemRequest.index(), errorType, FailureStoreMetrics.ErrorLocation.SHARD); + return IndexDocFailureStoreStatus.USED; } else { failureStoreMetrics.incrementRejected( bulkItemRequest.index(), diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index bedf65e1a9c8b..4dcc7c73c280e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -1343,7 +1343,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) { + "]" ) .collect(Collectors.joining()); - throw new IllegalArgumentException( + throw new TimestampError( "the document timestamp [" + timestampAsString + "] is outside of ranges of currently writable indices [" @@ -1405,10 +1405,10 @@ private static Instant getTimeStampFromRaw(Object rawTimestamp) { } else if (rawTimestamp instanceof String sTimestamp) { return DateFormatters.from(TIMESTAMP_FORMATTER.parse(sTimestamp), TIMESTAMP_FORMATTER.locale()).toInstant(); } else { - throw new IllegalArgumentException("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error"); + throw new TimestampError("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error"); } } catch (Exception e) { - throw new IllegalArgumentException("Error get data stream timestamp field: " + e.getMessage(), e); + throw new TimestampError("Error get data stream timestamp field: " + e.getMessage(), e); } } @@ -1432,7 +1432,7 @@ private static Instant getTimestampFromParser(BytesReference source, XContentTyp ); }; } catch (Exception e) { - throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e); + throw new TimestampError("Error extracting data stream timestamp field: " + e.getMessage(), e); } } @@ -1741,4 +1741,20 @@ public DataStream build() { ); } } + + /** + * This is a specialised error to capture that a document does not have a valid timestamp + * to index a document. It is mainly applicable for TSDS data streams because they need the timestamp + * to determine the write index. + */ + public static class TimestampError extends IllegalArgumentException { + + public TimestampError(String message, Exception cause) { + super(message, cause); + } + + public TimestampError(String message) { + super(message); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java index fd3fc1a732acb..7315e9f7a51d3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -19,6 +20,8 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; /** * A utility class that contains the mappings and settings logic for failure store indices that are a part of data streams. @@ -26,12 +29,30 @@ public class DataStreamFailureStoreDefinition { public static final String FAILURE_STORE_REFRESH_INTERVAL_SETTING_NAME = "data_streams.failure_store.refresh_interval"; + public static final String INDEX_FAILURE_STORE_VERSION_SETTING_NAME = "index.failure_store.version"; public static final Settings DATA_STREAM_FAILURE_STORE_SETTINGS; + // Only a subset of user configurable settings is applicable for a failure index. Here we have an + // allowlist that will filter all other settings out. + public static final Set SUPPORTED_USER_SETTINGS = Set.of( + DataTier.TIER_PREFERENCE, + IndexMetadata.SETTING_INDEX_HIDDEN, + INDEX_FAILURE_STORE_VERSION_SETTING_NAME, + IndexMetadata.SETTING_NUMBER_OF_SHARDS, + IndexMetadata.SETTING_NUMBER_OF_REPLICAS, + IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, + IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), + IndexMetadata.LIFECYCLE_NAME + ); + public static final Set SUPPORTED_USER_SETTINGS_PREFIXES = Set.of( + IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".", + IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".", + IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "." + ); public static final CompressedXContent DATA_STREAM_FAILURE_STORE_MAPPING; public static final int FAILURE_STORE_DEFINITION_VERSION = 1; public static final Setting FAILURE_STORE_DEFINITION_VERSION_SETTING = Setting.intSetting( - "index.failure_store.version", + INDEX_FAILURE_STORE_VERSION_SETTING_NAME, 0, Setting.Property.IndexScope ); @@ -40,11 +61,6 @@ public class DataStreamFailureStoreDefinition { DATA_STREAM_FAILURE_STORE_SETTINGS = Settings.builder() // Always start with the hidden settings for a backing index. .put(IndexMetadata.SETTING_INDEX_HIDDEN, true) - // Override any pipeline settings on the failure store to not use any - // specified by the data stream template. Default pipelines are very much - // meant for the backing indices only. - .putNull(IndexSettings.DEFAULT_PIPELINE.getKey()) - .putNull(IndexSettings.FINAL_PIPELINE.getKey()) .put(FAILURE_STORE_DEFINITION_VERSION_SETTING.getKey(), FAILURE_STORE_DEFINITION_VERSION) .build(); @@ -199,4 +215,23 @@ public static Settings.Builder applyFailureStoreSettings(Settings nodeSettings, } return builder; } + + /** + * Removes the unsupported by the failure store settings from the settings provided. + * ATTENTION: This method should be applied BEFORE we set the necessary settings for an index + * @param builder the settings builder that is going to be updated + * @return the original settings builder, with the unsupported settings removed. + */ + public static Settings.Builder filterUserDefinedSettings(Settings.Builder builder) { + if (builder.keys().isEmpty() == false) { + Set existingKeys = new HashSet<>(builder.keys()); + for (String setting : existingKeys) { + if (SUPPORTED_USER_SETTINGS.contains(setting) == false + && SUPPORTED_USER_SETTINGS_PREFIXES.stream().anyMatch(setting::startsWith) == false) { + builder.remove(setting); + } + } + } + return builder; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 2df9cf706d892..5dbf4da6f376f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -425,7 +425,8 @@ public static ClusterState createFailureStoreIndex( .nameResolvedInstant(nameResolvedInstant) .performReroute(false) .setMatchingTemplate(template) - .settings(indexSettings); + .settings(indexSettings) + .isFailureIndex(true); try { currentState = metadataCreateIndexService.applyCreateIndexRequest( diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 321719475c1f8..3accdd3881c6d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -983,6 +983,7 @@ static Settings aggregateIndexSettings( final Settings templateAndRequestSettings = Settings.builder().put(combinedTemplateSettings).put(request.settings()).build(); final IndexMode templateIndexMode = Optional.of(request) + .filter(r -> r.isFailureIndex() == false) .map(CreateIndexClusterStateUpdateRequest::matchingTemplate) .map(metadata::retrieveIndexModeFromTemplate) .orElse(null); @@ -1038,11 +1039,13 @@ static Settings aggregateIndexSettings( // Finally, we actually add the explicit defaults prior to the template settings and the // request settings, so that the precedence goes: - // Explicit Defaults -> Template -> Request -> Necessary Settings (# of shards, uuid, etc) + // Explicit Defaults -> Template -> Request -> Filter out failure store settings -> Necessary Settings (# of shards, uuid, etc) indexSettingsBuilder.put(additionalIndexSettings.build()); indexSettingsBuilder.put(templateSettings.build()); } - + if (request.isFailureIndex()) { + DataStreamFailureStoreDefinition.filterUserDefinedSettings(indexSettingsBuilder); + } // now, put the request settings, so they override templates indexSettingsBuilder.put(requestSettings.build()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinitionTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinitionTests.java new file mode 100644 index 0000000000000..38d4031755a55 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinitionTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.routing.allocation.DataTier; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.cluster.metadata.DataStreamFailureStoreDefinition.INDEX_FAILURE_STORE_VERSION_SETTING_NAME; +import static org.hamcrest.Matchers.equalTo; + +public class DataStreamFailureStoreDefinitionTests extends ESTestCase { + + public void testSettingsFiltering() { + // Empty + Settings.Builder builder = Settings.builder(); + Settings.Builder expectedBuilder = Settings.builder(); + assertThat(DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys(), equalTo(expectedBuilder.keys())); + + // All supported settings + builder.put(INDEX_FAILURE_STORE_VERSION_SETTING_NAME, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(DataTier.TIER_PREFERENCE, "data_cold") + .put(IndexMetadata.SETTING_INDEX_HIDDEN, true) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-10") + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") + .put(IndexMetadata.LIFECYCLE_NAME, "my-policy") + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4)) + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4)) + .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4)); + // We expect no changes + expectedBuilder = Settings.builder().put(builder.build()); + assertThat(DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys(), equalTo(expectedBuilder.keys())); + + // Remove unsupported settings + String randomSetting = randomAlphaOfLength(10); + builder.put(INDEX_FAILURE_STORE_VERSION_SETTING_NAME, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(DataTier.TIER_PREFERENCE, "data_cold") + .put(IndexMetadata.SETTING_INDEX_HIDDEN, true) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-10") + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") + .put(IndexMetadata.LIFECYCLE_NAME, "my-policy") + .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4)) + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4)) + .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4)) + .put(IndexSettings.MODE.getKey(), randomFrom(IndexMode.values())) + .put(randomSetting, randomAlphaOfLength(10)); + // We expect no changes + expectedBuilder = Settings.builder().put(builder.build()); + assertThat( + DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys().size(), + equalTo(expectedBuilder.keys().size() - 2) + ); + assertThat( + DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys().contains(IndexSettings.MODE.getKey()), + equalTo(false) + ); + assertThat(DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys().contains(randomSetting), equalTo(false)); + } + +} diff --git a/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml b/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml new file mode 100644 index 0000000000000..21e4f49fe7af5 --- /dev/null +++ b/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml @@ -0,0 +1,99 @@ +--- +teardown: + - do: + indices.delete_data_stream: + name: my-logs-fs + ignore: 404 + + - do: + indices.delete_index_template: + name: template + ignore: 404 + + - do: + indices.delete_data_stream: + name: my-logs-db + ignore: 404 + - do: + indices.delete_index_template: + name: template1 + ignore: 404 + +--- +Test failure store with logsdb: + - requires: + test_runner_features: [ capabilities, allowed_warnings ] + capabilities: + - method: PUT + path: /{index} + capabilities: [ logsdb_index_mode ] + - method: POST + path: /_bulk + capabilities: [ 'failure_store_status' ] + - method: PUT + path: /_bulk + capabilities: [ 'failure_store_status' ] + reason: "Support for 'logsdb' index mode & failure status capability required" + + - do: + allowed_warnings: + - "index template [my-template] has index patterns [my-logs-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation" + indices.put_index_template: + name: my-template + body: + index_patterns: ["my-logs-fs*"] + data_stream: + failure_store: true + template: + settings: + index: + mode: logsdb + number_of_replicas: 1 + number_of_shards: 2 + - do: + allowed_warnings: + - "index template [my-template2] has index patterns [my-logs-db*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation" + indices.put_index_template: + name: my-template2 + body: + index_patterns: [ "my-logs-db*" ] + data_stream: {} + template: + settings: + index: + mode: logsdb + number_of_replicas: 1 + number_of_shards: 2 + + - do: + bulk: + refresh: true + body: + - '{ "create": { "_index": "my-logs-fs"} }' + - '{"@timestamp":"2019-08-06T12:09:12.375Z", "log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer"}' + - '{ "create": { "_index": "my-logs-db"} }' + - '{ "@timestamp": "2022-01-01", "log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer" }' + - '{ "create": { "_index": "my-logs-fs"} }' + - '{"log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer"}' + - '{ "create": { "_index": "my-logs-db"} }' + - '{"log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer"}' + - is_true: errors + + # Successfully indexed to backing index + - match: { items.0.create._index: '/\.ds-my-logs-fs-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { items.0.create.status: 201 } + - is_false: items.0.create.failure_store + - match: { items.1.create._index: '/\.ds-my-logs-db-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { items.1.create.status: 201 } + - is_false: items.1.create.failure_store + + # Successfully indexed to failure store + - match: { items.2.create._index: '/\.fs-my-logs-fs-(\d{4}\.\d{2}\.\d{2}-)?000002/' } + - match: { items.2.create.status: 201 } + - match: { items.2.create.failure_store: used } + + # Rejected, eligible to go to failure store, but failure store not enabled + - match: { items.3.create._index: '/\.ds-my-logs-db-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { items.3.create.status: 400 } + - match: { items.3.create.error.type: document_parsing_exception } + - match: { items.3.create.failure_store: not_enabled }