-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mysql-cdc] add fallback with "desc table" when parsing ddl failed #949
[mysql-cdc] add fallback with "desc table" when parsing ddl failed #949
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Cleverdada Thanks for your contribution, left some comments
} | ||
} | ||
|
||
private String readSchemaByDescTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private String readSchemaByDescTable( | |
private String buildSchemaByDescTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
String showCreateTable = readSchemaByShowCreateTable(jdbc, tableId, tableChangeMap); | ||
if (!tableChangeMap.containsKey(tableId)) { | ||
// fallback to desc table | ||
String descTable = readSchemaByDescTable(jdbc, tableId, tableChangeMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return a command string is wired with the function name, we can define private static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE "
and private static final String DESC_TABLE = "DESC "
and then construct the showCreateTable
and descTable
sql.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank, have fixed based on your suggestion .
/** used to get filed info by desc table. */ | ||
class MySqlFieldDesc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import static org.junit.Assert.assertNotNull; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
/** tests for {@link MySqlSource}. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given a meaningful note
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my fault, fixed
private static final String PASSWORD = "123456"; | ||
private static final String DATABASE = "polardbx_ddl_test"; | ||
private static final DockerImageName POLARDBX_IMAGE = | ||
DockerImageName.parse("polardbx/polardb-x:latest"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we use a fixed version for test stability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your review, we provide a stable version(2.0.1)
* Executes a JDBC statement using the default jdbc config without autocommitting the | ||
* connection. | ||
*/ | ||
protected static void initializePolarxTables(String databaseName) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor:
protected static void initializePolarxTables(String databaseName) throws InterruptedException { | |
protected static void initializePolardbxTables(String databaseName) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's "initializePolardbxTables" now
new String[] { | ||
"+I[4, 1002, 2, 106, 2022-01-16T00:00]", | ||
"+I[5, 1003, 1, 107, 2022-01-16T00:00]", | ||
"+I[2, 1002, 2, 105, 2022-01-16T00:00]", | ||
"+I[3, 1004, 3, 109, 2022-01-16T00:00]", | ||
"+I[1, 1001, 1, 102, 2022-01-16T00:00]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: keep the record in id order as we use assertEqualsInAnyOrder
next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the order has been fixed
TableResult tableResult = tEnv.executeSql("select * from polardbx_full_types"); | ||
CloseableIterator<Row> iterator = tableResult.collect(); | ||
List<String> realSnapshotData = fetchRows(iterator, 1); | ||
assertEquals(1, realSnapshotData.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's compare the result instead of the size of result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks and done
assertEqualsInAnyOrder( | ||
expectedSnapshotData, TestValuesTableFactory.getRawResults("multi_key_sink")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In above three cases we can validate binlog reading as well just like other ConnectorITCase, we can make sure the polardbx works well under snapshot phase and binlog phase in this way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for suggestion, I have added some binlog reading cases, it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Cleverdada Thanks for the update, looks generally good to me, I only left one minor comment
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote; | ||
|
||
/** used to generate table define in ddl with "desc table". */ | ||
public class MySqlTableDefine { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: definition
instead of define
as it's a verb
} | ||
|
||
/** used to generate field define in ddl with "desc table". */ | ||
class MySqlFieldDefine { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Cleverdada for the great work, LGTM
Add fallback with "desc table" when parsing ddl failed to support the databases who supports MySQL protocol