Skip to content

Commit

Permalink
Build matrix for MySQL and make client compatible with MySQL 5.6 (#356)
Browse files Browse the repository at this point in the history
* add build matrix support for MySQL

Signed-off-by: Billy Yuan <billy112487983@gmail.com>

* add support for disabling DEPRECATE_EOF flag in order to be compatible with MySQL whose version is below 5.7.5

Signed-off-by: Billy Yuan <billy112487983@gmail.com>

* tweak configuration to make test pass

Signed-off-by: Billy Yuan <billy112487983@gmail.com>

* skip tests for being incompatible with MySQL 5.6

Signed-off-by: Billy Yuan <billy112487983@gmail.com>

* tune innodb log file size

Signed-off-by: Billy Yuan <billy112487983@gmail.com>

* documentation minors

Signed-off-by: Billy Yuan <billy112487983@gmail.com>
  • Loading branch information
BillyYccc authored and vietj committed Jul 14, 2019
1 parent 7739297 commit b3b433e
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 54 deletions.
12 changes: 10 additions & 2 deletions .travis.yml
Expand Up @@ -12,14 +12,22 @@ before_script:
- sudo /etc/init.d/mysql stop
jobs:
include:
- stage: test
name: "MySQL 5.6"
jdk: openjdk8
script: mvn -q clean verify -B -Dembedded.mysql.version=5.6 --projects vertx-sql-client,vertx-mysql-client
- stage: test
name: "MySQL 5.7"
jdk: openjdk8
script: mvn -q clean verify -B -Dembedded.mysql.version=5.7 --projects vertx-sql-client,vertx-mysql-client
- stage: test
name: "Postgres 9"
jdk: openjdk8
script: mvn -q clean verify -B -Dembedded.postgres.version=9.6
script: mvn -q clean verify -B -Dembedded.postgres.version=9.6 --projects vertx-sql-client,vertx-pg-client
- stage: test
name: "Postgres 10"
jdk: openjdk8
script: mvn -q clean verify -B -Dembedded.postgres.version=10.6
script: mvn -q clean verify -B -Dembedded.postgres.version=10.6 --projects vertx-sql-client,vertx-pg-client
- stage: deploy
name: "Deploy to Sonatype's snapshots repository"
if: type != pull_request AND env(SONATYPE_NEXUS_USERNAME) IS present
Expand Down
1 change: 1 addition & 0 deletions vertx-mysql-client/README.adoc
Expand Up @@ -17,6 +17,7 @@
* [ ] TLS support
* [ ] Add support for more authentication methods(caching_sha2_password)
* [ ] Authentication methods switch
* [ ] Transaction management
* [ ] Text Protocol(Local INFILE Request)
* [ ] Text Protocol(All Utility Commands)
* [ ] Full data type support
Expand Down
1 change: 1 addition & 0 deletions vertx-mysql-client/src/main/asciidoc/index.adoc
Expand Up @@ -15,6 +15,7 @@ scalability and low overhead.
* Direct memory to object without unnecessary copies
* Java 8 Date and Time
* MySQL utilities commands support
* Compatible with MySQL 5.6 and 5.7
== Usage

Expand Down
Expand Up @@ -144,6 +144,13 @@ OkPacket decodeOkPacketPayload(ByteBuf payload, Charset charset) {
return new OkPacket(affectedRows, lastInsertId, serverStatusFlags, numberOfWarnings, statusInfo, sessionStateInfo);
}

EofPacket decodeEofPacketPayload(ByteBuf payload) {
payload.skipBytes(1); // skip EOF_Packet header
int numberOfWarnings = payload.readUnsignedShortLE();
int serverStatusFlags = payload.readUnsignedShortLE();
return new EofPacket(numberOfWarnings, serverStatusFlags);
}

String readRestOfPacketString(ByteBuf payload, Charset charset) {
return BufferUtils.readFixedLengthString(payload, payload.readableBytes(), charset);
}
Expand All @@ -163,4 +170,14 @@ ColumnDefinition decodeColumnDefinitionPacketPayload(ByteBuf payload) {
byte decimals = payload.readByte();
return new ColumnDefinition(catalog, schema, table, orgTable, name, orgName, characterSet, columnLength, type, flags, decimals);
}

void skipEofPacketIfNeeded(ByteBuf payload) {
if (!isDeprecatingEofFlagEnabled()) {
payload.skipBytes(5);
}
}

boolean isDeprecatingEofFlagEnabled() {
return (encoder.clientCapabilitiesFlag & CapabilitiesFlag.CLIENT_DEPRECATE_EOF) != 0;
}
}
Expand Up @@ -30,14 +30,14 @@ void encode(MySQLEncoder encoder) {
}

@Override
protected void handleSingleResultsetDecodingCompleted(ByteBuf payload) {
super.handleSingleResultsetDecodingCompleted(payload);
protected void handleSingleResultsetDecodingCompleted(int serverStatusFlags, int affectedRows) {
super.handleSingleResultsetDecodingCompleted(serverStatusFlags, affectedRows);
doExecuteBatch();
}

@Override
protected boolean isDecodingCompleted(OkPacket okPacket) {
return super.isDecodingCompleted(okPacket) && batchIdx == params.size();
protected boolean isDecodingCompleted(int serverStatusFlags) {
return super.isDecodingCompleted(serverStatusFlags) && batchIdx == params.size();
}

private void doExecuteBatch() {
Expand Down
Expand Up @@ -5,6 +5,7 @@
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.impl.command.ExtendedQueryCommandBase;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -28,7 +29,8 @@ protected void handleInitPacket(ByteBuf payload) {
// may receive ERR_Packet, OK_Packet, Binary Protocol Resultset
int firstByte = payload.getUnsignedByte(payload.readerIndex());
if (firstByte == OK_PACKET_HEADER) {
handleSingleResultsetDecodingCompleted(payload);
OkPacket okPacket = decodeOkPacketPayload(payload, StandardCharsets.UTF_8);
handleSingleResultsetDecodingCompleted(okPacket.serverStatusFlags(), (int) okPacket.affectedRows());
} else if (firstByte == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else {
Expand Down
Expand Up @@ -35,8 +35,8 @@ void encode(MySQLEncoder encoder) {
super.encode(encoder);

if (statement.isCursorOpen) {
sendStatementFetchCommand(statement.statementId, cmd.fetch());
decoder = new RowResultDecoder<>(cmd.collector(), false, statement.rowDesc);
sendStatementFetchCommand(statement.statementId, cmd.fetch());
} else {
if (cmd.fetch() > 0) {
//TODO Cursor_type is READ_ONLY?
Expand All @@ -51,41 +51,44 @@ void encode(MySQLEncoder encoder) {
@Override
void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
if (statement.isCursorOpen) {
// decoding COM_STMT_FETCH response
handleRows(payload, payloadLength, super::handleSingleRow);
int first = payload.getUnsignedByte(payload.readerIndex());
if (first == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else {
// decoding COM_STMT_FETCH response
handleRows(payload, payloadLength, super::handleSingleRow);
}
} else {
// decoding COM_STMT_EXECUTE response
if (cmd.fetch() > 0) {
switch (commandHandlerState) {
case INIT:
handleResultsetColumnCountPacketBody(payload);
int first = payload.getUnsignedByte(payload.readerIndex());
if (first == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else {
handleResultsetColumnCountPacketBody(payload);
}
break;
case HANDLING_COLUMN_DEFINITION:
handleResultsetColumnDefinitions(payload);
break;
case COLUMN_DEFINITIONS_DECODING_COMPLETED:
// accept an EOF_Packet when DEPRECATE_EOF is not enabled
skipEofPacketIfNeeded(payload);
case HANDLING_ROW_DATA_OR_END_PACKET:
/*
Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff)
To ensure that packets beginning with 0xfe correspond to the ending packet (EOF_Packet or OK_Packet with a 0xFE header),
the packet length must be checked and must be less than 0xffffff in length.
*/
int first = payload.getUnsignedByte(payload.readerIndex());
if (first == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
}
// enabling CLIENT_DEPRECATE_EOF capability will receive an OK_Packet with a EOF_Packet header here
// we need check this is not a row data by checking packet length < 0xFFFFFF
else if (first == EOF_PACKET_HEADER && payloadLength < 0xFFFFFF) {
// need to reset packet number so that we can send a fetch request
this.sequenceId = 0;
// send fetch after cursor opened
decoder = new RowResultDecoder<>(cmd.collector(), false, statement.rowDesc);
handleResultsetColumnDefinitionsDecodingCompleted();
// need to reset packet number so that we can send a fetch request
this.sequenceId = 0;
// send fetch after cursor opened
decoder = new RowResultDecoder<>(cmd.collector(), false, statement.rowDesc);

statement.isCursorOpen = true;
statement.isCursorOpen = true;

sendStatementFetchCommand(statement.statementId, cmd.fetch());
}
sendStatementFetchCommand(statement.statementId, cmd.fetch());
break;
default:
throw new IllegalStateException("Unexpected state for decoding COM_STMT_EXECUTE response with cursor opening");
}
} else {
super.decodePayload(payload, payloadLength, sequenceId);
Expand Down
Expand Up @@ -63,6 +63,17 @@ private void decodeInit0(MySQLEncoder encoder, InitCommand cmd, ByteBuf payload)
short protocolVersion = payload.readUnsignedByte();

String serverVersion = BufferUtils.readNullTerminatedString(payload, StandardCharsets.US_ASCII);
// we assume the server version follows ${major}.${minor}.${version} in https://dev.mysql.com/doc/refman/8.0/en/which-version.html
String[] versionNumbers = serverVersion.split("\\.");
int majorVersion = Integer.parseInt(versionNumbers[0]);
int minorVersion = Integer.parseInt(versionNumbers[1]);
int releaseNumber = Integer.parseInt(versionNumbers[2]); // we don't support dev version
if (majorVersion == 5 && (minorVersion < 7 || (minorVersion == 7 && releaseNumber < 5))) {
// EOF_HEADER is enabled
} else {
encoder.clientCapabilitiesFlag |= CLIENT_DEPRECATE_EOF;
}

long connectionId = payload.readUnsignedIntLE();

// read first part of scramble
Expand Down
Expand Up @@ -96,7 +96,6 @@ private void initSupportedCapabilitiesFlags() {
clientCapabilitiesFlag |= CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA;
clientCapabilitiesFlag |= CLIENT_SECURE_CONNECTION;
clientCapabilitiesFlag |= CLIENT_PROTOCOL_41;
clientCapabilitiesFlag |= CLIENT_DEPRECATE_EOF;
clientCapabilitiesFlag |= CLIENT_TRANSACTIONS;
clientCapabilitiesFlag |= CLIENT_MULTI_STATEMENTS;
clientCapabilitiesFlag |= CLIENT_MULTI_RESULTS;
Expand Down
Expand Up @@ -78,22 +78,34 @@ void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
case HANDLING_PARAM_COLUMN_DEFINITION:
paramDescs[processingIndex++] = decodeColumnDefinitionPacketPayload(payload);
if (processingIndex == paramDescs.length) {
if (columnDescs.length == 0) {
handleReadyForQuery();
resetIntermediaryResult();
if (isDeprecatingEofFlagEnabled()) {
// we enabled the DEPRECATED_EOF flag and don't need to accept an EOF_Packet
handleParamDefinitionsDecodingCompleted();
} else {
processingIndex = 0;
this.commandHandlerState = CommandHandlerState.HANDLING_COLUMN_COLUMN_DEFINITION;
// we need to decode an EOF_Packet before handling rows, to be compatible with MySQL version below 5.7.5
commandHandlerState = CommandHandlerState.PARAM_DEFINITIONS_DECODING_COMPLETED;
}
}
break;
case PARAM_DEFINITIONS_DECODING_COMPLETED:
skipEofPacketIfNeeded(payload);
handleParamDefinitionsDecodingCompleted();
break;
case HANDLING_COLUMN_COLUMN_DEFINITION:
columnDescs[processingIndex++] = decodeColumnDefinitionPacketPayload(payload);
if (processingIndex == columnDescs.length) {
handleReadyForQuery();
resetIntermediaryResult();
if (isDeprecatingEofFlagEnabled()) {
// we enabled the DEPRECATED_EOF flag and don't need to accept an EOF_Packet
handleColumnDefinitionsDecodingCompleted();
} else {
// we need to decode an EOF_Packet before handling rows, to be compatible with MySQL version below 5.7.5
commandHandlerState = CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED;
}
}
break;
case COLUMN_DEFINITIONS_DECODING_COMPLETED:
handleColumnDefinitionsDecodingCompleted();
break;
}
}

Expand Down Expand Up @@ -131,9 +143,26 @@ private void resetIntermediaryResult() {
columnDescs = null;
}

private void handleParamDefinitionsDecodingCompleted() {
if (columnDescs.length == 0) {
handleReadyForQuery();
resetIntermediaryResult();
} else {
processingIndex = 0;
this.commandHandlerState = CommandHandlerState.HANDLING_COLUMN_COLUMN_DEFINITION;
}
}

private void handleColumnDefinitionsDecodingCompleted() {
handleReadyForQuery();
resetIntermediaryResult();
}

private enum CommandHandlerState {
INIT,
HANDLING_PARAM_COLUMN_DEFINITION,
HANDLING_COLUMN_COLUMN_DEFINITION
PARAM_DEFINITIONS_DECODING_COMPLETED,
HANDLING_COLUMN_COLUMN_DEFINITION,
COLUMN_DEFINITIONS_DECODING_COMPLETED
}
}
Expand Up @@ -56,6 +56,10 @@ void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
case HANDLING_COLUMN_DEFINITION:
handleResultsetColumnDefinitions(payload);
break;
case COLUMN_DEFINITIONS_DECODING_COMPLETED:
skipEofPacketIfNeeded(payload);
handleResultsetColumnDefinitionsDecodingCompleted();
break;
case HANDLING_ROW_DATA_OR_END_PACKET:
handleRows(payload, payloadLength, this::handleSingleRow);
break;
Expand All @@ -74,12 +78,22 @@ protected void handleResultsetColumnDefinitions(ByteBuf payload) {
ColumnDefinition def = decodeColumnDefinitionPacketPayload(payload);
columnDefinitions[currentColumn++] = def;
if (currentColumn == columnDefinitions.length) {
// all column definitions have been handled, switch to row data handling
commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET;
decoder = new RowResultDecoder<>(cmd.collector(), false/*cmd.isSingleton()*/, new MySQLRowDesc(columnDefinitions, format));
// all column definitions have been decoded, switch to column definitions decoding completed state
if (isDeprecatingEofFlagEnabled()) {
// we enabled the DEPRECATED_EOF flag and don't need to accept an EOF_Packet
handleResultsetColumnDefinitionsDecodingCompleted();
} else {
// we need to decode an EOF_Packet before handling rows, to be compatible with MySQL version below 5.7.5
commandHandlerState = CommandHandlerState.COLUMN_DEFINITIONS_DECODING_COMPLETED;
}
}
}

protected void handleResultsetColumnDefinitionsDecodingCompleted() {
commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET;
decoder = new RowResultDecoder<>(cmd.collector(), false/*cmd.isSingleton()*/, new MySQLRowDesc(columnDefinitions, format));
}

protected void handleRows(ByteBuf payload, int payloadLength, Consumer<ByteBuf> singleRowHandler) {
/*
Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff)
Expand All @@ -93,7 +107,16 @@ protected void handleRows(ByteBuf payload, int payloadLength, Consumer<ByteBuf>
// enabling CLIENT_DEPRECATE_EOF capability will receive an OK_Packet with a EOF_Packet header here
// we need check this is not a row data by checking packet length < 0xFFFFFF
else if (first == EOF_PACKET_HEADER && payloadLength < 0xFFFFFF) {
handleSingleResultsetDecodingCompleted(payload);
int serverStatusFlags;
int affectedRows = 0;
if (isDeprecatingEofFlagEnabled()) {
OkPacket okPacket = decodeOkPacketPayload(payload, StandardCharsets.UTF_8);
serverStatusFlags = okPacket.serverStatusFlags();
affectedRows = (int) okPacket.affectedRows();
} else {
serverStatusFlags = decodeEofPacketPayload(payload).serverStatusFlags();
}
handleSingleResultsetDecodingCompleted(serverStatusFlags, affectedRows);
} else {
singleRowHandler.accept(payload);
}
Expand All @@ -104,22 +127,21 @@ protected void handleSingleRow(ByteBuf payload) {
decoder.decodeRow(columnDefinitions.length, payload);
}

protected void handleSingleResultsetDecodingCompleted(ByteBuf payload) {
OkPacket okPacket = decodeOkPacketPayload(payload, StandardCharsets.UTF_8);
handleSingleResultsetEndPacket(okPacket);
protected void handleSingleResultsetDecodingCompleted(int serverStatusFlags, int affectedRows) {
handleSingleResultsetEndPacket(serverStatusFlags, affectedRows);
resetIntermediaryResult();
if (isDecodingCompleted(okPacket)) {
if (isDecodingCompleted(serverStatusFlags)) {
// no more sql result
handleAllResultsetDecodingCompleted();
}
}

protected boolean isDecodingCompleted(OkPacket okPacket) {
return (okPacket.serverStatusFlags() & ServerStatusFlags.SERVER_MORE_RESULTS_EXISTS) == 0;
protected boolean isDecodingCompleted(int serverStatusFlags) {
return (serverStatusFlags & ServerStatusFlags.SERVER_MORE_RESULTS_EXISTS) == 0;
}

private void handleSingleResultsetEndPacket(OkPacket okPacket) {
this.result = (okPacket.serverStatusFlags() & ServerStatusFlags.SERVER_STATUS_LAST_ROW_SENT) == 0;
private void handleSingleResultsetEndPacket(int serverStatusFlags, int affectedRows) {
this.result = (serverStatusFlags & ServerStatusFlags.SERVER_STATUS_LAST_ROW_SENT) == 0;
T result;
int size;
RowDesc rowDesc;
Expand All @@ -133,7 +155,7 @@ private void handleSingleResultsetEndPacket(OkPacket okPacket) {
size = 0;
rowDesc = null;
}
cmd.resultHandler().handleResult((int) okPacket.affectedRows(), size, rowDesc, result);
cmd.resultHandler().handleResult(affectedRows, size, rowDesc, result);
}

private void handleAllResultsetDecodingCompleted() {
Expand All @@ -160,6 +182,7 @@ private void resetIntermediaryResult() {
protected enum CommandHandlerState {
INIT,
HANDLING_COLUMN_DEFINITION,
COLUMN_DEFINITIONS_DECODING_COMPLETED,
HANDLING_ROW_DATA_OR_END_PACKET
}
}
Expand Up @@ -41,7 +41,8 @@ protected void handleInitPacket(ByteBuf payload) {
// may receive ERR_Packet, OK_Packet, LOCAL INFILE Request, Text Resultset
int firstByte = payload.getUnsignedByte(payload.readerIndex());
if (firstByte == OK_PACKET_HEADER) {
handleSingleResultsetDecodingCompleted(payload);
OkPacket okPacket = decodeOkPacketPayload(payload, StandardCharsets.UTF_8);
handleSingleResultsetDecodingCompleted(okPacket.serverStatusFlags(), (int) okPacket.affectedRows());
} else if (firstByte == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else if (firstByte == 0xFB) {
Expand Down

0 comments on commit b3b433e

Please sign in to comment.