From 810e86be1a3be6136560127e3e3471defa693a19 Mon Sep 17 00:00:00 2001 From: Simon Cooper Date: Fri, 3 May 2024 16:58:04 +0100 Subject: [PATCH] Backport serialization fix of put/delete shutdown requests to 8.14 (#108251) Backport of #107862 to 8.14 --- .../org/elasticsearch/TransportVersions.java | 2 + .../shutdown/DeleteShutdownNodeAction.java | 13 ++ .../xpack/shutdown/PutShutdownNodeAction.java | 13 ++ .../shutdown/DeleteShutdownRequestTests.java | 87 ++++++++ .../shutdown/PutShutdownRequestTests.java | 196 ++++++++++++++++++ 5 files changed, 311 insertions(+) create mode 100644 x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/DeleteShutdownRequestTests.java create mode 100644 x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 175fbf9efbfe3..3c8a403775802 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -133,6 +133,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_REQUEST_NORMALIZED_BYTES_PARSED = def(8_593_00_0); public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0); public static final TransportVersion ML_MODEL_IN_SERVICE_SETTINGS = def(8_595_00_0); + public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13 = def(8_595_00_1); // 8.14.0+ public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0); public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0); @@ -175,6 +176,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_AZURE_OPENAI_EMBEDDINGS = def(8_634_00_0); public static final TransportVersion ILM_SHRINK_ENABLE_WRITE = def(8_635_00_0); public static final TransportVersion GEOIP_CACHE_STATS = def(8_636_00_0); + public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14 = def(8_636_00_1); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/DeleteShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/DeleteShutdownNodeAction.java index 75c36f063f805..d7674ebb251ec 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/DeleteShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/DeleteShutdownNodeAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.shutdown; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -14,6 +15,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -35,11 +37,22 @@ public Request(String nodeId) { } public Request(StreamInput in) throws IOException { + if (in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13) + || in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) { + // effectively super(in): + setParentTask(TaskId.readFromStream(in)); + masterNodeTimeout(in.readTimeValue()); + timeout(in.readTimeValue()); + } this.nodeId = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13) + || out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) { + super.writeTo(out); + } out.writeString(this.nodeId); } diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java index d05b60cd947f5..ddf35d3110427 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.shutdown; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -17,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentParser; @@ -96,6 +98,13 @@ public Request( } public Request(StreamInput in) throws IOException { + if (in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13) + || in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) { + // effectively super(in): + setParentTask(TaskId.readFromStream(in)); + masterNodeTimeout(in.readTimeValue()); + timeout(in.readTimeValue()); + } this.nodeId = in.readString(); this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class); this.reason = in.readString(); @@ -114,6 +123,10 @@ public Request(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13) + || out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) { + super.writeTo(out); + } out.writeString(nodeId); if (out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION) && this.type == SingleNodeShutdownMetadata.Type.REPLACE) { diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/DeleteShutdownRequestTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/DeleteShutdownRequestTests.java new file mode 100644 index 0000000000000..ed12970008bc3 --- /dev/null +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/DeleteShutdownRequestTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class DeleteShutdownRequestTests extends AbstractWireSerializingTestCase { + + /** + * Wraps a {@link DeleteShutdownNodeAction.Request} to add proper equality checks + */ + record RequestWrapper(String nodeId, TaskId parentTask, TimeValue masterNodeTimeout, TimeValue ackTimeout) implements Writeable { + @Override + public void writeTo(StreamOutput out) throws IOException { + final var request = new DeleteShutdownNodeAction.Request(nodeId); + request.setParentTask(parentTask); + request.timeout(ackTimeout); + request.masterNodeTimeout(masterNodeTimeout); + request.writeTo(out); + } + } + + @Override + protected Writeable.Reader instanceReader() { + return in -> { + final var request = new DeleteShutdownNodeAction.Request(in); + return new RequestWrapper(request.getNodeId(), request.getParentTask(), request.masterNodeTimeout(), request.ackTimeout()); + }; + } + + @Override + protected RequestWrapper createTestInstance() { + return new RequestWrapper( + randomIdentifier(), + randomTaskId(), + TimeValue.parseTimeValue(randomTimeValue(), getTestName()), + TimeValue.parseTimeValue(randomTimeValue(), getTestName()) + ); + } + + private static TaskId randomTaskId() { + return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong()); + } + + @Override + protected RequestWrapper mutateInstance(RequestWrapper instance) { + return switch (between(1, 4)) { + case 1 -> new RequestWrapper( + randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier), + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 2 -> new RequestWrapper( + instance.nodeId, + randomValueOtherThan(instance.parentTask, DeleteShutdownRequestTests::randomTaskId), + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 3 -> new RequestWrapper( + instance.nodeId, + instance.parentTask, + randomValueOtherThan(instance.ackTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName())), + instance.masterNodeTimeout + ); + case 4 -> new RequestWrapper( + instance.nodeId, + instance.parentTask, + instance.ackTimeout, + randomValueOtherThan(instance.masterNodeTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName())) + ); + default -> throw new AssertionError("impossible"); + }; + } +} diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java new file mode 100644 index 0000000000000..a2039aad39ebf --- /dev/null +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java @@ -0,0 +1,196 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class PutShutdownRequestTests extends AbstractWireSerializingTestCase { + + /** + * Wraps a {@link org.elasticsearch.xpack.shutdown.PutShutdownNodeAction.Request} to add proper equality checks + */ + record RequestWrapper( + String nodeId, + SingleNodeShutdownMetadata.Type type, + String reason, + TimeValue allocationDelay, + String targetNodeName, + TimeValue gracePeriod, + TaskId parentTask, + TimeValue masterNodeTimeout, + TimeValue ackTimeout + ) implements Writeable { + @Override + public void writeTo(StreamOutput out) throws IOException { + final var request = new PutShutdownNodeAction.Request(nodeId, type, reason, allocationDelay, targetNodeName, gracePeriod); + request.setParentTask(parentTask); + request.timeout(ackTimeout); + request.masterNodeTimeout(masterNodeTimeout); + request.writeTo(out); + } + } + + @Override + protected Writeable.Reader instanceReader() { + return in -> { + final var request = new PutShutdownNodeAction.Request(in); + return new RequestWrapper( + request.getNodeId(), + request.getType(), + request.getReason(), + request.getAllocationDelay(), + request.getTargetNodeName(), + request.getGracePeriod(), + request.getParentTask(), + request.masterNodeTimeout(), + request.ackTimeout() + ); + }; + } + + @Override + protected RequestWrapper createTestInstance() { + return new RequestWrapper( + randomIdentifier(), + randomFrom(SingleNodeShutdownMetadata.Type.values()), + randomIdentifier(), + randomOptionalTimeValue(), + randomOptionalIdentifier(), + randomOptionalTimeValue(), + randomTaskId(), + TimeValue.parseTimeValue(randomTimeValue(), getTestName()), + TimeValue.parseTimeValue(randomTimeValue(), getTestName()) + ); + } + + private static String randomOptionalIdentifier() { + return randomBoolean() ? null : randomIdentifier(); + } + + private TimeValue randomOptionalTimeValue() { + return randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), getTestName()); + } + + private static TaskId randomTaskId() { + return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong()); + } + + @Override + protected RequestWrapper mutateInstance(RequestWrapper instance) { + return switch (between(1, 9)) { + case 1 -> new RequestWrapper( + randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier), + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 2 -> new RequestWrapper( + instance.nodeId, + randomValueOtherThan(instance.type, () -> randomFrom(SingleNodeShutdownMetadata.Type.values())), + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 3 -> new RequestWrapper( + instance.nodeId, + instance.type, + randomValueOtherThan(instance.reason, ESTestCase::randomIdentifier), + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 4 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + randomValueOtherThan(instance.allocationDelay, this::randomOptionalTimeValue), + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 5 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + randomValueOtherThan(instance.targetNodeName, PutShutdownRequestTests::randomOptionalIdentifier), + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 6 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + randomValueOtherThan(instance.gracePeriod, this::randomOptionalTimeValue), + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 7 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + randomValueOtherThan(instance.parentTask, PutShutdownRequestTests::randomTaskId), + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 8 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + randomValueOtherThan(instance.ackTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName())), + instance.masterNodeTimeout + ); + case 9 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + randomValueOtherThan(instance.masterNodeTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName())) + ); + default -> throw new AssertionError("impossible"); + }; + } +}