32
32
import io .debezium .connector .mysql .MySqlConnectorConfig ;
33
33
import io .debezium .connector .mysql .MySqlDatabaseSchema ;
34
34
import io .debezium .connector .mysql .MySqlOffsetContext ;
35
+ import io .debezium .connector .mysql .MysqlTextProtocolFieldReader ;
35
36
import io .debezium .pipeline .EventDispatcher ;
36
37
import io .debezium .pipeline .source .AbstractSnapshotChangeEventSource ;
37
38
import io .debezium .pipeline .source .spi .ChangeEventSource ;
38
39
import io .debezium .pipeline .source .spi .SnapshotProgressListener ;
39
40
import io .debezium .pipeline .spi .ChangeRecordEmitter ;
40
41
import io .debezium .pipeline .spi .OffsetContext ;
41
42
import io .debezium .pipeline .spi .SnapshotResult ;
43
+ import io .debezium .relational .Column ;
42
44
import io .debezium .relational .RelationalSnapshotChangeEventSource ;
43
45
import io .debezium .relational .SnapshotChangeRecordEmitter ;
44
46
import io .debezium .relational .Table ;
50
52
51
53
import java .sql .PreparedStatement ;
52
54
import java .sql .ResultSet ;
53
- import java .sql .ResultSetMetaData ;
54
55
import java .sql .SQLException ;
55
- import java .sql .Types ;
56
56
import java .time .Duration ;
57
57
58
58
import static org .apache .seatunnel .connectors .seatunnel .cdc .mysql .utils .MySqlConnectionUtils .currentBinlogOffset ;
@@ -74,6 +74,8 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
74
74
private final SnapshotSplit snapshotSplit ;
75
75
private final MySqlOffsetContext offsetContext ;
76
76
private final SnapshotProgressListener snapshotProgressListener ;
77
+ private final MysqlTextProtocolFieldReader mysqlTextProtocolFieldReader =
78
+ new MysqlTextProtocolFieldReader ();
77
79
78
80
public MySqlSnapshotSplitReadTask (
79
81
MySqlConnectorConfig connectorConfig ,
@@ -217,7 +219,9 @@ private void createDataEventsForTable(
217
219
rows ++;
218
220
final Object [] row = new Object [columnArray .getGreatestColumnPosition ()];
219
221
for (int i = 0 ; i < columnArray .getColumns ().length ; i ++) {
220
- row [columnArray .getColumns ()[i ].position () - 1 ] = readField (rs , i + 1 );
222
+ Column actualColumn = table .columns ().get (i );
223
+ row [columnArray .getColumns ()[i ].position () - 1 ] =
224
+ mysqlTextProtocolFieldReader .readField (rs , i + 1 , actualColumn , table );
221
225
}
222
226
if (logTimer .expired ()) {
223
227
long stop = clock .currentTimeInMillis ();
@@ -256,16 +260,6 @@ private Threads.Timer getTableScanLogTimer() {
256
260
return Threads .timer (clock , LOG_INTERVAL );
257
261
}
258
262
259
- private Object readField (ResultSet rs , int columnIndex ) throws SQLException {
260
- final ResultSetMetaData metaData = rs .getMetaData ();
261
- final int columnType = metaData .getColumnType (columnIndex );
262
- if (columnType == Types .TIME ) {
263
- return rs .getTimestamp (columnIndex );
264
- } else {
265
- return rs .getObject (columnIndex );
266
- }
267
- }
268
-
269
263
private static class MySqlSnapshotContext
270
264
extends RelationalSnapshotChangeEventSource .RelationalSnapshotContext {
271
265
public MySqlSnapshotContext () throws SQLException {
0 commit comments