Skip to content

Commit

Permalink
[mongodb] Support specific timestamp startup mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiabao-Sun committed Jun 13, 2023
1 parent afefe40 commit 68a62bd
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 75 deletions.
62 changes: 41 additions & 21 deletions docs/content/connectors/mongodb-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -201,30 +201,32 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>电流和分离 <a href="https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options">连接选项</a> of MongoDB. eg. <br>
<td><a href="https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options">MongoDB连接选项</a>。 例如: <br>
<code>replicaSet=test&connectTimeoutMS=300000</code>
</td>
</tr>
<tr>
<td>copy.existing</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>是否从源集合复制现有数据。</td>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td> MongoDB CDC 消费者可选的启动模式,
合法的模式为 "initial","latest-offset" 和 "timestamp"。
请查阅 <a href="#a-name-id-002-a">启动模式</a> 章节了解更多详细信息。</td>
</tr>
<tr>
<td>copy.existing.queue.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">10240</td>
<td>Integer</td>
<td>复制数据时要使用的队列的最大大小。</td>
<td>scan.startup.timestamp-millis</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>起始毫秒数, 仅适用于 <code>'timestamp'</code> 启动模式.</td>
</tr>
<tr>
<td>batch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>光标批次大小。</td>
<td>Cursor 批次大小。</td>
</tr>
<tr>
<td>poll.max.batch.size</td>
Expand All @@ -245,7 +247,7 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>发送检测信号消息之间的时间长度(以毫秒为单位)。使用 0 禁用。</td>
<td>心跳间隔(毫秒)。使用 0 禁用。</td>
</tr>
<tr>
<td>scan.incremental.snapshot.enabled</td>
Expand Down Expand Up @@ -330,20 +332,38 @@ CREATE TABLE products (

MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后在处理**甚至失败时继续读取带有**的更改流事件。

### Snapshot When Startup Or Not
### 启动模式<a name="启动模式" id="002" ></a>

配置选项 `copy.existing` 指定在 MongoDB CDC 消费者启动时是否执行快照。 <br>默认是 `true`.
配置选项```scan.startup.mode```指定 MySQL CDC 使用者的启动模式。有效枚举包括:

### 快照数据筛选器
- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 oplog。
- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 oplog 事件。

配置选项 `copy.existing.pipeline` 描述复制现有数据时的筛选器。<br>
这可以只过滤所需的数据,并改进复制管理器对索引的使用。
例如使用 DataStream API:
```java
MongoDBSource.builder()
.startupOptions(StartupOptions.latest()) // Start from latest offset
.startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
.build()
```

在下面的示例中,`$match` 聚合运算符确保只复制关闭字段设置为 false 的文档。
and with SQL:

```SQL
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
...
'scan.incremental.snapshot.enabled' = 'true', -- 指定时间戳启动,需要开启增量快照读
'scan.startup.mode' = 'timestamp', -- 指定时间戳启动模式
'scan.startup.timestamp-millis' = '1667232000000' -- 启动毫秒时间
...
)
```
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
```

**Notes:**
- 'timestamp' 指定时间戳启动模式,需要开启增量快照读。

### 更改流

Expand Down
54 changes: 41 additions & 13 deletions docs/content/connectors/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,19 @@ Connector Options
</td>
</tr>
<tr>
<td>copy.existing</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether copy existing data from source collections.</td>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td>Optional startup mode for MongoDB CDC consumer, valid enumerations are "initial", "latest-offset" and "timestamp".
Please see <a href="#startup-reading-position">Startup Reading Position</a> section for more detailed information.</td>
</tr>
<tr>
<td>scan.startup.timestamp-millis</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Timestamp in millis of the start point, only used for <code>'timestamp'</code> startup mode.</td>
</tr>
<tr>
<td>copy.existing.queue.size</td>
Expand Down Expand Up @@ -330,20 +338,40 @@ Features

The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with **exactly-once processing** even failures happen.

### Snapshot When Startup Or Not
### Startup Reading Position

The config option `copy.existing` specifies whether do snapshot when MongoDB CDC consumer startup. <br>Defaults to `true`.
The config option `scan.startup.mode` specifies the startup mode for MongoDB CDC consumer. The valid enumerations are:

### Snapshot Data Filters
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest oplog.
- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from
the end of the oplog which means only have the changes since the connector was started.
- `timestamp`: Skip snapshot phase and start reading oplog events from a specific timestamp.

The config option `copy.existing.pipeline` describing the filters when copying existing data.<br>
This can filter only required data and improve the use of indexes by the copying manager.
For example in DataStream API:
```java
MongoDBSource.builder()
.startupOptions(StartupOptions.latest()) // Start from latest offset
.startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
.build()
```

In the following example, the `$match` aggregation operator ensures that only documents in which the closed field is set to false are copied.
and with SQL:

```SQL
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.startup.mode' = 'latest-offset', -- Start from latest offset
...
'scan.incremental.snapshot.enabled' = 'true', -- To use timestamp startup mode should enable incremental snapshot.
'scan.startup.mode' = 'timestamp', -- Start from timestamp
'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under timestamp startup mode
...
)
```
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
```

**Notes:**
- 'timestamp' startup mode is not supported by legacy source. To use timestamp startup mode, you need to enable incremental snapshot.


### Change Streams

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
Expand Down Expand Up @@ -128,10 +129,34 @@ public void close() {}
// ------------------------------------------------------------------------------------------

public StreamSplit createStreamSplit() {
StartupOptions startupOptions = sourceConfig.getStartupOptions();

Offset startingOffset;
switch (startupOptions.startupMode) {
case LATEST_OFFSET:
startingOffset = dialect.displayCurrentOffset(sourceConfig);
break;
case EARLIEST_OFFSET:
startingOffset = offsetFactory.createInitialOffset();
break;
case TIMESTAMP:
startingOffset =
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
break;
case SPECIFIC_OFFSETS:
startingOffset =
offsetFactory.newOffset(
startupOptions.specificOffsetFile,
startupOptions.specificOffsetPos.longValue());
break;
default:
throw new IllegalStateException(
"Unsupported startup mode " + startupOptions.startupMode);
}

return new StreamSplit(
STREAM_SPLIT_ID,
dialect.displayCurrentOffset(sourceConfig),
startingOffset,
offsetFactory.createNoStoppingOffset(),
new ArrayList<>(),
new HashMap<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public OffsetFactory() {}

public abstract Offset newOffset(Long position);

public abstract Offset createTimestampOffset(long timestampMillis);

public abstract Offset createInitialOffset();

public abstract Offset createNoStoppingOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public BinlogOffset(String filename, long position) {
this(filename, position, 0L, 0L, 0L, null, null);
}

public BinlogOffset(long binlogEpochSecs) {
this(null, 0L, 0L, 0L, binlogEpochSecs, null, null);
}

public BinlogOffset(
String filename,
long position,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public Offset newOffset(Long position) {
throw new FlinkRuntimeException("not supported create new Offset by Long position.");
}

@Override
public Offset createTimestampOffset(long timestampMillis) {
return new BinlogOffset(timestampMillis / 1000);
}

@Override
public Offset createInitialOffset() {
return BinlogOffset.INITIAL_OFFSET;
Expand Down
6 changes: 6 additions & 0 deletions flink-connector-mongodb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ under the License.
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
Expand Down Expand Up @@ -77,7 +76,7 @@ public static class Builder<T> {
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
private Boolean updateLookup = true;
private Boolean copyExisting = COPY_EXISTING.defaultValue();
private Boolean copyExisting = true;
private Integer copyExistingMaxThreads;
private Integer copyExistingQueueSize;
private String copyExistingPipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,13 @@ public MongoDBSourceBuilder<T> pollMaxBatchSize(int pollMaxBatchSize) {
}

/**
* copy.existing
* scan.startup.mode
*
* <p>Copy existing data from source collections and convert them to Change Stream events on
* their respective topics. Any changes to the data that occur during the copy process are
* applied once the copy is completed.
* <p>Optional startup mode for MongoDB CDC consumer, valid enumerations are initial,
* latest-offset, timestamp. Default: initial
*/
public MongoDBSourceBuilder<T> copyExisting(boolean copyExisting) {
if (copyExisting) {
this.configFactory.startupOptions(StartupOptions.initial());
} else {
this.configFactory.startupOptions(StartupOptions.latest());
}
public MongoDBSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
this.configFactory.startupOptions(startupOptions);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,6 @@ public class MongoDBSourceOptions {
"The ampersand-separated MongoDB connection options. "
+ "eg. replicaSet=test&connectTimeoutMS=300000");

public static final ConfigOption<Boolean> COPY_EXISTING =
ConfigOptions.key("copy.existing")
.booleanType()
.defaultValue(Boolean.TRUE)
.withDescription(
"Copy existing data from source collections and convert them "
+ "to Change Stream events on their respective topics. Any changes to the data "
+ "that occur during the copy process are applied once the copy is completed.");

public static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
ConfigOptions.key("copy.existing.queue.size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.mongodb.source.offset;

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;

import java.util.Map;
Expand All @@ -39,7 +40,12 @@ public ChangeStreamOffset newOffset(String filename, Long position) {

@Override
public ChangeStreamOffset newOffset(Long position) {
return new ChangeStreamOffset(bsonTimestampFromEpochMillis(position));
throw new UnsupportedOperationException("not supported create new Offset by position.");
}

@Override
public Offset createTimestampOffset(long timestampMillis) {
return new ChangeStreamOffset(bsonTimestampFromEpochMillis(timestampMillis));
}

@Override
Expand Down
Loading

0 comments on commit 68a62bd

Please sign in to comment.