Skip to content

Commit

Permalink
refactor: use kvapi::Key to define meta-service key for QuotaMgr
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Mar 12, 2024
1 parent 0538cfa commit 5d173ea
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 45 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/meta/app/src/tenant/mod.rs
Expand Up @@ -15,6 +15,8 @@
mod quota;
#[allow(clippy::module_inception)]
mod tenant;
mod tenant_quota_ident;

pub use quota::TenantQuota;
pub use tenant::Tenant;
pub use tenant_quota_ident::TenantQuotaIdent;
85 changes: 85 additions & 0 deletions src/meta/app/src/tenant/tenant_quota_ident.rs
@@ -0,0 +1,85 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant::Tenant;

/// QuotaIdent is a unique identifier for a quota.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TenantQuotaIdent {
pub tenant: Tenant,
}

impl TenantQuotaIdent {
pub fn new(tenant: Tenant) -> Self {
Self { tenant }
}
}

mod kvapi_key_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KeyError;

use crate::tenant::Tenant;
use crate::tenant::TenantQuota;
use crate::tenant::TenantQuotaIdent;
use crate::KeyWithTenant;

impl kvapi::Key for TenantQuotaIdent {
const PREFIX: &'static str = "__fd_quotas";
type ValueType = TenantQuota;

fn parent(&self) -> Option<String> {
Some(self.tenant.to_string_key())
}

fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
b.push_str(self.tenant_name())
}

fn decode_key(p: &mut kvapi::KeyParser) -> Result<Self, KeyError> {
let tenant = p.next_nonempty()?;
Ok(TenantQuotaIdent::new(Tenant::new_nonempty(tenant)))
}
}

impl KeyWithTenant for TenantQuotaIdent {
fn tenant(&self) -> &Tenant {
&self.tenant
}
}

impl kvapi::Value for TenantQuota {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
}
}
}

#[cfg(test)]
mod tests {
use databend_common_meta_kvapi::kvapi::Key;

use super::*;

#[test]
fn test_quota_ident() {
let tenant = Tenant::new("test");
let ident = TenantQuotaIdent::new(tenant.clone());

let key = ident.to_string_key();
assert_eq!(key, "__fd_quotas/test");

assert_eq!(ident, TenantQuotaIdent::from_str_key(&key).unwrap());
}
}
42 changes: 23 additions & 19 deletions src/query/management/src/quota/quota_mgr.rs
Expand Up @@ -17,44 +17,53 @@ use std::sync::Arc;
use databend_common_base::base::escape_for_key;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant::TenantQuota;
use databend_common_meta_app::tenant::TenantQuotaIdent;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::ConflictSeq;
use databend_common_meta_types::IntoSeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::NonEmptyString;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;

use super::quota_api::QuotaApi;

static QUOTA_API_KEY_PREFIX: &str = "__fd_quotas";

pub struct QuotaMgr {
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
key: String,
ident: TenantQuotaIdent,
}

impl QuotaMgr {
pub fn create(kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>, tenant: &str) -> Result<Self> {
if tenant.is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while quota mgr create)",
));
}
Ok(QuotaMgr {
pub fn create(
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
tenant: &NonEmptyString,
) -> Self {
QuotaMgr {
kv_api,
key: format!("{}/{}", QUOTA_API_KEY_PREFIX, escape_for_key(tenant)?),
})
ident: TenantQuotaIdent::new(Tenant::new_nonempty(tenant.clone())),
}
}

fn key(&self) -> String {
self.ident.to_string_key()
}
}

// TODO: use pb to replace json
#[async_trait::async_trait]
impl QuotaApi for QuotaMgr {
#[async_backtrace::framed]
async fn get_quota(&self, seq: MatchSeq) -> Result<SeqV<TenantQuota>> {
let res = self.kv_api.get_kv(&self.key).await?;
let res = self.kv_api.get_kv(&self.key()).await?;
match res {
Some(seq_value) => match seq.match_seq(&seq_value) {
Ok(_) => Ok(seq_value.into_seqv()?),
Expand All @@ -69,12 +78,7 @@ impl QuotaApi for QuotaMgr {
let value = serde_json::to_vec(quota)?;
let res = self
.kv_api
.upsert_kv(UpsertKVReq::new(
&self.key,
seq,
Operation::Update(value),
None,
))
.upsert_kv(UpsertKV::update(&self.key(), &value).with(seq))
.await?;

match res.result {
Expand Down
Expand Up @@ -109,7 +109,7 @@ impl Interpreter for CreateDatabaseInterpreter {
debug!("ctx.id" = self.ctx.get_id().as_str(); "create_database_execute");

let tenant = self.plan.tenant.clone();
let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?;
let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant);
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let databases = catalog.list_databases(tenant.as_str()).await?;
Expand Down
Expand Up @@ -117,7 +117,7 @@ impl Interpreter for CreateTableInterpreter {
.check_enterprise_enabled(self.ctx.get_license_key(), ComputedColumn)?;
}

let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?;
let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant);
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
let engine = self.plan.engine;
let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?;
Expand Down
Expand Up @@ -62,7 +62,7 @@ impl Interpreter for CreateUserInterpreter {
let user_mgr = UserApiProvider::instance();
let users = user_mgr.get_users(&tenant).await?;

let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?;
let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant);
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
if quota.max_users != 0 && users.len() >= quota.max_users as usize {
return Err(ErrorCode::TenantQuotaExceeded(format!(
Expand Down
Expand Up @@ -77,7 +77,7 @@ impl Interpreter for CreateUserStageInterpreter {
ErrorCode::TenantIsEmpty("tenant is empty when CreateUserStateInterpreter")
})?;

let quota_api = user_mgr.get_tenant_quota_api_client(&tenant)?;
let quota_api = user_mgr.tenant_quota_api(&tenant);
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
let stages = user_mgr.get_stages(&tenant).await?;
if quota.max_stages != 0 && stages.len() >= quota.max_stages as usize {
Expand Down
Expand Up @@ -245,7 +245,7 @@ impl AsyncSource for TenantQuotaSource {
}
tenant = args[0].clone();
}
let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?;
let quota_api = UserApiProvider::instance().tenant_quota_api(&tenant);
let res = quota_api.get_quota(MatchSeq::GE(0)).await?;
let mut quota = res.data;

Expand Down
12 changes: 3 additions & 9 deletions src/query/users/src/user_api.rs
Expand Up @@ -67,7 +67,7 @@ impl UserApiProvider {
GlobalInstance::set(Self::try_create(conf, idm_config, tenant).await?);
let user_mgr = UserApiProvider::instance();
if let Some(q) = quota {
let i = user_mgr.get_tenant_quota_api_client(tenant)?;
let i = user_mgr.tenant_quota_api(tenant);
let res = i.get_quota(MatchSeq::GE(0)).await?;
i.set_quota(&q, MatchSeq::Exact(res.seq)).await?;
}
Expand Down Expand Up @@ -149,14 +149,8 @@ impl UserApiProvider {
)?))
}

pub fn get_tenant_quota_api_client(
&self,
tenant: &NonEmptyString,
) -> Result<Arc<dyn QuotaApi>> {
Ok(Arc::new(QuotaMgr::create(
self.client.clone(),
tenant.as_str(),
)?))
pub fn tenant_quota_api(&self, tenant: &NonEmptyString) -> Arc<dyn QuotaApi> {
Arc::new(QuotaMgr::create(self.client.clone(), tenant))
}

pub fn setting_api(&self, tenant: &NonEmptyString) -> Arc<dyn SettingApi> {
Expand Down

0 comments on commit 5d173ea

Please sign in to comment.