Skip to content

Commit

Permalink
Remove uses of calculateMinimumCompatVersion from handshakes (#93376)
Browse files Browse the repository at this point in the history
This replaces the previous uses of Version.minimumCompatibleVersion with hard-coded constants.

At some point later, handshakes will be refactored to not use the same transport versioning as the rest of the transport code, and will use its own version numbers & send/ack mechanisms.
  • Loading branch information
thecoop committed Feb 13, 2023
1 parent b4712a5 commit fd7736f
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,19 @@ static Header readHeader(TransportVersion version, int networkMessageSize, Bytes
streamInput.skip(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
long requestId = streamInput.readLong();
byte status = streamInput.readByte();
TransportVersion remoteVersion = TransportVersion.fromId(streamInput.readInt());
Header header = new Header(networkMessageSize, requestId, status, remoteVersion);
final IllegalStateException invalidVersion = ensureVersionCompatibility(remoteVersion, version, header.isHandshake());
if (invalidVersion != null) {
throw invalidVersion;
int remoteVersion = streamInput.readInt();

Header header = new Header(networkMessageSize, requestId, status, TransportVersion.fromId(remoteVersion));
if (header.isHandshake()) {
checkHandshakeVersionCompatibility(header.getVersion());
} else {
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
// Skip since we already have ensured enough data available
streamInput.readInt();
header.finishParsingHeader(streamInput);
}
checkVersionCompatibility(header.getVersion(), version);
}

if (header.getVersion().onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
// Skip since we already have ensured enough data available
streamInput.readInt();
header.finishParsingHeader(streamInput);
}
return header;
}
Expand All @@ -203,23 +205,22 @@ private void ensureOpen() {
}
}

static IllegalStateException ensureVersionCompatibility(
TransportVersion remoteVersion,
TransportVersion currentVersion,
boolean isHandshake
) {
// for handshakes we are compatible with N-2 since otherwise we can't figure out our initial version
// since we are compatible with N-1 and N+1 so we always send our minCompatVersion as the initial version in the
// handshake. This looks odd but it's required to establish the connection correctly we check for real compatibility
// once the connection is established
final TransportVersion compatibilityVersion = isHandshake ? currentVersion.calculateMinimumCompatVersion() : currentVersion;
if (remoteVersion.isCompatible(compatibilityVersion) == false) {
final TransportVersion minCompatibilityVersion = isHandshake
? compatibilityVersion
: compatibilityVersion.calculateMinimumCompatVersion();
String msg = "Received " + (isHandshake ? "handshake " : "") + "message from unsupported version: [";
return new IllegalStateException(msg + remoteVersion + "] minimal compatible version is: [" + minCompatibilityVersion + "]");
static void checkHandshakeVersionCompatibility(TransportVersion handshakeVersion) {
if (TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS.contains(handshakeVersion) == false) {
throw new IllegalStateException(
"Received message from unsupported version: ["
+ handshakeVersion
+ "] allowed versions are: "
+ TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS
);
}
}

static void checkVersionCompatibility(TransportVersion remoteVersion, TransportVersion currentVersion) {
if (remoteVersion.isCompatible(currentVersion) == false) {
throw new IllegalStateException(
"Received message from unsupported version: [" + remoteVersion + "] minimal compatible version is: [" + currentVersion + "]"
);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

import static org.elasticsearch.core.Strings.format;

/**
* Handles inbound messages by first deserializing a {@link TransportMessage} from an {@link InboundMessage} and then passing
* it to the appropriate handler.
Expand Down Expand Up @@ -217,19 +215,8 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
try {
handshaker.handleHandshake(transportChannel, requestId, stream);
} catch (Exception e) {
if (TransportVersion.CURRENT.isCompatible(header.getVersion())) {
sendErrorResponse(action, transportChannel, e);
} else {
logger.warn(
() -> format(
"could not send error response to handshake received on [%s] using wire format version [%s], closing channel",
channel,
header.getVersion()
),
e
);
channel.close();
}
logger.warn(() -> "error processing handshake version [" + version + "] received on [" + channel + "], closing channel", e);
channel.close();
}
} else {
final TransportChannel transportChannel = new TcpTransportChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -112,6 +113,10 @@ final class TransportHandshaker {
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
*/

static final TransportVersion EARLIEST_HANDSHAKE_VERSION = TransportVersion.fromId(6080099);
static final TransportVersion REQUEST_HANDSHAKE_VERSION = TransportVersion.MINIMUM_COMPATIBLE;
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(EARLIEST_HANDSHAKE_VERSION, REQUEST_HANDSHAKE_VERSION);

static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final CounterMetric numHandshakes = new CounterMetric();
Expand Down Expand Up @@ -148,11 +153,7 @@ void sendHandshake(
);
boolean success = false;
try {
// for the request we use the minCompatVersion since we don't know what's the version of the node we talk to
// we also have no payload on the request but the response will contain the actual version of the node we talk
// to as the payload.
TransportVersion minCompatVersion = version.calculateMinimumCompatVersion();
handshakeRequestSender.sendRequest(node, channel, requestId, minCompatVersion);
handshakeRequestSender.sendRequest(node, channel, requestId, REQUEST_HANDSHAKE_VERSION);

threadPool.schedule(
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -125,7 +124,7 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException {
Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null);
String action = "test-request";
long requestId = randomNonNegativeLong();
final TransportVersion preHeaderVariableInt = TransportVersion.V_7_5_0;
final TransportVersion preHeaderVariableInt = TransportHandshaker.EARLIEST_HANDSHAKE_VERSION;
final String contentValue = randomAlphaOfLength(100);
// 8.0 is only compatible with handshakes on a pre-variable int version
final OutboundMessage message = new OutboundMessage.Request(
Expand Down Expand Up @@ -184,7 +183,7 @@ public void testDecodeHandshakeCompatibility() throws IOException {
final String headerKey = randomAlphaOfLength(10);
final String headerValue = randomAlphaOfLength(20);
threadContext.putHeader(headerKey, headerValue);
TransportVersion handshakeCompat = TransportVersion.CURRENT.calculateMinimumCompatVersion().calculateMinimumCompatVersion();
TransportVersion handshakeCompat = TransportHandshaker.EARLIEST_HANDSHAKE_VERSION;
OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(randomAlphaOfLength(100)),
Expand Down Expand Up @@ -307,7 +306,7 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException {
final String headerKey = randomAlphaOfLength(10);
final String headerValue = randomAlphaOfLength(20);
threadContext.putHeader(headerKey, headerValue);
TransportVersion handshakeCompat = TransportVersion.CURRENT.calculateMinimumCompatVersion().calculateMinimumCompatVersion();
TransportVersion handshakeCompat = TransportHandshaker.EARLIEST_HANDSHAKE_VERSION;
OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(randomAlphaOfLength(100)),
Expand Down Expand Up @@ -344,7 +343,7 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException {
public void testVersionIncompatibilityDecodeException() throws IOException {
String action = "test-request";
long requestId = randomNonNegativeLong();
TransportVersion incompatibleVersion = TransportVersion.CURRENT.calculateMinimumCompatVersion().calculateMinimumCompatVersion();
TransportVersion incompatibleVersion = TransportVersionUtils.getPreviousVersion(TransportVersion.MINIMUM_COMPATIBLE);
OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(randomAlphaOfLength(100)),
Expand All @@ -370,40 +369,52 @@ public void testVersionIncompatibilityDecodeException() throws IOException {
assertFalse(releasable1.hasReferences());
}

public void testEnsureVersionCompatibility() throws IOException {
IllegalStateException ise = InboundDecoder.ensureVersionCompatibility(
TransportVersionUtils.randomVersionBetween(
random(),
TransportVersion.CURRENT.minimumCompatibilityVersion(),
public void testCheckVersionCompatibility() {
try {
InboundDecoder.checkVersionCompatibility(
TransportVersionUtils.randomVersionBetween(
random(),
TransportVersion.CURRENT.minimumCompatibilityVersion(),
TransportVersion.CURRENT
),
TransportVersion.CURRENT
),
TransportVersion.CURRENT,
randomBoolean()
);
assertNull(ise);

TransportVersion version = Version.fromString("7.0.0").transportVersion;
ise = InboundDecoder.ensureVersionCompatibility(Version.fromString("6.0.0").transportVersion, version, true);
assertNull(ise);

ise = InboundDecoder.ensureVersionCompatibility(Version.fromString("6.0.0").transportVersion, version, false);
assertEquals(
"Received message from unsupported version: [6000099] minimal compatible version is: ["
+ version.calculateMinimumCompatVersion()
+ "]",
ise.getMessage()
);
);
} catch (IllegalStateException e) {
throw new AssertionError(e);
}

TransportVersion version = TransportVersion.CURRENT;
TransportVersion invalid = TransportVersionUtils.getPreviousVersion(version.minimumCompatibilityVersion());
try {
InboundDecoder.checkVersionCompatibility(invalid, version);
fail();
} catch (IllegalStateException expected) {
assertEquals(
"Received message from unsupported version: [" + invalid + "] minimal compatible version is: [" + version + "]",
expected.getMessage()
);
}
}

// For handshake we are compatible with N-2
ise = InboundDecoder.ensureVersionCompatibility(Version.fromString("5.6.0").transportVersion, version, true);
assertNull(ise);
public void testCheckHandshakeCompatibility() {
try {
InboundDecoder.checkHandshakeVersionCompatibility(randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS));
} catch (IllegalStateException e) {
throw new AssertionError(e);
}

ise = InboundDecoder.ensureVersionCompatibility(Version.fromString("5.6.0").transportVersion, version, false);
assertEquals(
"Received message from unsupported version: [5060099] minimal compatible version is: ["
+ version.calculateMinimumCompatVersion()
+ "]",
ise.getMessage()
);
var invalid = TransportVersion.fromId(TransportHandshaker.EARLIEST_HANDSHAKE_VERSION.id - 1);
try {
InboundDecoder.checkHandshakeVersionCompatibility(invalid);
fail();
} catch (IllegalStateException expected) {
assertEquals(
"Received message from unsupported version: ["
+ invalid
+ "] allowed versions are: "
+ TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS,
expected.getMessage()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
Expand Down Expand Up @@ -209,32 +208,7 @@ public TestResponse read(StreamInput in) throws IOException {
}
}

public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exception {
// Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
// v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
// successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
// response. However if the two nodes are from the same major version then we do guarantee compatibility of error responses.

final TransportVersion remoteVersion = TransportVersionUtils.randomCompatibleVersion(random(), version);
final long requestId = randomNonNegativeLong();
final Header requestHeader = new Header(
between(0, 100),
requestId,
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)),
remoteVersion
);
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
handler.inboundMessage(channel, requestMessage);

final BytesReference responseBytesReference = channel.getMessageCaptor().get();
final Header responseHeader = InboundDecoder.readHeader(remoteVersion, responseBytesReference.length(), responseBytesReference);
assertTrue(responseHeader.isResponse());
assertTrue(responseHeader.isError());
}

public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws Exception {
public void testClosesChannelOnErrorInHandshake() throws Exception {
// Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
// v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
// successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
Expand All @@ -248,7 +222,7 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws
"expected message",
InboundHandler.class.getCanonicalName(),
Level.WARN,
"could not send error response to handshake"
"error processing handshake received"
)
);
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
Expand Down Expand Up @@ -320,7 +294,7 @@ public void testLogsSlowInboundProcessing() throws Exception {
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
handler.inboundMessage(channel, requestMessage);
assertNotNull(channel.getMessageCaptor().get());
// expect no response - channel just closed on exception
mockAppender.assertAllExpectationsMatched();

mockAppender.addExpectation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -217,8 +218,7 @@ public void testDecodeExceptionIsPropagated() throws IOException {

try (RecyclerBytesStreamOutput streamOutput = new RecyclerBytesStreamOutput(recycler)) {
String actionName = "actionName";
final TransportVersion invalidVersion = TransportVersion.CURRENT.calculateMinimumCompatVersion()
.calculateMinimumCompatVersion();
final TransportVersion invalidVersion = TransportVersionUtils.getPreviousVersion(TransportVersion.MINIMUM_COMPATIBLE);
final String value = randomAlphaOfLength(1000);
final boolean isRequest = randomBoolean();
final long requestId = randomNonNegativeLong();
Expand Down

0 comments on commit fd7736f

Please sign in to comment.