From d9ea355752bd64a81b2e605df58c1d31fc855c85 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Tue, 13 Jun 2023 16:08:31 +0800 Subject: [PATCH] [mongodb] Add support for mongodb+srv connection protocol. (#2203) --- docs/content/connectors/mongodb-cdc(ZH).md | 7 +++++++ docs/content/connectors/mongodb-cdc.md | 7 +++++++ .../cdc/connectors/mongodb/MongoDBSource.java | 19 ++++++++++++++++--- .../mongodb/internal/MongoDBEnvelope.java | 2 ++ .../mongodb/source/MongoDBSourceBuilder.java | 6 ++++++ .../source/config/MongoDBSourceConfig.java | 12 ++++++++++-- .../config/MongoDBSourceConfigFactory.java | 16 ++++++++++++++++ .../source/config/MongoDBSourceOptions.java | 10 ++++++++++ .../reader/fetch/MongoDBScanFetchTask.java | 1 + .../reader/fetch/MongoDBStreamFetchTask.java | 3 ++- .../source/utils/MongoRecordUtils.java | 8 ++++---- .../mongodb/source/utils/MongoUtils.java | 9 ++++----- .../mongodb/table/MongoDBTableSource.java | 12 +++++++++++- .../table/MongoDBTableSourceFactory.java | 4 ++++ .../mongodb/LegacyMongoDBSourceTest.java | 16 +++++++++------- .../table/MongoDBTableFactoryTest.java | 6 ++++++ 16 files changed, 115 insertions(+), 23 deletions(-) diff --git a/docs/content/connectors/mongodb-cdc(ZH).md b/docs/content/connectors/mongodb-cdc(ZH).md index e3282d10c9..62acfbe9f3 100644 --- a/docs/content/connectors/mongodb-cdc(ZH).md +++ b/docs/content/connectors/mongodb-cdc(ZH).md @@ -146,6 +146,13 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为 String 指定要使用的连接器,此处应为 mongodb-cdc. + + scheme + optional + mongodb + String + 指定 MongoDB 连接协议。 eg. mongodb or mongodb+srv. + hosts required diff --git a/docs/content/connectors/mongodb-cdc.md b/docs/content/connectors/mongodb-cdc.md index f1aaa3e63b..2eccc28891 100644 --- a/docs/content/connectors/mongodb-cdc.md +++ b/docs/content/connectors/mongodb-cdc.md @@ -146,6 +146,13 @@ Connector Options String Specify what connector to use, here should be mongodb-cdc. + + scheme + optional + mongodb + String + The protocol connected to MongoDB. eg. mongodb or mongodb+srv. + hosts required diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java index 187279194c..a98bd3b1c5 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java @@ -36,12 +36,15 @@ import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME; import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString; import static org.apache.flink.util.Preconditions.checkArgument; @@ -63,7 +66,7 @@ public static Builder builder() { /** Builder class of {@link MongoDBSource}. */ public static class Builder { - + private String scheme = SCHEME.defaultValue(); private String hosts; private String username; private String password; @@ -81,6 +84,17 @@ public static class Builder { private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); private DebeziumDeserializationSchema deserializer; + /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */ + public Builder scheme(String scheme) { + checkArgument( + MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme), + String.format( + "The scheme should either be %s or %s", + MONGODB_SCHEME, MONGODB_SRV_SCHEME)); + this.scheme = scheme; + return this; + } + /** The comma-separated list of hostname and port pairs of mongodb servers. */ public Builder hosts(String hosts) { this.hosts = hosts; @@ -260,8 +274,7 @@ public DebeziumSourceFunction build() { props.setProperty( MongoSourceConfig.CONNECTION_URI_CONFIG, - String.valueOf( - buildConnectionString(username, password, hosts, connectionOptions))); + buildConnectionString(username, password, scheme, hosts, connectionOptions)); if (databaseList != null) { props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList)); diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java index 312018d998..a95bf9213d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java @@ -41,6 +41,8 @@ public class MongoDBEnvelope { public static final String MONGODB_SCHEME = "mongodb"; + public static final String MONGODB_SRV_SCHEME = "mongodb+srv"; + public static final String ID_FIELD = "_id"; public static final String DATA_FIELD = "_data"; diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index 54281e1408..8a2f76b6af 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -52,6 +52,12 @@ public class MongoDBSourceBuilder { private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory(); private DebeziumDeserializationSchema deserializer; + /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */ + public MongoDBSourceBuilder scheme(String scheme) { + this.configFactory.scheme(scheme); + return this; + } + /** The comma-separated list of hostname and port pairs of mongodb servers. */ public MongoDBSourceBuilder hosts(String hosts) { this.configFactory.hosts(hosts); diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java index 6e8134204b..9f0e43dbcf 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java @@ -33,6 +33,7 @@ public class MongoDBSourceConfig implements SourceConfig { private static final long serialVersionUID = 1L; + private final String scheme; private final String hosts; @Nullable private final String username; @Nullable private final String password; @@ -49,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig { private final int splitSizeMB; MongoDBSourceConfig( + String scheme, String hosts, @Nullable String username, @Nullable String password, @@ -63,14 +65,14 @@ public class MongoDBSourceConfig implements SourceConfig { int heartbeatIntervalMillis, int splitMetaGroupSize, int splitSizeMB) { + this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); this.username = username; this.password = password; this.databaseList = databaseList; this.collectionList = collectionList; this.connectionString = - buildConnectionString(username, password, hosts, connectionOptions) - .getConnectionString(); + buildConnectionString(username, password, scheme, hosts, connectionOptions); this.batchSize = batchSize; this.pollAwaitTimeMillis = pollAwaitTimeMillis; this.pollMaxBatchSize = pollMaxBatchSize; @@ -81,6 +83,10 @@ public class MongoDBSourceConfig implements SourceConfig { this.splitSizeMB = splitSizeMB; } + public String getScheme() { + return scheme; + } + public String getHosts() { return hosts; } @@ -166,6 +172,7 @@ public boolean equals(Object o) { && heartbeatIntervalMillis == that.heartbeatIntervalMillis && splitMetaGroupSize == that.splitMetaGroupSize && splitSizeMB == that.splitSizeMB + && Objects.equals(scheme, that.scheme) && Objects.equals(hosts, that.hosts) && Objects.equals(username, that.username) && Objects.equals(password, that.password) @@ -177,6 +184,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( + scheme, hosts, username, password, diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java index ec1cd08814..9ff96a6e09 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java @@ -25,11 +25,14 @@ import java.util.List; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -39,6 +42,7 @@ public class MongoDBSourceConfigFactory implements Factory private static final long serialVersionUID = 1L; + private String scheme = SCHEME.defaultValue(); private String hosts; private String username; private String password; @@ -54,6 +58,17 @@ public class MongoDBSourceConfigFactory implements Factory private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue(); private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue(); + /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */ + public MongoDBSourceConfigFactory scheme(String scheme) { + checkArgument( + MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme), + String.format( + "The scheme should either be %s or %s", + MONGODB_SCHEME, MONGODB_SRV_SCHEME)); + this.scheme = scheme; + return this; + } + /** The comma-separated list of hostname and port pairs of mongodb servers. */ public MongoDBSourceConfigFactory hosts(String hosts) { this.hosts = hosts; @@ -196,6 +211,7 @@ public void validate() { @Override public MongoDBSourceConfig create(int subtaskId) { return new MongoDBSourceConfig( + scheme, hosts, username, password, diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java index d1675d96c7..9032a6f71d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java @@ -20,9 +20,19 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME; + /** Configurations for {@link com.ververica.cdc.connectors.mongodb.source.MongoDBSource}. */ public class MongoDBSourceOptions { + public static final ConfigOption SCHEME = + ConfigOptions.key("scheme") + .stringType() + .defaultValue(MONGODB_SCHEME) + .withDescription( + "The protocol connected to MongoDB. eg. mongodb or mongodb+srv. " + + "The +srv indicates to the client that the hostname that follows corresponds to a DNS SRV record. Defaults to mongodb."); + public static final ConfigOption HOSTS = ConfigOptions.key("hosts") .stringType() diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java index 46a6d60942..6a30ce96b0 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java @@ -130,6 +130,7 @@ public void execute(Context context) throws Exception { SourceRecord snapshotRecord = createSourceRecord( createPartitionMap( + sourceConfig.getScheme(), sourceConfig.getHosts(), collectionId.catalog(), collectionId.table()), diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index 1e20991fbd..c4b818ed0d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -145,6 +145,7 @@ public void execute(Context context) throws Exception { changeRecord = createSourceRecord( createPartitionMap( + sourceConfig.getScheme(), sourceConfig.getHosts(), namespace.getDatabaseName(), namespace.getCollectionName()), @@ -277,7 +278,7 @@ private HeartbeatManager openHeartbeatManagerIfNeeded( changeStreamCursor, sourceConfig.getHeartbeatIntervalMillis(), HEARTBEAT_TOPIC_NAME, - createHeartbeatPartitionMap(sourceConfig.getHosts())); + createHeartbeatPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts())); } return null; } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java index 225467526a..6c7ec7fbd3 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java @@ -175,9 +175,9 @@ public static Map createSourceOffsetMap( } public static Map createPartitionMap( - String hosts, String database, String collection) { + String scheme, String hosts, String database, String collection) { StringBuilder builder = new StringBuilder(); - builder.append("mongodb://"); + builder.append(String.format("%s://", scheme)); builder.append(hosts); builder.append("/"); if (StringUtils.isNotEmpty(database)) { @@ -190,9 +190,9 @@ public static Map createPartitionMap( return singletonMap(NAMESPACE_FIELD, builder.toString()); } - public static Map createHeartbeatPartitionMap(String hosts) { + public static Map createHeartbeatPartitionMap(String scheme, String hosts) { StringBuilder builder = new StringBuilder(); - builder.append("mongodb://"); + builder.append(String.format("%s://", scheme)); builder.append(hosts); builder.append("/"); builder.append(HEARTBEAT_TOPIC_NAME); diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java index 2f50fe8092..6a85b39333 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java @@ -16,7 +16,6 @@ package com.ververica.cdc.connectors.mongodb.source.utils; -import com.mongodb.ConnectionString; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoChangeStreamCursor; import com.mongodb.client.MongoClient; @@ -56,7 +55,6 @@ import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.DROPPED_FIELD; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.ID_FIELD; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.KEY_FIELD; -import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_FIELD; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.UUID_FIELD; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; @@ -345,12 +343,13 @@ public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) { return MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig); } - public static ConnectionString buildConnectionString( + public static String buildConnectionString( @Nullable String username, @Nullable String password, + String scheme, String hosts, @Nullable String connectionOptions) { - StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://"); + StringBuilder sb = new StringBuilder(scheme).append("://"); if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) { sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@"); @@ -362,6 +361,6 @@ public static ConnectionString buildConnectionString( sb.append("/?").append(connectionOptions); } - return new ConnectionString(sb.toString()); + return sb.toString(); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index 4be17dae5b..cfce1b8227 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -62,6 +62,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private static final Logger LOG = LoggerFactory.getLogger(MongoDBTableSource.class); private final ResolvedSchema physicalSchema; + private final String scheme; private final String hosts; private final String connectionOptions; private final String username; @@ -91,6 +92,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad public MongoDBTableSource( ResolvedSchema physicalSchema, + String scheme, String hosts, @Nullable String username, @Nullable String password, @@ -108,6 +110,7 @@ public MongoDBTableSource( @Nullable Integer splitMetaGroupSize, @Nullable Integer splitSizeMB) { this.physicalSchema = physicalSchema; + this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); this.username = username; this.password = password; @@ -172,7 +175,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { if (enableParallelRead) { MongoDBSourceBuilder builder = - MongoDBSource.builder().hosts(hosts).deserializer(deserializer); + MongoDBSource.builder() + .scheme(scheme) + .hosts(hosts) + .deserializer(deserializer); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); @@ -192,6 +198,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } else { com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder builder = com.ververica.cdc.connectors.mongodb.MongoDBSource.builder() + .scheme(scheme) .hosts(hosts) .deserializer(deserializer); @@ -248,6 +255,7 @@ public DynamicTableSource copy() { MongoDBTableSource source = new MongoDBTableSource( physicalSchema, + scheme, hosts, username, password, @@ -279,6 +287,7 @@ public boolean equals(Object o) { } MongoDBTableSource that = (MongoDBTableSource) o; return Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(scheme, that.scheme) && Objects.equals(hosts, that.hosts) && Objects.equals(username, that.username) && Objects.equals(password, that.password) @@ -303,6 +312,7 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash( physicalSchema, + scheme, hosts, username, password, diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index 72f3d9dc9b..fd48a20005 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -46,6 +46,7 @@ import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME; import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; import static org.apache.flink.util.Preconditions.checkArgument; @@ -65,6 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final ReadableConfig config = helper.getOptions(); + String scheme = config.get(SCHEME); String hosts = config.get(HOSTS); String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null); @@ -103,6 +105,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { return new MongoDBTableSource( physicalSchema, + scheme, hosts, username, password, @@ -142,6 +145,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); + options.add(SCHEME); options.add(USERNAME); options.add(PASSWORD); options.add(CONNECTION_OPTIONS); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java index 556b71a6ba..08b9487570 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java @@ -354,17 +354,19 @@ public void go() throws Exception { public void testConnectionUri() { String hosts = MONGODB_CONTAINER.getHostAndPort(); - ConnectionString case0 = buildConnectionString(null, null, hosts, null); - assertEquals(String.format("mongodb://%s", hosts), case0.toString()); + String case0 = buildConnectionString(null, null, "mongodb", hosts, null); + assertEquals(String.format("mongodb://%s", hosts), case0); - ConnectionString case1 = buildConnectionString("", null, hosts, null); - assertEquals(String.format("mongodb://%s", hosts), case1.toString()); + String case1 = buildConnectionString("", null, "mongodb", hosts, null); + assertEquals(String.format("mongodb://%s", hosts), case1); - ConnectionString case2 = buildConnectionString(null, "", hosts, null); - assertEquals(String.format("mongodb://%s", hosts), case2.toString()); + String case2 = buildConnectionString(null, "", "mongodb+srv", "localhost", null); + assertEquals("mongodb+srv://localhost", case2); ConnectionString case3 = - buildConnectionString(FLINK_USER, FLINK_USER_PASSWORD, hosts, null); + new ConnectionString( + buildConnectionString( + FLINK_USER, FLINK_USER_PASSWORD, "mongodb", hosts, null)); assertEquals(FLINK_USER, case3.getUsername()); assertEquals(FLINK_USER_PASSWORD, new String(case3.getPassword())); } diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index 654ace41c9..f7b84e193e 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -45,6 +45,7 @@ import java.util.Map; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; @@ -52,6 +53,7 @@ import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME; import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -111,6 +113,7 @@ public void testCommonProperties() { MongoDBTableSource expectedSource = new MongoDBTableSource( SCHEMA, + SCHEME.defaultValue(), MY_HOSTS, USER, PASSWORD, @@ -133,6 +136,7 @@ public void testCommonProperties() { @Test public void testOptionalProperties() { Map options = getAllOptions(); + options.put("scheme", MONGODB_SRV_SCHEME); options.put("connection.options", "replicaSet=test&connectTimeoutMS=300000"); options.put("copy.existing", "false"); options.put("copy.existing.queue.size", "100"); @@ -148,6 +152,7 @@ public void testOptionalProperties() { MongoDBTableSource expectedSource = new MongoDBTableSource( SCHEMA, + MONGODB_SRV_SCHEME, MY_HOSTS, USER, PASSWORD, @@ -182,6 +187,7 @@ public void testMetadataColumns() { MongoDBTableSource expectedSource = new MongoDBTableSource( ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), + SCHEME.defaultValue(), MY_HOSTS, USER, PASSWORD,