Skip to content

Commit

Permalink
[7.x] [ML] allow data streams to be expanded for analytics and transf…
Browse files Browse the repository at this point in the history
…orms (#58280) (#58455)

This commits allows data streams to be a valid source for analytics and transforms.

Data streams are fairly transparent and our `_search` and `_reindex` actions work without error.

For `_transforms` the check-pointing works as desired as well. Data streams are effectively treated as an `alias` and the backing index values are stored within checkpointing information.
  • Loading branch information
benwtrent committed Jun 23, 2020
1 parent 0cc84d3 commit a9b868b
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ private void resolveLocalAndRemoteSource() {
indexNameExpressionResolver.concreteIndexNames(
state,
DEFAULT_INDICES_OPTIONS_FOR_VALIDATION,
true,
resolvedSource.toArray(new String[0])
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Recall;
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -141,6 +144,61 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}

public void testWithDatastreams() throws Exception {
initialize("classification_with_datastreams", true);
String predictedClassField = KEYWORD_FIELD + "_prediction";
indexData(sourceIndex, 300, 50, KEYWORD_FIELD);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null,
new Classification(
KEYWORD_FIELD,
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
null,
null,
null,
null,
null));
putAnalytics(config);

assertIsStopped(jobId);
assertProgressIsZero(jobId);

startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

client().admin().indices().refresh(new RefreshRequest(destIndex));
SearchResponse sourceData = client().prepareSearch(sourceIndex).setTrackTotalHits(true).setSize(1000).get();
for (SearchHit hit : sourceData.getHits()) {
Map<String, Object> destDoc = getDestDoc(config, hit);
Map<String, Object> resultsObject = getFieldValue(destDoc, "ml");
assertThat(getFieldValue(resultsObject, predictedClassField), is(in(KEYWORD_FIELD_VALUES)));
assertThat(getFieldValue(resultsObject, "is_training"), is(destDoc.containsKey(KEYWORD_FIELD)));
assertTopClasses(resultsObject, 2, KEYWORD_FIELD, KEYWORD_FIELD_VALUES);
@SuppressWarnings("unchecked")
List<Map<String, Object>> importanceArray = (List<Map<String, Object>>)resultsObject.get("feature_importance");
assertThat(importanceArray, hasSize(greaterThan(0)));
}

assertProgressComplete(jobId);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);
assertMlResultsFieldMappings(destIndex, predictedClassField, "keyword");
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
expectedDestIndexAuditMessage(),
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}

public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception {
initialize("classification_only_training_data_and_training_percent_is_100");
String predictedClassField = KEYWORD_FIELD + "_prediction";
Expand Down Expand Up @@ -455,7 +513,7 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
String sourceIndex = "classification_two_jobs_with_same_randomize_seed_source";
String dependentVariable = KEYWORD_FIELD;

createIndex(sourceIndex);
createIndex(sourceIndex, false);
// We use 100 rows as we can't set this too low. If too low it is possible
// we only train with rows of one of the two classes which leads to a failure.
indexData(sourceIndex, 100, 0, dependentVariable);
Expand Down Expand Up @@ -595,28 +653,65 @@ public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
}

private void initialize(String jobId) {
initialize(jobId, false);
}

private void initialize(String jobId, boolean isDatastream) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
this.destIndex = sourceIndex + "_results";
this.analysisUsesExistingDestIndex = randomBoolean();
createIndex(sourceIndex);
createIndex(sourceIndex, isDatastream);
if (analysisUsesExistingDestIndex) {
createIndex(destIndex);
createIndex(destIndex, false);
}
}

private static void createIndex(String index) {
client().admin().indices().prepareCreate(index)
.addMapping("_doc",
BOOLEAN_FIELD, "type=boolean",
NUMERICAL_FIELD, "type=double",
DISCRETE_NUMERICAL_FIELD, "type=integer",
TEXT_FIELD, "type=text",
KEYWORD_FIELD, "type=keyword",
NESTED_FIELD, "type=keyword",
ALIAS_TO_KEYWORD_FIELD, "type=alias,path=" + KEYWORD_FIELD,
ALIAS_TO_NESTED_FIELD, "type=alias,path=" + NESTED_FIELD)
.get();
private static void createIndex(String index, boolean isDatastream) {
String mapping = "{\n" +
" \"properties\": {\n" +
" \"time\": {\n" +
" \"type\": \"date\"\n" +
" }," +
" \""+ BOOLEAN_FIELD + "\": {\n" +
" \"type\": \"boolean\"\n" +
" }," +
" \""+ NUMERICAL_FIELD + "\": {\n" +
" \"type\": \"double\"\n" +
" }," +
" \""+ DISCRETE_NUMERICAL_FIELD + "\": {\n" +
" \"type\": \"integer\"\n" +
" }," +
" \""+ TEXT_FIELD + "\": {\n" +
" \"type\": \"text\"\n" +
" }," +
" \""+ KEYWORD_FIELD + "\": {\n" +
" \"type\": \"keyword\"\n" +
" }," +
" \""+ NESTED_FIELD + "\": {\n" +
" \"type\": \"keyword\"\n" +
" }," +
" \""+ ALIAS_TO_KEYWORD_FIELD + "\": {\n" +
" \"type\": \"alias\",\n" +
" \"path\": \"" + KEYWORD_FIELD + "\"\n" +
" }," +
" \""+ ALIAS_TO_NESTED_FIELD + "\": {\n" +
" \"type\": \"alias\",\n" +
" \"path\": \"" + NESTED_FIELD + "\"\n" +
" }" +
" }\n" +
" }";
if (isDatastream) {
try {
createDataStreamAndTemplate(index, "time", mapping);
} catch (IOException ex) {
throw new ElasticsearchException(ex);
}
} else {
client().admin().indices().prepareCreate(index)
.addMapping("_doc", mapping, XContentType.JSON)
.get();
}
}

private static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) {
Expand All @@ -630,7 +725,7 @@ private static void indexData(String sourceIndex, int numTrainingRows, int numNo
TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()),
KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()),
NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()));
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) {
Expand All @@ -655,7 +750,7 @@ private static void indexData(String sourceIndex, int numTrainingRows, int numNo
if (NESTED_FIELD.equals(dependentVariable) == false) {
source.addAll(Arrays.asList(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size())));
}
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,50 @@ public void testLookbackOnly() throws Exception {
waitUntilJobIsClosed(job.getId());
}

public void testLookbackOnlyDataStream() throws Exception {
String mapping = "{\n" +
" \"properties\": {\n" +
" \"time\": {\n" +
" \"type\": \"date\"\n" +
" }" +
" }\n" +
" }";
createDataStreamAndTemplate("datafeed_data_stream", "time", mapping);
long numDocs = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "datafeed_data_stream", "_doc", numDocs, twoWeeksAgo, oneWeekAgo);

client().admin().cluster().prepareHealth("datafeed_data_stream").setWaitForYellowStatus().get();

Job.Builder job = createScheduledJob("lookback-data-stream-job");
registerJob(job);
PutJobAction.Response putJobResponse = putJob(job);
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));

DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList("datafeed_data_stream"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);

startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));

GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
}, 60, TimeUnit.SECONDS);

waitUntilJobIsClosed(job.getId());
}

public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -270,6 +275,20 @@ protected void ensureClusterStateConsistency() throws IOException {
}
}

protected static void createDataStreamAndTemplate(String dataStreamName, String timeField, String mapping) throws IOException {
client().execute(PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request(dataStreamName + "_template")
.indexTemplate(new ComposableIndexTemplate(Collections.singletonList(dataStreamName),
new Template(null, new CompressedXContent("{\"_doc\":" + mapping + "}"), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(timeField))))
.actionGet();
client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).actionGet();
}

public static class MockPainlessScriptEngine extends MockScriptEngine {

public static final String NAME = "painless";
Expand Down

0 comments on commit a9b868b

Please sign in to comment.