Skip to content

Commit

Permalink
[7.x] Update persistent state document in the index the document belo…
Browse files Browse the repository at this point in the history
…ngs to (#51751) (#52145)
  • Loading branch information
przemekwitek committed Feb 10, 2020
1 parent c77b80f commit c7cc383
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,20 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand All @@ -22,10 +32,24 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

import java.util.Map;
import java.util.Objects;

/**
* Reads state documents of a stream, splits them and persists to an index via a bulk request
* Reads state documents of a stream, splits them and persists to an index via a bulk request.
*
* Some types of state, for example data frame analytics state and categorizer state, are written multiple times with the same document id.
* The code needs to make sure that even after .ml-state index rollover there are no duplicate documents across the .ml-state*
* indices. Such duplicates are undesirable for at least two reasons:
* 1. We deliberately have no mappings on the state index so we cannot sort and filter in a search
* 2. The state documents are large, so having dead documents with duplicate IDs is suboptimal from a disk usage perspective
*
* In order to avoid duplicates the following sequence of steps is executed every time the document is about to get persisted:
* 1. The first non-blank line is extracted from the given bytes. Lines are delimited by the new line character ('\n')
* 2. Document id is extracted from this line.
* 3. Document with this id is searched for in .ml-state* indices
* 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content.
* Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei
*/
public class IndexingStateProcessor implements StateProcessor {

Expand Down Expand Up @@ -88,7 +112,7 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
// Ignore completely empty chunks
if (nextZeroByte > splitFrom) {
// No validation - assume the native process has formatted the state correctly
persist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
findAppropriateIndexOrAliasAndPersist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
}
splitFrom = nextZeroByte + 1;
}
Expand All @@ -98,11 +122,25 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
}

void persist(BytesReference bytes) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), XContentType.JSON);
/**
* Finds an appropriate index the document should be put in and then persists the document in that index.
* For what is considered to be "appropriate" see the class documentation.
*/
void findAppropriateIndexOrAliasAndPersist(BytesReference bytes) throws IOException {
String firstNonBlankLine = extractFirstNonBlankLine(bytes);
if (firstNonBlankLine == null) {
return;
}
String stateDocId = extractDocId(firstNonBlankLine);
String indexOrAlias = getConcreteIndexOrWriteAlias(stateDocId);
persist(indexOrAlias, bytes);
}

void persist(String indexOrAlias, BytesReference bytes) throws IOException {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkRequest.add(bytes, indexOrAlias, XContentType.JSON);
if (bulkRequest.numberOfActions() > 0) {
LOGGER.trace("[{}] Persisting job state document", jobId);
LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length());
try {
resultsPersisterService.bulkIndexWithRetry(bulkRequest,
jobId,
Expand All @@ -117,12 +155,79 @@ void persist(BytesReference bytes) throws IOException {
}

private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) {
if (bytesRef.get(i) == 0) {
return i;
return bytesRef.indexOf((byte)0, Math.max(searchFrom, splitFrom));
}

@SuppressWarnings("unchecked")
/**
* Extracts document id from the given {@code bytesRef}.
* Only first non-blank line is parsed and document id is assumed to be a nested "index._id" field of type String.
*/
static String extractDocId(String firstNonBlankLine) throws IOException {
try (XContentParser parser =
JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, firstNonBlankLine)) {
Map<String, Object> map = parser.map();
if ((map.get("index") instanceof Map) == false) {
throw new IllegalStateException("Could not extract \"index\" field out of [" + firstNonBlankLine + "]");
}
map = (Map<String, Object>)map.get("index");
if ((map.get("_id") instanceof String) == false) {
throw new IllegalStateException("Could not extract \"index._id\" field out of [" + firstNonBlankLine + "]");
}
return (String)map.get("_id");
}
return -1;
}

/**
* Extracts the first non-blank line from the given {@code bytesRef}.
* Lines are separated by the new line character ('\n').
* A line is considered blank if it only consists of space characters (' ').
*/
private static String extractFirstNonBlankLine(BytesReference bytesRef) {
for (int searchFrom = 0; searchFrom < bytesRef.length();) {
int newLineMarkerIndex = bytesRef.indexOf((byte) '\n', searchFrom);
int searchTo = newLineMarkerIndex != -1 ? newLineMarkerIndex : bytesRef.length();
if (isBlank(bytesRef, searchFrom, searchTo) == false) {
return bytesRef.slice(searchFrom, searchTo - searchFrom).utf8ToString();
}
searchFrom = newLineMarkerIndex != -1 ? newLineMarkerIndex + 1 : bytesRef.length();
}
return null;
}

/**
* Checks whether the line pointed to by a pair of indexes: {@code from} (inclusive) and {@code to} (exclusive) is blank.
* A line is considered blank if it only consists of space characters (' ').
*/
private static boolean isBlank(BytesReference bytesRef, int from, int to) {
for (int i = from; i < to; ++i) {
if (bytesRef.get(i) != ((byte) ' ')) {
return false;
}
}
return true;
}

private String getConcreteIndexOrWriteAlias(String documentId) {
Objects.requireNonNull(documentId);
SearchRequest searchRequest =
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.allowPartialSearchResults(false)
.source(
new SearchSourceBuilder()
.size(1)
.trackTotalHits(false)
.query(new BoolQueryBuilder().filter(new IdsQueryBuilder().addIds(documentId))));
SearchResponse searchResponse =
resultsPersisterService.searchWithRetry(
searchRequest,
jobId,
() -> true,
(msg) -> auditor.warning(jobId, documentId + " " + msg));
return searchResponse.getHits().getHits().length > 0
? searchResponse.getHits().getHits()[0].getIndex()
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import com.carrotsearch.randomizedtesting.annotations.Timeout;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import org.junit.After;
Expand All @@ -22,15 +23,23 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

/**
Expand All @@ -39,6 +48,7 @@
public class IndexingStateProcessorTests extends ESTestCase {

private static final String STATE_SAMPLE = ""
+ " \n"
+ "{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}\n"
+ "{ \"field\" : \"value1\" }\n"
+ "\0"
Expand All @@ -56,54 +66,99 @@ public class IndexingStateProcessorTests extends ESTestCase {

private IndexingStateProcessor stateProcessor;
private ResultsPersisterService resultsPersisterService;
private SearchResponse searchResponse;

@Before
public void initialize() {
searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
resultsPersisterService = mock(ResultsPersisterService.class);
doReturn(searchResponse).when(resultsPersisterService).searchWithRetry(any(SearchRequest.class), any(), any(), any());
doReturn(mock(BulkResponse.class)).when(resultsPersisterService).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class);
stateProcessor = spy(new IndexingStateProcessor(JOB_ID, resultsPersisterService, auditor));
when(resultsPersisterService.bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any())).thenReturn(mock(BulkResponse.class));
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
}

@After
public void verifyNoMoreClientInteractions() {
Mockito.verifyNoMoreInteractions(resultsPersisterService);
verifyNoMoreInteractions(resultsPersisterService);
}

public void testStateRead() throws IOException {
public void testExtractDocId() throws IOException {
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_index\": \"test\", \"_id\": \"1\" } }\n"), equalTo("1"));
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_id\": \"2\" } }\n"), equalTo("2"));
}

private void testStateRead(SearchHits searchHits, String expectedIndexOrAlias) throws IOException {
when(searchResponse.getHits()).thenReturn(searchHits);

ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
stateProcessor.process(stream);
ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
verify(stateProcessor, times(3)).persist(bytesRefCaptor.capture());
verify(stateProcessor, times(3)).persist(eq(expectedIndexOrAlias), bytesRefCaptor.capture());

String[] threeStates = STATE_SAMPLE.split("\0");
List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();
assertEquals(threeStates[0], capturedBytes.get(0).utf8ToString());
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
}

public void testStateRead_StateDocumentCreated() throws IOException {
testStateRead(SearchHits.empty(), ".ml-state-write");
}

public void testStateRead_StateDocumentUpdated() throws IOException {
testStateRead(
new SearchHits(new SearchHit[]{ SearchHit.createFromMap(Collections.singletonMap("_index", ".ml-state-dummy")) }, null, 0.0f),
".ml-state-dummy");
}

public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
String zeroBytes = "\0\0\0\0\0\0";
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));

stateProcessor.process(stream);

verify(stateProcessor, never()).persist(any());
Mockito.verifyNoMoreInteractions(resultsPersisterService);
verify(stateProcessor, never()).persist(any(), any());
}

public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
String zeroBytes = " \n\0";
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
public void testStateReadGivenSpacesAndNewLineCharactersFollowedByZeroByte() throws IOException {
Function<String, InputStream> stringToInputStream = s -> new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8));

stateProcessor.process(stream);
stateProcessor.process(stringToInputStream.apply("\0"));
stateProcessor.process(stringToInputStream.apply(" \0"));
stateProcessor.process(stringToInputStream.apply("\n\0"));
stateProcessor.process(stringToInputStream.apply(" \0"));
stateProcessor.process(stringToInputStream.apply(" \n \0"));
stateProcessor.process(stringToInputStream.apply(" \n\n \0"));
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
stateProcessor.process(stringToInputStream.apply("\n \n \0"));

verify(stateProcessor, times(1)).persist(any());
Mockito.verifyNoMoreInteractions(resultsPersisterService);
verify(stateProcessor, never()).persist(any(), any());
}

public void testStateReadGivenNoIndexField() throws IOException {
String bytes = " \n \n \n \n\n {}\0";
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));

Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
assertThat(e.getMessage(), containsString("Could not extract \"index\" field"));

verify(stateProcessor, never()).persist(any(), any());
}

public void testStateReadGivenNoIdField() throws IOException {
String bytes = " \n \n \n {\"index\": {}}\0";
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));

Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
assertThat(e.getMessage(), containsString("Could not extract \"index._id\" field"));

verify(stateProcessor, never()).persist(any(), any());
}

/**
Expand All @@ -113,9 +168,11 @@ public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOExc
*/
@Timeout(millis = 10 * 1000)
public void testLargeStateRead() throws Exception {
when(searchResponse.getHits()).thenReturn(SearchHits.empty());

StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators
for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) {
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\"}}\n");
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\",\"_id\":\"doc").append(docNum).append("\"}}\n");
for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) {
builder.append("data");
}
Expand All @@ -124,7 +181,8 @@ public void testLargeStateRead() throws Exception {

ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
stateProcessor.process(stream);
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any());
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any());
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
}
}

0 comments on commit c7cc383

Please sign in to comment.