Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Make datafeeds work with nanosecond time fields #51180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.yaml.snakeyaml.util.UriEncoder;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +37,7 @@ public void testMachineLearningInstalled() throws Exception {
assertTrue((Boolean) ml.get("enabled"));
}

public void testInvalidJob() throws Exception {
public void testInvalidJob() {
// The job name is invalid because it contains a space
String jobId = "invalid job";
ResponseException e = expectThrows(ResponseException.class, () -> createFarequoteJob(jobId));
Expand Down Expand Up @@ -103,22 +104,26 @@ public void testMiniFarequote() throws Exception {
}

public void testMiniFarequoteWithDatafeeder() throws Exception {
boolean datesHaveNanoSecondResolution = randomBoolean();
String dateMappingType = datesHaveNanoSecondResolution ? "date_nanos" : "date";
String dateFormat = datesHaveNanoSecondResolution ? "strict_date_optional_time_nanos" : "strict_date_optional_time";
String randomNanos = datesHaveNanoSecondResolution ? "," + randomIntBetween(100000000, 999999999) : "";
Request createAirlineDataRequest = new Request("PUT", "/airline-data");
createAirlineDataRequest.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"properties\": {"
+ " \"time\": { \"type\":\"date\"},"
+ " \"time\": { \"type\":\"" + dateMappingType + "\", \"format\":\"" + dateFormat + "\"},"
+ " \"airline\": { \"type\":\"keyword\"},"
+ " \"responsetime\": { \"type\":\"float\"}"
+ " }"
+ " }"
+ "}");
client().performRequest(createAirlineDataRequest);
Request airlineData1 = new Request("PUT", "/airline-data/_doc/1");
airlineData1.setJsonEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}");
airlineData1.setJsonEntity("{\"time\":\"2016-06-01T00:00:00" + randomNanos + "Z\",\"airline\":\"AAA\",\"responsetime\":135.22}");
client().performRequest(airlineData1);
Request airlineData2 = new Request("PUT", "/airline-data/_doc/2");
airlineData2.setJsonEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}");
airlineData2.setJsonEntity("{\"time\":\"2016-06-01T01:59:00" + randomNanos + "Z\",\"airline\":\"AAA\",\"responsetime\":541.76}");
client().performRequest(airlineData2);

// Ensure all data is searchable
Expand Down Expand Up @@ -147,7 +152,7 @@ public void testMiniFarequoteWithDatafeeder() throws Exception {
assertEquals(2, dataCountsDoc.get("input_record_count"));
assertEquals(2, dataCountsDoc.get("processed_record_count"));
} catch (IOException e) {
throw new RuntimeException(e);
throw new UncheckedIOException(e);
}
});

Expand Down Expand Up @@ -233,7 +238,7 @@ public void testMiniFarequoteReopen() throws Exception {
assertEquals(1000, responseBody2.get("bucket_count"));

// unintuitive: should return the earliest record timestamp of this feed???
assertEquals(null, responseBody2.get("earliest_record_timestamp"));
assertNull(responseBody2.get("earliest_record_timestamp"));
assertEquals(1407082000000L, responseBody2.get("latest_record_timestamp"));

assertEquals(Collections.singletonMap("closed", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static TimeBasedExtractedFields build(Job job, DatafeedConfig datafeed, F
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
remainingFields.forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));

return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public ExtractedField detect(String field) {
}

private ExtractedField detectNonScriptField(String field) {
if (isFieldOfType(field, TimeField.TYPE) && isAggregatable(field)) {
if (isFieldOfTypes(field, TimeField.TYPES) && isAggregatable(field)) {
return new TimeField(field, ExtractedField.Method.DOC_VALUE);
}
if (isFieldOfType(field, GeoPointField.TYPE)) {
Expand Down Expand Up @@ -129,9 +129,14 @@ public boolean isAggregatable(String field) {
}

private boolean isFieldOfType(String field, String type) {
return isFieldOfTypes(field, Collections.singleton(type));
}

private boolean isFieldOfTypes(String field, Set<String> types) {
assert types.isEmpty() == false;
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps != null && fieldCaps.size() == 1) {
return fieldCaps.containsKey(type);
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
return types.containsAll(fieldCaps.keySet());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.extractor;

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.search.SearchHit;

import java.util.Collections;
Expand All @@ -13,15 +14,17 @@

public class TimeField extends AbstractField {

static final String TYPE = "date";

private static final Set<String> TYPES = Collections.singleton(TYPE);
static final Set<String> TYPES = Collections.unmodifiableSet(Sets.newHashSet("date", "date_nanos"));

private static final String EPOCH_MILLIS_FORMAT = "epoch_millis";

private final Method method;

public TimeField(String name, Method method) {
// This class intentionally reports the possible types rather than the types reported by
// field caps at the point of construction. This means that it will continue to work if,
// for example, a newly created index has a "date_nanos" time field when in all the indices
// that matched the pattern when this constructor was called the field had type "date".
super(name, TYPES);
if (method == Method.SOURCE) {
throw new IllegalArgumentException("time field [" + name + "] cannot be extracted from source");
Expand All @@ -41,7 +44,23 @@ public Object[] value(SearchHit hit) {
return value;
}
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
value[0] = Long.parseLong((String) value[0]);
// Since nanosecond support was added epoch_millis timestamps may have a fractional component.
// We discard this, taking just whole milliseconds. Arguably it would be better to retain the
// precision here and let the downstream component decide whether it wants the accuracy, but
// that makes it hard to pass around the value as a number. The double type doesn't have
// enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
// work, but that isn't supported by the JSON parser if the number gets round-tripped through
// JSON. So String is really the only format that could be used, but the ML consumers of time
// are expecting a number.
String strVal0 = (String) value[0];
int dotPos = strVal0.indexOf('.');
if (dotPos == -1) {
value[0] = Long.parseLong(strVal0);
} else if (dotPos > 0) {
value[0] = Long.parseLong(strVal0.substring(0, dotPos));
} else {
value[0] = 0L;
}
} else if (value[0] instanceof Long == false) { // pre-6.0 field
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,61 @@
*/
package org.elasticsearch.xpack.ml.extractor;

import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;

import static org.hamcrest.Matchers.contains;
import java.time.Instant;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

public class TimeFieldTests extends ESTestCase {

public void testDocValueWithStringValue() {
long millis = randomLong();
SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", Long.toString(millis)).build();
public void testDocValueWithWholeMillisecondStringValue() {
long millis = randomNonNegativeLong();
Instant time = Instant.ofEpochMilli(millis);
DateFormatter formatter = DateFormatter.forPattern("epoch_millis");
String timeAsString = formatter.format(time);
SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", timeAsString).build();

ExtractedField timeField = new TimeField("time", ExtractedField.Method.DOC_VALUE);

assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
assertThat(timeField.getName(), equalTo("time"));
assertThat(timeField.getSearchField(), equalTo("time"));
assertThat(timeField.getTypes(), containsInAnyOrder("date", "date_nanos"));
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.DOC_VALUE));
assertThat(timeField.getDocValueFormat(), equalTo("epoch_millis"));
assertThat(timeField.supportsFromSource(), is(false));
expectThrows(UnsupportedOperationException.class, timeField::newFromSource);
assertThat(timeField.isMultiField(), is(false));
expectThrows(UnsupportedOperationException.class, timeField::getParentField);
}

public void testDocValueWithFractionalMillisecondStringValue() {
long millis = randomNonNegativeLong();
int extraNanos = randomIntBetween(1, 999999);
Instant time = Instant.ofEpochMilli(millis).plusNanos(extraNanos);
DateFormatter formatter = DateFormatter.forPattern("epoch_millis");
String timeAsString = formatter.format(time);
SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", timeAsString).build();

ExtractedField timeField = new TimeField("time", ExtractedField.Method.DOC_VALUE);

assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
assertThat(timeField.getName(), equalTo("time"));
assertThat(timeField.getSearchField(), equalTo("time"));
assertThat(timeField.getTypes(), contains("date"));
assertThat(timeField.getTypes(), containsInAnyOrder("date", "date_nanos"));
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.DOC_VALUE));
assertThat(timeField.getDocValueFormat(), equalTo("epoch_millis"));
assertThat(timeField.supportsFromSource(), is(false));
expectThrows(UnsupportedOperationException.class, () -> timeField.newFromSource());
expectThrows(UnsupportedOperationException.class, timeField::newFromSource);
assertThat(timeField.isMultiField(), is(false));
expectThrows(UnsupportedOperationException.class, () -> timeField.getParentField());
expectThrows(UnsupportedOperationException.class, timeField::getParentField);
}

public void testScriptWithLongValue() {
Expand All @@ -43,13 +71,13 @@ public void testScriptWithLongValue() {
assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
assertThat(timeField.getName(), equalTo("time"));
assertThat(timeField.getSearchField(), equalTo("time"));
assertThat(timeField.getTypes(), contains("date"));
assertThat(timeField.getTypes(), containsInAnyOrder("date", "date_nanos"));
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.SCRIPT_FIELD));
expectThrows(UnsupportedOperationException.class, () -> timeField.getDocValueFormat());
expectThrows(UnsupportedOperationException.class, timeField::getDocValueFormat);
assertThat(timeField.supportsFromSource(), is(false));
expectThrows(UnsupportedOperationException.class, () -> timeField.newFromSource());
expectThrows(UnsupportedOperationException.class, timeField::newFromSource);
assertThat(timeField.isMultiField(), is(false));
expectThrows(UnsupportedOperationException.class, () -> timeField.getParentField());
expectThrows(UnsupportedOperationException.class, timeField::getParentField);
}

public void testUnknownFormat() {
Expand Down