Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,49 +44,17 @@ final class TransportHandshaker {
* ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later
* messages.
*
* This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure
* as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response
* to any v6080099 or v8800000 requests that it receives.
* This version supports two handshake protocols, v7170099 and v8800000, which respectively have the same message structure as the
* transport protocols of v7.17.0, and v8.18.0. This node only sends v8800000 requests, but it can send a valid response to any v7170099
* requests that it receives.
*
* Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did rely
* on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
* true.
*
* Here are some example messages, broken down to show their structure. See TransportHandshakerRawMessageTests for supporting tests.
*
* ## v6080099 Request:
*
* 45 53 -- 'ES' marker
* 00 00 00 34 -- total message length
* 00 00 00 00 00 00 00 01 -- request ID
* 08 -- status flags (0b1000 == handshake request)
* 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099)
* 00 -- no request headers [1]
* 00 -- no response headers [1]
* 01 -- one feature [2]
* 06 -- feature name length
* 78 2d 70 61 63 6b -- feature name 'x-pack'
* 16 -- action string size
* 69 6e 74 65 72 6e 61 6c }
* 3a 74 63 70 2f 68 61 6e }- ASCII representation of HANDSHAKE_ACTION_NAME
* 64 73 68 61 6b 65 }
* 00 -- no parent task ID [3]
* 04 -- payload length
* 8b d5 b5 03 -- max acceptable protocol version (vInt: 00000011 10110101 11010101 10001011 == 7170699)
*
* ## v6080099 Response:
*
* 45 53 -- 'ES' marker
* 00 00 00 13 -- total message length
* 00 00 00 00 00 00 00 01 -- request ID (copied from request)
* 09 -- status flags (0b1001 == handshake response)
* 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099, copied from request)
* 00 -- no request headers [1]
* 00 -- no response headers [1]
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
*
*
* ## v7170099 Requests:
*
* 45 53 -- 'ES' marker
Expand Down Expand Up @@ -158,14 +126,9 @@ final class TransportHandshaker {
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
*/

static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99);
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
V7_HANDSHAKE_VERSION,
V8_HANDSHAKE_VERSION,
V9_HANDSHAKE_VERSION
);
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(V8_HANDSHAKE_VERSION, V9_HANDSHAKE_VERSION);

static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -203,7 +166,7 @@ void sendHandshake(
);
boolean success = false;
try {
handshakeRequestSender.sendRequest(node, channel, requestId, V8_HANDSHAKE_VERSION);
handshakeRequestSender.sendRequest(node, channel, requestId, V9_HANDSHAKE_VERSION);

threadPool.schedule(
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.transport.InboundDecoder.ChannelType;
Expand Down Expand Up @@ -126,105 +125,6 @@ public void testDecode() throws IOException {

}

@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // can delete test in v9
public void testDecodePreHeaderSizeVariableInt() throws IOException {
Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null);
String action = "test-request";
long requestId = randomNonNegativeLong();
final TransportVersion preHeaderVariableInt = TransportHandshaker.V7_HANDSHAKE_VERSION;
final String contentValue = randomAlphaOfLength(100);
// 8.0 is only compatible with handshakes on a pre-variable int version
final OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(contentValue),
preHeaderVariableInt,
action,
requestId,
true,
compressionScheme
);

try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference totalBytes = message.serialize(os);
int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt);

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = wrapAsReleasable(totalBytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(partialHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());

final Header header = (Header) fragments.get(0);
assertEquals(requestId, header.getRequestId());
assertEquals(preHeaderVariableInt, header.getVersion());
if (compressionScheme == null) {
assertFalse(header.isCompressed());
} else {
assertTrue(header.isCompressed());
}
assertTrue(header.isHandshake());
assertTrue(header.isRequest());
assertTrue(header.needsToReadVariableHeader());
fragments.clear();

final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2);
int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
if (compressionScheme == null) {
assertEquals(2, fragments.size());
} else {
assertEquals(3, fragments.size());
final Object body = fragments.get(1);
assertThat(body, instanceOf(ReleasableBytesReference.class));
((ReleasableBytesReference) body).close();
}
assertEquals(InboundDecoder.END_CONTENT, fragments.get(fragments.size() - 1));
assertEquals(totalBytes.length() - bytesConsumed, bytesConsumed2);
}
}

public void testDecodeHandshakeV7Compatibility() throws IOException {
String action = "test-request";
long requestId = randomNonNegativeLong();
final String headerKey = randomAlphaOfLength(10);
final String headerValue = randomAlphaOfLength(20);
threadContext.putHeader(headerKey, headerValue);
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(randomAlphaOfLength(100)),
handshakeCompat,
action,
requestId,
true,
null
);

try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference bytes = message.serialize(os);
int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(totalHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());

final Header header = (Header) fragments.get(0);
assertEquals(requestId, header.getRequestId());
assertEquals(handshakeCompat, header.getVersion());
assertFalse(header.isCompressed());
assertTrue(header.isHandshake());
assertTrue(header.isRequest());
// TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
assertTrue(header.needsToReadVariableHeader());
fragments.clear();
}

}

public void testDecodeHandshakeV8Compatibility() throws IOException {
doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, null);
doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
Expand Down Expand Up @@ -453,46 +353,6 @@ public void testCompressedDecode() throws IOException {

}

public void testCompressedDecodeHandshakeCompatibility() throws IOException {
String action = "test-request";
long requestId = randomNonNegativeLong();
final String headerKey = randomAlphaOfLength(10);
final String headerValue = randomAlphaOfLength(20);
threadContext.putHeader(headerKey, headerValue);
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
OutboundMessage message = new OutboundMessage.Request(
threadContext,
new TestRequest(randomAlphaOfLength(100)),
handshakeCompat,
action,
requestId,
true,
Compression.Scheme.DEFLATE
);

try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
final BytesReference bytes = message.serialize(os);
int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);

InboundDecoder decoder = new InboundDecoder(recycler);
final ArrayList<Object> fragments = new ArrayList<>();
final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
int bytesConsumed = decoder.decode(releasable1, fragments::add);
assertEquals(totalHeaderSize, bytesConsumed);
assertTrue(releasable1.hasReferences());

final Header header = (Header) fragments.get(0);
assertEquals(requestId, header.getRequestId());
assertEquals(handshakeCompat, header.getVersion());
assertTrue(header.isCompressed());
assertTrue(header.isHandshake());
assertTrue(header.isRequest());
// TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
assertTrue(header.needsToReadVariableHeader());
fragments.clear();
}
}

public void testVersionIncompatibilityDecodeException() throws IOException {
String action = "test-request";
long requestId = randomNonNegativeLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.TransportVersionUtils;

Expand All @@ -38,56 +37,6 @@

public class TransportHandshakerRawMessageTests extends ESSingleNodeTestCase {

@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // remove support for v7 handshakes in v9
public void testV7Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
final var requestId = randomNonNegativeLong();
try (var outputStream = new BytesStreamOutput()) {
outputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
outputStream.writeLong(requestId);
outputStream.writeByte(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)));
outputStream.writeInt(TransportHandshaker.V7_HANDSHAKE_VERSION.id());
outputStream.writeByte((byte) 0); // no request headers;
outputStream.writeByte((byte) 0); // no response headers;
outputStream.writeStringArray(new String[] { "x-pack" }); // one feature
outputStream.writeString("internal:tcp/handshake");
outputStream.writeByte((byte) 0); // no parent task ID;

final var requestNodeTransportVersionId = TransportVersionUtils.randomCompatibleVersion(random()).id();
assertThat(requestNodeTransportVersionId, allOf(greaterThanOrEqualTo(1 << 22), lessThan(1 << 28))); // 4-byte vInt
outputStream.writeByte((byte) 4); // payload length
outputStream.writeVInt(requestNodeTransportVersionId);

handshakeRequestBytes = outputStream.bytes().toBytesRef();
}

final BytesRef handshakeResponseBytes;
try (var socket = openTransportConnection()) {
var streamOutput = new OutputStreamStreamOutput(socket.getOutputStream());
streamOutput.write("ES".getBytes(StandardCharsets.US_ASCII));
streamOutput.writeInt(handshakeRequestBytes.length);
streamOutput.writeBytes(handshakeRequestBytes.bytes, handshakeRequestBytes.offset, handshakeRequestBytes.length);
streamOutput.flush();

var streamInput = new InputStreamStreamInput(socket.getInputStream());
assertEquals((byte) 'E', streamInput.readByte());
assertEquals((byte) 'S', streamInput.readByte());
var responseLength = streamInput.readInt();
handshakeResponseBytes = streamInput.readBytesRef(responseLength);
}

try (var inputStream = new BytesArray(handshakeResponseBytes).streamInput()) {
assertEquals(requestId, inputStream.readLong());
assertEquals(TransportStatus.setResponse(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
assertEquals(TransportHandshaker.V7_HANDSHAKE_VERSION.id(), inputStream.readInt());
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
inputStream.setTransportVersion(TransportHandshaker.V7_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(-1, inputStream.read());
}
}

@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // remove support for v8 handshakes in v10
public void testV8Handshake() throws Exception {
final BytesRef handshakeRequestBytes;
Expand Down Expand Up @@ -223,11 +172,10 @@ public void testOutboundHandshake() throws Exception {
try (var inputStream = new BytesArray(handshakeRequestBytes).streamInput()) {
assertThat(inputStream.readLong(), greaterThan(0L));
assertEquals(TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), inputStream.readByte());
assertEquals(TransportHandshaker.V8_HANDSHAKE_VERSION.id(), inputStream.readInt());
assertEquals(0x1a, inputStream.readInt()); // length of variable-length header, always 0x1a
assertEquals(TransportHandshaker.V9_HANDSHAKE_VERSION.id(), inputStream.readInt());
assertEquals(0x19, inputStream.readInt()); // length of variable-length header, always 0x19
assertEquals((byte) 0, inputStream.readByte()); // no request headers
assertEquals((byte) 0, inputStream.readByte()); // no response headers
assertEquals((byte) 0, inputStream.readByte()); // no features
assertEquals("internal:tcp/handshake", inputStream.readString());
assertEquals((byte) 0, inputStream.readByte()); // no parent task
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
Expand All @@ -236,8 +184,9 @@ public void testOutboundHandshake() throws Exception {
}

try (var inputStream = new BytesArray(payloadBytes).streamInput()) {
inputStream.setTransportVersion(TransportHandshaker.V8_HANDSHAKE_VERSION);
inputStream.setTransportVersion(TransportHandshaker.V9_HANDSHAKE_VERSION);
assertEquals(TransportVersion.current().id(), inputStream.readVInt());
assertEquals(Build.current().version(), inputStream.readString());
assertEquals(-1, inputStream.read());
}
}
Expand Down
Loading