diff --git a/components/cdc/src/delegate.rs b/components/cdc/src/delegate.rs index c220970bf9a..afaa98ff4db 100644 --- a/components/cdc/src/delegate.rs +++ b/components/cdc/src/delegate.rs @@ -270,7 +270,7 @@ impl Delegate { "region_id" => self.region_id, "min_ts" => min_ts); return; } - info!("try to advance ts"; "region_id" => self.region_id); + info!("try to advance ts"; "region_id" => self.region_id, "min_ts" => min_ts); let resolver = self.resolver.as_mut().unwrap(); let resolved_ts = match resolver.resolve(min_ts) { Some(rts) => rts, diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 9ef83292a86..0a76619119a 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -312,6 +312,9 @@ impl> Endpoint { } if has_failed { self.capture_regions.remove(®ion_id); + self.connections + .iter_mut() + .for_each(|(_, conn)| conn.unsubscribe(region_id)); } } } @@ -335,6 +338,9 @@ impl> Endpoint { // Delegate may fail during handling pending batch. if delegate.has_failed() { self.capture_regions.remove(®ion_id); + self.connections + .iter_mut() + .for_each(|(_, conn)| conn.unsubscribe(region_id)); } } else { warn!("region not found on region ready (finish building resolver)"; "region_id" => region_id); diff --git a/components/cdc/tests/failpoints/test_cdc.rs b/components/cdc/tests/failpoints/test_cdc.rs index 9bb78460be6..01ce2e0b744 100644 --- a/components/cdc/tests/failpoints/test_cdc.rs +++ b/components/cdc/tests/failpoints/test_cdc.rs @@ -29,7 +29,7 @@ fn test_failed_pending_batch() { req.region_id = region.get_id(); req.set_region_epoch(region.get_region_epoch().clone()); let (req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = req_tx + let req_tx = req_tx .send((req.clone(), WriteFlags::default())) .wait() .unwrap(); @@ -61,11 +61,9 @@ fn test_failed_pending_batch() { // Try to subscribe region again. let region = suite.cluster.get_region(b"k0"); - // Ensure it is old region. + // Ensure it is the previous region. assert_eq!(req.get_region_id(), region.get_id()); req.set_region_epoch(region.get_region_epoch().clone()); - let (req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap(); - event_feed_wrap.as_ref().replace(Some(resp_rx)); let _req_tx = req_tx.send((req, WriteFlags::default())).wait().unwrap(); let mut events = receive_event(false); assert_eq!(events.len(), 1, "{:?}", events); diff --git a/components/cdc/tests/integrations/test_cdc.rs b/components/cdc/tests/integrations/test_cdc.rs index 14df917bfae..4cc605f393b 100644 --- a/components/cdc/tests/integrations/test_cdc.rs +++ b/components/cdc/tests/integrations/test_cdc.rs @@ -523,7 +523,7 @@ fn test_region_split() { req.region_id = region.get_id(); req.set_region_epoch(region.get_region_epoch().clone()); let (req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); - let _req_tx = req_tx + let req_tx = req_tx .send((req.clone(), WriteFlags::default())) .wait() .unwrap(); @@ -552,12 +552,9 @@ fn test_region_split() { } // Try to subscribe region again. let region = suite.cluster.get_region(b"k0"); - // Ensure it is old region. + // Ensure it is the previous region. assert_eq!(req.get_region_id(), region.get_id()); req.set_region_epoch(region.get_region_epoch().clone()); - - let (req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap(); - event_feed_wrap.as_ref().replace(Some(resp_rx)); let _req_tx = req_tx.send((req, WriteFlags::default())).wait().unwrap(); let mut events = receive_event(false); assert_eq!(events.len(), 1);