Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E");
public static final TransportVersion V_8_500_012 = registerTransportVersion(8_500_012, "BB6F4AF1-A860-4FD4-A138-8150FFBE0ABD");

private static class CurrentHolder {
private static final TransportVersion CURRENT = findCurrent(V_8_500_011);
private static final TransportVersion CURRENT = findCurrent(V_8_500_012);

// finds the pluggable current version, or uses the given fallback
private static TransportVersion findCurrent(TransportVersion fallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -62,6 +63,7 @@ public static Request parseRequest(String jobId, XContentParser parser) {

private boolean calcInterim = false;
private boolean waitForNormalization = true;
private boolean refreshRequired = true;
private String start;
private String end;
private String advanceTime;
Expand All @@ -77,6 +79,9 @@ public Request(StreamInput in) throws IOException {
advanceTime = in.readOptionalString();
skipTime = in.readOptionalString();
waitForNormalization = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
refreshRequired = in.readBoolean();
}
}

@Override
Expand All @@ -88,6 +93,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(advanceTime);
out.writeOptionalString(skipTime);
out.writeBoolean(waitForNormalization);
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
out.writeBoolean(refreshRequired);
}
}

public Request(String jobId) {
Expand Down Expand Up @@ -138,18 +146,32 @@ public boolean isWaitForNormalization() {
return waitForNormalization;
}

public boolean isRefreshRequired() {
return refreshRequired;
}

/**
* Used internally. Datafeeds do not need to wait renormalization to complete before continuing.
* Used internally. Datafeeds do not need to wait for renormalization to complete before continuing.
*
* For large jobs, renormalization can take minutes, causing datafeeds to needlessly pause execution.
*/
public void setWaitForNormalization(boolean waitForNormalization) {
this.waitForNormalization = waitForNormalization;
}

/**
* Used internally. For datafeeds, there is no need for the results to be searchable after the flush,
* as the datafeed itself does not search them immediately.
*
* Particularly for short bucket spans these refreshes could be a significant cost.
**/
public void setRefreshRequired(boolean refreshRequired) {
this.refreshRequired = refreshRequired;
}

@Override
public int hashCode() {
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization);
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization, refreshRequired);
}

@Override
Expand All @@ -164,6 +186,7 @@ public boolean equals(Object obj) {
return Objects.equals(jobId, other.jobId)
&& calcInterim == other.calcInterim
&& waitForNormalization == other.waitForNormalization
&& refreshRequired == other.refreshRequired
&& Objects.equals(start, other.start)
&& Objects.equals(end, other.end)
&& Objects.equals(advanceTime, other.advanceTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.output;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -28,44 +29,57 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
public static final ParseField TYPE = new ParseField("flush");
public static final ParseField ID = new ParseField("id");
public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
public static final ParseField REFRESH_REQUIRED = new ParseField("refresh_required");

public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
TYPE.getPreferredName(),
a -> new FlushAcknowledgement((String) a[0], (Long) a[1])
a -> new FlushAcknowledgement((String) a[0], (Long) a[1], (Boolean) a[2])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REFRESH_REQUIRED);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is correct to include the internal field in the parser, as this is for internal communications with our C++ code.

}

private final String id;
private final Instant lastFinalizedBucketEnd;
private final boolean refreshRequired;

public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs) {
public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean refreshRequired) {
this.id = id;
// The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null
this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0)
? Instant.ofEpochMilli(lastFinalizedBucketEndMs)
: null;
this.refreshRequired = refreshRequired == null || refreshRequired;
}

public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd) {
public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean refreshRequired) {
this.id = id;
// Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object
long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0;
this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null;
this.refreshRequired = refreshRequired == null || refreshRequired;
}

public FlushAcknowledgement(StreamInput in) throws IOException {
id = in.readString();
lastFinalizedBucketEnd = in.readOptionalInstant();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
refreshRequired = in.readBoolean();
} else {
refreshRequired = true;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeOptionalInstant(lastFinalizedBucketEnd);
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
out.writeBoolean(refreshRequired);
}
}

public String getId() {
Expand All @@ -76,6 +90,10 @@ public Instant getLastFinalizedBucketEnd() {
return lastFinalizedBucketEnd;
}

public boolean getRefreshRequired() {
return refreshRequired;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -87,13 +105,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
lastFinalizedBucketEnd.toEpochMilli()
);
}
builder.field(REFRESH_REQUIRED.getPreferredName(), refreshRequired);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(id, lastFinalizedBucketEnd);
return Objects.hash(id, lastFinalizedBucketEnd, refreshRequired);
}

@Override
Expand All @@ -105,6 +124,8 @@ public boolean equals(Object obj) {
return false;
}
FlushAcknowledgement other = (FlushAcknowledgement) obj;
return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
return Objects.equals(id, other.id)
&& Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd)
&& refreshRequired == other.refreshRequired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ protected Request createTestInstance() {
if (randomBoolean()) {
request.setWaitForNormalization(randomBoolean());
}
if (randomBoolean()) {
request.setRefreshRequired(randomBoolean());
}
if (randomBoolean()) {
request.setCalcInterim(randomBoolean());
}
Expand Down Expand Up @@ -49,6 +52,9 @@ protected Writeable.Reader<Request> instanceReader() {

@Override
protected Request mutateInstanceForVersion(Request instance, TransportVersion version) {
if (version.before(TransportVersion.V_8_500_012)) {
instance.setRefreshRequired(true);
}
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ private static Quantiles createQuantiles() {
}

private static FlushAcknowledgement createFlushAcknowledgement() {
return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant());
return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant(), true);
}

private static class ResultsBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ protected void taskOperation(
FlushJobParams.Builder paramsBuilder = FlushJobParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim());
paramsBuilder.waitForNormalization(request.isWaitForNormalization());
paramsBuilder.refreshRequired(request.isRefreshRequired());
if (request.getAdvanceTime() != null) {
paramsBuilder.advanceTime(request.getAdvanceTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ Long runLookBack(long startTime, Long endTime) throws Exception {

FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setCalcInterim(true);
request.setRefreshRequired(false);
run(lookbackStartTimeMs, lookbackEnd, request);
if (shouldPersistAfterLookback(isLookbackOnly)) {
sendPersistRequest();
Expand Down Expand Up @@ -205,6 +206,7 @@ private long skipToStartTime(long startTime) {
// start time is after last checkpoint, thus we need to skip time
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setSkipTime(String.valueOf(startTime));
request.setRefreshRequired(false);
FlushJobAction.Response flushResponse = flushJob(request);
LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli());
return flushResponse.getLastFinalizedBucketEnd().toEpochMilli();
Expand All @@ -218,6 +220,7 @@ long runRealtime() throws Exception {
long end = toIntervalStartEpochMs(nowMinusQueryDelay);
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setWaitForNormalization(false);
request.setRefreshRequired(false);
request.setCalcInterim(true);
request.setAdvanceTime(String.valueOf(end));
run(start, end, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeV
*/
@Override
public String flushJob(FlushJobParams params) {
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L);
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L, true);
AutodetectResult result = new AutodetectResult(
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,12 @@ void processResult(AutodetectResult result) {
try {
bulkResultsPersister.executeRequest();
bulkAnnotationsPersister.executeRequest();
persister.commitWrites(
jobId,
EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
);
if (flushAcknowledgement.getRefreshRequired()) {
persister.commitWrites(
jobId,
EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
);
}
} catch (Exception e) {
logger.error(
"["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private static class FlushAcknowledgementHolder {
private volatile Exception flushException;

private FlushAcknowledgementHolder(String flushId) {
this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L);
this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L, true);
this.latch = new CountDownLatch(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,25 @@ public class FlushJobParams {
*/
private final boolean waitForNormalization;

/**
* Should the flush request trigger a refresh or not.
*/
private final boolean refreshRequired;

private FlushJobParams(
boolean calcInterim,
TimeRange timeRange,
Long advanceTimeSeconds,
Long skipTimeSeconds,
boolean waitForNormalization
boolean waitForNormalization,
boolean refreshRequired
) {
this.calcInterim = calcInterim;
this.timeRange = Objects.requireNonNull(timeRange);
this.advanceTimeSeconds = advanceTimeSeconds;
this.skipTimeSeconds = skipTimeSeconds;
this.waitForNormalization = waitForNormalization;
this.refreshRequired = refreshRequired;
}

public boolean shouldCalculateInterim() {
Expand Down Expand Up @@ -93,6 +100,10 @@ public boolean isWaitForNormalization() {
return waitForNormalization;
}

public boolean isRefreshRequired() {
return refreshRequired;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -105,12 +116,14 @@ public boolean equals(Object o) {
return calcInterim == that.calcInterim
&& Objects.equals(timeRange, that.timeRange)
&& Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds)
&& Objects.equals(skipTimeSeconds, that.skipTimeSeconds);
&& Objects.equals(skipTimeSeconds, that.skipTimeSeconds)
&& waitForNormalization == that.waitForNormalization
&& refreshRequired == that.refreshRequired;
}

@Override
public int hashCode() {
return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, refreshRequired);
}

public static class Builder {
Expand All @@ -119,6 +132,7 @@ public static class Builder {
private String advanceTime;
private String skipTime;
private boolean waitForNormalization = true;
private boolean refreshRequired = true;

public Builder calcInterim(boolean value) {
calcInterim = value;
Expand All @@ -145,6 +159,11 @@ public Builder waitForNormalization(boolean waitForNormalization) {
return this;
}

public Builder refreshRequired(boolean refreshRequired) {
this.refreshRequired = refreshRequired;
return this;
}

public FlushJobParams build() {
checkValidFlushArgumentsCombination();
Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime);
Expand All @@ -154,7 +173,7 @@ public FlushJobParams build() {
"advance_time [" + advanceTime + "] must be later than skip_time [" + skipTime + "]"
);
}
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization);
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, refreshRequired);
}

private void checkValidFlushArgumentsCombination() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class AutodetectControlMsgWriter extends AbstractControlMsgWriter {
*/
public static final String BACKGROUND_PERSIST_MESSAGE_CODE = "w";

/**
* This must match the code defined in the api::CAnomalyJob C++ class.
*/
public static final String REFRESH_REQUIRED_MESSAGE_CODE = "z";

/**
* An number to uniquely identify each flush so that subsequent code can
* wait for acknowledgement of the correct flush.
Expand Down Expand Up @@ -143,6 +148,7 @@ public void writeFlushControlMessage(FlushJobParams params) throws IOException {
if (params.shouldCalculateInterim()) {
writeControlCodeFollowedByTimeRange(INTERIM_MESSAGE_CODE, params.getStart(), params.getEnd());
}
writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.isRefreshRequired());
}

/**
Expand Down
Loading