Skip to content

Commit

Permalink
[ML] Fix failure on datafeed preview with date_nanos time field (#77109
Browse files Browse the repository at this point in the history
…) (#77121)

Preview datafeed currently fails when the time field is of type
`date_nanos`. The failure contains the error message:

```
date [...] is after 2262-04-11T23:47:16.854775807 and cannot be
stored in nanosecond resolution
```

This commit fixes this failure. The cause of the issue was that
preview generates a search with a range query on the time field
whose upper bound is `Long.MAX_VALUE` in order to include all
available data in the preview. However, that value is parsed
with `DateUtils.toLong` when the time field is `date_nanos` and
it hits the limitation that values can't be larger than
`DateUtils.MAX_NANOSECOND_INSTANT`. The fix checks whether the
time field is `date_nanos` and uses `DateUtils.MAX_NANOSECOND_INSTANT`
as the upper bound instead of `Long.MAX_VALUE`.
  • Loading branch information
dimitris-athanasiou committed Sep 1, 2021
1 parent 39a2786 commit 57e3a7c
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -35,9 +40,12 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
import static org.elasticsearch.xpack.core.ClientHelper.filterSecurityHeaders;
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;

Expand Down Expand Up @@ -93,22 +101,27 @@ private void previewDatafeed(
Job job,
ActionListener<PreviewDatafeedAction.Response> listener
) {
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
DatafeedConfig.Builder previewDatafeedBuilder = buildPreviewDatafeed(datafeedConfig);
useSecondaryAuthIfAvailable(securityContext, () -> {
previewDatafeed.setHeaders(filterSecurityHeaders(threadPool.getThreadContext().getHeaders()));
previewDatafeedBuilder.setHeaders(filterSecurityHeaders(threadPool.getThreadContext().getHeaders()));
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DatafeedConfig previewDatafeedConfig = previewDatafeedBuilder.build();
DataExtractorFactory.create(
client,
previewDatafeed.build(),
previewDatafeedConfig,
job,
xContentRegistry,
// Fake DatafeedTimingStatsReporter that does not have access to results index
new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy) -> {}),
listener.delegateFailure((l, dataExtractorFactory) -> {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, l));
isDateNanos(previewDatafeedConfig.getHeaders(), job.getDataDescription().getTimeField(),
listener.delegateFailure((l2, isDateNanos) -> {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0,
isDateNanos ? DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli() : Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, l));
}));
}));
});
}
Expand All @@ -130,6 +143,23 @@ static DatafeedConfig.Builder buildPreviewDatafeed(DatafeedConfig datafeed) {
return previewDatafeed;
}

private void isDateNanos(Map<String, String> headers, String timeField, ActionListener<Boolean> listener) {
executeWithHeadersAsync(
headers,
ML_ORIGIN,
client,
FieldCapabilitiesAction.INSTANCE,
new FieldCapabilitiesRequest().fields(timeField),
ActionListener.wrap(
fieldCapsResponse -> {
Map<String, FieldCapabilities> timeFieldCaps = fieldCapsResponse.getField(timeField);
listener.onResponse(timeFieldCaps.keySet().contains(DateFieldMapper.DATE_NANOS_CONTENT_TYPE));
},
listener::onFailure
)
);
}

/** Visible for testing */
static void previewDatafeed(DataExtractor dataExtractor, ActionListener<PreviewDatafeedAction.Response> listener) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,104 @@ setup:
- match: { 2.airline: foo }
- match: { 2.responsetime_x_2: 84.0 }
- match: { 2.doc_count: 1 }

---
"Test preview datafeed where time field is data_nanos":

- do:
indices.create:
index: index-airline-data-date-nanos
body:
mappings:
properties:
time:
type: date_nanos
airline:
type: keyword
responsetime:
type: float
event_rate:
type: integer

- do:
index:
index: index-airline-data-date-nanos
id: 1
body: >
{
"time": "2017-02-18T00:00:00Z",
"airline": "foo",
"responsetime": 1.0,
"event_rate": 5
}
- do:
index:
index: index-airline-data-date-nanos
id: 2
body: >
{
"time": "2017-02-18T00:30:00Z",
"airline": "foo",
"responsetime": 1.0,
"event_rate": 6
}
- do:
index:
index: index-airline-data-date-nanos
id: 3
body: >
{
"time": "2017-02-18T01:00:00Z",
"airline": "bar",
"responsetime": 42.0,
"event_rate": 8
}
- do:
index:
index: index-airline-data-date-nanos
id: 4
body: >
{
"time": "2017-02-18T01:01:00Z",
"airline": "foo",
"responsetime": 42.0,
"event_rate": 7
}
- do:
indices.refresh:
index: index-airline-data-date-nanos

- do:
ml.preview_datafeed:
body: >
{
"datafeed_config": {
"indexes":"index-airline-data-date-nanos"
},
"job_config": {
"analysis_config": {
"bucket_span": "1h",
"detectors": [{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description": {
"time_field":"time"
}
}
}
- length: { $body: 4 }
- match: { 0.time: 1487376000000 }
- match: { 0.airline: foo }
- match: { 0.responsetime: 1.0 }
- match: { 1.time: 1487377800000 }
- match: { 1.airline: foo }
- match: { 1.responsetime: 1.0 }
- match: { 2.time: 1487379600000 }
- match: { 2.airline: bar }
- match: { 2.responsetime: 42.0 }
- match: { 3.time: 1487379660000 }
- match: { 3.airline: foo }
- match: { 3.responsetime: 42.0 }

0 comments on commit 57e3a7c

Please sign in to comment.