forked from apache/flink-cdc
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[mysql-cdc] Supports MYSQL_TYPE_TYPED_ARRAY column type when parsing …
…the table map event This closes apache#2001
- Loading branch information
1 parent
892f2d6
commit bb60bfe
Showing
3 changed files
with
354 additions
and
0 deletions.
There are no files selected for viewing
92 changes: 92 additions & 0 deletions
92
...ql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright 2022 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.github.shyiko.mysql.binlog.event.deserialization; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY. | ||
* | ||
* <p>Line 57: Add support for mysql data type: MYSQL_TYPE_TYPED_ARRAY. Its type code is changed to | ||
* 20 in <a | ||
* href="https://github.com/mysql/mysql-server/commit/9082b6a820f3948fd563cc32a050f5e8775f2855">MySql | ||
* Bug#29948925</a> since mysql 8.0.18+. | ||
* | ||
* <p>Remove this file once <a | ||
* href="https://github.com/osheroff/mysql-binlog-connector-java/issues/104">mysql-binlog-connector-java#104</a> | ||
* fixed. | ||
*/ | ||
public enum ColumnType { | ||
DECIMAL(0), | ||
TINY(1), | ||
SHORT(2), | ||
LONG(3), | ||
FLOAT(4), | ||
DOUBLE(5), | ||
NULL(6), | ||
TIMESTAMP(7), | ||
LONGLONG(8), | ||
INT24(9), | ||
DATE(10), | ||
TIME(11), | ||
DATETIME(12), | ||
YEAR(13), | ||
NEWDATE(14), | ||
VARCHAR(15), | ||
BIT(16), | ||
// (TIMESTAMP|DATETIME|TIME)_V2 data types appeared in MySQL 5.6.4 | ||
// @see http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html | ||
TIMESTAMP_V2(17), | ||
DATETIME_V2(18), | ||
TIME_V2(19), | ||
TYPED_ARRAY(20), | ||
JSON(245), | ||
NEWDECIMAL(246), | ||
ENUM(247), | ||
SET(248), | ||
TINY_BLOB(249), | ||
MEDIUM_BLOB(250), | ||
LONG_BLOB(251), | ||
BLOB(252), | ||
VAR_STRING(253), | ||
STRING(254), | ||
GEOMETRY(255); | ||
|
||
private int code; | ||
|
||
private ColumnType(int code) { | ||
this.code = code; | ||
} | ||
|
||
public int getCode() { | ||
return code; | ||
} | ||
|
||
private static final Map<Integer, ColumnType> INDEX_BY_CODE; | ||
|
||
static { | ||
INDEX_BY_CODE = new HashMap<Integer, ColumnType>(); | ||
for (ColumnType columnType : values()) { | ||
INDEX_BY_CODE.put(columnType.code, columnType); | ||
} | ||
} | ||
|
||
public static ColumnType byCode(int code) { | ||
return INDEX_BY_CODE.get(code); | ||
} | ||
} |
136 changes: 136 additions & 0 deletions
136
...a/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright 2022 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.github.shyiko.mysql.binlog.event.deserialization; | ||
|
||
import com.github.shyiko.mysql.binlog.event.TableMapEventData; | ||
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata; | ||
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; | ||
|
||
import java.io.IOException; | ||
|
||
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY; | ||
|
||
/** | ||
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY. | ||
* | ||
* <p>Line 93 ~ 98: process MYSQL_TYPE_TYPED_ARRAY metadata, imitated the code in canal <a | ||
* href="https://github.com/alibaba/canal/blob/master/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java#L546">TableMapLogEvent#decodeFields</a>. | ||
* | ||
* <p>Remove this file once <a | ||
* href="https://github.com/osheroff/mysql-binlog-connector-java/issues/104">mysql-binlog-connector-java#104</a> | ||
* fixed. | ||
*/ | ||
public class TableMapEventDataDeserializer implements EventDataDeserializer<TableMapEventData> { | ||
|
||
private final TableMapEventMetadataDeserializer metadataDeserializer = | ||
new TableMapEventMetadataDeserializer(); | ||
|
||
@Override | ||
public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IOException { | ||
TableMapEventData eventData = new TableMapEventData(); | ||
eventData.setTableId(inputStream.readLong(6)); | ||
inputStream.skip(3); // 2 bytes reserved for future use + 1 for the length of database name | ||
eventData.setDatabase(inputStream.readZeroTerminatedString()); | ||
inputStream.skip(1); // table name | ||
eventData.setTable(inputStream.readZeroTerminatedString()); | ||
int numberOfColumns = inputStream.readPackedInteger(); | ||
eventData.setColumnTypes(inputStream.read(numberOfColumns)); | ||
inputStream.readPackedInteger(); // metadata length | ||
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes())); | ||
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true)); | ||
int metadataLength = inputStream.available(); | ||
TableMapEventMetadata metadata = null; | ||
if (metadataLength > 0) { | ||
metadata = | ||
metadataDeserializer.deserialize( | ||
new ByteArrayInputStream(inputStream.read(metadataLength)), | ||
eventData.getColumnTypes().length, | ||
numericColumnCount(eventData.getColumnTypes())); | ||
} | ||
eventData.setEventMetadata(metadata); | ||
return eventData; | ||
} | ||
|
||
private int numericColumnCount(byte[] types) { | ||
int count = 0; | ||
for (int i = 0; i < types.length; i++) { | ||
switch (ColumnType.byCode(types[i] & 0xff)) { | ||
case TINY: | ||
case SHORT: | ||
case INT24: | ||
case LONG: | ||
case LONGLONG: | ||
case NEWDECIMAL: | ||
case FLOAT: | ||
case DOUBLE: | ||
count++; | ||
break; | ||
default: | ||
break; | ||
} | ||
} | ||
return count; | ||
} | ||
|
||
private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes) | ||
throws IOException { | ||
int[] metadata = new int[columnTypes.length]; | ||
for (int i = 0; i < columnTypes.length; i++) { | ||
ColumnType columnType = ColumnType.byCode(columnTypes[i] & 0xFF); | ||
if (columnType == TYPED_ARRAY) { | ||
byte[] arrayType = inputStream.read(1); | ||
columnType = ColumnType.byCode(arrayType[0] & 0xFF); | ||
} | ||
switch (columnType) { | ||
case FLOAT: | ||
case DOUBLE: | ||
case BLOB: | ||
case JSON: | ||
case GEOMETRY: | ||
metadata[i] = inputStream.readInteger(1); | ||
break; | ||
case BIT: | ||
case VARCHAR: | ||
case NEWDECIMAL: | ||
metadata[i] = inputStream.readInteger(2); | ||
break; | ||
case SET: | ||
case ENUM: | ||
case STRING: | ||
metadata[i] = bigEndianInteger(inputStream.read(2), 0, 2); | ||
break; | ||
case TIME_V2: | ||
case DATETIME_V2: | ||
case TIMESTAMP_V2: | ||
metadata[i] = inputStream.readInteger(1); // fsp (@see {@link ColumnType}) | ||
break; | ||
default: | ||
metadata[i] = 0; | ||
} | ||
} | ||
return metadata; | ||
} | ||
|
||
private static int bigEndianInteger(byte[] bytes, int offset, int length) { | ||
int result = 0; | ||
for (int i = offset; i < (offset + length); i++) { | ||
byte b = bytes[i]; | ||
result = (result << 8) | (b >= 0 ? (int) b : (b + 256)); | ||
} | ||
return result; | ||
} | ||
} |
126 changes: 126 additions & 0 deletions
126
...m/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright 2022 Ververica Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.github.shyiko.mysql.binlog.event.deserialization; | ||
|
||
import com.github.shyiko.mysql.binlog.event.TableMapEventData; | ||
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata; | ||
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; | ||
import org.junit.Test; | ||
|
||
import java.io.IOException; | ||
import java.util.BitSet; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
/** Tests for the copied class {@link TableMapEventDataDeserializer}. */ | ||
public class TableMapEventDataDeserializerTest { | ||
@Test | ||
public void testDeserialize() throws IOException { | ||
TableMapEventDataDeserializer deserializer = new TableMapEventDataDeserializer(); | ||
// The Table_map_event data. See its format at | ||
// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Table__map__event.html | ||
byte[] data = { | ||
// table_id : 6 bytes | ||
1, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
// flags : 2 bytes | ||
1, | ||
0, | ||
// database_name string length : 1 byte | ||
6, | ||
// database_name null-terminated string, end with 0 | ||
116, | ||
101, | ||
115, | ||
116, | ||
68, | ||
98, | ||
0, | ||
// table_name string length : 1 byte | ||
9, | ||
// table_name null-terminated string, end with 0 | ||
116, | ||
101, | ||
115, | ||
116, | ||
84, | ||
97, | ||
98, | ||
108, | ||
101, | ||
0, | ||
// column_count | ||
3, | ||
// column_type list | ||
8, | ||
1, | ||
20, | ||
// metadata_length | ||
1, | ||
// metadata | ||
8, | ||
// null_bits | ||
80, | ||
// optional metadata fields stored in Type, Length, Value(TLV) format. | ||
// Type takes 1 byte. Length is a packed integer value. Values takes Length bytes. | ||
|
||
// SIGNEDNESS | ||
1, | ||
1, | ||
0, | ||
// DEFAULT_CHARSET | ||
2, | ||
1, | ||
45 | ||
}; | ||
TableMapEventData eventData = deserializer.deserialize(new ByteArrayInputStream(data)); | ||
assertThat(eventData.toString()).isEqualTo(getExpectedEventData().toString()); | ||
} | ||
|
||
private TableMapEventData getExpectedEventData() { | ||
TableMapEventData eventData = new TableMapEventData(); | ||
// table_id | ||
eventData.setTableId(1); | ||
// database_name | ||
eventData.setDatabase("testDb"); | ||
// table_name | ||
eventData.setTable("testTable"); | ||
|
||
// column_type | ||
// 3 column types: MYSQL_TYPE_LONGLONG, MYSQL_TYPE_TINY, MYSQL_TYPE_TYPED_ARRAY<LONGLONG> | ||
eventData.setColumnTypes(new byte[] {8, 1, 20}); | ||
|
||
// metadata of the column types | ||
eventData.setColumnMetadata(new int[] {0, 0, 0}); | ||
|
||
// null_bits | ||
eventData.setColumnNullability(new BitSet()); | ||
|
||
// optional metadata fields | ||
TableMapEventMetadata metadata = new TableMapEventMetadata(); | ||
metadata.setSignedness(new BitSet()); | ||
TableMapEventMetadata.DefaultCharset charset = new TableMapEventMetadata.DefaultCharset(); | ||
charset.setDefaultCharsetCollation(45); | ||
metadata.setDefaultCharset(charset); | ||
eventData.setEventMetadata(metadata); | ||
return eventData; | ||
} | ||
} |