Skip to content

Commit

Permalink
[7.x][ML] Add log_time to AD data_counts and decide current based on …
Browse files Browse the repository at this point in the history
…it (#66343) (#66384)

This commit is fixing a potential bug if we support anomaly detection
results index rollover in the future.

In particular, we determine the current `data_counts` by sorting on the
latest record time. However, this is not correct if the job reverts
to an older model snapshot. To fix this we add `log_time` to `data_counts`
(similarly to `model_size_stats`) and sort on `log_time` to figure
out the current counts for the job.

Backport of #66343
  • Loading branch information
dimitris-athanasiou committed Dec 16, 2020
1 parent 38de88d commit 97cfc8f
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.Objects;

Expand Down Expand Up @@ -60,11 +61,12 @@ public class DataCounts implements ToXContentObject {
public static final ParseField LAST_DATA_TIME = new ParseField("last_data_time");
public static final ParseField LATEST_EMPTY_BUCKET_TIME = new ParseField("latest_empty_bucket_timestamp");
public static final ParseField LATEST_SPARSE_BUCKET_TIME = new ParseField("latest_sparse_bucket_timestamp");
public static final ParseField LOG_TIME = new ParseField("log_time");

public static final ConstructingObjectParser<DataCounts, Void> PARSER = new ConstructingObjectParser<>("data_counts", true,
a -> new DataCounts((String) a[0], (long) a[1], (long) a[2], (long) a[3], (long) a[4], (long) a[5], (long) a[6],
(long) a[7], (long) a[8], (long) a[9], (long) a[10], (Date) a[11], (Date) a[12], (Date) a[13], (Date) a[14],
(Date) a[15]));
(Date) a[15], (Instant) a[16]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
Expand Down Expand Up @@ -98,6 +100,10 @@ public class DataCounts implements ToXContentObject {
(p) -> TimeUtil.parseTimeField(p, LATEST_SPARSE_BUCKET_TIME.getPreferredName()),
LATEST_SPARSE_BUCKET_TIME,
ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, LOG_TIME.getPreferredName()),
LOG_TIME,
ValueType.VALUE);
}

private final String jobId;
Expand All @@ -116,12 +122,13 @@ public class DataCounts implements ToXContentObject {
private Date lastDataTimeStamp;
private Date latestEmptyBucketTimeStamp;
private Date latestSparseBucketTimeStamp;
private Instant logTime;

public DataCounts(String jobId, long processedRecordCount, long processedFieldCount, long inputBytes,
long inputFieldCount, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount,
long emptyBucketCount, long sparseBucketCount, long bucketCount,
Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp,
Date latestEmptyBucketTimeStamp, Date latestSparseBucketTimeStamp) {
Date latestEmptyBucketTimeStamp, Date latestSparseBucketTimeStamp, Instant logTime) {
this.jobId = jobId;
this.processedRecordCount = processedRecordCount;
this.processedFieldCount = processedFieldCount;
Expand All @@ -138,31 +145,13 @@ public DataCounts(String jobId, long processedRecordCount, long processedFieldCo
this.lastDataTimeStamp = lastDataTimeStamp;
this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp;
this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp;
this.logTime = logTime == null ? null : Instant.ofEpochMilli(logTime.toEpochMilli());
}

DataCounts(String jobId) {
this.jobId = jobId;
}

public DataCounts(DataCounts lhs) {
jobId = lhs.jobId;
processedRecordCount = lhs.processedRecordCount;
processedFieldCount = lhs.processedFieldCount;
inputBytes = lhs.inputBytes;
inputFieldCount = lhs.inputFieldCount;
invalidDateCount = lhs.invalidDateCount;
missingFieldCount = lhs.missingFieldCount;
outOfOrderTimeStampCount = lhs.outOfOrderTimeStampCount;
emptyBucketCount = lhs.emptyBucketCount;
sparseBucketCount = lhs.sparseBucketCount;
bucketCount = lhs.bucketCount;
latestRecordTimeStamp = lhs.latestRecordTimeStamp;
earliestRecordTimeStamp = lhs.earliestRecordTimeStamp;
lastDataTimeStamp = lhs.lastDataTimeStamp;
latestEmptyBucketTimeStamp = lhs.latestEmptyBucketTimeStamp;
latestSparseBucketTimeStamp = lhs.latestSparseBucketTimeStamp;
}

public String getJobId() {
return jobId;
}
Expand Down Expand Up @@ -331,6 +320,15 @@ public Date getLatestSparseBucketTimeStamp() {
return latestSparseBucketTimeStamp;
}

/**
* The wall clock time at the point when this instance was created.
*
* @return The wall clock time
*/
public Instant getLogTime() {
return logTime;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -367,6 +365,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
latestSparseBucketTimeStamp.getTime());
}
builder.field(INPUT_RECORD_COUNT.getPreferredName(), getInputRecordCount());
if (logTime != null) {
builder.timeField(LOG_TIME.getPreferredName(), LOG_TIME.getPreferredName() + "_string", logTime.toEpochMilli());
}

builder.endObject();
return builder;
Expand Down Expand Up @@ -402,14 +403,15 @@ public boolean equals(Object other) {
Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp) &&
Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp) &&
Objects.equals(this.latestEmptyBucketTimeStamp, that.latestEmptyBucketTimeStamp) &&
Objects.equals(this.latestSparseBucketTimeStamp, that.latestSparseBucketTimeStamp);
Objects.equals(this.latestSparseBucketTimeStamp, that.latestSparseBucketTimeStamp) &&
Objects.equals(this.logTime, that.logTime);
}

@Override
public int hashCode() {
return Objects.hash(jobId, processedRecordCount, processedFieldCount,
inputBytes, inputFieldCount, invalidDateCount, missingFieldCount,
outOfOrderTimeStampCount, lastDataTimeStamp, emptyBucketCount, sparseBucketCount, bucketCount,
latestRecordTimeStamp, earliestRecordTimeStamp, latestEmptyBucketTimeStamp, latestSparseBucketTimeStamp);
latestRecordTimeStamp, earliestRecordTimeStamp, latestEmptyBucketTimeStamp, latestSparseBucketTimeStamp, logTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.elasticsearch.test.AbstractXContentTestCase;
import org.joda.time.DateTime;

import java.util.Date;

import static org.hamcrest.Matchers.greaterThan;
import java.time.Instant;

public class DataCountsTests extends AbstractXContentTestCase<DataCounts> {

Expand All @@ -35,7 +33,7 @@ public static DataCounts createTestInstance(String jobId) {
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(),
new DateTime(randomDateTimeZone()).toDate());
new DateTime(randomDateTimeZone()).toDate(), randomBoolean() ? null : Instant.now());
}

@Override
Expand All @@ -53,78 +51,4 @@ protected boolean supportsUnknownFields() {
return true;
}

public void testCountsEquals_GivenEqualCounts() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);

assertTrue(counts1.equals(counts2));
assertTrue(counts2.equals(counts1));
}

public void testCountsHashCode_GivenEqualCounts() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
assertEquals(counts1.hashCode(), counts2.hashCode());
}

public void testCountsCopyConstructor() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
DataCounts counts2 = new DataCounts(counts1);

assertEquals(counts1.hashCode(), counts2.hashCode());
}

public void testCountCreatedZero() throws Exception {
DataCounts counts = new DataCounts(randomAlphaOfLength(16));
assertAllFieldsEqualZero(counts);
}

public void testCountCopyCreatedFieldsNotZero() throws Exception {
DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 7, 8, 9, 1479211200000L, 1479384000000L, 13, 14, 15);
assertAllFieldsGreaterThanZero(counts1);

DataCounts counts2 = new DataCounts(counts1);
assertAllFieldsGreaterThanZero(counts2);
}

private void assertAllFieldsEqualZero(DataCounts stats) throws Exception {
assertEquals(0L, stats.getProcessedRecordCount());
assertEquals(0L, stats.getProcessedFieldCount());
assertEquals(0L, stats.getInputBytes());
assertEquals(0L, stats.getInputFieldCount());
assertEquals(0L, stats.getInputRecordCount());
assertEquals(0L, stats.getInvalidDateCount());
assertEquals(0L, stats.getMissingFieldCount());
assertEquals(0L, stats.getOutOfOrderTimeStampCount());
}

private void assertAllFieldsGreaterThanZero(DataCounts stats) throws Exception {
assertThat(stats.getProcessedRecordCount(), greaterThan(0L));
assertThat(stats.getProcessedFieldCount(), greaterThan(0L));
assertThat(stats.getInputBytes(), greaterThan(0L));
assertThat(stats.getInputFieldCount(), greaterThan(0L));
assertThat(stats.getInputRecordCount(), greaterThan(0L));
assertThat(stats.getInputRecordCount(), greaterThan(0L));
assertThat(stats.getInvalidDateCount(), greaterThan(0L));
assertThat(stats.getMissingFieldCount(), greaterThan(0L));
assertThat(stats.getOutOfOrderTimeStampCount(), greaterThan(0L));
assertThat(stats.getLatestRecordTimeStamp().getTime(), greaterThan(0L));
}

private static DataCounts createCounts(
long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount,
long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount,
long emptyBucketCount, long sparseBucketCount, long bucketCount,
long earliestRecordTime, long latestRecordTime, long lastDataTimeStamp, long latestEmptyBucketTimeStamp,
long latestSparseBucketTimeStamp) {

DataCounts counts = new DataCounts("foo", processedRecordCount, processedFieldCount, inputBytes,
inputFieldCount, invalidDateCount, missingFieldCount, outOfOrderTimeStampCount,
emptyBucketCount, sparseBucketCount, bucketCount,
new Date(earliestRecordTime), new Date(latestRecordTime),
new Date(lastDataTimeStamp), new Date(latestEmptyBucketTimeStamp), new Date(latestSparseBucketTimeStamp));

return counts;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=latest-record-timestamp]
(date)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=latest-sparse-record-timestamp]
`log_time`:::
(date) The timestamp of the `data_counts` according to server time.
`missing_field_count`:::
(long)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=missing-field-count]
Expand Down

0 comments on commit 97cfc8f

Please sign in to comment.