From b14c392816c5b81c463059daae7d6ca8799da2a9 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 13 Jun 2023 14:51:24 +0100 Subject: [PATCH 1/8] [ML] Add internal flag to flush api determining whether to refresh indices or not. When datafeeds send flush requests they don't require the indices to be refreshed. First working version. --- .../xpack/core/ml/action/FlushJobAction.java | 25 +++++++++++-- .../output/FlushAcknowledgement.java | 24 ++++++++++--- .../ml/action/FlushJobActionRequestTests.java | 3 ++ .../AutodetectResultProcessorIT.java | 2 +- .../ml/action/TransportFlushJobAction.java | 1 + .../xpack/ml/datafeed/DatafeedJob.java | 5 ++- .../BlackHoleAutodetectProcess.java | 2 +- .../output/AutodetectResultProcessor.java | 10 +++--- .../autodetect/output/FlushListener.java | 2 +- .../autodetect/params/FlushJobParams.java | 21 +++++++++-- .../writer/AutodetectControlMsgWriter.java | 6 ++++ .../xpack/ml/datafeed/DatafeedJobTests.java | 1 + .../AutodetectResultProcessorTests.java | 23 +++++++++++- .../output/FlushAcknowledgementTests.java | 12 +++++-- .../autodetect/output/FlushListenerTests.java | 4 +-- .../AutodetectControlMsgWriterTests.java | 36 +++++++++++++++++++ .../ml/job/results/AutodetectResultTests.java | 2 +- 17 files changed, 156 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index 2bb8e3363b3e4..f62b778f5decf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -40,6 +40,7 @@ public static class Request extends JobTaskRequest implements ToXConten public static final ParseField END = new ParseField("end"); public static final ParseField ADVANCE_TIME = new ParseField("advance_time"); public static final ParseField SKIP_TIME = new ParseField("skip_time"); + public static final ParseField SHOULD_REFRESH = new ParseField("should_refresh"); private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); @@ -50,6 +51,7 @@ public static class Request extends JobTaskRequest implements ToXConten PARSER.declareString(Request::setEnd, END); PARSER.declareString(Request::setAdvanceTime, ADVANCE_TIME); PARSER.declareString(Request::setSkipTime, SKIP_TIME); + PARSER.declareBoolean(Request::setShouldRefresh, SHOULD_REFRESH); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -62,6 +64,7 @@ public static Request parseRequest(String jobId, XContentParser parser) { private boolean calcInterim = false; private boolean waitForNormalization = true; + private boolean shouldRefresh = true; private String start; private String end; private String advanceTime; @@ -77,6 +80,7 @@ public Request(StreamInput in) throws IOException { advanceTime = in.readOptionalString(); skipTime = in.readOptionalString(); waitForNormalization = in.readBoolean(); + shouldRefresh = in.readBoolean(); } @Override @@ -88,6 +92,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(advanceTime); out.writeOptionalString(skipTime); out.writeBoolean(waitForNormalization); + out.writeBoolean(shouldRefresh); } public Request(String jobId) { @@ -138,8 +143,12 @@ public boolean isWaitForNormalization() { return waitForNormalization; } + public boolean isShouldRefresh() { + return shouldRefresh; + } + /** - * Used internally. Datafeeds do not need to wait renormalization to complete before continuing. + * Used internally. Datafeeds do not need to wait for renormalization to complete before continuing. * * For large jobs, renormalization can take minutes, causing datafeeds to needlessly pause execution. */ @@ -147,9 +156,19 @@ public void setWaitForNormalization(boolean waitForNormalization) { this.waitForNormalization = waitForNormalization; } + /** + * Used internally. For datafeeds, there is no need for the results to be searchable after the flush, + * as the datafeed itself does not search them immediately. + * + * Particularly for short bucket spans these refreshes could be a significant cost. + **/ + public void setShouldRefresh(boolean shouldRefresh) { + this.shouldRefresh = shouldRefresh; + } + @Override public int hashCode() { - return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization); + return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization, shouldRefresh); } @Override @@ -164,6 +183,7 @@ public boolean equals(Object obj) { return Objects.equals(jobId, other.jobId) && calcInterim == other.calcInterim && waitForNormalization == other.waitForNormalization + && shouldRefresh == other.shouldRefresh && Objects.equals(start, other.start) && Objects.equals(end, other.end) && Objects.equals(advanceTime, other.advanceTime) @@ -187,6 +207,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (skipTime != null) { builder.field(SKIP_TIME.getPreferredName(), skipTime); } + builder.field(SHOULD_REFRESH.getPreferredName(), shouldRefresh); builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java index c741138e1758e..f05845af28af4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java @@ -28,44 +28,51 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { public static final ParseField TYPE = new ParseField("flush"); public static final ParseField ID = new ParseField("id"); public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end"); + public static final ParseField SHOULD_REFRESH = new ParseField("should_refresh"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( TYPE.getPreferredName(), - a -> new FlushAcknowledgement((String) a[0], (Long) a[1]) + a -> new FlushAcknowledgement((String) a[0], (Long) a[1], (Boolean) a[2]) ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SHOULD_REFRESH); } private final String id; private final Instant lastFinalizedBucketEnd; + private final Boolean shouldRefresh; - public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs) { + public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean shouldRefresh) { this.id = id; // The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0) ? Instant.ofEpochMilli(lastFinalizedBucketEndMs) : null; + this.shouldRefresh = shouldRefresh == null || shouldRefresh; } - public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd) { + public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean shouldRefresh) { this.id = id; // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; + this.shouldRefresh = shouldRefresh == null || shouldRefresh; } public FlushAcknowledgement(StreamInput in) throws IOException { id = in.readString(); lastFinalizedBucketEnd = in.readOptionalInstant(); + shouldRefresh = in.readOptionalBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeOptionalInstant(lastFinalizedBucketEnd); + out.writeOptionalBoolean(shouldRefresh); } public String getId() { @@ -76,6 +83,10 @@ public Instant getLastFinalizedBucketEnd() { return lastFinalizedBucketEnd; } + public Boolean getShouldRefresh() { + return shouldRefresh; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -87,13 +98,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws lastFinalizedBucketEnd.toEpochMilli() ); } + builder.field(SHOULD_REFRESH.getPreferredName(), shouldRefresh); builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(id, lastFinalizedBucketEnd); + return Objects.hash(id, lastFinalizedBucketEnd, shouldRefresh); } @Override @@ -105,6 +117,8 @@ public boolean equals(Object obj) { return false; } FlushAcknowledgement other = (FlushAcknowledgement) obj; - return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd); + return Objects.equals(id, other.id) + && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd) + && Objects.equals(shouldRefresh, other.shouldRefresh); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java index 538ce2ed33de4..da89fc1862212 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java @@ -19,6 +19,9 @@ protected Request createTestInstance() { if (randomBoolean()) { request.setWaitForNormalization(randomBoolean()); } + if (randomBoolean()) { + request.setShouldRefresh(randomBoolean()); + } if (randomBoolean()) { request.setCalcInterim(randomBoolean()); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index f437a5ad7d52d..365ce4bbee389 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -575,7 +575,7 @@ private static Quantiles createQuantiles() { } private static FlushAcknowledgement createFlushAcknowledgement() { - return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant()); + return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant(), randomBoolean()); } private static class ResultsBuilder { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java index 8b2a3061871e7..6321cb16db734 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java @@ -51,6 +51,7 @@ protected void taskOperation( FlushJobParams.Builder paramsBuilder = FlushJobParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); paramsBuilder.waitForNormalization(request.isWaitForNormalization()); + paramsBuilder.shouldRefresh(request.isShouldRefresh()); if (request.getAdvanceTime() != null) { paramsBuilder.advanceTime(request.getAdvanceTime()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 132b982ba13c1..d99c403f7c78e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -175,6 +175,7 @@ Long runLookBack(long startTime, Long endTime) throws Exception { FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setCalcInterim(true); + request.setShouldRefresh(false); run(lookbackStartTimeMs, lookbackEnd, request); if (shouldPersistAfterLookback(isLookbackOnly)) { sendPersistRequest(); @@ -205,6 +206,7 @@ private long skipToStartTime(long startTime) { // start time is after last checkpoint, thus we need to skip time FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setSkipTime(String.valueOf(startTime)); + request.setShouldRefresh(false); FlushJobAction.Response flushResponse = flushJob(request); LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli()); return flushResponse.getLastFinalizedBucketEnd().toEpochMilli(); @@ -218,6 +220,7 @@ long runRealtime() throws Exception { long end = toIntervalStartEpochMs(nowMinusQueryDelay); FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setWaitForNormalization(false); + request.setShouldRefresh(false); request.setCalcInterim(true); request.setAdvanceTime(String.valueOf(end)); run(start, end, request); @@ -500,7 +503,7 @@ private long toIntervalStartEpochMs(long epochMs) { private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) { try { - LOGGER.trace("[" + jobId + "] Sending flush request"); + LOGGER.info("[" + jobId + "] Sending flush request"); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { return client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 03038c3ede6bf..0ebd6ab9b3e79 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -101,7 +101,7 @@ public void writeUpdateScheduledEventsMessage(List events, TimeV */ @Override public String flushJob(FlushJobParams params) { - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L, true); AutodetectResult result = new AutodetectResult( null, null, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 0843bac103674..390d56f6018f7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -394,10 +394,12 @@ void processResult(AutodetectResult result) { try { bulkResultsPersister.executeRequest(); bulkAnnotationsPersister.executeRequest(); - persister.commitWrites( - jobId, - EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS) - ); + if (flushAcknowledgement.getShouldRefresh()) { + persister.commitWrites( + jobId, + EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS) + ); + } } catch (Exception e) { logger.error( "[" diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 5f6fe747945ef..b911f7217870e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -70,7 +70,7 @@ private static class FlushAcknowledgementHolder { private volatile Exception flushException; private FlushAcknowledgementHolder(String flushId) { - this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L); + this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L, true); this.latch = new CountDownLatch(1); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index 1a81c6079532b..b3d6324fdd53f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -41,18 +41,25 @@ public class FlushJobParams { */ private final boolean waitForNormalization; + /** + * Should the flush request trigger a refresh or not. + */ + private final boolean shouldRefresh; + private FlushJobParams( boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds, Long skipTimeSeconds, - boolean waitForNormalization + boolean waitForNormalization, + boolean shouldRefresh ) { this.calcInterim = calcInterim; this.timeRange = Objects.requireNonNull(timeRange); this.advanceTimeSeconds = advanceTimeSeconds; this.skipTimeSeconds = skipTimeSeconds; this.waitForNormalization = waitForNormalization; + this.shouldRefresh = shouldRefresh; } public boolean shouldCalculateInterim() { @@ -93,6 +100,10 @@ public boolean isWaitForNormalization() { return waitForNormalization; } + public Boolean shouldRefresh() { + return shouldRefresh; + } + public static Builder builder() { return new Builder(); } @@ -119,6 +130,7 @@ public static class Builder { private String advanceTime; private String skipTime; private boolean waitForNormalization = true; + private boolean shouldRefresh = true; public Builder calcInterim(boolean value) { calcInterim = value; @@ -145,6 +157,11 @@ public Builder waitForNormalization(boolean waitForNormalization) { return this; } + public Builder shouldRefresh(boolean shouldRefresh) { + this.shouldRefresh = shouldRefresh; + return this; + } + public FlushJobParams build() { checkValidFlushArgumentsCombination(); Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime); @@ -154,7 +171,7 @@ public FlushJobParams build() { "advance_time [" + advanceTime + "] must be later than skip_time [" + skipTime + "]" ); } - return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization); + return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, shouldRefresh); } private void checkValidFlushArgumentsCombination() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java index 7ec932d50ff61..a46a9a9f09ecf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java @@ -78,6 +78,11 @@ public class AutodetectControlMsgWriter extends AbstractControlMsgWriter { */ public static final String BACKGROUND_PERSIST_MESSAGE_CODE = "w"; + /** + * This must match the code defined in the api::CAnomalyJob C++ class. + */ + public static final String SHOULD_REFRESH_MESSAGE_CODE = "z"; + /** * An number to uniquely identify each flush so that subsequent code can * wait for acknowledgement of the correct flush. @@ -143,6 +148,7 @@ public void writeFlushControlMessage(FlushJobParams params) throws IOException { if (params.shouldCalculateInterim()) { writeControlCodeFollowedByTimeRange(INTERIM_MESSAGE_CODE, params.getStart(), params.getEnd()); } + writeMessage(SHOULD_REFRESH_MESSAGE_CODE + params.shouldRefresh().toString()); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index afaea47c8a6b3..8ad8df7b02b66 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -285,6 +285,7 @@ public void testRealtimeRun() throws Exception { flushRequest.setCalcInterim(true); flushRequest.setAdvanceTime("59000"); flushRequest.setWaitForNormalization(false); + flushRequest.setShouldRefresh(false); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 9a4489c4857d9..7a881caa33397 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -248,10 +248,11 @@ public void testProcessResult_categoryDefinition() { verify(bulkResultsPersister, never()).executeRequest(); } - public void testProcessResult_flushAcknowledgement() { + public void testProcessResult_flushAcknowledgementWithRefresh() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); + when(flushAcknowledgement.getShouldRefresh()).thenReturn(true); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); processorUnderTest.setDeleteInterimRequired(false); @@ -267,10 +268,30 @@ public void testProcessResult_flushAcknowledgement() { verify(bulkResultsPersister).executeRequest(); } + public void testProcessResult_flushAcknowledgement() { + AutodetectResult result = mock(AutodetectResult.class); + FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); + when(flushAcknowledgement.getId()).thenReturn(JOB_ID); + when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); + + processorUnderTest.setDeleteInterimRequired(false); + processorUnderTest.processResult(result); + assertTrue(processorUnderTest.isDeleteInterimRequired()); + + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); + verify(persister, never()).commitWrites( + JOB_ID, + EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS) + ); + verify(bulkResultsPersister).executeRequest(); + } + public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(Integer.valueOf(randomInt(100)).toString()); + when(flushAcknowledgement.getShouldRefresh()).thenReturn(true); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); when(categoryDefinition.getCategoryId()).thenReturn(1L); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java index f9cf683d64a58..023361ed73b69 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java @@ -23,9 +23,17 @@ protected FlushAcknowledgement doParseInstance(XContentParser parser) { @Override protected FlushAcknowledgement createTestInstance() { if (randomBoolean()) { - return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomFrom(randomNonNegativeLong(), 0L, null)); + return new FlushAcknowledgement( + randomAlphaOfLengthBetween(1, 20), + randomFrom(randomNonNegativeLong(), 0L, null), + randomBoolean() + ); } else { - return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomFrom(randomInstant(), Instant.EPOCH, null)); + return new FlushAcknowledgement( + randomAlphaOfLengthBetween(1, 20), + randomFrom(randomInstant(), Instant.EPOCH, null), + randomBoolean() + ); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java index a4a9fd2c012b1..e4ff8df0457fc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java @@ -35,7 +35,7 @@ public void testAcknowledgeFlush() throws Exception { }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertNull(flushAcknowledgementHolder.get()); - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", 12345678L); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", 12345678L, false); listener.acknowledgeFlush(flushAcknowledgement, null); assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get())); assertEquals(1, listener.awaitingFlushed.size()); @@ -59,7 +59,7 @@ public void testAcknowledgeFlushFailure() throws Exception { }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertNull(flushExceptionHolder.get()); - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", Instant.ofEpochMilli(12345678L)); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", Instant.ofEpochMilli(12345678L), true); listener.acknowledgeFlush(flushAcknowledgement, new Exception("BOOM")); assertBusy(() -> { assertNotNull(flushExceptionHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java index 978ea72b5d501..bbecfc6b176b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java @@ -58,6 +58,9 @@ public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException { inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); inOrder.verify(lengthEncodedWriter).writeField("t1234567890"); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("ztrue"); verifyNoMoreInteractions(lengthEncodedWriter); } @@ -71,6 +74,9 @@ public void testWriteFlushControlMessage_GivenSkipTime() throws IOException { inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); inOrder.verify(lengthEncodedWriter).writeField("s1234567890"); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("ztrue"); verifyNoMoreInteractions(lengthEncodedWriter); } @@ -95,6 +101,9 @@ public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); inOrder.verify(lengthEncodedWriter).writeField("i"); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("ztrue"); verifyNoMoreInteractions(lengthEncodedWriter); } @@ -104,6 +113,27 @@ public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException { writer.writeFlushControlMessage(flushJobParams); + // Even a plain flush message contains the "shouldRefresh" flag, which + // is set to "true" by default + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("ztrue"); + verifyNoMoreInteractions(lengthEncodedWriter); + } + + public void testWriteFlushControlMessage_GivenShouldRefreshFalse() throws IOException { + AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); + FlushJobParams flushJobParams = FlushJobParams.builder().shouldRefresh(false).build(); + + writer.writeFlushControlMessage(flushJobParams); + + // Even a plain flush message contains the "shouldRefresh" flag, which + // is set to "true" by default + InOrder inOrder = inOrder(lengthEncodedWriter); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("zfalse"); verifyNoMoreInteractions(lengthEncodedWriter); } @@ -120,6 +150,9 @@ public void testWriteFlushControlMessage_GivenCalcInterimResultsWithTimeParams() inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); inOrder.verify(lengthEncodedWriter).writeField("i120 180"); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("ztrue"); verifyNoMoreInteractions(lengthEncodedWriter); } @@ -140,6 +173,9 @@ public void testWriteFlushControlMessage_GivenCalcInterimAndAdvanceTime() throws inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); inOrder.verify(lengthEncodedWriter).writeField("i50 100"); + inOrder.verify(lengthEncodedWriter).writeNumFields(4); + inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); + inOrder.verify(lengthEncodedWriter).writeField("ztrue"); verifyNoMoreInteractions(lengthEncodedWriter); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java index 9d812ac018239..951b0ee572c4a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java @@ -129,7 +129,7 @@ protected AutodetectResult createTestInstance() { categorizerStats = null; } if (randomBoolean()) { - flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomInstant()); + flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomInstant(), randomBoolean()); } else { flushAcknowledgement = null; } From 1a781b6dba72b7df2430139c3c0a1a0040943128 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 13 Jun 2023 15:39:26 +0100 Subject: [PATCH 2/8] Tidy up * change naming * remove unnecessary logging --- .../xpack/core/ml/action/FlushJobAction.java | 24 ++++++++-------- .../output/FlushAcknowledgement.java | 28 +++++++++---------- .../ml/action/FlushJobActionRequestTests.java | 2 +- .../ml/action/TransportFlushJobAction.java | 2 +- .../xpack/ml/datafeed/DatafeedJob.java | 8 +++--- .../output/AutodetectResultProcessor.java | 2 +- .../autodetect/params/FlushJobParams.java | 18 ++++++------ .../writer/AutodetectControlMsgWriter.java | 4 +-- .../xpack/ml/datafeed/DatafeedJobTests.java | 5 +++- .../AutodetectResultProcessorTests.java | 4 +-- .../AutodetectControlMsgWriterTests.java | 6 ++-- 11 files changed, 53 insertions(+), 50 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index f62b778f5decf..73d130a27535b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -40,7 +40,7 @@ public static class Request extends JobTaskRequest implements ToXConten public static final ParseField END = new ParseField("end"); public static final ParseField ADVANCE_TIME = new ParseField("advance_time"); public static final ParseField SKIP_TIME = new ParseField("skip_time"); - public static final ParseField SHOULD_REFRESH = new ParseField("should_refresh"); + public static final ParseField REFRESH_REQUIRED = new ParseField("refresh_required"); private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); @@ -51,7 +51,7 @@ public static class Request extends JobTaskRequest implements ToXConten PARSER.declareString(Request::setEnd, END); PARSER.declareString(Request::setAdvanceTime, ADVANCE_TIME); PARSER.declareString(Request::setSkipTime, SKIP_TIME); - PARSER.declareBoolean(Request::setShouldRefresh, SHOULD_REFRESH); + PARSER.declareBoolean(Request::setRefreshRequired, REFRESH_REQUIRED); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -64,7 +64,7 @@ public static Request parseRequest(String jobId, XContentParser parser) { private boolean calcInterim = false; private boolean waitForNormalization = true; - private boolean shouldRefresh = true; + private boolean refreshRequired = true; private String start; private String end; private String advanceTime; @@ -80,7 +80,7 @@ public Request(StreamInput in) throws IOException { advanceTime = in.readOptionalString(); skipTime = in.readOptionalString(); waitForNormalization = in.readBoolean(); - shouldRefresh = in.readBoolean(); + refreshRequired = in.readBoolean(); } @Override @@ -92,7 +92,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(advanceTime); out.writeOptionalString(skipTime); out.writeBoolean(waitForNormalization); - out.writeBoolean(shouldRefresh); + out.writeBoolean(refreshRequired); } public Request(String jobId) { @@ -143,8 +143,8 @@ public boolean isWaitForNormalization() { return waitForNormalization; } - public boolean isShouldRefresh() { - return shouldRefresh; + public boolean isRefreshRequired() { + return refreshRequired; } /** @@ -162,13 +162,13 @@ public void setWaitForNormalization(boolean waitForNormalization) { * * Particularly for short bucket spans these refreshes could be a significant cost. **/ - public void setShouldRefresh(boolean shouldRefresh) { - this.shouldRefresh = shouldRefresh; + public void setRefreshRequired(boolean refreshRequired) { + this.refreshRequired = refreshRequired; } @Override public int hashCode() { - return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization, shouldRefresh); + return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization, refreshRequired); } @Override @@ -183,7 +183,7 @@ public boolean equals(Object obj) { return Objects.equals(jobId, other.jobId) && calcInterim == other.calcInterim && waitForNormalization == other.waitForNormalization - && shouldRefresh == other.shouldRefresh + && refreshRequired == other.refreshRequired && Objects.equals(start, other.start) && Objects.equals(end, other.end) && Objects.equals(advanceTime, other.advanceTime) @@ -207,7 +207,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (skipTime != null) { builder.field(SKIP_TIME.getPreferredName(), skipTime); } - builder.field(SHOULD_REFRESH.getPreferredName(), shouldRefresh); + builder.field(REFRESH_REQUIRED.getPreferredName(), refreshRequired); builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java index f05845af28af4..43a32259901a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java @@ -28,7 +28,7 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { public static final ParseField TYPE = new ParseField("flush"); public static final ParseField ID = new ParseField("id"); public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end"); - public static final ParseField SHOULD_REFRESH = new ParseField("should_refresh"); + public static final ParseField REFRESH_REQUIRED = new ParseField("refresh_required"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( TYPE.getPreferredName(), @@ -38,41 +38,41 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { static { PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END); - PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SHOULD_REFRESH); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REFRESH_REQUIRED); } private final String id; private final Instant lastFinalizedBucketEnd; - private final Boolean shouldRefresh; + private final Boolean refreshRequired; - public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean shouldRefresh) { + public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean refreshRequired) { this.id = id; // The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0) ? Instant.ofEpochMilli(lastFinalizedBucketEndMs) : null; - this.shouldRefresh = shouldRefresh == null || shouldRefresh; + this.refreshRequired = refreshRequired == null || refreshRequired; } - public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean shouldRefresh) { + public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean refreshRequired) { this.id = id; // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; - this.shouldRefresh = shouldRefresh == null || shouldRefresh; + this.refreshRequired = refreshRequired == null || refreshRequired; } public FlushAcknowledgement(StreamInput in) throws IOException { id = in.readString(); lastFinalizedBucketEnd = in.readOptionalInstant(); - shouldRefresh = in.readOptionalBoolean(); + refreshRequired = in.readOptionalBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeOptionalInstant(lastFinalizedBucketEnd); - out.writeOptionalBoolean(shouldRefresh); + out.writeOptionalBoolean(refreshRequired); } public String getId() { @@ -83,8 +83,8 @@ public Instant getLastFinalizedBucketEnd() { return lastFinalizedBucketEnd; } - public Boolean getShouldRefresh() { - return shouldRefresh; + public Boolean getRefreshRequired() { + return refreshRequired; } @Override @@ -98,14 +98,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws lastFinalizedBucketEnd.toEpochMilli() ); } - builder.field(SHOULD_REFRESH.getPreferredName(), shouldRefresh); + builder.field(REFRESH_REQUIRED.getPreferredName(), refreshRequired); builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(id, lastFinalizedBucketEnd, shouldRefresh); + return Objects.hash(id, lastFinalizedBucketEnd, refreshRequired); } @Override @@ -119,6 +119,6 @@ public boolean equals(Object obj) { FlushAcknowledgement other = (FlushAcknowledgement) obj; return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd) - && Objects.equals(shouldRefresh, other.shouldRefresh); + && Objects.equals(refreshRequired, other.refreshRequired); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java index da89fc1862212..46798c1cc47b8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java @@ -20,7 +20,7 @@ protected Request createTestInstance() { request.setWaitForNormalization(randomBoolean()); } if (randomBoolean()) { - request.setShouldRefresh(randomBoolean()); + request.setRefreshRequired(randomBoolean()); } if (randomBoolean()) { request.setCalcInterim(randomBoolean()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java index 6321cb16db734..5814f5a0e7922 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java @@ -51,7 +51,7 @@ protected void taskOperation( FlushJobParams.Builder paramsBuilder = FlushJobParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); paramsBuilder.waitForNormalization(request.isWaitForNormalization()); - paramsBuilder.shouldRefresh(request.isShouldRefresh()); + paramsBuilder.refreshRequired(request.isRefreshRequired()); if (request.getAdvanceTime() != null) { paramsBuilder.advanceTime(request.getAdvanceTime()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index d99c403f7c78e..d4757d5be9ce5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -175,7 +175,7 @@ Long runLookBack(long startTime, Long endTime) throws Exception { FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setCalcInterim(true); - request.setShouldRefresh(false); + request.setRefreshRequired(false); run(lookbackStartTimeMs, lookbackEnd, request); if (shouldPersistAfterLookback(isLookbackOnly)) { sendPersistRequest(); @@ -206,7 +206,7 @@ private long skipToStartTime(long startTime) { // start time is after last checkpoint, thus we need to skip time FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setSkipTime(String.valueOf(startTime)); - request.setShouldRefresh(false); + request.setRefreshRequired(false); FlushJobAction.Response flushResponse = flushJob(request); LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli()); return flushResponse.getLastFinalizedBucketEnd().toEpochMilli(); @@ -220,7 +220,7 @@ long runRealtime() throws Exception { long end = toIntervalStartEpochMs(nowMinusQueryDelay); FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setWaitForNormalization(false); - request.setShouldRefresh(false); + request.setRefreshRequired(false); request.setCalcInterim(true); request.setAdvanceTime(String.valueOf(end)); run(start, end, request); @@ -503,7 +503,7 @@ private long toIntervalStartEpochMs(long epochMs) { private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) { try { - LOGGER.info("[" + jobId + "] Sending flush request"); + LOGGER.trace("[" + jobId + "] Sending flush request"); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { return client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 390d56f6018f7..459e6f6dee4bd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -394,7 +394,7 @@ void processResult(AutodetectResult result) { try { bulkResultsPersister.executeRequest(); bulkAnnotationsPersister.executeRequest(); - if (flushAcknowledgement.getShouldRefresh()) { + if (flushAcknowledgement.getRefreshRequired()) { persister.commitWrites( jobId, EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index b3d6324fdd53f..bbf549710db0a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -44,7 +44,7 @@ public class FlushJobParams { /** * Should the flush request trigger a refresh or not. */ - private final boolean shouldRefresh; + private final boolean refreshRequired; private FlushJobParams( boolean calcInterim, @@ -52,14 +52,14 @@ private FlushJobParams( Long advanceTimeSeconds, Long skipTimeSeconds, boolean waitForNormalization, - boolean shouldRefresh + boolean refreshRequired ) { this.calcInterim = calcInterim; this.timeRange = Objects.requireNonNull(timeRange); this.advanceTimeSeconds = advanceTimeSeconds; this.skipTimeSeconds = skipTimeSeconds; this.waitForNormalization = waitForNormalization; - this.shouldRefresh = shouldRefresh; + this.refreshRequired = refreshRequired; } public boolean shouldCalculateInterim() { @@ -100,8 +100,8 @@ public boolean isWaitForNormalization() { return waitForNormalization; } - public Boolean shouldRefresh() { - return shouldRefresh; + public Boolean refreshRequired() { + return refreshRequired; } public static Builder builder() { @@ -130,7 +130,7 @@ public static class Builder { private String advanceTime; private String skipTime; private boolean waitForNormalization = true; - private boolean shouldRefresh = true; + private boolean refreshRequired = true; public Builder calcInterim(boolean value) { calcInterim = value; @@ -157,8 +157,8 @@ public Builder waitForNormalization(boolean waitForNormalization) { return this; } - public Builder shouldRefresh(boolean shouldRefresh) { - this.shouldRefresh = shouldRefresh; + public Builder refreshRequired(boolean refreshRequired) { + this.refreshRequired = refreshRequired; return this; } @@ -171,7 +171,7 @@ public FlushJobParams build() { "advance_time [" + advanceTime + "] must be later than skip_time [" + skipTime + "]" ); } - return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, shouldRefresh); + return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, refreshRequired); } private void checkValidFlushArgumentsCombination() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java index a46a9a9f09ecf..8389d3ab29fc9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java @@ -81,7 +81,7 @@ public class AutodetectControlMsgWriter extends AbstractControlMsgWriter { /** * This must match the code defined in the api::CAnomalyJob C++ class. */ - public static final String SHOULD_REFRESH_MESSAGE_CODE = "z"; + public static final String REFRESH_REQUIRED_MESSAGE_CODE = "z"; /** * An number to uniquely identify each flush so that subsequent code can @@ -148,7 +148,7 @@ public void writeFlushControlMessage(FlushJobParams params) throws IOException { if (params.shouldCalculateInterim()) { writeControlCodeFollowedByTimeRange(INTERIM_MESSAGE_CODE, params.getStart(), params.getEnd()); } - writeMessage(SHOULD_REFRESH_MESSAGE_CODE + params.shouldRefresh().toString()); + writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.refreshRequired().toString()); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 8ad8df7b02b66..d4141c61a48d4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -176,6 +176,7 @@ public void testLookBackRunWithEndTime() throws Exception { verify(dataExtractorFactory).newExtractor(0L, 1000L); FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); + flushRequest.setRefreshRequired(false); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } @@ -202,6 +203,7 @@ public void testLookBackRunWithNoEndTime() throws Exception { verify(dataExtractorFactory).newExtractor(0L, 1500L); FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); + flushRequest.setRefreshRequired(false); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId))); } @@ -226,6 +228,7 @@ public void testLookBackRunWithStartTimeEarlierThanResumePoint() throws Exceptio assertThat(flushJobRequests.getAllValues().size(), equalTo(1)); FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); + flushRequest.setRefreshRequired(false); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId))); } @@ -285,7 +288,7 @@ public void testRealtimeRun() throws Exception { flushRequest.setCalcInterim(true); flushRequest.setAdvanceTime("59000"); flushRequest.setWaitForNormalization(false); - flushRequest.setShouldRefresh(false); + flushRequest.setRefreshRequired(false); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 7a881caa33397..49b362c97156a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -252,7 +252,7 @@ public void testProcessResult_flushAcknowledgementWithRefresh() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); - when(flushAcknowledgement.getShouldRefresh()).thenReturn(true); + when(flushAcknowledgement.getRefreshRequired()).thenReturn(true); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); processorUnderTest.setDeleteInterimRequired(false); @@ -291,7 +291,7 @@ public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(Integer.valueOf(randomInt(100)).toString()); - when(flushAcknowledgement.getShouldRefresh()).thenReturn(true); + when(flushAcknowledgement.getRefreshRequired()).thenReturn(true); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); when(categoryDefinition.getCategoryId()).thenReturn(1L); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java index bbecfc6b176b2..a1f82b7993e8f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java @@ -113,7 +113,7 @@ public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException { writer.writeFlushControlMessage(flushJobParams); - // Even a plain flush message contains the "shouldRefresh" flag, which + // Even a plain flush message contains the "refreshRequired" flag, which // is set to "true" by default InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); @@ -124,11 +124,11 @@ public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException { public void testWriteFlushControlMessage_GivenShouldRefreshFalse() throws IOException { AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4); - FlushJobParams flushJobParams = FlushJobParams.builder().shouldRefresh(false).build(); + FlushJobParams flushJobParams = FlushJobParams.builder().refreshRequired(false).build(); writer.writeFlushControlMessage(flushJobParams); - // Even a plain flush message contains the "shouldRefresh" flag, which + // Even a plain flush message contains the "refreshRequired" flag, which // is set to "true" by default InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); From c41061bb4a8931299215d9e874b839a2aabc0d82 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Wed, 14 Jun 2023 10:55:20 +0100 Subject: [PATCH 3/8] AutodetectResultProcessorIT.testMultipleFlushesBetweenPersisting test fix Always refresh indices in FlushAcknowlegement instance created within integration tests. This is to ensure consistent behaviour across test runs. --- .../xpack/ml/integration/AutodetectResultProcessorIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 365ce4bbee389..5ef4754ec8544 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -575,7 +575,7 @@ private static Quantiles createQuantiles() { } private static FlushAcknowledgement createFlushAcknowledgement() { - return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant(), randomBoolean()); + return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant(), true); } private static class ResultsBuilder { From c276fc471e716571be2efca07e829517f9d08888 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 15 Jun 2023 10:56:44 +0100 Subject: [PATCH 4/8] Attend to review comments * use boolean in preference to Boolean * BWC guards --- .../org/elasticsearch/TransportVersion.java | 3 ++- .../xpack/core/ml/action/FlushJobAction.java | 9 +++++-- .../output/FlushAcknowledgement.java | 25 ++++++++++++------- .../autodetect/params/FlushJobParams.java | 2 +- .../writer/AutodetectControlMsgWriter.java | 2 +- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersion.java b/server/src/main/java/org/elasticsearch/TransportVersion.java index 4ed6df1a9e30e..054bd1b05c13d 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersion.java +++ b/server/src/main/java/org/elasticsearch/TransportVersion.java @@ -135,8 +135,9 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1"); public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675"); public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E"); + public static final TransportVersion V_8_500_012 = registerTransportVersion(8_500_012, "BB6F4AF1-A860-4FD4-A138-8150FFBE0ABD"); - private static final TransportVersion CURRENT = findCurrent(V_8_500_011); + private static final TransportVersion CURRENT = findCurrent(V_8_500_012); /** * Reference to the earliest compatible transport version to this version of the codebase. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index 73d130a27535b..0b9811c982237 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.common.io.stream.StreamInput; @@ -80,7 +81,9 @@ public Request(StreamInput in) throws IOException { advanceTime = in.readOptionalString(); skipTime = in.readOptionalString(); waitForNormalization = in.readBoolean(); - refreshRequired = in.readBoolean(); + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) { + refreshRequired = in.readBoolean(); + } } @Override @@ -92,7 +95,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(advanceTime); out.writeOptionalString(skipTime); out.writeBoolean(waitForNormalization); - out.writeBoolean(refreshRequired); + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) { + out.writeBoolean(refreshRequired); + } } public Request(String jobId) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java index 43a32259901a8..42d891696bb6e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.ml.job.process.autodetect.output; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -43,36 +44,42 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { private final String id; private final Instant lastFinalizedBucketEnd; - private final Boolean refreshRequired; + private final boolean refreshRequired; - public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean refreshRequired) { + public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, boolean refreshRequired) { this.id = id; // The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0) ? Instant.ofEpochMilli(lastFinalizedBucketEndMs) : null; - this.refreshRequired = refreshRequired == null || refreshRequired; + this.refreshRequired = refreshRequired; } - public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean refreshRequired) { + public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, boolean refreshRequired) { this.id = id; // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; - this.refreshRequired = refreshRequired == null || refreshRequired; + this.refreshRequired = refreshRequired; } public FlushAcknowledgement(StreamInput in) throws IOException { id = in.readString(); lastFinalizedBucketEnd = in.readOptionalInstant(); - refreshRequired = in.readOptionalBoolean(); + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) { + refreshRequired = in.readBoolean(); + } else { + refreshRequired = true; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeOptionalInstant(lastFinalizedBucketEnd); - out.writeOptionalBoolean(refreshRequired); + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) { + out.writeBoolean(refreshRequired); + } } public String getId() { @@ -83,7 +90,7 @@ public Instant getLastFinalizedBucketEnd() { return lastFinalizedBucketEnd; } - public Boolean getRefreshRequired() { + public boolean getRefreshRequired() { return refreshRequired; } @@ -119,6 +126,6 @@ public boolean equals(Object obj) { FlushAcknowledgement other = (FlushAcknowledgement) obj; return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd) - && Objects.equals(refreshRequired, other.refreshRequired); + && refreshRequired == refreshRequired; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index bbf549710db0a..42de2978e63d5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -100,7 +100,7 @@ public boolean isWaitForNormalization() { return waitForNormalization; } - public Boolean refreshRequired() { + public boolean refreshRequired() { return refreshRequired; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java index 8389d3ab29fc9..338b69273468c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java @@ -148,7 +148,7 @@ public void writeFlushControlMessage(FlushJobParams params) throws IOException { if (params.shouldCalculateInterim()) { writeControlCodeFollowedByTimeRange(INTERIM_MESSAGE_CODE, params.getStart(), params.getEnd()); } - writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.refreshRequired().toString()); + writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.refreshRequired()); } /** From 304f30e1922c261f5c15ba603c2d5e5e7652bcbd Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 15 Jun 2023 11:23:46 +0100 Subject: [PATCH 5/8] Attend to further review comments * Do not parse "refreshRequired" flag in FlushJobAction --- .../org/elasticsearch/xpack/core/ml/action/FlushJobAction.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index 0b9811c982237..ff85727ef0c62 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -41,7 +41,6 @@ public static class Request extends JobTaskRequest implements ToXConten public static final ParseField END = new ParseField("end"); public static final ParseField ADVANCE_TIME = new ParseField("advance_time"); public static final ParseField SKIP_TIME = new ParseField("skip_time"); - public static final ParseField REFRESH_REQUIRED = new ParseField("refresh_required"); private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); @@ -52,7 +51,6 @@ public static class Request extends JobTaskRequest implements ToXConten PARSER.declareString(Request::setEnd, END); PARSER.declareString(Request::setAdvanceTime, ADVANCE_TIME); PARSER.declareString(Request::setSkipTime, SKIP_TIME); - PARSER.declareBoolean(Request::setRefreshRequired, REFRESH_REQUIRED); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -212,7 +210,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (skipTime != null) { builder.field(SKIP_TIME.getPreferredName(), skipTime); } - builder.field(REFRESH_REQUIRED.getPreferredName(), refreshRequired); builder.endObject(); return builder; } From 583a440d605bee457ebd1e43a0e072489478e125 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 15 Jun 2023 14:14:46 +0100 Subject: [PATCH 6/8] Fix failing BWC tests * For backwards compatibility it must be possible to construct FlushAcknowledgement without refreshRequired, i.e. refreshRequired must be nullable. * Added missing comparisons in FlushParams equals and hashCode methods. --- .../autodetect/output/FlushAcknowledgement.java | 10 +++++----- .../job/process/autodetect/params/FlushJobParams.java | 8 +++++--- .../autodetect/writer/AutodetectControlMsgWriter.java | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java index 42d891696bb6e..06def9204686e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java @@ -46,21 +46,21 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { private final Instant lastFinalizedBucketEnd; private final boolean refreshRequired; - public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, boolean refreshRequired) { + public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean refreshRequired) { this.id = id; // The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0) ? Instant.ofEpochMilli(lastFinalizedBucketEndMs) : null; - this.refreshRequired = refreshRequired; + this.refreshRequired = refreshRequired == null || refreshRequired; } - public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, boolean refreshRequired) { + public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean refreshRequired) { this.id = id; // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; - this.refreshRequired = refreshRequired; + this.refreshRequired = refreshRequired == null || refreshRequired; } public FlushAcknowledgement(StreamInput in) throws IOException { @@ -126,6 +126,6 @@ public boolean equals(Object obj) { FlushAcknowledgement other = (FlushAcknowledgement) obj; return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd) - && refreshRequired == refreshRequired; + && refreshRequired == other.refreshRequired; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index 42de2978e63d5..df4ad9f1cac41 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -100,7 +100,7 @@ public boolean isWaitForNormalization() { return waitForNormalization; } - public boolean refreshRequired() { + public boolean isRefreshRequired() { return refreshRequired; } @@ -116,12 +116,14 @@ public boolean equals(Object o) { return calcInterim == that.calcInterim && Objects.equals(timeRange, that.timeRange) && Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds) - && Objects.equals(skipTimeSeconds, that.skipTimeSeconds); + && Objects.equals(skipTimeSeconds, that.skipTimeSeconds) + && waitForNormalization == that.waitForNormalization + && refreshRequired == that.refreshRequired; } @Override public int hashCode() { - return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds); + return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, refreshRequired); } public static class Builder { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java index 338b69273468c..8e8d828c8a8ea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java @@ -148,7 +148,7 @@ public void writeFlushControlMessage(FlushJobParams params) throws IOException { if (params.shouldCalculateInterim()) { writeControlCodeFollowedByTimeRange(INTERIM_MESSAGE_CODE, params.getStart(), params.getEnd()); } - writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.refreshRequired()); + writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.isRefreshRequired()); } /** From 212ca28bccb4ab3a71b1416f89562b74f5966e91 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 15 Jun 2023 15:38:47 +0100 Subject: [PATCH 7/8] Fix failing BWC test For transport versions prior to V_8_500_012 refreshRequired in FlushJobAction.Request is always set to true. Update the tests to reflect that. --- .../xpack/core/ml/action/FlushJobActionRequestTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java index 46798c1cc47b8..2a64f7c5151c2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Request; +import org.elasticsearch.xpack.core.ml.inference.trainedmodel.BertJapaneseTokenization; public class FlushJobActionRequestTests extends AbstractBWCWireSerializationTestCase { @@ -52,6 +53,9 @@ protected Writeable.Reader instanceReader() { @Override protected Request mutateInstanceForVersion(Request instance, TransportVersion version) { + if (version.before(TransportVersion.V_8_500_012)) { + instance.setRefreshRequired(true); + } return instance; } } From d3c6cc38d090afb1c08ba19e3fc17d82f4b6006a Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 15 Jun 2023 15:45:07 +0100 Subject: [PATCH 8/8] Fix format violation --- .../xpack/core/ml/action/FlushJobActionRequestTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java index 2a64f7c5151c2..be82f11b00260 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Request; -import org.elasticsearch.xpack.core.ml.inference.trainedmodel.BertJapaneseTokenization; public class FlushJobActionRequestTests extends AbstractBWCWireSerializationTestCase {