Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -170,7 +171,7 @@ public void testTimeRanges() throws Exception {
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99));
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
expectThrows(IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class, () -> client().index(indexRequest).actionGet());
}

// Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class DataStreamFeatures implements FeatureSpecification {

public static final NodeFeature DATA_STREAM_LIFECYCLE = new NodeFeature("data_stream.lifecycle");
public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");

@Override
public Map<NodeFeature, Version> getHistoricalFeatures() {
Expand All @@ -41,4 +42,9 @@ public Set<NodeFeature> getFeatures() {
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
);
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,107 @@ index without timestamp:
body:
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'

---
TSDB failures go to failure store:
- requires:
cluster_features: ["data_stream.failure_store.tsdb_fix"]
reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store."

- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
indices.put_index_template:
name: my-template2
body:
index_patterns: [ "fs-k8s*" ]
data_stream:
failure_store: true
template:
settings:
index:
mode: time_series
number_of_replicas: 1
number_of_shards: 2
routing_path: [ metricset, time_series_dimension ]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
properties:
uid:
type: keyword
time_series_dimension: true
name:
type: keyword
ip:
type: ip
network:
properties:
tx:
type: long
rx:
type: long
- do:
index:
index: fs-k8s
body:
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- match: { result : "created"}
- match: { failure_store : "used"}

- do:
bulk:
refresh: true
body:
- '{ "create": { "_index": "fs-k8s"} }'
- '{"@timestamp":"2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "k8s"} }'
- '{ "@timestamp": "2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "fs-k8s"} }'
- '{ "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "fs-k8s"} }'
- '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "k8s"} }'
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "k8s"} }'
- '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- is_true: errors

# Successfully indexed to backing index
- match: { items.0.create._index: '/\.ds-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.0.create.status: 201 }
- is_false: items.0.create.failure_store
- match: { items.1.create._index: '/\.ds-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.1.create.status: 201 }
- is_false: items.1.create.failure_store

# Successfully indexed to failure store
- match: { items.2.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- match: { items.2.create.status: 201 }
- match: { items.2.create.failure_store: used }
- match: { items.3.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- match: { items.3.create.status: 201 }
- match: { items.3.create.failure_store: used }

# Rejected, eligible to go to failure store, but failure store not enabled
- match: { items.4.create._index: 'k8s' }
- match: { items.4.create.status: 400 }
- match: { items.4.create.error.type: timestamp_error }
- match: { items.4.create.failure_store: not_enabled }
- match: { items.4.create._index: 'k8s' }
- match: { items.4.create.status: 400 }
- match: { items.4.create.error.type: timestamp_error }
- match: { items.4.create.failure_store: not_enabled }

---
index without timestamp with pipeline:
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ teardown:
# Successfully indexed to backing index
- match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.0.create.status: 201 }
- is_false: items.1.create.failure_store
- is_false: items.0.create.failure_store

# Rejected but not eligible to go to failure store
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class CreateIndexClusterStateUpdateRequest {
private ResizeType resizeType;
private boolean copySettings;
private SystemDataStreamDescriptor systemDataStreamDescriptor;
private boolean isFailureIndex = false;

private Settings settings = Settings.EMPTY;

Expand Down Expand Up @@ -102,6 +103,11 @@ public CreateIndexClusterStateUpdateRequest systemDataStreamDescriptor(SystemDat
return this;
}

public CreateIndexClusterStateUpdateRequest isFailureIndex(boolean isFailureIndex) {
this.isFailureIndex = isFailureIndex;
return this;
}

public String cause() {
return cause;
}
Expand Down Expand Up @@ -168,6 +174,10 @@ public String dataStreamName() {
return dataStreamName;
}

public boolean isFailureIndex() {
return isFailureIndex;
}

public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName) {
this.dataStreamName = dataStreamName;
return this;
Expand Down Expand Up @@ -228,6 +238,8 @@ public String toString() {
+ systemDataStreamDescriptor
+ ", matchingTemplate="
+ matchingTemplate
+ ", isFailureIndex="
+ isFailureIndex
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
shard -> new ArrayList<>()
);
shardRequests.add(bulkItemRequest);
} catch (DataStream.TimestampError timestampError) {
IndexDocFailureStoreStatus failureStoreStatus = processFailure(bulkItemRequest, clusterState, timestampError);
if (IndexDocFailureStoreStatus.USED.equals(failureStoreStatus) == false) {
String name = ia != null ? ia.getName() : docWriteRequest.index();
addFailureAndDiscardRequest(docWriteRequest, bulkItemRequest.id(), name, timestampError, failureStoreStatus);
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {
String name = ia != null ? ia.getName() : docWriteRequest.index();
var failureStoreStatus = isFailureStoreRequest(docWriteRequest)
Expand Down Expand Up @@ -545,6 +551,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
boolean added = addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());
if (added) {
failureStoreMetrics.incrementFailureStore(bulkItemRequest.index(), errorType, FailureStoreMetrics.ErrorLocation.SHARD);
return IndexDocFailureStoreStatus.USED;
} else {
failureStoreMetrics.incrementRejected(
bulkItemRequest.index(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) {
+ "]"
)
.collect(Collectors.joining());
throw new IllegalArgumentException(
throw new TimestampError(
"the document timestamp ["
+ timestampAsString
+ "] is outside of ranges of currently writable indices ["
Expand Down Expand Up @@ -1405,10 +1405,10 @@ private static Instant getTimeStampFromRaw(Object rawTimestamp) {
} else if (rawTimestamp instanceof String sTimestamp) {
return DateFormatters.from(TIMESTAMP_FORMATTER.parse(sTimestamp), TIMESTAMP_FORMATTER.locale()).toInstant();
} else {
throw new IllegalArgumentException("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
throw new TimestampError("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
}
} catch (Exception e) {
throw new IllegalArgumentException("Error get data stream timestamp field: " + e.getMessage(), e);
throw new TimestampError("Error get data stream timestamp field: " + e.getMessage(), e);
}
}

Expand All @@ -1432,7 +1432,7 @@ private static Instant getTimestampFromParser(BytesReference source, XContentTyp
);
};
} catch (Exception e) {
throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e);
throw new TimestampError("Error extracting data stream timestamp field: " + e.getMessage(), e);
}
}

Expand Down Expand Up @@ -1741,4 +1741,20 @@ public DataStream build() {
);
}
}

/**
* This is a specialised error to capture that a document does not have a valid timestamp
* to index a document. It is mainly applicable for TSDS data streams because they need the timestamp
* to determine the write index.
*/
public static class TimestampError extends IllegalArgumentException {

public TimestampError(String message, Exception cause) {
super(message, cause);
}

public TimestampError(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,19 +20,39 @@
import org.elasticsearch.index.mapper.RoutingFieldMapper;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
* A utility class that contains the mappings and settings logic for failure store indices that are a part of data streams.
*/
public class DataStreamFailureStoreDefinition {

public static final String FAILURE_STORE_REFRESH_INTERVAL_SETTING_NAME = "data_streams.failure_store.refresh_interval";
public static final String INDEX_FAILURE_STORE_VERSION_SETTING_NAME = "index.failure_store.version";
public static final Settings DATA_STREAM_FAILURE_STORE_SETTINGS;
// Only a subset of user configurable settings is applicable for a failure index. Here we have an
// allowlist that will filter all other settings out.
public static final Set<String> SUPPORTED_USER_SETTINGS = Set.of(
DataTier.TIER_PREFERENCE,
IndexMetadata.SETTING_INDEX_HIDDEN,
INDEX_FAILURE_STORE_VERSION_SETTING_NAME,
IndexMetadata.SETTING_NUMBER_OF_SHARDS,
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS,
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(),
IndexMetadata.LIFECYCLE_NAME
);
public static final Set<String> SUPPORTED_USER_SETTINGS_PREFIXES = Set.of(
IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".",
IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".",
IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "."
);
public static final CompressedXContent DATA_STREAM_FAILURE_STORE_MAPPING;

public static final int FAILURE_STORE_DEFINITION_VERSION = 1;
public static final Setting<Integer> FAILURE_STORE_DEFINITION_VERSION_SETTING = Setting.intSetting(
"index.failure_store.version",
INDEX_FAILURE_STORE_VERSION_SETTING_NAME,
0,
Setting.Property.IndexScope
);
Expand All @@ -40,11 +61,6 @@ public class DataStreamFailureStoreDefinition {
DATA_STREAM_FAILURE_STORE_SETTINGS = Settings.builder()
// Always start with the hidden settings for a backing index.
.put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
// Override any pipeline settings on the failure store to not use any
// specified by the data stream template. Default pipelines are very much
// meant for the backing indices only.
.putNull(IndexSettings.DEFAULT_PIPELINE.getKey())
.putNull(IndexSettings.FINAL_PIPELINE.getKey())
.put(FAILURE_STORE_DEFINITION_VERSION_SETTING.getKey(), FAILURE_STORE_DEFINITION_VERSION)
.build();

Expand Down Expand Up @@ -199,4 +215,23 @@ public static Settings.Builder applyFailureStoreSettings(Settings nodeSettings,
}
return builder;
}

/**
* Removes the unsupported by the failure store settings from the settings provided.
* ATTENTION: This method should be applied BEFORE we set the necessary settings for an index
* @param builder the settings builder that is going to be updated
* @return the original settings builder, with the unsupported settings removed.
*/
public static Settings.Builder filterUserDefinedSettings(Settings.Builder builder) {
if (builder.keys().isEmpty() == false) {
Set<String> existingKeys = new HashSet<>(builder.keys());
for (String setting : existingKeys) {
if (SUPPORTED_USER_SETTINGS.contains(setting) == false
&& SUPPORTED_USER_SETTINGS_PREFIXES.stream().anyMatch(setting::startsWith) == false) {
builder.remove(setting);
}
}
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ public static ClusterState createFailureStoreIndex(
.nameResolvedInstant(nameResolvedInstant)
.performReroute(false)
.setMatchingTemplate(template)
.settings(indexSettings);
.settings(indexSettings)
.isFailureIndex(true);

try {
currentState = metadataCreateIndexService.applyCreateIndexRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ static Settings aggregateIndexSettings(
final Settings templateAndRequestSettings = Settings.builder().put(combinedTemplateSettings).put(request.settings()).build();

final IndexMode templateIndexMode = Optional.of(request)
.filter(r -> r.isFailureIndex() == false)
.map(CreateIndexClusterStateUpdateRequest::matchingTemplate)
.map(metadata::retrieveIndexModeFromTemplate)
.orElse(null);
Expand Down Expand Up @@ -1038,11 +1039,13 @@ static Settings aggregateIndexSettings(

// Finally, we actually add the explicit defaults prior to the template settings and the
// request settings, so that the precedence goes:
// Explicit Defaults -> Template -> Request -> Necessary Settings (# of shards, uuid, etc)
// Explicit Defaults -> Template -> Request -> Filter out failure store settings -> Necessary Settings (# of shards, uuid, etc)
indexSettingsBuilder.put(additionalIndexSettings.build());
indexSettingsBuilder.put(templateSettings.build());
}

if (request.isFailureIndex()) {
DataStreamFailureStoreDefinition.filterUserDefinedSettings(indexSettingsBuilder);
}
// now, put the request settings, so they override templates
indexSettingsBuilder.put(requestSettings.build());

Expand Down
Loading