Skip to content

Commit

Permalink
br/cdcutil: don't skip failed changefeeds (pingcap#53482)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored and RidRisR committed May 23, 2024
1 parent 8d636b0 commit 9702a4a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
2 changes: 1 addition & 1 deletion lightning/pkg/importer/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
)
checkEtcdPut(
"/tidb/cdc/default/default/changefeed/info/test-1",
`{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`,
`{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"finished","error":null,"creator-version":"v6.5.0-master-dirty"}`,
)
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test")
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1")
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/cdcutil/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin
}
switch info.State {
// https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview
case "failed", "finished":
case "finished":
return invalidTs, nil
case "running", "warning", "normal", "stopped", "error":
case "failed", "running", "warning", "normal", "stopped", "error":
cts, err := c.fetchCheckpointTSFromStatus(ctx, cf)
if err != nil {
return 0, err
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/cdcutil/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ func testGetConflictChangefeeds(t *testing.T, cli *clientv3.Client) {

putChangefeed("st-ok", "normal", 1, 43)
putChangefeed("st-fail", "normal", 1, 41)
putLameChangefeed("skipped", "failed", 1)
putLameChangefeed("skipped", "finished", 1)
putChangefeed("not-skipped", "failed", 1, 41)
putLameChangefeed("nost-ok", "normal", 43)
putLameChangefeed("nost-fail", "normal", 41)

names, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 42)
require.NoError(t, err)
require.ElementsMatch(t, names.TESTGetChangefeedNames(), []string{
"default/default/not-skipped",
"default/default/nost-fail",
"default/default/st-fail",
})
Expand All @@ -83,6 +85,7 @@ func testGetConflictChangefeeds(t *testing.T, cli *clientv3.Client) {
names3, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 48)
require.NoError(t, err)
require.ElementsMatch(t, names3.TESTGetChangefeedNames(), []string{
"default/default/not-skipped",
"default/default/nost-fail",
"default/default/st-fail",
"default/default/nost-ok",
Expand Down Expand Up @@ -116,7 +119,7 @@ func testGetCDCChangefeedNameSet(t *testing.T, cli *clientv3.Client) {
)
checkEtcdPut(
"/tidb/cdc/default/default/changefeed/info/test-1",
`{"state":"failed"}`,
`{"state":"finished"}`,
)
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1")
checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1")
Expand Down

0 comments on commit 9702a4a

Please sign in to comment.