Skip to content

Commit

Permalink
status_server: support dumping region meta by http (tikv#7650)
Browse files Browse the repository at this point in the history
Signed-off-by: Hexilee <i@hexilee.me>
  • Loading branch information
Hexilee committed May 11, 2020
1 parent d10871f commit 817445d
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 157 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions cmd/src/server.rs
Expand Up @@ -97,8 +97,8 @@ pub fn run_tikv(config: TiKvConfig) {
let server_config = tikv.init_servers(&gc_worker);
tikv.register_services();
tikv.init_metrics_flusher();

tikv.run_server(server_config);
tikv.run_status_server();

signal_handler::wait_for_signal(Some(tikv.engines.take().unwrap().engines));

Expand Down Expand Up @@ -759,7 +759,9 @@ impl TiKVServer {
.server
.start(server_config, self.security_mgr.clone())
.unwrap_or_else(|e| fatal!("failed to start server: {}", e));
}

fn run_status_server(&mut self) {
// Create a status server.
let status_enabled =
self.config.metric.address.is_empty() && !self.config.server.status_addr.is_empty();
Expand All @@ -768,6 +770,7 @@ impl TiKVServer {
self.config.server.status_thread_pool_size,
Some(self.pd_client.clone()),
self.cfg_controller.take().unwrap(),
self.router.clone(),
));
// Start the status server.
if let Err(e) = status_server.start(
Expand Down Expand Up @@ -897,7 +900,11 @@ trait Stop {
fn stop(self: Box<Self>);
}

impl Stop for StatusServer {
impl<E, R> Stop for StatusServer<E, R>
where
E: 'static,
R: 'static + Send,
{
fn stop(self: Box<Self>) {
(*self).stop()
}
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/peer.rs
Expand Up @@ -511,7 +511,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.fsm.peer.ping();
self.fsm.has_ready = true;
}
CasualMessage::Test(cb) => cb(self.fsm),
CasualMessage::AccessPeer(cb) => cb(self.fsm),
}
}

Expand Down
7 changes: 3 additions & 4 deletions components/raftstore/src/store/msg.rs
Expand Up @@ -253,9 +253,8 @@ pub enum CasualMessage<E: KvEngine> {
/// Notifies that a new snapshot has been generated.
SnapshotGenerated,

/// A test only message, it is useful when we want to access
/// peer's internal state.
Test(Box<dyn FnOnce(&mut PeerFsm<E>) + Send + 'static>),
/// A message to access peer's internal state.
AccessPeer(Box<dyn FnOnce(&mut PeerFsm<E>) + Send + 'static>),
}

impl<E: KvEngine> fmt::Debug for CasualMessage<E> {
Expand Down Expand Up @@ -293,7 +292,7 @@ impl<E: KvEngine> fmt::Debug for CasualMessage<E> {
},
CasualMessage::RegionOverlapped => write!(fmt, "RegionOverlapped"),
CasualMessage::SnapshotGenerated => write!(fmt, "SnapshotGenerated"),
CasualMessage::Test(_) => write!(fmt, "Test"),
CasualMessage::AccessPeer(_) => write!(fmt, "AccessPeer"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion components/test_raftstore/src/cluster.rs
Expand Up @@ -1226,7 +1226,7 @@ impl<T: Simulator> Cluster<T> {
CasualRouter::send(
&router,
region_id,
CasualMessage::Test(Box::new(move |peer: &mut PeerFsm<RocksEngine>| {
CasualMessage::AccessPeer(Box::new(move |peer: &mut PeerFsm<RocksEngine>| {
let idx = peer.peer.raft_group.store().committed_index();
peer.peer.raft_group.request_snapshot(idx).unwrap();
debug!("{} request snapshot at {}", idx, peer.peer.tag);
Expand Down

0 comments on commit 817445d

Please sign in to comment.