Skip to content

Commit

Permalink
[mongodb][hotfix] Fix regex inference of database or collection only …
Browse files Browse the repository at this point in the history
…contains dash.
  • Loading branch information
Jiabao-Sun committed Jun 14, 2023
1 parent a95659e commit 8902607
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/** Utilities to discovery matched collections. */
public class CollectionDiscoveryUtils {

public static final String REGEX_META_CHARACTERS = ".$|()[]{}<>^?*+-=!\\";
public static final String REGEX_META_CHARACTERS = ".$|()[]{}<>^?*+=!\\";

public static final String ADD_NS_FIELD_NAME = "_ns_";

Expand All @@ -46,6 +46,10 @@ public class CollectionDiscoveryUtils {
"{'$addFields': {'%s': {'$concat': ['$ns.db', '.', '$ns.coll']}}}",
ADD_NS_FIELD_NAME));

private static final Pattern RANGE_PATTERN =
Pattern.compile(
"\\[(([a-z]-[a-z])|([A-Z]-[A-Z])|((0|[1-9][0-9]*)-(0|[1-9][0-9]*)))+\\]");

private CollectionDiscoveryUtils() {}

public static List<String> databaseNames(
Expand Down Expand Up @@ -194,6 +198,14 @@ public static boolean containsRegexMetaCharacters(String literal) {
return false;
}

public static boolean containsRegexRange(String literal) {
return RANGE_PATTERN.matcher(literal).find();
}

public static boolean inferIsRegularExpression(String literal) {
return containsRegexMetaCharacters(literal) || containsRegexRange(literal);
}

public static Pattern completionPattern(String pattern) {
if (pattern.startsWith("^") && pattern.endsWith("$")) {
return Pattern.compile(pattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

import static com.mongodb.MongoNamespace.checkCollectionNameValidity;
import static com.mongodb.MongoNamespace.checkDatabaseNameValidity;
import static com.ververica.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
import static com.ververica.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.inferIsRegularExpression;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -158,8 +158,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
String collectionList = null;
if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
// explicitly specified database and collection.
if (!containsRegexMetaCharacters(database)
&& !containsRegexMetaCharacters(collection)) {
if (!inferIsRegularExpression(database) && !inferIsRegularExpression(collection)) {
checkDatabaseNameValidity(database);
checkCollectionNameValidity(collection);
databaseList = database;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,26 @@ public void testMatchSingleDatabaseWithCollectionPattern() throws Exception {
result.getJobClient().get().cancel().get();
}

@Test
public void testMatchDatabaseAndCollectionContainsDash() throws Exception {
// 1. Given collections:
// db0: [coll-a1, coll-a2, coll-b1, coll-b2]
String db0 = ROUTER.executeCommandFileInSeparateDatabase("ns-regex");

TableResult result = submitTestCase(db0, "coll-a1");

// 2. Wait change stream records come
waitForSinkSize("mongodb_sink", 1);

// 3. Check results
String[] expected = new String[] {String.format("+I[%s, coll-a1, A101]", db0)};

List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));

result.getJobClient().get().cancel().get();
}

private TableResult submitTestCase(String database, String collection) throws Exception {
String sourceDDL =
"CREATE TABLE mongodb_source ("
Expand Down
17 changes: 17 additions & 0 deletions flink-connector-mongodb-cdc/src/test/resources/ddl/ns-regex.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 Ververica Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

db.getCollection('coll-a1').insertOne({"seq": "A101"});
db.getCollection('coll-a2').insertOne({"seq": "A201"});
db.getCollection('coll-b1').insertOne({"seq": "B101"});
db.getCollection('coll-b2').insertOne({"seq": "B201"});

0 comments on commit 8902607

Please sign in to comment.