Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ public void testNoTimestampInDocument() throws Exception {

IndexRequest indexRequest = new IndexRequest(dataStreamName).opType("create").source("{}", XContentType.JSON);
Exception e = expectThrows(Exception.class, client().index(indexRequest));
assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] is missing"));
assertThat(e.getCause().getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] is missing"));
}

public void testMultipleTimestampValuesInDocument() throws Exception {
Expand All @@ -1431,7 +1431,7 @@ public void testMultipleTimestampValuesInDocument() throws Exception {
IndexRequest indexRequest = new IndexRequest(dataStreamName).opType("create")
.source("{\"@timestamp\": [\"2020-12-12\",\"2022-12-12\"]}", XContentType.JSON);
Exception e = expectThrows(Exception.class, client().index(indexRequest));
assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] encountered multiple values"));
assertThat(e.getCause().getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] encountered multiple values"));
}

public void testMixedAutoCreate() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ teardown:
---
"Redirect ingest failure in data stream to failure store":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [allowed_warnings, contains]

reason: "Failure store status was added in 8.16+"
test_runner_features: [capabilities, allowed_warnings, contains]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "failing_pipeline"
Expand Down Expand Up @@ -92,6 +94,8 @@ teardown:
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { failure_store: used}
- match: { _index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/'}

- do:
indices.get_data_stream:
Expand Down Expand Up @@ -144,9 +148,12 @@ teardown:
---
"Redirect shard failure in data stream to failure store":
- requires:
cluster_features: ["gte_v8.14.0"]
reason: "data stream failure stores only redirect shard failures in 8.14+"
test_runner_features: [allowed_warnings, contains]
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]

- do:
allowed_warnings:
Expand Down Expand Up @@ -176,6 +183,8 @@ teardown:
body:
'@timestamp': '2020-12-12'
count: 'invalid value'
- match: { _index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000002/'}
- match: { failure_store: used}

- do:
indices.get_data_stream:
Expand Down Expand Up @@ -222,14 +231,13 @@ teardown:

---
"Ensure failure is redirected to correct failure store after a reroute processor":
- skip:
known_issues:
- cluster_feature: "gte_v8.15.0"
fixed_by: "gte_v8.16.0"
reason: "Failure store documents contained the original index name rather than the rerouted one before v8.16.0"
- requires:
test_runner_features: [allowed_warnings]

test_runner_features: [allowed_warnings, capabilities]
reason: "Failure store status was added in 8.16+"
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "failing_pipeline"
Expand Down Expand Up @@ -307,6 +315,7 @@ teardown:
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { failure_store: used}

- do:
search:
Expand Down Expand Up @@ -422,9 +431,12 @@ teardown:
---
"Failure redirects to original failure store during index change if final pipeline changes target":
- requires:
cluster_features: [ "gte_v8.15.0" ]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [ allowed_warnings, contains ]
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -466,6 +478,7 @@ teardown:
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { failure_store: used}

- do:
indices.get_data_stream:
Expand Down Expand Up @@ -514,9 +527,12 @@ teardown:
---
"Failure redirects to correct failure store when index loop is detected":
- requires:
cluster_features: [ "gte_v8.15.0" ]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [ allowed_warnings, contains ]
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -591,6 +607,8 @@ teardown:
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { _index: '/\.fs-destination-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { failure_store: used}


- do:
Expand Down Expand Up @@ -640,9 +658,12 @@ teardown:
---
"Failure redirects to correct failure store when pipeline loop is detected":
- requires:
cluster_features: [ "gte_v8.15.0" ]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [ allowed_warnings, contains ]
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -701,6 +722,7 @@ teardown:
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { failure_store: used}

- do:
indices.get_data_stream:
Expand Down Expand Up @@ -752,9 +774,7 @@ teardown:
---
"Version conflicts are not redirected to failure store":
- requires:
cluster_features: ["gte_v8.16.0"]
reason: "Redirecting version conflicts to the failure store is considered a bug fixed in 8.16"
test_runner_features: [allowed_warnings, contains]
test_runner_features: [ allowed_warnings]

- do:
allowed_warnings:
Expand Down Expand Up @@ -788,3 +808,92 @@ teardown:
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.1.create.status: 409 }
- match: { items.1.create.error.type: version_conflict_engine_exception}
- is_false: items.1.create.failure_store

---
"Test failure store status with bulk request":
- requires:
test_runner_features: [ allowed_warnings, capabilities ]
reason: "Failure store status was added in 8.16+"
capabilities:
- method: POST
path: /_bulk
capabilities: [ 'failure_store_status' ]
- method: PUT
path: /_bulk
capabilities: [ 'failure_store_status' ]

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
template:
settings:
number_of_shards: 1
number_of_replicas: 1
mappings:
properties:
'@timestamp':
type: date
count:
type: long
- do:
allowed_warnings:
- "index template [no-fs] has index patterns [no-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [no-fs] will take precedence during new index creation"
indices.put_index_template:
name: no-fs
body:
index_patterns: no-fs*
data_stream:
failure_store: false
template:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
'@timestamp':
type: date
count:
type: long


- do:
bulk:
refresh: true
body:
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
- '{ "@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox" }'
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
- '{ "@timestamp": "2022-01-01", "baz": "lazy", "a": "dog" }'
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
- '{ "@timestamp": "2022-01-01", "count": "invalid" }'
- '{ "create": { "_index": "no-fs", "_id": "1" } }'
- '{ "@timestamp": "2022-01-01", "count": "invalid" }'
- is_true: errors
# Successfully indexed to backing index
- match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.0.create.status: 201 }
- is_false: items.1.create.failure_store

# Rejected but not eligible to go to failure store
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.1.create.status: 409 }
- match: { items.1.create.error.type: version_conflict_engine_exception}
- is_false: items.1.create.failure_store

# Successfully indexed to failure store
- match: { items.2.create._index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- match: { items.2.create.status: 201 }
- match: { items.2.create.failure_store: used }

# Rejected, eligible to go to failure store, but failure store not enabled
- match: { items.3.create._index: '/\.ds-no-fs-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.3.create.status: 400 }
- match: { items.3.create.error.type: document_parsing_exception }
- match: { items.3.create.failure_store: not_enabled }
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
Expand Down Expand Up @@ -1929,6 +1930,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.ingest.IngestPipelineException::new,
182,
TransportVersions.INGEST_PIPELINE_EXCEPTION_ADDED
),
INDEX_RESPONSE_WRAPPER_EXCEPTION(
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class,
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new,
183,
TransportVersions.FAILURE_STORE_STATUS_IN_INDEX_RESPONSE
);

final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS = def(8_743_00_0);
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_EMBEDDINGS_ADDED = def(8_744_00_0);
public static final TransportVersion BULK_INCREMENTAL_STATE = def(8_745_00_0);
public static final TransportVersion FAILURE_STORE_STATUS_IN_INDEX_RESPONSE = def(8_746_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
Expand Down Expand Up @@ -249,6 +250,10 @@ public String getLocation(@Nullable String routing) {
return location.toString();
}

public IndexDocFailureStoreStatus getFailureStoreStatus() {
return IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN;
}

public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
writeWithoutShardId(out);
Expand Down
Loading