diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java index 3b53333c0b..e23d43db07 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java @@ -98,15 +98,15 @@ public SelectorsBuilder includeTables(String tableInclusions) { Predicates.setOf( tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str); for (String tableSplit : tableSplitSet) { - Set tableIdSet = - Predicates.setOf( + List tableIdList = + Predicates.listOf( tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str); - Iterator iterator = tableIdSet.iterator(); - if (tableIdSet.size() == 1) { + Iterator iterator = tableIdList.iterator(); + if (tableIdList.size() == 1) { selectors.add(new Selector(null, null, iterator.next())); - } else if (tableIdSet.size() == 2) { + } else if (tableIdList.size() == 2) { selectors.add(new Selector(null, iterator.next(), iterator.next())); - } else if (tableIdSet.size() == 3) { + } else if (tableIdList.size() == 3) { selectors.add(new Selector(iterator.next(), iterator.next(), iterator.next())); } else { throw new IllegalArgumentException( diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Predicates.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Predicates.java index 570e87418b..f7b8ba9515 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Predicates.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Predicates.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.Set; @@ -78,6 +79,21 @@ public static Set setOf( return matches; } + public static List listOf( + String input, Function splitter, Function factory) { + if (input == null) { + return Collections.emptyList(); + } + List matches = new LinkedList<>(); + for (String item : splitter.apply(input)) { + T obj = factory.apply(item); + if (obj != null) { + matches.add(obj); + } + } + return matches; + } + protected static Function> matchedByPattern( Collection patterns, Function conversion) { return (t) -> { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/schema/SelectorsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/schema/SelectorsTest.java index 88b41c897e..ff10366c61 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/schema/SelectorsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/schema/SelectorsTest.java @@ -32,9 +32,10 @@ public void testTableSelector() { // nameSpace, schemaName, tableName Selectors selectors = new Selectors.SelectorsBuilder() - .includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+") + .includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+,db.sc1.sc1") .build(); + assertAllowed(selectors, "db", "sc1", "sc1"); assertAllowed(selectors, "db", "sc1", "A1"); assertAllowed(selectors, "db", "sc1", "A2"); assertAllowed(selectors, "db", "sc2", "B0"); @@ -50,9 +51,12 @@ public void testTableSelector() { selectors = new Selectors.SelectorsBuilder() - .includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+") + .includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+,db\\..sc1.sc1,db.sc1.sc1") .build(); + assertAllowed(selectors, "db", "sc1", "sc1"); + assertAllowed(selectors, "db1", "sc1", "sc1"); + assertAllowed(selectors, "dba", "sc1", "sc1"); assertAllowed(selectors, "db1", "sc1", "A1"); assertAllowed(selectors, "dba", "sc1", "A2"); assertAllowed(selectors, "db", "sc2", "B0"); @@ -68,8 +72,11 @@ public void testTableSelector() { // schemaName, tableName selectors = - new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build(); + new Selectors.SelectorsBuilder() + .includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1") + .build(); + assertAllowed(selectors, null, "sc1", "sc1"); assertAllowed(selectors, null, "sc1", "A1"); assertAllowed(selectors, null, "sc1", "A2"); assertAllowed(selectors, null, "sc2", "B0"); @@ -82,8 +89,12 @@ public void testTableSelector() { assertNotAllowed(selectors, null, "sc1A", "A1"); // tableName - selectors = new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+").build(); + selectors = + new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+,sc1").build(); + assertAllowed(selectors, null, null, "sc1"); + assertNotAllowed(selectors, "db", "sc1", "sc1"); + assertNotAllowed(selectors, null, "sc1", "sc1"); assertAllowed(selectors, null, null, "1A1"); assertAllowed(selectors, null, null, "AA2"); assertAllowed(selectors, null, null, "B0"); @@ -94,8 +105,11 @@ public void testTableSelector() { assertNotAllowed(selectors, null, null, "2B"); selectors = - new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build(); + new Selectors.SelectorsBuilder() + .includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1") + .build(); + assertAllowed(selectors, null, "sc1", "sc1"); assertAllowed(selectors, null, "sc1", "A1"); assertAllowed(selectors, null, "sc1", "A2"); assertAllowed(selectors, null, "sc1", "A2"); @@ -107,6 +121,15 @@ public void testTableSelector() { assertNotAllowed(selectors, null, "sc2", "B2"); assertNotAllowed(selectors, null, "sc11", "A1"); assertNotAllowed(selectors, null, "sc1A", "A1"); + + selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc1").build(); + assertAllowed(selectors, null, "sc1", "sc1"); + + selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc[0-9]+").build(); + assertAllowed(selectors, null, "sc1", "sc1"); + + selectors = new Selectors.SelectorsBuilder().includeTables("sc1.\\.*").build(); + assertAllowed(selectors, null, "sc1", "sc1"); } protected void assertAllowed( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 59dcdb6cfc..68ea425a75 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -24,6 +24,9 @@ import org.junit.Test; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -124,6 +127,44 @@ public void testExcludeAllTable() { + tableExclude); } + @Test + public void testDatabaseAndTableWithTheSameName() throws SQLException { + inventoryDatabase.createAndInitialize(); + // create a table with the same name of database + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + String createSameNameTableSql = + String.format( + "CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n" + + " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512)\n" + + ");", + inventoryDatabase.getDatabaseName(), + inventoryDatabase.getDatabaseName()); + + statement.execute(createSameNameTableSql); + } + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put( + TABLES.key(), + inventoryDatabase.getDatabaseName() + "." + inventoryDatabase.getDatabaseName()); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + assertThat(dataSource.getSourceConfig().getTableList()) + .isEqualTo( + Arrays.asList( + inventoryDatabase.getDatabaseName() + + "." + + inventoryDatabase.getDatabaseName())); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration;