Skip to content

Commit

Permalink
fix deregister log
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers committed Mar 6, 2020
1 parent ec7159a commit c2cf0e1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
28 changes: 17 additions & 11 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum Task {
conn_id: ConnID,
},
Deregister {
region_id: u64,
region_id: Option<u64>,
downstream_id: Option<DownstreamID>,
conn_id: Option<ConnID>,
err: Option<Error>,
Expand Down Expand Up @@ -161,19 +161,21 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {

fn on_deregister(
&mut self,
region_id: u64,
region_id: Option<u64>,
id: Option<DownstreamID>,
conn_id: Option<ConnID>,
err: Option<Error>,
) {
info!("cdc deregister region";
"region_id" => region_id,
"id" => ?id,
"downstream_id" => ?id,
"conn_id" => ?conn_id,
"error" => ?err);
let mut is_last = false;
match (id, err, conn_id) {
(Some(id), err, Some(conn_id)) => {
// The peer wants to deregister
let region_id = region_id.unwrap();
if let Some(delegate) = self.capture_regions.get_mut(&region_id) {
is_last = delegate.unsubscribe(id, err);
}
Expand All @@ -182,17 +184,24 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {
}
if is_last {
self.capture_regions.remove(&region_id);
// Do not continue to observe the events of the region
self.observer.unsubscribe_region(region_id);
}
}
(None, Some(err), None) => {
// Something went wrong, deregister all downstreams.
// Something went wrong, deregister all downstreams of the region.
let region_id = region_id.unwrap();
if let Some(mut delegate) = self.capture_regions.remove(&region_id) {
delegate.fail(err);
is_last = true;
}
self.connections
.iter_mut()
.for_each(|(_, conn)| conn.unsubscribe(region_id));
if is_last {
// Do not continue to observe the events of the region
self.observer.unsubscribe_region(region_id);
}
}
(None, None, Some(conn_id)) => {
if let Some(conn) = self.connections.remove(&conn_id) {
Expand All @@ -202,6 +211,7 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {
if let Some(delegate) = self.capture_regions.get_mut(&region_id) {
if delegate.unsubscribe(downstream_id, None) {
self.capture_regions.remove(&region_id);
// Do not continue to observe the events of the region
self.observer.unsubscribe_region(region_id);
}
}
Expand All @@ -210,10 +220,6 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {
}
_ => unreachable!(),
}
if is_last {
// Unsubscribe region role change events.
self.observer.unsubscribe_region(region_id);
}
}

pub fn on_register(
Expand Down Expand Up @@ -287,7 +293,7 @@ impl<T: CasualRouter<RocksEngine>> Endpoint<T> {
) {
error!("cdc send capture change cmd failed"; "region_id" => region_id, "error" => ?e);
let deregister = Task::Deregister {
region_id,
region_id: Some(region_id),
downstream_id: Some(downstream_id),
conn_id: Some(conn_id),
err: Some(Error::Request(e.into())),
Expand Down Expand Up @@ -400,7 +406,7 @@ impl Initializer {
);
let err = resp.response.take_header().take_error();
let deregister = Task::Deregister {
region_id: self.region_id,
region_id: Some(self.region_id),
downstream_id: Some(self.downstream_id),
conn_id: Some(self.conn_id),
err: Some(Error::Request(err)),
Expand Down Expand Up @@ -449,7 +455,7 @@ impl Initializer {
error!("cdc scan entries failed"; "error" => ?e);
// TODO: record in metrics.
if let Err(e) = sched.schedule(Task::Deregister {
region_id,
region_id: Some(region_id),
downstream_id: Some(downstream_id),
conn_id: Some(conn_id),
err: Some(e),
Expand Down
4 changes: 2 additions & 2 deletions components/cdc/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl RoleObserver for CdcObserver {
// Unregister all downstreams.
let store_err = RaftStoreError::NotLeader(region_id, None);
if let Err(e) = self.sched.schedule(Task::Deregister {
region_id,
region_id: Some(region_id),
downstream_id: None,
conn_id: None,
err: Some(CdcError::Request(store_err.into())),
Expand Down Expand Up @@ -155,7 +155,7 @@ mod tests {
conn_id,
err,
} => {
assert_eq!(region_id, 1);
assert_eq!(region_id.unwrap(), 1);
assert!(downstream_id.is_none(), "{:?}", downstream_id);
assert!(conn_id.is_none(), "{:?}", conn_id);
assert!(err.is_some(), "{:?}", err);
Expand Down
4 changes: 2 additions & 2 deletions components/cdc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl ChangeData for Service {
ctx.spawn(recv_req.then(move |res| {
// Unregister this downstream only.
if let Err(e) = scheduler.schedule(Task::Deregister {
region_id: 0,
region_id: None,
downstream_id: None,
conn_id: Some(conn_id),
err: None,
Expand All @@ -180,7 +180,7 @@ impl ChangeData for Service {
ctx.spawn(send_resp.then(move |res| {
// Unregister this downstream only.
if let Err(e) = scheduler.schedule(Task::Deregister {
region_id: 0,
region_id: None,
downstream_id: None,
conn_id: Some(conn_id),
err: None,
Expand Down

0 comments on commit c2cf0e1

Please sign in to comment.