Skip to content

Commit

Permalink
[mongodb] Add support for mongodb+srv connection protocol. (apache#2203)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiabao-Sun committed Jun 13, 2023
1 parent 7c4c407 commit d9ea355
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 23 deletions.
7 changes: 7 additions & 0 deletions docs/content/connectors/mongodb-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>String</td>
<td>指定要使用的连接器,此处应为 <code>mongodb-cdc</code>.</td>
</tr>
<tr>
<td>scheme</td>
<td>optional</td>
<td style="word-wrap: break-word;">mongodb</td>
<td>String</td>
<td>指定 MongoDB 连接协议。 eg. <code>mongodb or mongodb+srv.</code></td>
</tr>
<tr>
<td>hosts</td>
<td>required</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/connectors/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ Connector Options
<td>String</td>
<td>Specify what connector to use, here should be <code>mongodb-cdc</code>.</td>
</tr>
<tr>
<td>scheme</td>
<td>optional</td>
<td style="word-wrap: break-word;">mongodb</td>
<td>String</td>
<td>The protocol connected to MongoDB. eg. <code>mongodb or mongodb+srv.</code></td>
</tr>
<tr>
<td>hosts</td>
<td>required</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
import static org.apache.flink.util.Preconditions.checkArgument;

Expand All @@ -63,7 +66,7 @@ public static <T> Builder<T> builder() {

/** Builder class of {@link MongoDBSource}. */
public static class Builder<T> {

private String scheme = SCHEME.defaultValue();
private String hosts;
private String username;
private String password;
Expand All @@ -81,6 +84,17 @@ public static class Builder<T> {
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;

/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public Builder<T> scheme(String scheme) {
checkArgument(
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
String.format(
"The scheme should either be %s or %s",
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
this.scheme = scheme;
return this;
}

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public Builder<T> hosts(String hosts) {
this.hosts = hosts;
Expand Down Expand Up @@ -260,8 +274,7 @@ public DebeziumSourceFunction<T> build() {

props.setProperty(
MongoSourceConfig.CONNECTION_URI_CONFIG,
String.valueOf(
buildConnectionString(username, password, hosts, connectionOptions)));
buildConnectionString(username, password, scheme, hosts, connectionOptions));

if (databaseList != null) {
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class MongoDBEnvelope {

public static final String MONGODB_SCHEME = "mongodb";

public static final String MONGODB_SRV_SCHEME = "mongodb+srv";

public static final String ID_FIELD = "_id";

public static final String DATA_FIELD = "_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public class MongoDBSourceBuilder<T> {
private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
private DebeziumDeserializationSchema<T> deserializer;

/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceBuilder<T> scheme(String scheme) {
this.configFactory.scheme(scheme);
return this;
}

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceBuilder<T> hosts(String hosts) {
this.configFactory.hosts(hosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class MongoDBSourceConfig implements SourceConfig {

private static final long serialVersionUID = 1L;

private final String scheme;
private final String hosts;
@Nullable private final String username;
@Nullable private final String password;
Expand All @@ -49,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private final int splitSizeMB;

MongoDBSourceConfig(
String scheme,
String hosts,
@Nullable String username,
@Nullable String password,
Expand All @@ -63,14 +65,14 @@ public class MongoDBSourceConfig implements SourceConfig {
int heartbeatIntervalMillis,
int splitMetaGroupSize,
int splitSizeMB) {
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
this.databaseList = databaseList;
this.collectionList = collectionList;
this.connectionString =
buildConnectionString(username, password, hosts, connectionOptions)
.getConnectionString();
buildConnectionString(username, password, scheme, hosts, connectionOptions);
this.batchSize = batchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
this.pollMaxBatchSize = pollMaxBatchSize;
Expand All @@ -81,6 +83,10 @@ public class MongoDBSourceConfig implements SourceConfig {
this.splitSizeMB = splitSizeMB;
}

public String getScheme() {
return scheme;
}

public String getHosts() {
return hosts;
}
Expand Down Expand Up @@ -166,6 +172,7 @@ public boolean equals(Object o) {
&& heartbeatIntervalMillis == that.heartbeatIntervalMillis
&& splitMetaGroupSize == that.splitMetaGroupSize
&& splitSizeMB == that.splitSizeMB
&& Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
Expand All @@ -177,6 +184,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
scheme,
hosts,
username,
password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import java.util.List;

import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -39,6 +42,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>

private static final long serialVersionUID = 1L;

private String scheme = SCHEME.defaultValue();
private String hosts;
private String username;
private String password;
Expand All @@ -54,6 +58,17 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();

/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceConfigFactory scheme(String scheme) {
checkArgument(
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
String.format(
"The scheme should either be %s or %s",
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
this.scheme = scheme;
return this;
}

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceConfigFactory hosts(String hosts) {
this.hosts = hosts;
Expand Down Expand Up @@ -196,6 +211,7 @@ public void validate() {
@Override
public MongoDBSourceConfig create(int subtaskId) {
return new MongoDBSourceConfig(
scheme,
hosts,
username,
password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;

/** Configurations for {@link com.ververica.cdc.connectors.mongodb.source.MongoDBSource}. */
public class MongoDBSourceOptions {

public static final ConfigOption<String> SCHEME =
ConfigOptions.key("scheme")
.stringType()
.defaultValue(MONGODB_SCHEME)
.withDescription(
"The protocol connected to MongoDB. eg. mongodb or mongodb+srv. "
+ "The +srv indicates to the client that the hostname that follows corresponds to a DNS SRV record. Defaults to mongodb.");

public static final ConfigOption<String> HOSTS =
ConfigOptions.key("hosts")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void execute(Context context) throws Exception {
SourceRecord snapshotRecord =
createSourceRecord(
createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
collectionId.catalog(),
collectionId.table()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void execute(Context context) throws Exception {
changeRecord =
createSourceRecord(
createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
namespace.getDatabaseName(),
namespace.getCollectionName()),
Expand Down Expand Up @@ -277,7 +278,7 @@ private HeartbeatManager openHeartbeatManagerIfNeeded(
changeStreamCursor,
sourceConfig.getHeartbeatIntervalMillis(),
HEARTBEAT_TOPIC_NAME,
createHeartbeatPartitionMap(sourceConfig.getHosts()));
createHeartbeatPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts()));
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public static Map<String, String> createSourceOffsetMap(
}

public static Map<String, String> createPartitionMap(
String hosts, String database, String collection) {
String scheme, String hosts, String database, String collection) {
StringBuilder builder = new StringBuilder();
builder.append("mongodb://");
builder.append(String.format("%s://", scheme));
builder.append(hosts);
builder.append("/");
if (StringUtils.isNotEmpty(database)) {
Expand All @@ -190,9 +190,9 @@ public static Map<String, String> createPartitionMap(
return singletonMap(NAMESPACE_FIELD, builder.toString());
}

public static Map<String, Object> createHeartbeatPartitionMap(String hosts) {
public static Map<String, Object> createHeartbeatPartitionMap(String scheme, String hosts) {
StringBuilder builder = new StringBuilder();
builder.append("mongodb://");
builder.append(String.format("%s://", scheme));
builder.append(hosts);
builder.append("/");
builder.append(HEARTBEAT_TOPIC_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.ververica.cdc.connectors.mongodb.source.utils;

import com.mongodb.ConnectionString;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
Expand Down Expand Up @@ -56,7 +55,6 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.DROPPED_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.ID_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.KEY_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.UUID_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
Expand Down Expand Up @@ -345,12 +343,13 @@ public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) {
return MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig);
}

public static ConnectionString buildConnectionString(
public static String buildConnectionString(
@Nullable String username,
@Nullable String password,
String scheme,
String hosts,
@Nullable String connectionOptions) {
StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
StringBuilder sb = new StringBuilder(scheme).append("://");

if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
Expand All @@ -362,6 +361,6 @@ public static ConnectionString buildConnectionString(
sb.append("/?").append(connectionOptions);
}

return new ConnectionString(sb.toString());
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private static final Logger LOG = LoggerFactory.getLogger(MongoDBTableSource.class);

private final ResolvedSchema physicalSchema;
private final String scheme;
private final String hosts;
private final String connectionOptions;
private final String username;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad

public MongoDBTableSource(
ResolvedSchema physicalSchema,
String scheme,
String hosts,
@Nullable String username,
@Nullable String password,
Expand All @@ -108,6 +110,7 @@ public MongoDBTableSource(
@Nullable Integer splitMetaGroupSize,
@Nullable Integer splitSizeMB) {
this.physicalSchema = physicalSchema;
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
Expand Down Expand Up @@ -172,7 +175,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {

if (enableParallelRead) {
MongoDBSourceBuilder<RowData> builder =
MongoDBSource.<RowData>builder().hosts(hosts).deserializer(deserializer);
MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.deserializer(deserializer);

Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Expand All @@ -192,6 +198,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
} else {
com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder =
com.ververica.cdc.connectors.mongodb.MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.deserializer(deserializer);

Expand Down Expand Up @@ -248,6 +255,7 @@ public DynamicTableSource copy() {
MongoDBTableSource source =
new MongoDBTableSource(
physicalSchema,
scheme,
hosts,
username,
password,
Expand Down Expand Up @@ -279,6 +287,7 @@ public boolean equals(Object o) {
}
MongoDBTableSource that = (MongoDBTableSource) o;
return Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
Expand All @@ -303,6 +312,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(
physicalSchema,
scheme,
hosts,
username,
password,
Expand Down
Loading

0 comments on commit d9ea355

Please sign in to comment.