Skip to content

Commit

Permalink
make lint happy
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Dec 23, 2021
1 parent 6343752 commit 69365ca
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 102 deletions.
4 changes: 4 additions & 0 deletions common/meta/raft-store/src/grpc/grpc_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum MetaGrpcWriteAction {
CreateTable(CreateTableReq),
DropTable(DropTableReq),
CommitTable(UpsertTableOptionReq),
UpsertKV(UpsertKVAction),
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
Expand All @@ -66,6 +67,9 @@ pub enum MetaGrpcGetAction {
GetTable(GetTableReq),
GetTableExt(GetTableExtReq),
ListTables(ListTableReq),
GetKV(GetKVAction),
MGetKV(MGetKVAction),
PrefixListKV(PrefixListReq),
}

/// Try convert tonic::Request<RaftRequest> to DoActionAction.
Expand Down
5 changes: 5 additions & 0 deletions common/meta/raft-store/src/grpc/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ impl MetaGrpcClient {
.await
}

#[tracing::instrument(level = "debug", skip(password))]
pub async fn try_create(addr: &str, username: &str, password: &str) -> Result<Self> {
Self::with_tls_conf(addr, username, password, None, None).await
}

#[tracing::instrument(level = "debug", skip(password))]
pub async fn with_tls_conf(
addr: &str,
Expand Down
1 change: 1 addition & 0 deletions common/meta/raft-store/src/grpc/grpc_client_conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ use common_flight_rpc::GrpcClientConf;
#[derive(Clone, Debug, Default)]
pub struct MetaGrpcClientConf {
pub meta_service_config: GrpcClientConf,
pub kv_service_config: GrpcClientConf,
pub client_timeout_in_second: u64,
}
37 changes: 37 additions & 0 deletions common/meta/raft-store/src/grpc/kv_api_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use common_meta_api::KVApi;
use common_meta_types::GetKVActionReply;
use common_meta_types::MGetKVActionReply;
use common_meta_types::PrefixListReply;
use common_meta_types::UpsertKVAction;
use common_meta_types::UpsertKVActionReply;

use crate::grpc::grpc_action::GetKVAction;
use crate::grpc::grpc_action::MGetKVAction;
use crate::grpc::grpc_action::PrefixListReq;
use crate::MetaGrpcClient;

#[async_trait::async_trait]
impl KVApi for MetaGrpcClient {
async fn upsert_kv(
&self,
act: UpsertKVAction,
) -> common_exception::Result<UpsertKVActionReply> {
self.do_write(act).await
}

async fn get_kv(&self, key: &str) -> common_exception::Result<GetKVActionReply> {
self.do_get(GetKVAction {
key: key.to_string(),
})
.await
}

async fn mget_kv(&self, keys: &[String]) -> common_exception::Result<MGetKVActionReply> {
let keys = keys.to_vec();
self.do_get(MGetKVAction { keys }).await
}

async fn prefix_list_kv(&self, prefix: &str) -> common_exception::Result<PrefixListReply> {
self.do_get(PrefixListReq(prefix.to_string())).await
}
}
1 change: 1 addition & 0 deletions common/meta/raft-store/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod grpc_action;
mod grpc_client;
mod grpc_client_conf;
mod kv_api_impl;
mod meta_api_impl;

pub use grpc_client::*;
Expand Down
14 changes: 5 additions & 9 deletions metasrv/tests/it/flight/metasrv_flight_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
//! Test arrow-flight API of metasrv

use common_base::tokio;
use common_base::Stoppable;
use common_meta_api::KVApi;
use common_meta_flight::MetaFlightClient;
use common_meta_raft_store::MetaGrpcClient;
use common_meta_types::MatchSeq;
use common_meta_types::Operation;
use common_meta_types::SeqV;
Expand All @@ -44,7 +43,7 @@ async fn test_restart() -> anyhow::Result<()> {

let (mut tc, addr) = crate::tests::start_metasrv().await?;

let client = MetaFlightClient::try_create(addr.as_str(), "root", "xxx").await?;
let client = MetaGrpcClient::try_create(addr.as_str(), "root", "xxx").await?;

tracing::info!("--- upsert kv");
{
Expand Down Expand Up @@ -91,9 +90,6 @@ async fn test_restart() -> anyhow::Result<()> {

tracing::info!("--- stop metasrv");
{
let mut srv = tc.flight_srv.take().unwrap();
srv.stop(None).await?;

drop(client);

tokio::time::sleep(Duration::from_millis(1000)).await;
Expand All @@ -106,7 +102,7 @@ async fn test_restart() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_millis(10_000)).await;

// try to reconnect the restarted server.
let client = MetaFlightClient::try_create(addr.as_str(), "root", "xxx").await?;
let client = MetaGrpcClient::try_create(addr.as_str(), "root", "xxx").await?;

tracing::info!("--- get kv");
{
Expand Down Expand Up @@ -148,8 +144,8 @@ async fn test_join() -> anyhow::Result<()> {
let addr0 = tc0.config.grpc_api_address.clone();
let addr1 = tc1.config.grpc_api_address.clone();

let client0 = MetaFlightClient::try_create(addr0.as_str(), "root", "xxx").await?;
let client1 = MetaFlightClient::try_create(addr1.as_str(), "root", "xxx").await?;
let client0 = MetaGrpcClient::try_create(addr0.as_str(), "root", "xxx").await?;
let client1 = MetaGrpcClient::try_create(addr1.as_str(), "root", "xxx").await?;

let clients = vec![client0, client1];

Expand Down
4 changes: 2 additions & 2 deletions metasrv/tests/it/flight/metasrv_flight_kv_api_cross_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async fn test_kv_api_write_read_cross_nodes() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;

let follower1 = tcs[1].flight_client().await?;
let follower2 = tcs[2].flight_client().await?;
let follower1 = tcs[1].grpc_client().await?;
let follower2 = tcs[2].grpc_client().await?;

KVApiTestSuite {}
.kv_write_read_cross_nodes(&follower1, &follower2)
Expand Down
13 changes: 3 additions & 10 deletions metasrv/tests/it/flight/metasrv_flight_kv_api_restart_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::time::Duration;

use common_base::tokio;
use common_base::Stoppable;
use common_meta_api::KVApi;
use common_meta_types::MatchSeq;
use common_meta_types::Operation;
Expand Down Expand Up @@ -51,7 +50,7 @@ async fn test_kv_api_restart_cluster_write_read() -> anyhow::Result<()> {
tracing::info!("--- test write on every node: {}", key_suffix);

for tc in tcs.iter() {
let client = tc.flight_client().await?;
let client = tc.grpc_client().await?;

let k = make_key(tc, key_suffix);
let res = client
Expand Down Expand Up @@ -82,14 +81,11 @@ async fn test_kv_api_restart_cluster_write_read() -> anyhow::Result<()> {
tracing::info!("--- shutdown the cluster");
let stopped_tcs = {
let mut stopped_tcs = vec![];
for mut tc in tcs {
for tc in tcs {
// TODO(xp): remove this field, or split MetaSrvTestContext into two struct:
// one for metasrv and one for meta_node
assert!(tc.meta_nodes.is_empty());

let mut f = tc.flight_srv.take().unwrap();
f.stop(None).await?;

stopped_tcs.push(tc);
}
stopped_tcs
Expand All @@ -103,10 +99,7 @@ async fn test_kv_api_restart_cluster_write_read() -> anyhow::Result<()> {
tcs.push(tc);
}

for tc in tcs.iter() {
let flight = tc.flight_srv.as_ref().unwrap();
let meta_node = flight.get_meta_node();

for meta_node in &tcs[0].meta_nodes {
tracing::info!("--- wait until a leader is observed");

let metrics = meta_node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ async fn test_meta_api_database_create_get_drop() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;

let follower1 = tcs[1].flight_client().await?;
let follower2 = tcs[2].flight_client().await?;
let follower1 = tcs[1].grpc_client().await?;
let follower2 = tcs[2].grpc_client().await?;

MetaApiTestSuite {}
.database_get_diff_nodes(&follower1, &follower2)
Expand All @@ -42,8 +42,8 @@ async fn test_meta_api_list_database() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;

let follower1 = tcs[1].flight_client().await?;
let follower2 = tcs[2].flight_client().await?;
let follower1 = tcs[1].grpc_client().await?;
let follower2 = tcs[2].grpc_client().await?;

MetaApiTestSuite {}
.list_database_diff_nodes(&follower1, &follower2)
Expand All @@ -57,8 +57,8 @@ async fn test_meta_api_table_create_get_drop() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;

let follower1 = tcs[1].flight_client().await?;
let follower2 = tcs[2].flight_client().await?;
let follower1 = tcs[1].grpc_client().await?;
let follower2 = tcs[2].grpc_client().await?;

MetaApiTestSuite {}
.table_get_diff_nodes(&follower1, &follower2)
Expand All @@ -72,8 +72,8 @@ async fn test_meta_api_list_table() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;

let follower1 = tcs[1].flight_client().await?;
let follower2 = tcs[2].flight_client().await?;
let follower1 = tcs[1].grpc_client().await?;
let follower2 = tcs[2].grpc_client().await?;

MetaApiTestSuite {}
.list_table_diff_nodes(&follower1, &follower2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async fn test_meta_api_database_create_get_drop() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1]).await?;

let client0 = tcs[0].flight_client().await?;
let client1 = tcs[1].flight_client().await?;
let client0 = tcs[0].grpc_client().await?;
let client1 = tcs[1].grpc_client().await?;

MetaApiTestSuite {}
.database_get_diff_nodes(&client0, &client1)
Expand All @@ -41,8 +41,8 @@ async fn test_meta_api_list_database() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1]).await?;

let client0 = tcs[0].flight_client().await?;
let client1 = tcs[1].flight_client().await?;
let client0 = tcs[0].grpc_client().await?;
let client1 = tcs[1].grpc_client().await?;

MetaApiTestSuite {}
.list_database_diff_nodes(&client0, &client1)
Expand All @@ -56,8 +56,8 @@ async fn test_meta_api_table_create_get_drop() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1]).await?;

let client0 = tcs[0].flight_client().await?;
let client1 = tcs[1].flight_client().await?;
let client0 = tcs[0].grpc_client().await?;
let client1 = tcs[1].grpc_client().await?;
MetaApiTestSuite {}
.table_get_diff_nodes(&client0, &client1)
.await
Expand All @@ -70,8 +70,8 @@ async fn test_meta_api_list_table() -> anyhow::Result<()> {

let tcs = start_metasrv_cluster(&[0, 1]).await?;

let client0 = tcs[0].flight_client().await?;
let client1 = tcs[1].flight_client().await?;
let client0 = tcs[0].grpc_client().await?;
let client1 = tcs[1].grpc_client().await?;

MetaApiTestSuite {}
.list_table_diff_nodes(&client0, &client1)
Expand Down
15 changes: 4 additions & 11 deletions metasrv/tests/it/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ use anyhow::Result;
use async_raft::NodeId;
use common_base::tokio;
use common_base::GlobalSequence;
use common_base::Stoppable;
use common_meta_flight::MetaFlightClient;
use common_meta_raft_store::protobuf::meta_service_client::MetaServiceClient;
use common_meta_raft_store::protobuf::GetReq;
use common_meta_raft_store::MetaGrpcClient;
use common_tracing::tracing;
use databend_meta::configs;
use databend_meta::meta_service::MetaNode;
Expand All @@ -39,11 +38,8 @@ pub async fn start_metasrv() -> Result<(MetaSrvTestContext, String)> {
}

pub async fn start_metasrv_with_context(tc: &mut MetaSrvTestContext) -> Result<()> {
let mn = MetaNode::start(&tc.config.raft_config).await?;
let mut srv = GrpcServer::create(tc.config.clone(), mn);
srv.start().await?;
MetaNode::start(&tc.config.raft_config).await?;

tc.flight_srv = Some(Box::new(srv));
Ok(())
}

Expand Down Expand Up @@ -83,8 +79,6 @@ pub struct MetaSrvTestContext {
pub config: configs::Config,

pub meta_nodes: Vec<Arc<MetaNode>>,

pub flight_srv: Option<Box<GrpcServer>>,
}

impl MetaSrvTestContext {
Expand Down Expand Up @@ -134,14 +128,13 @@ impl MetaSrvTestContext {
MetaSrvTestContext {
config,
meta_nodes: vec![],
flight_srv: None,
}
}

pub async fn flight_client(&self) -> anyhow::Result<MetaFlightClient> {
pub async fn grpc_client(&self) -> anyhow::Result<MetaGrpcClient> {
let addr = self.config.grpc_api_address.clone();

let client = MetaFlightClient::try_create(addr.as_str(), "root", "xxx").await?;
let client = MetaGrpcClient::try_create(addr.as_str(), "root", "xxx").await?;
Ok(client)
}

Expand Down
6 changes: 2 additions & 4 deletions query/src/catalogs/impls/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ impl MutableCatalog {
} else {
tracing::info!("use remote meta");

let meta_client_provider = Arc::new(MetaClientProvider::new(
conf.meta.to_grpc_client_config(),
conf.meta.to_flight_client_config(),
));
let meta_client_provider =
Arc::new(MetaClientProvider::new(conf.meta.to_grpc_client_config()));
let meta_remote = MetaRemote::create(meta_client_provider);
Arc::new(meta_remote)
};
Expand Down
5 changes: 1 addition & 4 deletions query/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ pub struct ClusterDiscovery {

impl ClusterDiscovery {
async fn create_meta_client(cfg: &Config) -> Result<Arc<dyn KVApi>> {
let meta_api_provider = MetaClientProvider::new(
cfg.meta.to_grpc_client_config(),
cfg.meta.to_flight_client_config(),
);
let meta_api_provider = MetaClientProvider::new(cfg.meta.to_grpc_client_config());
match meta_api_provider.try_get_kv_client().await {
Ok(client) => Ok(client),
Err(cause) => Err(cause.add_message_back("(while create cluster api).")),
Expand Down
14 changes: 4 additions & 10 deletions query/src/common/meta/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::Arc;

use common_exception::Result;
use common_meta_api::KVApi;
use common_meta_flight::MetaFlightClient;
use common_meta_flight::MetaFlightClientConf;
use common_meta_raft_store::MetaGrpcClient;
use common_meta_raft_store::MetaGrpcClientConf;

Expand All @@ -30,15 +28,11 @@ use common_meta_raft_store::MetaGrpcClientConf;
#[derive(Clone)]
pub struct MetaClientProvider {
grpc_conf: MetaGrpcClientConf,
flight_conf: MetaFlightClientConf,
}

impl MetaClientProvider {
pub fn new(grpc_conf: MetaGrpcClientConf, flight_conf: MetaFlightClientConf) -> Self {
MetaClientProvider {
grpc_conf,
flight_conf,
}
pub fn new(grpc_conf: MetaGrpcClientConf) -> Self {
MetaClientProvider { grpc_conf }
}

/// Get meta async client, trait is defined in MetaApi.
Expand All @@ -49,12 +43,12 @@ impl MetaClientProvider {

/// Get kv async client, operations trait defined in KVApi.
pub async fn try_get_kv_client(&self) -> Result<Arc<dyn KVApi>> {
let local = self.flight_conf.kv_service_config.address.is_empty();
let local = self.grpc_conf.meta_service_config.address.is_empty();
if local {
let meta_store = common_meta_embedded::MetaEmbedded::get_meta().await?;
Ok(meta_store)
} else {
let client = MetaFlightClient::try_new(&self.flight_conf).await?;
let client = MetaGrpcClient::try_new(&self.grpc_conf).await?;
Ok(Arc::new(client))
}
}
Expand Down

0 comments on commit 69365ca

Please sign in to comment.