Skip to content

Commit

Permalink
refactor: rename data_in to coord_data_in
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuoTiJia committed Sep 11, 2023
1 parent 818b7af commit 9c01221
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 43 deletions.
16 changes: 8 additions & 8 deletions config/src/limiter_config.rs
Expand Up @@ -40,10 +40,10 @@ pub struct Bucket {

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct RequestLimiterConfig {
pub data_in: Option<Bucket>,
pub data_out: Option<Bucket>,
pub queries: Option<Bucket>,
pub writes: Option<Bucket>,
pub coord_data_in: Option<Bucket>,
pub coord_data_out: Option<Bucket>,
pub coord_queries: Option<Bucket>,
pub coord_writes: Option<Bucket>,
}

#[test]
Expand All @@ -59,20 +59,20 @@ max_replicate_number = 2
max_retention_time = 30
[request_config.data_in]
[request_config.coord_data_in]
local_bucket = {max = 100, initial = 0}
remote_bucket = {max = 100, initial = 0, refill = 100, interval = 100}
[request_config.data_out]
[request_config.coord_data_out]
local_bucket = {max = 100, initial = 0}
remote_bucket = {max = 100, initial = 0, refill = 100, interval = 100}
[request_config.data_writes]
[request_config.coord_data_writes]
local_bucket = {max = 100, initial = 0}
remote_bucket = {max = 100, initial = 0, refill = 100, interval = 100}
[request_config.data_queries]
[request_config.coord_data_queries]
local_bucket = {max = 100, initial = 0}
remote_bucket = {max = 100, initial = 0, refill = 100, interval = 100}
"#;
Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/reader/mod.rs
Expand Up @@ -126,7 +126,7 @@ impl<O: VnodeOpener> CheckedCoordinatorRecordBatchStream<O> {
let limiter = self.limiter.clone();
let future = async move {
limiter
.check_data_out(batch_memory)
.check_coord_data_out(batch_memory)
.await
.map_err(CoordinatorError::from)
};
Expand Down
6 changes: 3 additions & 3 deletions coordinator/src/service.rs
Expand Up @@ -302,7 +302,7 @@ impl CoordService {

let checker = async move {
meta.limiter(&tenant)
.check_query()
.check_coord_queries()
.await
.map_err(CoordinatorError::from)
};
Expand Down Expand Up @@ -348,8 +348,8 @@ impl CoordService {
let limiter = self.meta.limiter(tenant);
let write_size = points.len();

limiter.check_write().await?;
limiter.check_data_in(write_size).await?;
limiter.check_coord_writes().await?;
limiter.check_coord_data_in(write_size).await?;

self.metrics.data_in(tenant, db).inc(write_size as u64);
}
Expand Down
9 changes: 5 additions & 4 deletions meta/src/limiter/limiter_kind.rs
Expand Up @@ -4,11 +4,12 @@ use serde::{Deserialize, Serialize};

#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RequestLimiterKind {
DataIn,
DataOut,
Queries,
Writes,
CoordDataIn,
CoordDataOut,
CoordQueries,
CoordWrites,
}

impl Display for RequestLimiterKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
Expand Down
24 changes: 12 additions & 12 deletions meta/src/limiter/local_request_limiter.rs
Expand Up @@ -66,10 +66,10 @@ impl LocalRequestLimiter {
match limiter_config {
Some(config) => {
let mut buckets = HashMap::new();
insert_local_bucket(&mut buckets, DataIn, config.data_in.as_ref());
insert_local_bucket(&mut buckets, DataOut, config.data_out.as_ref());
insert_local_bucket(&mut buckets, Writes, config.writes.as_ref());
insert_local_bucket(&mut buckets, Queries, config.queries.as_ref());
insert_local_bucket(&mut buckets, CoordDataIn, config.coord_data_in.as_ref());
insert_local_bucket(&mut buckets, CoordDataOut, config.coord_data_out.as_ref());
insert_local_bucket(&mut buckets, CoordWrites, config.coord_writes.as_ref());
insert_local_bucket(&mut buckets, CoordQueries, config.coord_queries.as_ref());
buckets
}
None => HashMap::new(),
Expand Down Expand Up @@ -165,22 +165,22 @@ impl LocalRequestLimiter {

#[async_trait]
impl RequestLimiter for LocalRequestLimiter {
async fn check_data_in(&self, data_len: usize) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::DataIn, data_len)
async fn check_coord_data_in(&self, data_len: usize) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::CoordDataIn, data_len)
.await
}

async fn check_data_out(&self, data_len: usize) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::DataOut, data_len)
async fn check_coord_data_out(&self, data_len: usize) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::CoordDataOut, data_len)
.await
}

async fn check_query(&self) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::Queries, 1).await
async fn check_coord_queries(&self) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::CoordQueries, 1).await
}

async fn check_write(&self) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::Writes, 1).await
async fn check_coord_writes(&self) -> MetaResult<()> {
self.check_bucket(RequestLimiterKind::CoordWrites, 1).await
}
}

Expand Down
8 changes: 4 additions & 4 deletions meta/src/limiter/mod.rs
Expand Up @@ -42,8 +42,8 @@ pub type LimiterRef = Arc<dyn RequestLimiter>;
///
#[async_trait]
pub trait RequestLimiter: Send + Sync + Debug {
async fn check_data_in(&self, data_len: usize) -> MetaResult<()>;
async fn check_data_out(&self, data_len: usize) -> MetaResult<()>;
async fn check_query(&self) -> MetaResult<()>;
async fn check_write(&self) -> MetaResult<()>;
async fn check_coord_data_in(&self, data_len: usize) -> MetaResult<()>;
async fn check_coord_data_out(&self, data_len: usize) -> MetaResult<()>;
async fn check_coord_queries(&self) -> MetaResult<()>;
async fn check_coord_writes(&self) -> MetaResult<()>;
}
8 changes: 4 additions & 4 deletions meta/src/limiter/none_limiter.rs
Expand Up @@ -19,19 +19,19 @@ impl Debug for NoneLimiter {

#[async_trait]
impl RequestLimiter for NoneLimiter {
async fn check_data_in(&self, _data_len: usize) -> MetaResult<()> {
async fn check_coord_data_in(&self, _data_len: usize) -> MetaResult<()> {
Ok(())
}

async fn check_data_out(&self, _data_len: usize) -> MetaResult<()> {
async fn check_coord_data_out(&self, _data_len: usize) -> MetaResult<()> {
Ok(())
}

async fn check_query(&self) -> MetaResult<()> {
async fn check_coord_queries(&self) -> MetaResult<()> {
Ok(())
}

async fn check_write(&self) -> MetaResult<()> {
async fn check_coord_writes(&self) -> MetaResult<()> {
Ok(())
}
}
8 changes: 4 additions & 4 deletions meta/src/limiter/remote_request_limiter.rs
Expand Up @@ -19,10 +19,10 @@ impl RemoteRequestLimiter {
Some(config) => {
use RequestLimiterKind::*;
let mut buckets = HashMap::new();
insert_remote_bucket(&mut buckets, DataIn, config.data_in.as_ref());
insert_remote_bucket(&mut buckets, DataOut, config.data_out.as_ref());
insert_remote_bucket(&mut buckets, Queries, config.queries.as_ref());
insert_remote_bucket(&mut buckets, Writes, config.writes.as_ref());
insert_remote_bucket(&mut buckets, CoordDataIn, config.coord_data_in.as_ref());
insert_remote_bucket(&mut buckets, CoordDataOut, config.coord_data_out.as_ref());
insert_remote_bucket(&mut buckets, CoordQueries, config.coord_queries.as_ref());
insert_remote_bucket(&mut buckets, CoordWrites, config.coord_writes.as_ref());
buckets
}
None => HashMap::new(),
Expand Down
Expand Up @@ -103,15 +103,15 @@ test_ts_tenant1,"{""comment"":null,""limiter_config"":null}"
200 OK


-- EXECUTE SQL: alter tenant test_ts_tenant1 set _limiter = '{"object_config":{"max_users_number":1,"max_databases":3,"max_shard_number":2,"max_replicate_number":2,"max_retention_time":30},"request_config":{"data_in":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"data_out":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"queries":null,"writes":null}}'; --
-- EXECUTE SQL: alter tenant test_ts_tenant1 set _limiter = '{"object_config":{"max_users_number":1,"max_databases":3,"max_shard_number":2,"max_replicate_number":2,"max_retention_time":30},"request_config":{"coord_data_in":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"coord_data_out":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"coord_queries":null,"coord_writes":null}}'; --
200 OK


-- EXECUTE SQL: select * from cluster_schema.tenants where tenant_name in ('test_ts_tenant1'); --
-- AFTER_SORT --
200 OK
tenant_name,tenant_options
test_ts_tenant1,"{""comment"":""hello world"",""limiter_config"":{""object_config"":{""max_users_number"":1,""max_databases"":3,""max_shard_number"":2,""max_replicate_number"":2,""max_retention_time"":30},""request_config"":{""data_in"":{""remote_bucket"":{""max"":100,""initial"":0,""refill"":100,""interval"":100},""local_bucket"":{""max"":100,""initial"":0}},""data_out"":{""remote_bucket"":{""max"":100,""initial"":0,""refill"":100,""interval"":100},""local_bucket"":{""max"":100,""initial"":0}},""queries"":null,""writes"":null}}}"
test_ts_tenant1,"{""comment"":""hello world"",""limiter_config"":{""object_config"":{""max_users_number"":1,""max_databases"":3,""max_shard_number"":2,""max_replicate_number"":2,""max_retention_time"":30},""request_config"":{""coord_data_in"":{""remote_bucket"":{""max"":100,""initial"":0,""refill"":100,""interval"":100},""local_bucket"":{""max"":100,""initial"":0}},""coord_data_out"":{""remote_bucket"":{""max"":100,""initial"":0,""refill"":100,""interval"":100},""local_bucket"":{""max"":100,""initial"":0}},""coord_queries"":null,""coord_writes"":null}}}"

-- EXECUTE SQL: alter tenant test_ts_tenant1 unset comment; --
200 OK
Expand Down
Expand Up @@ -34,7 +34,7 @@ select * from cluster_schema.tenants where tenant_name in ('test_ts_tenant1');
drop tenant cnosdb;

alter tenant test_ts_tenant1 set comment = 'hello world';
alter tenant test_ts_tenant1 set _limiter = '{"object_config":{"max_users_number":1,"max_databases":3,"max_shard_number":2,"max_replicate_number":2,"max_retention_time":30},"request_config":{"data_in":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"data_out":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"queries":null,"writes":null}}';
alter tenant test_ts_tenant1 set _limiter = '{"object_config":{"max_users_number":1,"max_databases":3,"max_shard_number":2,"max_replicate_number":2,"max_retention_time":30},"request_config":{"coord_data_in":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"coord_data_out":{"remote_bucket":{"max":100,"initial":0,"refill":100,"interval":100},"local_bucket":{"max":100,"initial":0}},"coord_queries":null,"coord_writes":null}}';

select * from cluster_schema.tenants where tenant_name in ('test_ts_tenant1');

Expand Down

0 comments on commit 9c01221

Please sign in to comment.