Skip to content

Commit

Permalink
[mongodb] Add support for mongodb+srv connection protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiabao-Sun committed Jun 12, 2023
1 parent 7cdeb0d commit a2ce1a0
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 16 deletions.
7 changes: 7 additions & 0 deletions docs/content/connectors/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ Connector Options
<td>String</td>
<td>Specify what connector to use, here should be <code>mongodb-cdc</code>.</td>
</tr>
<tr>
<td>scheme</td>
<td>optional</td>
<td style="word-wrap: break-word;">mongodb</td>
<td>String</td>
<td>The protocol connected to MongoDB. eg. <code>mongodb or mongodb+srv.</code></td>
</tr>
<tr>
<td>hosts</td>
<td>required</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
Expand Down Expand Up @@ -63,7 +65,7 @@ public static <T> Builder<T> builder() {

/** Builder class of {@link MongoDBSource}. */
public static class Builder<T> {

private String scheme;
private String hosts;
private String username;
private String password;
Expand All @@ -81,6 +83,17 @@ public static class Builder<T> {
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;

/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public Builder<T> scheme(String scheme) {
checkArgument(
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
String.format(
"The scheme should either be %s or %s",
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
this.scheme = scheme;
return this;
}

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public Builder<T> hosts(String hosts) {
this.hosts = hosts;
Expand Down Expand Up @@ -261,7 +274,8 @@ public DebeziumSourceFunction<T> build() {
props.setProperty(
MongoSourceConfig.CONNECTION_URI_CONFIG,
String.valueOf(
buildConnectionString(username, password, hosts, connectionOptions)));
buildConnectionString(
username, password, scheme, hosts, connectionOptions)));

if (databaseList != null) {
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class MongoDBEnvelope {

public static final String MONGODB_SCHEME = "mongodb";

public static final String MONGODB_SRV_SCHEME = "mongodb+srv";

public static final String ID_FIELD = "_id";

public static final String DATA_FIELD = "_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public class MongoDBSourceBuilder<T> {
private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
private DebeziumDeserializationSchema<T> deserializer;

/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceBuilder<T> scheme(String scheme) {
this.configFactory.scheme(scheme);
return this;
}

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceBuilder<T> hosts(String hosts) {
this.configFactory.hosts(hosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class MongoDBSourceConfig implements SourceConfig {

private static final long serialVersionUID = 1L;

private final String scheme;
private final String hosts;
@Nullable private final String username;
@Nullable private final String password;
Expand All @@ -49,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private final int splitSizeMB;

MongoDBSourceConfig(
String scheme,
String hosts,
@Nullable String username,
@Nullable String password,
Expand All @@ -63,13 +65,14 @@ public class MongoDBSourceConfig implements SourceConfig {
int heartbeatIntervalMillis,
int splitMetaGroupSize,
int splitSizeMB) {
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
this.databaseList = databaseList;
this.collectionList = collectionList;
this.connectionString =
buildConnectionString(username, password, hosts, connectionOptions)
buildConnectionString(scheme, username, password, hosts, connectionOptions)
.getConnectionString();
this.batchSize = batchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
Expand All @@ -81,6 +84,10 @@ public class MongoDBSourceConfig implements SourceConfig {
this.splitSizeMB = splitSizeMB;
}

public String getScheme() {
return scheme;
}

public String getHosts() {
return hosts;
}
Expand Down Expand Up @@ -166,6 +173,7 @@ public boolean equals(Object o) {
&& heartbeatIntervalMillis == that.heartbeatIntervalMillis
&& splitMetaGroupSize == that.splitMetaGroupSize
&& splitSizeMB == that.splitSizeMB
&& Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
Expand All @@ -177,6 +185,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
scheme,
hosts,
username,
password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import java.util.List;

import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -39,6 +42,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>

private static final long serialVersionUID = 1L;

private String scheme = SCHEME.defaultValue();
private String hosts;
private String username;
private String password;
Expand All @@ -54,6 +58,17 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();

/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceConfigFactory scheme(String scheme) {
checkArgument(
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
String.format(
"The scheme should either be %s or %s",
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
this.scheme = scheme;
return this;
}

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceConfigFactory hosts(String hosts) {
this.hosts = hosts;
Expand Down Expand Up @@ -196,6 +211,7 @@ public void validate() {
@Override
public MongoDBSourceConfig create(int subtaskId) {
return new MongoDBSourceConfig(
scheme,
hosts,
username,
password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;

/** Configurations for {@link com.ververica.cdc.connectors.mongodb.source.MongoDBSource}. */
public class MongoDBSourceOptions {

public static final ConfigOption<String> SCHEME =
ConfigOptions.key("scheme")
.stringType()
.defaultValue(MONGODB_SCHEME)
.withDescription(
"The protocol connected to MongoDB. eg. mongodb or mongodb+srv. "
+ "The +srv indicates to the client that the hostname that follows corresponds to a DNS SRV record. Defaults to mongodb.");

public static final ConfigOption<String> HOSTS =
ConfigOptions.key("hosts")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void execute(Context context) throws Exception {
SourceRecord snapshotRecord =
createSourceRecord(
createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
collectionId.catalog(),
collectionId.table()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void execute(Context context) throws Exception {
changeRecord =
createSourceRecord(
createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
namespace.getDatabaseName(),
namespace.getCollectionName()),
Expand Down Expand Up @@ -277,7 +278,7 @@ private HeartbeatManager openHeartbeatManagerIfNeeded(
changeStreamCursor,
sourceConfig.getHeartbeatIntervalMillis(),
HEARTBEAT_TOPIC_NAME,
createHeartbeatPartitionMap(sourceConfig.getHosts()));
createHeartbeatPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts()));
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public static Map<String, String> createSourceOffsetMap(
}

public static Map<String, String> createPartitionMap(
String hosts, String database, String collection) {
String scheme, String hosts, String database, String collection) {
StringBuilder builder = new StringBuilder();
builder.append("mongodb://");
builder.append(String.format("%s://", scheme));
builder.append(hosts);
builder.append("/");
if (StringUtils.isNotEmpty(database)) {
Expand All @@ -190,9 +190,9 @@ public static Map<String, String> createPartitionMap(
return singletonMap(NAMESPACE_FIELD, builder.toString());
}

public static Map<String, Object> createHeartbeatPartitionMap(String hosts) {
public static Map<String, Object> createHeartbeatPartitionMap(String scheme, String hosts) {
StringBuilder builder = new StringBuilder();
builder.append("mongodb://");
builder.append(String.format("%s://", scheme));
builder.append(hosts);
builder.append("/");
builder.append(HEARTBEAT_TOPIC_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.DROPPED_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.ID_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.KEY_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.UUID_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
Expand Down Expand Up @@ -348,9 +347,10 @@ public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) {
public static ConnectionString buildConnectionString(
@Nullable String username,
@Nullable String password,
String scheme,
String hosts,
@Nullable String connectionOptions) {
StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
StringBuilder sb = new StringBuilder(scheme).append("://");

if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private static final Logger LOG = LoggerFactory.getLogger(MongoDBTableSource.class);

private final ResolvedSchema physicalSchema;
private final String scheme;
private final String hosts;
private final String connectionOptions;
private final String username;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad

public MongoDBTableSource(
ResolvedSchema physicalSchema,
String scheme,
String hosts,
@Nullable String username,
@Nullable String password,
Expand All @@ -108,6 +110,7 @@ public MongoDBTableSource(
@Nullable Integer splitMetaGroupSize,
@Nullable Integer splitSizeMB) {
this.physicalSchema = physicalSchema;
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
Expand Down Expand Up @@ -172,7 +175,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {

if (enableParallelRead) {
MongoDBSourceBuilder<RowData> builder =
MongoDBSource.<RowData>builder().hosts(hosts).deserializer(deserializer);
MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.deserializer(deserializer);

Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Expand All @@ -192,6 +198,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
} else {
com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder =
com.ververica.cdc.connectors.mongodb.MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.deserializer(deserializer);

Expand Down Expand Up @@ -248,6 +255,7 @@ public DynamicTableSource copy() {
MongoDBTableSource source =
new MongoDBTableSource(
physicalSchema,
scheme,
hosts,
username,
password,
Expand Down Expand Up @@ -279,6 +287,7 @@ public boolean equals(Object o) {
}
MongoDBTableSource that = (MongoDBTableSource) o;
return Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
Expand All @@ -303,6 +312,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(
physicalSchema,
scheme,
hosts,
username,
password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -65,6 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final ReadableConfig config = helper.getOptions();

String scheme = config.get(SCHEME);
String hosts = config.get(HOSTS);
String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);

Expand Down Expand Up @@ -103,6 +105,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

return new MongoDBTableSource(
physicalSchema,
scheme,
hosts,
username,
password,
Expand Down Expand Up @@ -142,6 +145,7 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCHEME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(CONNECTION_OPTIONS);
Expand Down
Loading

0 comments on commit a2ce1a0

Please sign in to comment.