Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-666 Supporting ordered snapshot using tables.whitelist config #513

Merged
merged 1 commit into from May 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -13,15 +13,16 @@
import java.sql.Statement;
import java.sql.Types;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -336,6 +337,20 @@ protected void execute() {
logger.warn("\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage());
}
}
/* To achieve an ordered snapshot, we would first get a list of Regex tables.whitelist regex patterns
+ and then sort the tableIds list based on the above list
+ */
List<Pattern> tableWhitelistPattern = Strings.listOfRegex(context.config().getString(MySqlConnectorConfig.TABLE_WHITELIST),Pattern.CASE_INSENSITIVE);
List<TableId> tableIdsSorted = new ArrayList<>();
tableWhitelistPattern.forEach(pattern -> {
List<TableId> tablesMatchedByPattern = tableIds.stream().filter(t -> pattern.asPredicate().test(t.toString()))
.collect(Collectors.toList());
tablesMatchedByPattern.forEach(t -> {
if (!tableIdsSorted.contains(t))
tableIdsSorted.add(t);
});
});
tableIds.sort(Comparator.comparing(tableIdsSorted::indexOf));
final Set<String> includedDatabaseNames = readableDatabaseNames.stream().filter(filters.databaseFilter()).collect(Collectors.toSet());
logger.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);

Expand Down Expand Up @@ -386,7 +401,7 @@ protected void execute() {
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);

// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
List<TableId> allTableIds = new ArrayList<>(schema.tables().tableIds());
allTableIds.addAll(tableIds);
allTableIds.stream()
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
Expand Down
Expand Up @@ -6,14 +6,17 @@
package io.debezium.connector.mysql;

import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;

import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -446,6 +449,65 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
fail("failed to complete the snapshot within 10 seconds");
}
}

@Test
public void shouldSnapshotTablesInOrderSpecifiedInTablesWhitelist() throws Exception{
config = simpleConfig()
.with(MySqlConnectorConfig.TABLE_WHITELIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest")
.build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
// Start the snapshot ...
reader.start();
// Poll for records ...
List<SourceRecord> records;
LinkedHashSet<String> tablesInOrder = new LinkedHashSet<>();
LinkedHashSet<String> tablesInOrderExpected = getTableNamesInSpecifiedOrder("orders", "Products", "products_on_hand", "dbz_342_timetest");
while ((records = reader.poll()) != null) {
records.forEach(record -> {
VerifyRecord.isValid(record);
if (record.value() != null)
tablesInOrder.add(getTableNameFromSourceRecord.apply(record));
});
}
assertArrayEquals(tablesInOrder.toArray(), tablesInOrderExpected.toArray());
}
@Test
public void shouldSnapshotTablesInLexicographicalOrder() throws Exception{
config = simpleConfig()
.build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
// Start the snapshot ...
reader.start();
// Poll for records ...
// Testing.Print.enable();
List<SourceRecord> records;
LinkedHashSet<String> tablesInOrder = new LinkedHashSet<>();
LinkedHashSet<String> tablesInOrderExpected = getTableNamesInSpecifiedOrder("Products", "customers", "dbz_342_timetest", "orders", "products_on_hand");
while ((records = reader.poll()) != null) {
records.forEach(record -> {
VerifyRecord.isValid(record);
if (record.value() != null)
tablesInOrder.add(getTableNameFromSourceRecord.apply(record));
});
}
assertArrayEquals(tablesInOrder.toArray(), tablesInOrderExpected.toArray());
}

private Function<SourceRecord, String> getTableNameFromSourceRecord = sourceRecord -> ((Struct) sourceRecord.value()).getStruct("source").getString("table");
private LinkedHashSet<String> getTableNamesInSpecifiedOrder(String ... tables){
LinkedHashSet<String> tablesInOrderExpected = new LinkedHashSet<>();
for (String table : tables)
tablesInOrderExpected.add(table);
return tablesInOrderExpected;
}

public void shouldCreateSnapshotSchemaOnly() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).build();
Expand Down
2 changes: 1 addition & 1 deletion debezium-core/src/main/java/io/debezium/config/Field.java
Expand Up @@ -1049,7 +1049,7 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut
int errors = 0;
if (value != null) {
try {
Strings.listOfRegex(value, Pattern.CASE_INSENSITIVE);
Strings.setOfRegex(value, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) {
problems.accept(field, value, "A comma-separated list of valid regular expressions is expected, but " + e.getMessage());
++errors;
Expand Down
Expand Up @@ -194,7 +194,7 @@ public static Predicate<String> excludes(String regexPatterns) {
* @throws PatternSyntaxException if the string includes an invalid regular expression
*/
public static <T> Predicate<T> includes(String regexPatterns, Function<T, String> conversion) {
Set<Pattern> patterns = Strings.listOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE);
Set<Pattern> patterns = Strings.setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE);
return includedInPatterns(patterns, conversion);
}

Expand All @@ -212,7 +212,7 @@ protected static <T> Predicate<T> includedInPatterns(Collection<Pattern> pattern
* @throws PatternSyntaxException if the string includes an invalid regular expression
*/
public static Function<String, Optional<Pattern>> matchedBy(String regexPatterns) {
return matchedByPattern(Strings.listOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE), Function.identity());
return matchedByPattern(Strings.setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE), Function.identity());
}

protected static <T> Function<T, Optional<Pattern>> matchedByPattern(Collection<Pattern> patterns, Function<T, String> conversion) {
Expand Down
Expand Up @@ -35,7 +35,7 @@ public final class ColumnId implements Comparable<ColumnId> {
* @return the predicate function; never null
*/
public static Map<TableId,Predicate<Column>> filter(String columnBlacklist) {
Set<ColumnId> columnExclusions = columnBlacklist == null ? null : Strings.listOf(columnBlacklist, ColumnId::parse);
Set<ColumnId> columnExclusions = columnBlacklist == null ? null : Strings.setOf(columnBlacklist, ColumnId::parse);
Map<TableId,Set<String>> excludedColumnNamesByTable = new HashMap<>();
columnExclusions.forEach(columnId->{
excludedColumnNamesByTable.compute(columnId.tableId(), (tableId,columns)->{
Expand Down
50 changes: 40 additions & 10 deletions debezium-core/src/main/java/io/debezium/util/Strings.java
Expand Up @@ -47,9 +47,9 @@ public final class Strings {
* @param input the input string
* @param splitter the function that splits the input into multiple items; may not be null
* @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null
* @return the set of objects included in the list; never null
*/
public static <T> Set<T> listOf(String input, Function<String, String[]> splitter, Function<String, T> factory) {
public static <T> Set<T> setOf(String input, Function<String, String[]> splitter, Function<String, T> factory) {
if (input == null) return Collections.emptySet();
Set<T> matches = new HashSet<>();
for (String item : splitter.apply(input)) {
Expand All @@ -59,27 +59,57 @@ public static <T> Set<T> listOf(String input, Function<String, String[]> splitte
return matches;
}

/**
* Generate the list of values that are included in the list.
*
* @param input the input string
* @param splitter the function that splits the input into multiple items; may not be null
* @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null
*/
public static <T> List<T> listOf(String input, Function<String, String[]> splitter, Function<String, T> factory) {
if (input == null) return Collections.emptyList();
List<T> matches = new ArrayList<T>();
for (String item : splitter.apply(input)) {
T obj = factory.apply(item);
if (obj != null) matches.add(obj);
}
return matches;
}

/**
* Generate the set of values that are included in the list delimited by the given delimiter.
*
* @param input the input string
* @param delimiter the character used to delimit the items in the input
* @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null
* @return the set of objects included in the list; never null
*/
public static <T> Set<T> listOf(String input, char delimiter, Function<String, T> factory) {
return listOf(input, (str) -> str.split("[" + delimiter + "]"), factory);
public static <T> Set<T> setOf(String input, char delimiter, Function<String, T> factory) {
return setOf(input, (str) -> str.split("[" + delimiter + "]"), factory);
}

/**
* Generate the set of values that are included in the list separated by commas.
*
* @param input the input string
* @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null
* @return the set of objects included in the list; never null
*/
public static <T> Set<T> setOf(String input, Function<String, T> factory) {
return setOf(input, ',', factory);
}

/**
* Generate the set of regular expression {@link Pattern}s that are specified in the string containing comma-separated
* regular expressions.
*
* @param input the input string with comma-separated regular expressions. Comma can be escaped with backslash.
* @return the set of regular expression {@link Pattern}s included within the given string; never null
* @throws PatternSyntaxException if the input includes an invalid regular expression
*/
public static <T> Set<T> listOf(String input, Function<String, T> factory) {
return listOf(input, ',', factory);
public static Set<Pattern> setOfRegex(String input, int regexFlags) {
return setOf(input, RegExSplitter::split, (str) -> Pattern.compile(str, regexFlags));
}

/**
Expand All @@ -91,7 +121,7 @@ public static <T> Set<T> listOf(String input, Function<String, T> factory) {
* @throws PatternSyntaxException if the input includes an invalid regular expression
*/
public static Set<Pattern> setOfRegex(String input) {
return listOf(input, RegExSplitter::split, Pattern::compile);
return setOf(input, RegExSplitter::split, Pattern::compile);
}

/**
Expand All @@ -105,7 +135,7 @@ public static Set<Pattern> setOfRegex(String input) {
* @throws IllegalArgumentException if bit values other than those corresponding to the defined
* match flags are set in {@code regexFlags}
*/
public static Set<Pattern> listOfRegex(String input, int regexFlags) {
public static List<Pattern> listOfRegex(String input, int regexFlags) {
return listOf(input, RegExSplitter::split, (str) -> Pattern.compile(str, regexFlags));
}

Expand Down
17 changes: 17 additions & 0 deletions debezium-core/src/test/java/io/debezium/util/StringsTest.java
Expand Up @@ -11,6 +11,7 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -276,6 +277,15 @@ public void regexSplit() {
+ "INSERT INTO mysql.rds_heartbeat2\\(.*\\,.*\\) values \\(.*\\,.*\\) ON DUPLICATE KEY UPDATE value = .*",
"DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/",
"INSERT INTO mysql.rds_heartbeat2\\(.*,.*\\) values \\(.*,.*\\) ON DUPLICATE KEY UPDATE value = .*");
assertRegexList("a,b", "a", "b");
assertRegexList("a\\,b", "a,b");
assertRegexList("a,b,", "a", "b");
assertRegexList("a,b\\,", "a", "b,");
assertRegexList("a\\\\\\,b", "a\\\\,b");
assertRegexList( "DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/,"
+ "INSERT INTO mysql.rds_heartbeat2\\(.*\\,.*\\) values \\(.*\\,.*\\) ON DUPLICATE KEY UPDATE value = .*",
"DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/",
"INSERT INTO mysql.rds_heartbeat2\\(.*,.*\\) values \\(.*,.*\\) ON DUPLICATE KEY UPDATE value = .*");
}

@Test(expected = ParsingException.class)
Expand Down Expand Up @@ -303,4 +313,11 @@ protected void assertRegexSet(String patterns, String... matches) {
.map(Pattern::pattern)
.collect(Collectors.toSet())).containsOnly((Object[])matches);
}

protected void assertRegexList(String patterns, String... matches) {
List<Pattern> regexList = Strings.listOfRegex(patterns, Pattern.CASE_INSENSITIVE);
assertThat(regexList.stream()
.map(Pattern::pattern)
.collect(Collectors.toList())).isEqualTo(Arrays.asList((Object[])matches));
}
}