Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update persistent state document in the index the document belongs to #51751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,20 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand All @@ -22,10 +32,24 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

import java.util.Map;
import java.util.Objects;

/**
* Reads state documents of a stream, splits them and persists to an index via a bulk request
* Reads state documents of a stream, splits them and persists to an index via a bulk request.
*
* Some types of state, for example data frame analytics state and categorizer state, are written multiple times with the same document id.
* The code needs to make sure that even after .ml-state index rollover there are no duplicate documents across the .ml-state*
* indices. Such duplicates are undesirable for at least two reasons:
* 1. We deliberately have no mappings on the state index so we cannot sort and filter in a search
* 2. The state documents are large, so having dead documents with duplicate IDs is suboptimal from a disk usage perspective
*
* In order to avoid duplicates the following sequence of steps is executed every time the document is about to get persisted:
* 1. The first non-blank line is extracted from the given bytes. Lines are delimited by the new line character ('\n')
* 2. Document id is extracted from this line.
* 3. Document with this id is searched for in .ml-state* indices
* 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content.
* Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei
*/
public class IndexingStateProcessor implements StateProcessor {

Expand Down Expand Up @@ -88,7 +112,7 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
// Ignore completely empty chunks
if (nextZeroByte > splitFrom) {
// No validation - assume the native process has formatted the state correctly
persist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
findAppropriateIndexOrAliasAndPersist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
}
splitFrom = nextZeroByte + 1;
}
Expand All @@ -98,11 +122,25 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
}

void persist(BytesReference bytes) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), XContentType.JSON);
/**
* Finds an appropriate index the document should be put in and then persists the document in that index.
* For what is considered to be "appropriate" see the class documentation.
*/
void findAppropriateIndexOrAliasAndPersist(BytesReference bytes) throws IOException {
String firstNonBlankLine = extractFirstNonBlankLine(bytes);
if (firstNonBlankLine == null) {
return;
}
String stateDocId = extractDocId(firstNonBlankLine);
String indexOrAlias = getConcreteIndexOrWriteAlias(stateDocId);
persist(indexOrAlias, bytes);
}

void persist(String indexOrAlias, BytesReference bytes) throws IOException {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkRequest.add(bytes, indexOrAlias, XContentType.JSON);
if (bulkRequest.numberOfActions() > 0) {
LOGGER.trace("[{}] Persisting job state document", jobId);
LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length());
try {
resultsPersisterService.bulkIndexWithRetry(bulkRequest,
jobId,
Expand All @@ -117,12 +155,79 @@ void persist(BytesReference bytes) throws IOException {
}

private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) {
if (bytesRef.get(i) == 0) {
return i;
return bytesRef.indexOf((byte)0, Math.max(searchFrom, splitFrom));
}

@SuppressWarnings("unchecked")
/**
* Extracts document id from the given {@code bytesRef}.
* Only first non-blank line is parsed and document id is assumed to be a nested "index._id" field of type String.
*/
static String extractDocId(String firstNonBlankLine) throws IOException {
try (XContentParser parser =
JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, firstNonBlankLine)) {
Map<String, Object> map = parser.map();
if ((map.get("index") instanceof Map) == false) {
throw new IllegalStateException("Could not extract \"index\" field out of [" + firstNonBlankLine + "]");
}
map = (Map<String, Object>)map.get("index");
if ((map.get("_id") instanceof String) == false) {
throw new IllegalStateException("Could not extract \"index._id\" field out of [" + firstNonBlankLine + "]");
}
return (String)map.get("_id");
}
return -1;
}

/**
* Extracts the first non-blank line from the given {@code bytesRef}.
* Lines are separated by the new line character ('\n').
* A line is considered blank if it only consists of space characters (' ').
*/
private static String extractFirstNonBlankLine(BytesReference bytesRef) {
for (int searchFrom = 0; searchFrom < bytesRef.length();) {
int newLineMarkerIndex = bytesRef.indexOf((byte) '\n', searchFrom);
int searchTo = newLineMarkerIndex != -1 ? newLineMarkerIndex : bytesRef.length();
if (isBlank(bytesRef, searchFrom, searchTo) == false) {
return bytesRef.slice(searchFrom, searchTo - searchFrom).utf8ToString();
}
searchFrom = newLineMarkerIndex != -1 ? newLineMarkerIndex + 1 : bytesRef.length();
}
return null;
}

/**
* Checks whether the line pointed to by a pair of indexes: {@code from} (inclusive) and {@code to} (exclusive) is blank.
* A line is considered blank if it only consists of space characters (' ').
*/
private static boolean isBlank(BytesReference bytesRef, int from, int to) {
for (int i = from; i < to; ++i) {
if (bytesRef.get(i) != ((byte) ' ')) {
return false;
}
}
return true;
}

private String getConcreteIndexOrWriteAlias(String documentId) {
Objects.requireNonNull(documentId);
SearchRequest searchRequest =
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.allowPartialSearchResults(false)
.source(
new SearchSourceBuilder()
.size(1)
.trackTotalHits(false)
.query(new BoolQueryBuilder().filter(new IdsQueryBuilder().addIds(documentId))));
SearchResponse searchResponse =
resultsPersisterService.searchWithRetry(
searchRequest,
jobId,
() -> true,
(msg) -> auditor.warning(jobId, documentId + " " + msg));
return searchResponse.getHits().getHits().length > 0
? searchResponse.getHits().getHits()[0].getIndex()
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import com.carrotsearch.randomizedtesting.annotations.Timeout;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import org.junit.After;
Expand All @@ -22,15 +23,23 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

/**
Expand All @@ -39,6 +48,7 @@
public class IndexingStateProcessorTests extends ESTestCase {

private static final String STATE_SAMPLE = ""
+ " \n"
+ "{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}\n"
+ "{ \"field\" : \"value1\" }\n"
+ "\0"
Expand All @@ -56,54 +66,99 @@ public class IndexingStateProcessorTests extends ESTestCase {

private IndexingStateProcessor stateProcessor;
private ResultsPersisterService resultsPersisterService;
private SearchResponse searchResponse;

@Before
public void initialize() {
searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
resultsPersisterService = mock(ResultsPersisterService.class);
doReturn(searchResponse).when(resultsPersisterService).searchWithRetry(any(SearchRequest.class), any(), any(), any());
doReturn(mock(BulkResponse.class)).when(resultsPersisterService).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class);
stateProcessor = spy(new IndexingStateProcessor(JOB_ID, resultsPersisterService, auditor));
when(resultsPersisterService.bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any())).thenReturn(mock(BulkResponse.class));
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
}

@After
public void verifyNoMoreClientInteractions() {
Mockito.verifyNoMoreInteractions(resultsPersisterService);
verifyNoMoreInteractions(resultsPersisterService);
}

public void testStateRead() throws IOException {
public void testExtractDocId() throws IOException {
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_index\": \"test\", \"_id\": \"1\" } }\n"), equalTo("1"));
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_id\": \"2\" } }\n"), equalTo("2"));
}

private void testStateRead(SearchHits searchHits, String expectedIndexOrAlias) throws IOException {
when(searchResponse.getHits()).thenReturn(searchHits);

ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
stateProcessor.process(stream);
ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
verify(stateProcessor, times(3)).persist(bytesRefCaptor.capture());
verify(stateProcessor, times(3)).persist(eq(expectedIndexOrAlias), bytesRefCaptor.capture());

String[] threeStates = STATE_SAMPLE.split("\0");
List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();
assertEquals(threeStates[0], capturedBytes.get(0).utf8ToString());
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
}

public void testStateRead_StateDocumentCreated() throws IOException {
testStateRead(SearchHits.empty(), ".ml-state-write");
}

public void testStateRead_StateDocumentUpdated() throws IOException {
testStateRead(
new SearchHits(new SearchHit[]{ SearchHit.createFromMap(Map.of("_index", ".ml-state-dummy")) }, null, 0.0f),
".ml-state-dummy");
}

public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
String zeroBytes = "\0\0\0\0\0\0";
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));

stateProcessor.process(stream);

verify(stateProcessor, never()).persist(any());
Mockito.verifyNoMoreInteractions(resultsPersisterService);
verify(stateProcessor, never()).persist(any(), any());
}

public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
String zeroBytes = " \n\0";
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
public void testStateReadGivenSpacesAndNewLineCharactersFollowedByZeroByte() throws IOException {
Function<String, InputStream> stringToInputStream = s -> new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8));

stateProcessor.process(stream);
stateProcessor.process(stringToInputStream.apply("\0"));
stateProcessor.process(stringToInputStream.apply(" \0"));
stateProcessor.process(stringToInputStream.apply("\n\0"));
stateProcessor.process(stringToInputStream.apply(" \0"));
stateProcessor.process(stringToInputStream.apply(" \n \0"));
stateProcessor.process(stringToInputStream.apply(" \n\n \0"));
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
stateProcessor.process(stringToInputStream.apply("\n \n \0"));

verify(stateProcessor, times(1)).persist(any());
Mockito.verifyNoMoreInteractions(resultsPersisterService);
verify(stateProcessor, never()).persist(any(), any());
}

public void testStateReadGivenNoIndexField() throws IOException {
String bytes = " \n \n \n \n\n {}\0";
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));

Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
assertThat(e.getMessage(), containsString("Could not extract \"index\" field"));

verify(stateProcessor, never()).persist(any(), any());
}

public void testStateReadGivenNoIdField() throws IOException {
String bytes = " \n \n \n {\"index\": {}}\0";
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));

Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
assertThat(e.getMessage(), containsString("Could not extract \"index._id\" field"));

verify(stateProcessor, never()).persist(any(), any());
}

/**
Expand All @@ -113,9 +168,11 @@ public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOExc
*/
@Timeout(millis = 10 * 1000)
public void testLargeStateRead() throws Exception {
when(searchResponse.getHits()).thenReturn(SearchHits.empty());

StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators
for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) {
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\"}}\n");
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\",\"_id\":\"doc").append(docNum).append("\"}}\n");
for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) {
builder.append("data");
}
Expand All @@ -124,7 +181,8 @@ public void testLargeStateRead() throws Exception {

ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
stateProcessor.process(stream);
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any());
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any());
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
}
}