Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metasrvs nodeinfo #3873

Merged
merged 4 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a191edaea1089362a86ebc7d8e98ee9a1bd522d1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ impl InformationSchemaClusterInfoBuilder {
return;
}

if peer_type == "FRONTEND" {
// Always set peer_id to be -1 for frontends
if peer_type == "FRONTEND" || peer_type == "METASRV" {
// Always set peer_id to be -1 for frontends and metasrvs
self.peer_ids.push(Some(-1));
} else {
self.peer_ids.push(Some(node_info.peer.id as i64));
Expand Down
20 changes: 8 additions & 12 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,25 @@ impl ClusterInfo for MetaClient {

let mut nodes = if get_metasrv_nodes {
let last_activity_ts = -1; // Metasrv does not provide this information.

// TODO(dennis): Get Metasrv node info
let git_commit = "unknown";
let version = "unknown";
let start_time_ms = 0;

let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
.into_iter()
.map(|peer| NodeInfo {
peer,
.map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: version.to_string(),
git_commit: git_commit.to_string(),
version: node.version,
git_commit: node.git_commit,
start_time_ms,
})
.chain(leader.into_iter().map(|leader| NodeInfo {
peer: leader,
.chain(leader.into_iter().map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: version.to_string(),
git_commit: git_commit.to_string(),
version: node.version,
git_commit: node.git_commit,
start_time_ms,
}))
.collect::<Vec<_>>()
Expand Down
15 changes: 6 additions & 9 deletions src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use std::sync::Arc;

use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{MetasrvPeersRequest, ResponseHeader, Role};
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::Peer;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -72,7 +71,9 @@ impl Client {
inner.batch_get(req).await
}

pub async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
pub async fn get_metasrv_peers(
&self,
) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
let inner = self.inner.read().await;
inner.get_metasrv_peers().await
}
Expand Down Expand Up @@ -225,7 +226,7 @@ impl Inner {
.context(ConvertMetaResponseSnafu)
}

async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
self.with_retry(
"get_metasrv_peers",
move |mut client| {
Expand All @@ -241,10 +242,6 @@ impl Inner {
|res| &res.header,
)
.await
.map(|res| {
let leader = res.leader.map(|x| x.into());
let peers = res.followers.into_iter().map(|x| x.into()).collect();
(leader, peers)
})
.map(|res| (res.leader, res.followers))
}
}
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ common-procedure.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;

use crate::error::Result;
use crate::metasrv::LeaderValue;
use crate::metasrv::MetasrvNodeInfo;

pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
Expand Down Expand Up @@ -71,7 +71,7 @@ pub trait Election: Send + Sync {
async fn register_candidate(&self) -> Result<()>;

/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<LeaderValue>>;
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;

/// Campaign waits to acquire leadership in an election.
///
Expand Down
32 changes: 27 additions & 5 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::sync::broadcast::Receiver;
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

pub struct EtcdElection {
leader_value: String,
Expand Down Expand Up @@ -142,9 +142,19 @@ impl Election for EtcdElection {
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();

// The register info: key is the candidate key, value is its leader value.
// The register info: key is the candidate key, value is its node info(addr, version, git_commit).
let key = self.candidate_key().into_bytes();
let value = self.leader_value.clone().into_bytes();
let build_info = common_version::build_info();
let value = MetasrvNodeInfo {
addr: self.leader_value.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
};
let value = serde_json::to_string(&value)
.with_context(|_| error::SerializeToJsonSnafu {
input: format!("{value:?}"),
})?
.into_bytes();
// Puts with the lease id
self.client
.kv_client()
Expand Down Expand Up @@ -175,15 +185,27 @@ impl Election for EtcdElection {
Ok(())
}

async fn all_candidates(&self) -> Result<Vec<LeaderValue>> {
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key = self.candidate_root().into_bytes();
let res = self
.client
.kv_client()
.get(key, Some(GetOptions::new().with_prefix()))
.await
.context(error::EtcdFailedSnafu)?;
res.kvs().iter().map(|kv| Ok(kv.value().into())).collect()

let mut nodes = Vec::with_capacity(res.kvs().len());
for kv in res.kvs() {
let node =
serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
error::DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(kv.value()),
}
})?;
nodes.push(node);
}

Ok(nodes)
}

async fn campaign(&self) -> Result<()> {
Expand Down
24 changes: 24 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl Context {
}
}

/// The value of the leader. It is used to store the leader's address.
pub struct LeaderValue(pub String);

impl<T: AsRef<[u8]>> From<T> for LeaderValue {
Expand All @@ -216,6 +217,29 @@ impl<T: AsRef<[u8]>> From<T> for LeaderValue {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetasrvNodeInfo {
// The metasrv's address
pub addr: String,
// The node build version
pub version: String,
// The node build git commit hash
pub git_commit: String,
}

impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
fn from(node_info: MetasrvNodeInfo) -> Self {
Self {
peer: Some(api::v1::meta::Peer {
addr: node_info.addr,
..Default::default()
}),
version: node_info.version,
git_commit: node_info.git_commit,
}
}
}

#[derive(Clone)]
pub struct SelectorContext {
pub server_addr: String,
Expand Down
46 changes: 22 additions & 24 deletions src/meta-srv/src/service/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

use api::v1::meta::{
cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
Error, MetasrvPeersRequest, MetasrvPeersResponse, Peer, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse, ResponseHeader,
Error, MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse,
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
};
use common_telemetry::warn;
use snafu::ResultExt;
use tonic::{Request, Response};

use crate::error;
use crate::metasrv::Metasrv;
use crate::service::GrpcResult;
use crate::{error, metasrv};

#[async_trait::async_trait]
impl cluster_server::Cluster for Metasrv {
Expand Down Expand Up @@ -88,40 +88,38 @@ impl cluster_server::Cluster for Metasrv {
return Ok(Response::new(resp));
}

fn make_node_info(addr: String) -> Option<MetasrvNodeInfo> {
let build_info = common_version::build_info();
Some(
metasrv::MetasrvNodeInfo {
addr,
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
}
.into(),
)
}

let (leader, followers) = match self.election() {
Some(election) => {
let leader = election.leader().await?;
let peers = election.all_candidates().await?;
let followers = peers
let nodes = election.all_candidates().await?;
let followers = nodes
.into_iter()
.filter(|peer| peer.0 != leader.0)
.map(|peer| Peer {
addr: peer.0,
..Default::default()
})
.filter(|node_info| node_info.addr != leader.0)
.map(api::v1::meta::MetasrvNodeInfo::from)
.collect();
(
Some(Peer {
addr: leader.0,
..Default::default()
}),
followers,
)
(make_node_info(leader.0.clone()), followers)
}
None => (
Some(Peer {
addr: self.options().server_addr.clone(),
..Default::default()
}),
vec![],
),
None => (make_node_info(self.options().server_addr.clone()), vec![]),
};

let resp = MetasrvPeersResponse {
header: Some(ResponseHeader::success(0)),
leader,
followers,
};

Ok(Response::new(resp))
}
}
Expand Down
11 changes: 3 additions & 8 deletions tests/cases/distributed/information_schema/cluster_info.result
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,26 @@ DESC TABLE CLUSTER_INFO;
+-------------+----------------------+-----+------+---------+---------------+

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
Expand All @@ -51,18 +48,16 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration|+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
Expand Down
Loading
Loading