Support filtering oplog by op type with filter.cmds#949
Conversation
TOO-1 support filter.cmds oplog filtering
|
|
|
在这个配置中,我们是否可以把语义定义为所有的delete操作都做过滤, 这样的话,就可以对新版TTL的处理逻辑做过滤了。 |
感谢回复。嗯嗯,的确在【正常库->降冷库】的场景下是合理的。此时的源和目标端数据不一致也是合理且ByDesign的结果。 |
我也是这样考虑的,或者增加对自定义 |
在MongoDB 6.1 开始,用户主动发起的“多文档删除”本身就可能走 batched delete 路径,也就是op=="c",内层是 op=="d"。所以如果按这个语义来说的话,我们应该只走 如果这个思路可以,我就按这个思路开始修改代码 |
嗯,我觉得没问题。 |
…lter feat: support filter.cmds for inner applyOps DML
|
我们之前虽然对"d"这个语义做了澄清,但是其他的"i"/"u"没在之前的讨论范围内。查了下MongoDB 官方文档和官方测试,会和"d"存在同样的情况。
目前的PR中,我对于i/u/d都做了兼容。 引用: Docs: applyOps (database command) Source: apply_ops1.js#L265-L279 Source: apply_ops1.js#L283-L286 Source: apply_ops1.js#L430-L470 Source: batched_multi_deletes_oplog.js#L41-L51 |
|
好的感谢,我check下PR内容,没问题就处理掉。再次感谢贡献~ |
| } | ||
| default: | ||
| _ = LOG.Error("unknown applyOps type, filter can't handle. log:%+v", log.Object) | ||
| LOG.Error("unknown applyOps type, filter can't handle. log:%+v", log.Object) |
There was a problem hiding this comment.
维持原有的写法好了,不然IDE会有错误未处理的提示。
| return logs | ||
| } | ||
|
|
||
| // extractApplyOpsForTransform normalizes applyOps into []bson.D so namespace |
There was a problem hiding this comment.
oplog_filter.go中也有针对o.applyOps的case处理,新增函数的话,可以考虑统一下
collector/filter/oplog_filter.go 中新增了 extractApplyOps,executor/executor.go 中新增了 extractApplyOpsForTransform,两者几乎完全一样,但存在微妙差异——filter 版本遇到非法元素时会打 LOG.Error 日志,executor 版本则静默返回 false。这违反了 DRY 原则,更危险的是未来修一处忘了另一处会导致行为分裂。
建议:将 extractApplyOps 提取到 oplog 公共包中作为统一工具函数,两处共用。如果 executor 侧确实不需要日志,可以让公共函数返回 error,调用方自行决定是否打日志:
| } | ||
| ops[i] = transSubLog.Dump(keys, false) | ||
| } | ||
| oplog.SetFiled(partialLog.Object, "applyOps", ops) |
There was a problem hiding this comment.
竟然是一个transform功能一直遗留的bug。。
|
|
||
| // extractApplyOps extracts applyOps inner operations from the supported BSON | ||
| // representations already used in the project. | ||
| func extractApplyOps(logObject bson.D) ([]bson.D, bool) { |
There was a problem hiding this comment.
这个函数transform侧也有一个,有办法合并么。
| return nil | ||
| } | ||
|
|
||
| func normalizeFilterCmds(cmds []string) ([]string, error) { |
There was a problem hiding this comment.
字符串规范化做了三遍
normalizeFilterCmds(sanitize.go)在启动时做了 TrimSpace + ToLower。NewCmdFilter(oplog_filter.go)构造时又做了一遍。Filter 方法在运行时对每条 oplog 的 log.Operation 又做了 TrimSpace(ToLower(...))。
对于配置值,校验后应该保证已经是规范形式,构造函数不需要再处理。对于 log.Operation,MongoDB 的 oplog op 字段在正常情况下不会有空格或大写,这里的防御性处理反而带来了不必要的性能开销——每条 oplog 都要走一次字符串分配。
建议:
NewCmdFilter 中去掉 TrimSpace/ToLower,信任上游 normalizeFilterCmds 的校验结果
Filter 中对 log.Operation 的处理改为直接使用,或用一个简单的 == 判断(oplog 的 op 字段是 MongoDB 内部保证的单字符小写)
同理 filterApplyOpsDML 中对 inner op 也在做 TrimSpace(ToLower(...)),可以去掉
|
|
||
| innerOperation = strings.TrimSpace(strings.ToLower(innerOperation)) | ||
| if filter.shouldFilterApplyOpsInnerOp(innerOperation) { | ||
| LOG.Info("CmdFilter filter inner %s op in applyOps: %v", innerOperation, ele) |
There was a problem hiding this comment.
日志级别太高了。
在高吞吐增量同步场景下,如果 applyOps 中有大量匹配的内层操作(比如批量 TTL 删除),这行 Info 级别日志会产生海量输出,严重影响性能和日志可读性。
建议:将逐条过滤的日志降为 Debug 级别,只保留最终汇总的 Info 日志(即 "CmdFilter applyOps DML filter?..." 那行)。或者改为计数方式,只在结束时打印 "filtered N inner ops"。
| continue | ||
| } | ||
| if _, ok := validFilterCmds[cmd]; !ok { | ||
| return nil, fmt.Errorf("filter.cmds[%s] should in {i, u, d, c, n}", cmd) |
There was a problem hiding this comment.
建议:改为 "filter.cmds: unknown op type %q, must be one of {i, u, d, c, n}"。
单元测试同理
| return false | ||
| } | ||
|
|
||
| type CmdFilter struct { |
There was a problem hiding this comment.
"Cmd" 在 MongoDB 语境中通常指 command(即 op=c 的 oplog),但这个 filter 实际过滤的是所有 op 类型(i/u/d/c/n)。叫 CmdFilter 容易让人误以为只过滤 command 类型。配置项 filter.cmds 也有同样的问题。
建议:考虑改名为 OpTypeFilter,配置项改为 filter.op_types 或 filter.oplog_ops。如果改名成本太高,至少在结构体和配置项旁加上详细注释说明 "cmds" 此处的含义。
| "u": {}, | ||
| "d": {}, | ||
| "c": {}, | ||
| "n": {}, |
There was a problem hiding this comment.
当前 chain 的顺序是 AutologousFilter → NoopFilter → CmdFilter → GidFilter,CmdFilter 排在 NoopFilter 之后。这意味着 op=n 的 oplog 已经被 NoopFilter 过滤了,CmdFilter 中配置 n 实际上永远不会生效。虽然从功能上无害,但配置 filter.cmds=n 不报错也不生效,会让用户困惑。
建议: 要么在 normalizeFilterCmds 中遇到 n 时打一个 warn 日志提示 "noop is already filtered by NoopFilter",要么从 validFilterCmds 中移除 n。
|
好的,这些意见我综合下,看下应该怎么修正 |
|
后续我想把LOG这个sdk换掉,现在日志的sdk是原来的作者自己开发的,如果能替换成更流行的zap日志sdk。会尽量将当前的日志行为保持一致 |
好的,感激不尽。有一些评论是大模型给出的,我看了下也觉得合理就copy过来了。或许还是可以多借助AI的能力来辅助开发。 |
可以的,其实我之前也有过类似的想法。一些组件的确太古早了(有的都archived了),可以演进到更好更现代化的库。 |
TOO-9 clean up filter.cmds semantics and hot path
TOO-8 unify applyOps normalization paths
|
OK 再review下呢 |
|
再次感谢你的贡献!下次版本release的时候我会更新到 贡献者名单 中~ |
| utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, "") | ||
| assert.Equal(t, nil, err, "should be equal") | ||
|
|
||
| err = conn.Client.Database("zz").Drop(nil) |
There was a problem hiding this comment.
这里的tset会有点小问题,我处理下哈~
"codeName" : "NamespaceNotFound",
"errmsg" : "cannot apply insert or update operation on a non-existent namespace zz.y: { ns: \"zz.y\", op: \"i\", o: { _id: \"bson-d-1\", hello: \"worlod\" } }",
背景
在源端 MongoDB 开启 TTL、目标端作为冷库且不希望删除同步下发的场景下,源端因 TTL 触发的过期删除不应影响目标端数据。
为支持这一需求,本次改动新增了按 oplog 顶层
op类型进行过滤的能力,用户可通过如下配置过滤 delete oplog:filter.cmds = d变更内容
collector配置中新增filter.cmdscollector/filter中新增CmdFilterCmdFilter接入增量同步的 oplog filter chainfilter.cmds增加配置校验,非法值启动时直接报错,避免静默失效conf/collector.conf中的配置说明与示例行为说明
filter.cmds时,行为与当前版本保持一致filter.cmds=d时,顶层op=d的 oplog 会在下游处理前被过滤op类型过滤,不扩展到更复杂的表达式或嵌套语义验证情况
go test ./cmd/collector -count=1go build ./cmd/collectorgo build ./collector/filter适用场景
兼容性
该改动默认兼容现有行为:
filter.cmds时启用对应过滤能力