Skip to content

Commit

Permalink
Redirect shard-level bulk failures to a failure store if applicable (#…
Browse files Browse the repository at this point in the history
…105362)

This PR expands upon previous work in the failure store by inspecting failed shard-level bulk operations 
and possibly redirecting them to a failure store.
  • Loading branch information
jbaiera committed Mar 26, 2024
1 parent cba9e5b commit a49f8b8
Show file tree
Hide file tree
Showing 8 changed files with 1,467 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,83 @@ teardown:
indices.delete:
index: .fs-logs-foobar-*
- is_true: acknowledged

---
"Redirect shard failure in data stream to failure store":
- skip:
version: " - 8.13.99"
reason: "data stream failure stores only redirect shard failures in 8.14+"
features: [allowed_warnings, contains]

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
template:
settings:
number_of_shards: 1
number_of_replicas: 1
mappings:
properties:
'@timestamp':
type: date
count:
type: long


- do:
index:
index: logs-foobar
refresh: true
body:
'@timestamp': '2020-12-12'
count: 'invalid value'

- do:
indices.get_data_stream:
name: logs-foobar
- match: { data_streams.0.name: logs-foobar }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.failure_store: true }
- length: { data_streams.0.failure_indices: 1 }
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
search:
index: logs-foobar
body: { query: { match_all: {} } }
- length: { hits.hits: 0 }

- do:
search:
index: .fs-logs-foobar-*
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- exists: hits.hits.0._source.@timestamp
- not_exists: hits.hits.0._source.count
- match: { hits.hits.0._source.document.index: 'logs-foobar' }
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
- match: { hits.hits.0._source.document.source.count: 'invalid value' }
- match: { hits.hits.0._source.error.type: 'document_parsing_exception' }
- contains: { hits.hits.0._source.error.message: "failed to parse field [count] of type [long] in document with id " }
- contains: { hits.hits.0._source.error.message: "Preview of field's value: 'invalid value'" }
- contains: { hits.hits.0._source.error.stack_trace: "org.elasticsearch.index.mapper.DocumentParsingException: " }
- contains: { hits.hits.0._source.error.stack_trace: "failed to parse field [count] of type [long] in document with id" }
- contains: { hits.hits.0._source.error.stack_trace: "Preview of field's value: 'invalid value'" }

- do:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged

- do:
indices.delete:
index: .fs-logs-foobar-*
- is_true: acknowledged
444 changes: 378 additions & 66 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
final SparseFixedBitSet failedSlots;
final List<BulkItemResponse> itemResponses;
final AtomicIntegerArray originalSlots;
final FailureStoreDocumentConverter failureStoreDocumentConverter;

volatile int currentSlot = -1;

Expand All @@ -61,6 +62,7 @@ final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size());
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok
this.failureStoreDocumentConverter = new FailureStoreDocumentConverter();
}

@Override
Expand Down Expand Up @@ -243,7 +245,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
);
} else {
try {
IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName);
IndexRequest errorDocument = failureStoreDocumentConverter.transformFailedRequest(indexRequest, e, targetIndexName);
// This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to
// the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again.
errorDocument.isPipelineResolved(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
/**
* Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store.
*/
public final class FailureStoreDocument {

private FailureStoreDocument() {}
public class FailureStoreDocumentConverter {

/**
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
Expand All @@ -35,7 +33,7 @@ private FailureStoreDocument() {}
* @return A new {@link IndexRequest} with a failure store compliant structure
* @throws IOException If there is a problem when the document's new source is serialized
*/
public static IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException {
public IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException {
return transformFailedRequest(source, exception, targetIndexName, System::currentTimeMillis);
}

Expand All @@ -49,7 +47,7 @@ public static IndexRequest transformFailedRequest(IndexRequest source, Exception
* @return A new {@link IndexRequest} with a failure store compliant structure
* @throws IOException If there is a problem when the document's new source is serialized
*/
public static IndexRequest transformFailedRequest(
public IndexRequest transformFailedRequest(
IndexRequest source,
Exception exception,
String targetIndexName,
Expand Down
101 changes: 100 additions & 1 deletion server/src/main/java/org/elasticsearch/common/collect/Iterators.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;

public class Iterators {
Expand Down Expand Up @@ -56,7 +58,7 @@ public static <T> Iterator<T> concat(Iterator<? extends T>... iterators) {
for (int i = 0; i < iterators.length; i++) {
if (iterators[i].hasNext()) {
// explicit generic type argument needed for type inference
return new ConcatenatedIterator<T>(iterators, i);
return new ConcatenatedIterator<>(iterators, i);
}
}

Expand Down Expand Up @@ -258,6 +260,103 @@ public T next() {
}
}

/**
* Enumerates the elements of an iterator together with their index, using a function to combine the pair together into the final items
* produced by the iterator.
* <p>
* An example of its usage to enumerate a list of names together with their positional index in the list:
* </p>
* <pre><code>
* Iterator&lt;String&gt; nameIterator = ...;
* Iterator&lt;Tuple&lt;Integer, String&gt;&gt; enumeratedNames = Iterators.enumerate(nameIterator, Tuple::new);
* enumeratedNames.forEachRemaining(tuple -> System.out.println("Index: " + t.v1() + ", Name: " + t.v2()));
* </code></pre>
*
* @param input The iterator to wrap
* @param fn A function that takes the index for an entry and the entry itself, returning an item that combines them together
* @return An iterator that combines elements together with their indices in the underlying collection
* @param <T> The object type contained in the original iterator
* @param <U> The object type that results from combining the original entry with its index in the iterator
*/
public static <T, U> Iterator<U> enumerate(Iterator<? extends T> input, BiFunction<Integer, T, ? extends U> fn) {
return new EnumeratingIterator<>(Objects.requireNonNull(input), Objects.requireNonNull(fn));
}

private static class EnumeratingIterator<T, U> implements Iterator<U> {
private final Iterator<? extends T> input;
private final BiFunction<Integer, T, ? extends U> fn;

private int idx = 0;

EnumeratingIterator(Iterator<? extends T> input, BiFunction<Integer, T, ? extends U> fn) {
this.input = input;
this.fn = fn;
}

@Override
public boolean hasNext() {
return input.hasNext();
}

@Override
public U next() {
return fn.apply(idx++, input.next());
}

@Override
public void forEachRemaining(Consumer<? super U> action) {
input.forEachRemaining(t -> action.accept(fn.apply(idx++, t)));
}
}

/**
* Adapts a {@link Supplier} object into an iterator. The resulting iterator will return values from the delegate Supplier until the
* delegate returns a <code>null</code> value. Once the delegate returns <code>null</code>, the iterator will claim to be empty.
* <p>
* An example of its usage to iterate over a queue while draining it at the same time:
* </p>
* <pre><code>
* LinkedList&lt;String&gt; names = ...;
* assert names.size() != 0;
*
* Iterator&lt;String&gt; nameIterator = Iterator.fromSupplier(names::pollFirst);
* nameIterator.forEachRemaining(System.out::println)
* assert names.size() == 0;
* </code></pre>
*
* @param input A {@link Supplier} that returns null when no more elements should be returned from the iterator
* @return An iterator that returns elements by calling the supplier until a null value is returned
* @param <T> The object type returned from the supplier function
*/
public static <T> Iterator<T> fromSupplier(Supplier<? extends T> input) {
return new SupplierIterator<>(Objects.requireNonNull(input));
}

private static final class SupplierIterator<T> implements Iterator<T> {
private final Supplier<? extends T> fn;
private T head;

SupplierIterator(Supplier<? extends T> fn) {
this.fn = fn;
this.head = fn.get();
}

@Override
public boolean hasNext() {
return head != null;
}

@Override
public T next() {
if (head == null) {
throw new NoSuchElementException();
}
T next = head;
head = fn.get();
return next;
}
}

public static <T> boolean equals(Iterator<? extends T> iterator1, Iterator<? extends T> iterator2, BiPredicate<T, T> itemComparer) {
if (iterator1 == null) {
return iterator2 == null;
Expand Down

0 comments on commit a49f8b8

Please sign in to comment.