Skip to content

Commit

Permalink
fix failpoints tests
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers committed Mar 6, 2020
1 parent c2cf0e1 commit 1765a5b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion components/cdc/src/delegate.rs
Expand Up @@ -270,7 +270,7 @@ impl Delegate {
"region_id" => self.region_id, "min_ts" => min_ts);
return;
}
info!("try to advance ts"; "region_id" => self.region_id);
info!("try to advance ts"; "region_id" => self.region_id, "min_ts" => min_ts);
let resolver = self.resolver.as_mut().unwrap();
let resolved_ts = match resolver.resolve(min_ts) {
Some(rts) => rts,
Expand Down
6 changes: 6 additions & 0 deletions components/cdc/src/endpoint.rs
Expand Up @@ -312,6 +312,9 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {
}
if has_failed {
self.capture_regions.remove(&region_id);
self.connections
.iter_mut()
.for_each(|(_, conn)| conn.unsubscribe(region_id));
}
}
}
Expand All @@ -335,6 +338,9 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {
// Delegate may fail during handling pending batch.
if delegate.has_failed() {
self.capture_regions.remove(&region_id);
self.connections
.iter_mut()
.for_each(|(_, conn)| conn.unsubscribe(region_id));
}
} else {
warn!("region not found on region ready (finish building resolver)"; "region_id" => region_id);
Expand Down
6 changes: 2 additions & 4 deletions components/cdc/tests/failpoints/test_cdc.rs
Expand Up @@ -29,7 +29,7 @@ fn test_failed_pending_batch() {
req.region_id = region.get_id();
req.set_region_epoch(region.get_region_epoch().clone());
let (req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = req_tx
let req_tx = req_tx
.send((req.clone(), WriteFlags::default()))
.wait()
.unwrap();
Expand Down Expand Up @@ -61,11 +61,9 @@ fn test_failed_pending_batch() {

// Try to subscribe region again.
let region = suite.cluster.get_region(b"k0");
// Ensure it is old region.
// Ensure it is the previous region.
assert_eq!(req.get_region_id(), region.get_id());
req.set_region_epoch(region.get_region_epoch().clone());
let (req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap();
event_feed_wrap.as_ref().replace(Some(resp_rx));
let _req_tx = req_tx.send((req, WriteFlags::default())).wait().unwrap();
let mut events = receive_event(false);
assert_eq!(events.len(), 1, "{:?}", events);
Expand Down
7 changes: 2 additions & 5 deletions components/cdc/tests/integrations/test_cdc.rs
Expand Up @@ -523,7 +523,7 @@ fn test_region_split() {
req.region_id = region.get_id();
req.set_region_epoch(region.get_region_epoch().clone());
let (req_tx, event_feed_wrap, receive_event) = new_event_feed(suite.get_region_cdc_client(1));
let _req_tx = req_tx
let req_tx = req_tx
.send((req.clone(), WriteFlags::default()))
.wait()
.unwrap();
Expand Down Expand Up @@ -552,12 +552,9 @@ fn test_region_split() {
}
// Try to subscribe region again.
let region = suite.cluster.get_region(b"k0");
// Ensure it is old region.
// Ensure it is the previous region.
assert_eq!(req.get_region_id(), region.get_id());
req.set_region_epoch(region.get_region_epoch().clone());

let (req_tx, resp_rx) = suite.get_region_cdc_client(1).event_feed().unwrap();
event_feed_wrap.as_ref().replace(Some(resp_rx));
let _req_tx = req_tx.send((req, WriteFlags::default())).wait().unwrap();
let mut events = receive_event(false);
assert_eq!(events.len(), 1);
Expand Down

0 comments on commit 1765a5b

Please sign in to comment.