Skip to content

Commit

Permalink
feat: frontend and datanode info
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Apr 7, 2024
1 parent a76ba70 commit c396c54
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 14 deletions.
15 changes: 13 additions & 2 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,24 @@ pub enum NodeStatus {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DatanodeStatus {}
pub struct DatanodeStatus {
/// The read capacity units during this period.
pub rcus: i64,
/// The write capacity units during this period.
pub wcus: i64,
/// How many leader regions on this node.
pub leader_regions: usize,
/// How many follower regions on this node.
pub follower_regions: usize,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct FrontendStatus {}

#[derive(Debug, Serialize, Deserialize)]
pub struct MetasrvStatus {}
pub struct MetasrvStatus {
pub is_leader: bool,
}

impl FromStr for NodeInfoKey {
type Err = Error;
Expand Down
4 changes: 0 additions & 4 deletions src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ use std::sync::Arc;

use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{ResponseHeader, Role};
use async_trait::async_trait;
use common_grpc::channel_manager::ChannelManager;
use common_meta::cluster;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::peer::Peer;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
Expand All @@ -33,7 +30,6 @@ use tonic::Status;

use crate::client::ask_leader::AskLeader;
use crate::client::{util, Id};
use crate::error;
use crate::error::{
ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, Result,
RetryTimesExceededSnafu,
Expand Down
6 changes: 3 additions & 3 deletions src/meta-client/src/client/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ use std::time::Duration;

use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{
DdlTaskRequest, DdlTaskResponse, ErrorCode, MigrateRegionRequest, MigrateRegionResponse,
ProcedureId, ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role,
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureId,
ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::{Code, Status};
use tonic::Status;

use crate::client::ask_leader::AskLeader;
use crate::client::{util, Id};
Expand Down
7 changes: 7 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,12 @@ pub enum Error {
err_msg: String,
source: common_meta::error::Error,
},

#[snafu(display("Failed to save cluster info"))]
SaveClusterInfo {
location: Location,
source: common_meta::error::Error,
},
}

impl Error {
Expand Down Expand Up @@ -744,6 +750,7 @@ impl ErrorExt for Error {
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. }
| Error::SaveClusterInfo { .. }
| Error::MigrationRunning { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::service::mailbox::{

pub mod check_leader_handler;
pub mod collect_datanode_cluster_info_handler;
pub mod collect_frontend_cluster_info_handler;
pub mod collect_stats_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;
Expand Down
55 changes: 50 additions & 5 deletions src/meta-srv/src/handler/collect_datanode_cluster_info_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,29 @@
// limitations under the License.

use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::cluster;
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;

use super::node_stat::Stat;
use crate::error::Result;
use crate::error::{Result, SaveClusterInfoSnafu};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct CollectStatsHandler;
pub struct CollectDatanodeClusterInfoHandler;

#[async_trait::async_trait]
impl HeartbeatHandler for CollectStatsHandler {
impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}

async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
Expand All @@ -41,6 +45,47 @@ impl HeartbeatHandler for CollectStatsHandler {
let Some(peer) = &peer else {
return Ok(HandleControl::Continue);
};
let Some(stat) = &acc.stat else {
return Ok(HandleControl::Continue);
};

let key = NodeInfoKey {
cluster_id: header.cluster_id,
role: cluster::Role::Datanode,
node_id: peer.id,
};

let leader_regions = stat
.region_stats
.iter()
.filter(|s| s.role == RegionRole::Leader)
.count();
let follower_regions = stat.region_stats.len() - leader_regions;

let value = NodeInfo {
peer: peer.clone().into(),
last_activity_ts: stat.timestamp_millis,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: stat.rcus,
wcus: stat.wcus,
leader_regions,
follower_regions,
}),
};

let key = key.try_into().context(SaveClusterInfoSnafu)?;
let value = value.try_into().context(SaveClusterInfoSnafu)?;
let put_req = PutRequest {
key,
value,
..Default::default()
};

let res = ctx.in_memory.put(put_req).await;

if let Err(err) = res {
warn!("Failed to save datanode's cluster info, peer: {peer:?}, {err}");
}

Ok(HandleControl::Continue)
}
Expand Down
76 changes: 76 additions & 0 deletions src/meta-srv/src/handler/collect_frontend_cluster_info_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::cluster;
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use snafu::ResultExt;

use crate::error::{Result, SaveClusterInfoSnafu};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct CollectFrontendClusterInfoHandler;

#[async_trait::async_trait]
impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Frontend
}

async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(header) = &header else {
return Ok(HandleControl::Continue);
};
let Some(peer) = &peer else {
return Ok(HandleControl::Continue);
};

let key = NodeInfoKey {
cluster_id: header.cluster_id,
role: cluster::Role::Frontend,
node_id: peer.id,
};

let value = NodeInfo {
peer: peer.clone().into(),
last_activity_ts: common_time::util::current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
};

let key = key.try_into().context(SaveClusterInfoSnafu)?;
let value = value.try_into().context(SaveClusterInfoSnafu)?;
let put_req = PutRequest {
key,
value,
..Default::default()
};

let res = ctx.in_memory.put(put_req).await;

if let Err(err) = res {
warn!("Failed to save datanode's cluster info, peer: {peer:?}, {err}");
}

Ok(HandleControl::Continue)
}
}
4 changes: 4 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_datanode_cluster_info_handler::CollectDatanodeClusterInfoHandler;
use crate::handler::collect_frontend_cluster_info_handler::CollectFrontendClusterInfoHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
Expand Down Expand Up @@ -298,6 +300,8 @@ impl MetaSrvBuilder {
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(region_lease_handler).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
Expand Down

0 comments on commit c396c54

Please sign in to comment.