diff --git a/Cargo.lock b/Cargo.lock index f96a1fcade536..0ae6b00effff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8574,16 +8574,6 @@ dependencies = [ "libc", ] -[[package]] -name = "macros" -version = "0.9.0" -source = "git+https://github.com/drmingdrmer/openraft?tag=v0.9.0-alpha.11#c9a463f5ce73d1e7dd66eabfe909fe8d5a087f0e" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", -] - [[package]] name = "maplit" version = "1.0.2" @@ -9453,15 +9443,15 @@ dependencies = [ [[package]] name = "openraft" version = "0.9.0" -source = "git+https://github.com/drmingdrmer/openraft?tag=v0.9.0-alpha.11#c9a463f5ce73d1e7dd66eabfe909fe8d5a087f0e" +source = "git+https://github.com/datafuselabs/openraft?tag=v0.9.0#bec85f5bebd640d69c6999540e6bd2fe40fbc5b7" dependencies = [ "anyerror", "byte-unit", "clap 4.5.1", "derive_more", "futures", - "macros", "maplit", + "openraft-macros", "rand 0.8.5", "serde", "thiserror", @@ -9471,6 +9461,16 @@ dependencies = [ "validit", ] +[[package]] +name = "openraft-macros" +version = "0.9.0" +source = "git+https://github.com/datafuselabs/openraft?tag=v0.9.0#bec85f5bebd640d69c6999540e6bd2fe40fbc5b7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "opensrv-mysql" version = "0.5.0" diff --git a/src/meta/app/src/tenant/mod.rs b/src/meta/app/src/tenant/mod.rs index b720d74ad9e81..ef2d46835f3be 100644 --- a/src/meta/app/src/tenant/mod.rs +++ b/src/meta/app/src/tenant/mod.rs @@ -15,6 +15,8 @@ mod quota; #[allow(clippy::module_inception)] mod tenant; +mod tenant_quota_ident; pub use quota::TenantQuota; pub use tenant::Tenant; +pub use tenant_quota_ident::TenantQuotaIdent; diff --git a/src/meta/app/src/tenant/tenant_quota_ident.rs b/src/meta/app/src/tenant/tenant_quota_ident.rs new file mode 100644 index 0000000000000..25fe470024f55 --- /dev/null +++ b/src/meta/app/src/tenant/tenant_quota_ident.rs @@ -0,0 +1,85 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant::Tenant; + +/// QuotaIdent is a unique identifier for a quota. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TenantQuotaIdent { + pub tenant: Tenant, +} + +impl TenantQuotaIdent { + pub fn new(tenant: Tenant) -> Self { + Self { tenant } + } +} + +mod kvapi_key_impl { + use databend_common_meta_kvapi::kvapi; + use databend_common_meta_kvapi::kvapi::KeyError; + + use crate::tenant::Tenant; + use crate::tenant::TenantQuota; + use crate::tenant::TenantQuotaIdent; + use crate::KeyWithTenant; + + impl kvapi::Key for TenantQuotaIdent { + const PREFIX: &'static str = "__fd_quotas"; + type ValueType = TenantQuota; + + fn parent(&self) -> Option { + Some(self.tenant.to_string_key()) + } + + fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { + b.push_str(self.tenant_name()) + } + + fn decode_key(p: &mut kvapi::KeyParser) -> Result { + let tenant = p.next_nonempty()?; + Ok(TenantQuotaIdent::new(Tenant::new_nonempty(tenant))) + } + } + + impl KeyWithTenant for TenantQuotaIdent { + fn tenant(&self) -> &Tenant { + &self.tenant + } + } + + impl kvapi::Value for TenantQuota { + fn dependency_keys(&self) -> impl IntoIterator { + [] + } + } +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::*; + + #[test] + fn test_quota_ident() { + let tenant = Tenant::new("test"); + let ident = TenantQuotaIdent::new(tenant.clone()); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_quotas/test"); + + assert_eq!(ident, TenantQuotaIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/query/management/src/quota/quota_mgr.rs b/src/query/management/src/quota/quota_mgr.rs index 499fa6c2f7557..12d744d131d98 100644 --- a/src/query/management/src/quota/quota_mgr.rs +++ b/src/query/management/src/quota/quota_mgr.rs @@ -17,44 +17,53 @@ use std::sync::Arc; use databend_common_base::base::escape_for_key; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_meta_api::kv_pb_api::KVPbApi; +use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant::TenantQuota; +use databend_common_meta_app::tenant::TenantQuotaIdent; use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_types::ConflictSeq; use databend_common_meta_types::IntoSeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; +use databend_common_meta_types::NonEmptyString; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; +use databend_common_meta_types::UpsertKV; +use databend_common_meta_types::With; use super::quota_api::QuotaApi; -static QUOTA_API_KEY_PREFIX: &str = "__fd_quotas"; - pub struct QuotaMgr { kv_api: Arc>, - key: String, + ident: TenantQuotaIdent, } impl QuotaMgr { - pub fn create(kv_api: Arc>, tenant: &str) -> Result { - if tenant.is_empty() { - return Err(ErrorCode::TenantIsEmpty( - "Tenant can not empty(while quota mgr create)", - )); - } - Ok(QuotaMgr { + pub fn create( + kv_api: Arc>, + tenant: &NonEmptyString, + ) -> Self { + QuotaMgr { kv_api, - key: format!("{}/{}", QUOTA_API_KEY_PREFIX, escape_for_key(tenant)?), - }) + ident: TenantQuotaIdent::new(Tenant::new_nonempty(tenant.clone())), + } + } + + fn key(&self) -> String { + self.ident.to_string_key() } } +// TODO: use pb to replace json #[async_trait::async_trait] impl QuotaApi for QuotaMgr { #[async_backtrace::framed] async fn get_quota(&self, seq: MatchSeq) -> Result> { - let res = self.kv_api.get_kv(&self.key).await?; + let res = self.kv_api.get_kv(&self.key()).await?; match res { Some(seq_value) => match seq.match_seq(&seq_value) { Ok(_) => Ok(seq_value.into_seqv()?), @@ -69,12 +78,7 @@ impl QuotaApi for QuotaMgr { let value = serde_json::to_vec(quota)?; let res = self .kv_api - .upsert_kv(UpsertKVReq::new( - &self.key, - seq, - Operation::Update(value), - None, - )) + .upsert_kv(UpsertKV::update(&self.key(), &value).with(seq)) .await?; match res.result { diff --git a/src/query/service/src/interpreters/interpreter_database_create.rs b/src/query/service/src/interpreters/interpreter_database_create.rs index ed427043d20ce..8e1be6f8aa55c 100644 --- a/src/query/service/src/interpreters/interpreter_database_create.rs +++ b/src/query/service/src/interpreters/interpreter_database_create.rs @@ -109,7 +109,7 @@ impl Interpreter for CreateDatabaseInterpreter { debug!("ctx.id" = self.ctx.get_id().as_str(); "create_database_execute"); let tenant = self.plan.tenant.clone(); - let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?; + let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant); let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data; let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let databases = catalog.list_databases(tenant.as_str()).await?; diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index f349846af5a9e..d3ab10558fce3 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -117,7 +117,7 @@ impl Interpreter for CreateTableInterpreter { .check_enterprise_enabled(self.ctx.get_license_key(), ComputedColumn)?; } - let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?; + let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant); let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data; let engine = self.plan.engine; let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; diff --git a/src/query/service/src/interpreters/interpreter_user_create.rs b/src/query/service/src/interpreters/interpreter_user_create.rs index 508358f53d675..38b88a6a79f82 100644 --- a/src/query/service/src/interpreters/interpreter_user_create.rs +++ b/src/query/service/src/interpreters/interpreter_user_create.rs @@ -62,7 +62,7 @@ impl Interpreter for CreateUserInterpreter { let user_mgr = UserApiProvider::instance(); let users = user_mgr.get_users(&tenant).await?; - let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?; + let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant); let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data; if quota.max_users != 0 && users.len() >= quota.max_users as usize { return Err(ErrorCode::TenantQuotaExceeded(format!( diff --git a/src/query/service/src/interpreters/interpreter_user_stage_create.rs b/src/query/service/src/interpreters/interpreter_user_stage_create.rs index 4464522cb1e06..953dcdd9cd4aa 100644 --- a/src/query/service/src/interpreters/interpreter_user_stage_create.rs +++ b/src/query/service/src/interpreters/interpreter_user_stage_create.rs @@ -77,7 +77,7 @@ impl Interpreter for CreateUserStageInterpreter { ErrorCode::TenantIsEmpty("tenant is empty when CreateUserStateInterpreter") })?; - let quota_api = user_mgr.get_tenant_quota_api_client(&tenant)?; + let quota_api = user_mgr.tenant_quota_api(&tenant); let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data; let stages = user_mgr.get_stages(&tenant).await?; if quota.max_stages != 0 && stages.len() >= quota.max_stages as usize { diff --git a/src/query/service/src/table_functions/others/tenant_quota.rs b/src/query/service/src/table_functions/others/tenant_quota.rs index 69398ec494c41..8ba238f6d660c 100644 --- a/src/query/service/src/table_functions/others/tenant_quota.rs +++ b/src/query/service/src/table_functions/others/tenant_quota.rs @@ -245,7 +245,7 @@ impl AsyncSource for TenantQuotaSource { } tenant = args[0].clone(); } - let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?; + let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant); let res = quota_api.get_quota(MatchSeq::GE(0)).await?; let mut quota = res.data; diff --git a/src/query/users/src/user_api.rs b/src/query/users/src/user_api.rs index b44918cc2b2aa..1db9e8f08b052 100644 --- a/src/query/users/src/user_api.rs +++ b/src/query/users/src/user_api.rs @@ -67,7 +67,7 @@ impl UserApiProvider { GlobalInstance::set(Self::try_create(conf, idm_config, tenant).await?); let user_mgr = UserApiProvider::instance(); if let Some(q) = quota { - let i = user_mgr.get_tenant_quota_api_client(tenant)?; + let i = user_mgr.tenant_quota_api(tenant); let res = i.get_quota(MatchSeq::GE(0)).await?; i.set_quota(&q, MatchSeq::Exact(res.seq)).await?; } @@ -149,14 +149,8 @@ impl UserApiProvider { )?)) } - pub fn get_tenant_quota_api_client( - &self, - tenant: &NonEmptyString, - ) -> Result> { - Ok(Arc::new(QuotaMgr::create( - self.client.clone(), - tenant.as_str(), - )?)) + pub fn tenant_quota_api(&self, tenant: &NonEmptyString) -> Arc { + Arc::new(QuotaMgr::create(self.client.clone(), tenant)) } pub fn setting_api(&self, tenant: &NonEmptyString) -> Arc {