Skip to content

Commit

Permalink
improve meta watch && Optimize the interaction process between query …
Browse files Browse the repository at this point in the history
…nodes and meta nodes
  • Loading branch information
bartliu827 committed Jul 3, 2023
1 parent 83302a8 commit 8637611
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 71 deletions.
8 changes: 6 additions & 2 deletions common/models/src/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::sync::Arc;

use serde::{Deserialize, Serialize};

use crate::auth::role::{CustomTenantRole, TenantRoleIdentifier};
use crate::node_info::NodeStatus;
use crate::oid::Oid;
use crate::predicate::domain::TimeRange;
use crate::schema::{DatabaseSchema, TableSchema};

Expand Down Expand Up @@ -199,16 +201,18 @@ impl DatabaseInfo {
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct TenantMetaData {
pub version: u64,
pub users: HashMap<String, UserInfo>,
pub dbs: HashMap<String, DatabaseInfo>,
pub roles: HashMap<String, CustomTenantRole<Oid>>,
pub members: HashMap<String, TenantRoleIdentifier>,
}

impl TenantMetaData {
pub fn new() -> Self {
Self {
version: 0,
users: HashMap::new(),
dbs: HashMap::new(),
roles: HashMap::new(),
members: HashMap::new(),
}
}

Expand Down
116 changes: 62 additions & 54 deletions meta/src/model/meta_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use config::Config;
use models::auth::role::TenantRoleIdentifier;
use models::auth::user::{admin_user, User, UserDesc, UserOptions};
use models::meta_data::*;
use models::node_info::NodeStatus;
Expand Down Expand Up @@ -52,6 +51,7 @@ pub struct AdminMeta {
watch_tenants: Arc<RwLock<HashSet<String>>>,
watch_notify: Sender<UseTenantInfo>,

users: RwLock<HashMap<String, UserDesc>>,
conn_map: RwLock<HashMap<u64, Channel>>,
data_nodes: RwLock<HashMap<u64, NodeInfo>>,

Expand All @@ -67,6 +67,8 @@ impl AdminMeta {
config: Config::default(),
watch_notify,
client: MetaHttpClient::new("".to_string()),

users: RwLock::new(HashMap::new()),
conn_map: RwLock::new(HashMap::new()),
data_nodes: RwLock::new(HashMap::new()),
tenants: RwLock::new(HashMap::new()),
Expand All @@ -86,6 +88,8 @@ impl AdminMeta {
config,
watch_notify,
client: MetaHttpClient::new(meta_url),

users: RwLock::new(HashMap::new()),
conn_map: RwLock::new(HashMap::new()),
data_nodes: RwLock::new(HashMap::new()),
tenants: RwLock::new(HashMap::new()),
Expand All @@ -95,7 +99,7 @@ impl AdminMeta {
watch_version: Arc::new(AtomicU64::new(0)),
});

let base_ver = admin.sync_all_data_node().await.unwrap();
let base_ver = admin.sync_gobal_info().await.unwrap();
admin.watch_version.store(base_ver, Ordering::Relaxed);

tokio::spawn(AdminMeta::watch_task_manager(admin.clone(), receiver));
Expand Down Expand Up @@ -173,6 +177,30 @@ impl AdminMeta {
Ok(id)
}

pub async fn sync_gobal_info(&self) -> MetaResult<u64> {
let req = command::ReadCommand::DataNodes(self.config.cluster.name.clone());
let (resp, version) = self.client.read::<(Vec<NodeInfo>, u64)>(&req).await?;
{
let mut nodes = self.data_nodes.write().await;
nodes.clear();
for item in resp.iter() {
nodes.insert(item.id, item.clone());
}
}

let req = command::ReadCommand::Users(self.cluster());
let resp = self.client.read::<Vec<UserDesc>>(&req).await?;
{
let mut users = self.users.write().await;
users.clear();
for item in resp.iter() {
users.insert(item.name().to_owned(), item.clone());
}
}

Ok(version)
}

/******************** Watch Meta Data Change Begin *********************/
pub async fn use_tenant(&self, name: &str) -> MetaResult<()> {
if self.watch_tenants.read().await.contains(name) {
Expand Down Expand Up @@ -268,26 +296,21 @@ impl AdminMeta {
request.3 = watch_data.max_ver;
} else {
info!("watch response wrong {:?}", watch_rsp);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
}
}

pub async fn process_full_sync(&self) -> u64 {
let base_ver;
loop {
if let Ok(ver) = self.sync_all_data_node().await {
base_ver = ver;
break;
if let Ok(base_ver) = self.sync_gobal_info().await {
self.tenants.write().await.clear();
return base_ver;
} else {
info!("sync all data node failed");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}

self.tenants.write().await.clear();

base_ver
}

pub async fn process_watch_data(&self, watch_data: &command::WatchData) {
Expand All @@ -311,6 +334,7 @@ impl AdminMeta {
let _ = self.process_watch_log(entry).await;
} else if len == 3 && strs[2] == key_path::AUTO_INCR_ID {
} else if len == 4 && strs[2] == key_path::USERS {
let _ = self.process_watch_log(entry).await;
}
}
}
Expand All @@ -330,18 +354,25 @@ impl AdminMeta {
self.conn_map.write().await.remove(&node_id);
}
}
} else if len == 4 && strs[2] == key_path::USERS {
if entry.tye == command::ENTRY_LOG_TYPE_SET {
if let Ok(user) = serde_json::from_str::<UserDesc>(&entry.val) {
self.users.write().await.insert(strs[3].to_owned(), user);
}
} else if entry.tye == command::ENTRY_LOG_TYPE_DEL {
self.users.write().await.remove(strs[3]);
}
}

Ok(())
}
// **[4] /cluster_name/users/user ->
// **[3] /cluster_name/auto_incr_id -> id

// **[3] /cluster_name/auto_incr_id -> id
// **[4] /cluster_name/users/name -> [UserDesc]
// **[4] /cluster_name/data_nodes/node_id -> [NodeInfo] 集群、数据节点等信息

// **[6] /cluster_name/tenants/tenant/roles/roles ->
// **[6] /cluster_name/tenants/tenant/members/user_id ->
// **[6] /cluster_name/tenants/tenant/users/name -> [UserInfo] 租户下用户信息、访问权限等 -- delete
// **[6] /cluster_name/tenants/tenant/roles/name -> [CustomTenantRole<Oid>]
// **[6] /cluster_name/tenants/tenant/members/oid -> [TenantRoleIdentifier]
// **[6] /cluster_name/tenants/tenant/dbs/db_name -> [DatabaseInfo] db相关信息、保留策略等
// **[8] /cluster_name/tenants/tenant/dbs/db_name/buckets/id -> [BucketInfo] bucket相关信息
// **[8] /cluster_name/tenants/tenant/dbs/db_name/schemas/name -> [TskvTableSchema] schema相关信息
Expand All @@ -350,19 +381,6 @@ impl AdminMeta {
/******************** Watch Meta Data Change End *********************/

/******************** Data Node Operation Begin *********************/
pub async fn sync_all_data_node(&self) -> MetaResult<u64> {
let req = command::ReadCommand::DataNodes(self.config.cluster.name.clone());
let (resp, version) = self.client.read::<(Vec<NodeInfo>, u64)>(&req).await?;
{
let mut nodes = self.data_nodes.write().await;
for item in resp.iter() {
nodes.insert(item.id, item.clone());
}
}

Ok(version)
}

pub async fn add_data_node(&self) -> MetaResult<()> {
let mut attribute = NodeAttribute::default();
if self.config.node_basic.cold_data_server {
Expand Down Expand Up @@ -482,14 +500,19 @@ impl AdminMeta {
user_name: &str,
tenant_name: Option<&str>,
) -> MetaResult<User> {
let user_desc = self
.user(user_name)
.await?
.ok_or_else(|| MetaError::UserNotFound {
user: user_name.to_string(),
})?;
let user_desc = {
let cache = self.users.read().await.get(user_name).cloned();
if let Some(user) = cache {
user.clone()
} else {
self.user(user_name)
.await?
.ok_or_else(|| MetaError::UserNotFound {
user: user_name.to_string(),
})?
}
};

// admin user
if user_desc.is_admin() {
return Ok(admin_user(user_desc));
}
Expand All @@ -503,22 +526,7 @@ impl AdminMeta {
tenant: tenant_name.to_string(),
})?;

let tenant_id = *client.tenant().id();
let role = client.member_role(user_desc.id()).await?.ok_or_else(|| {
MetaError::MemberNotFound {
member_name: user_desc.name().to_string(),
tenant_name: tenant_name.to_string(),
}
})?;

let privileges = match role {
TenantRoleIdentifier::System(sys_role) => sys_role.to_privileges(&tenant_id),
TenantRoleIdentifier::Custom(ref role_name) => client
.custom_role(role_name)
.await?
.map(|e| e.to_privileges(&tenant_id))
.unwrap_or_default(),
};
let privileges = client.user_privileges(&user_desc).await?;

return Ok(User::new(user_desc, privileges));
}
Expand Down Expand Up @@ -628,10 +636,10 @@ impl AdminMeta {
if self.tenants.write().await.remove(name).is_some() {
let req = command::WriteCommand::DropTenant(self.cluster(), name.to_string());

let exist = self.client.write::<bool>(&req).await?;
self.client.write::<()>(&req).await?;
self.limiters.write().await.remove(name);

Ok(exist)
Ok(true)
} else {
Ok(false)
}
Expand Down
75 changes: 71 additions & 4 deletions meta/src/model/meta_tenant.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![allow(dead_code, clippy::if_same_then_else)]

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;

use client::MetaHttpClient;
use config::TenantObjectLimiterConfig;
use models::auth::privilege::DatabasePrivilege;
use models::auth::privilege::{DatabasePrivilege, Privilege};
use models::auth::role::{CustomTenantRole, SystemTenantRole, TenantRoleIdentifier};
use models::auth::user::UserDesc;
use models::meta_data::*;
use models::oid::{Identifier, Oid};
use models::schema::{DatabaseSchema, ExternalTableSchema, TableSchema, Tenant, TskvTableSchema};
Expand Down Expand Up @@ -146,7 +147,7 @@ impl TenantMeta {
max_users_number, ..
} = limiter_config;

let user_number = self.data.read().users.len();
let user_number = self.data.read().members.len();

if let Some(max) = max_users_number {
if user_number >= *max {
Expand Down Expand Up @@ -182,6 +183,53 @@ impl TenantMeta {
self.client.write::<()>(&req).await
}

pub async fn user_privileges(
&self,
user_desc: &UserDesc,
) -> MetaResult<HashSet<Privilege<Oid>>> {
let role = {
let cache = self
.data
.read()
.members
.get(&user_desc.id().to_string())
.cloned();
if let Some(role) = cache {
role.clone()
} else {
self.member_role(user_desc.id()).await?.ok_or_else(|| {
MetaError::MemberNotFound {
member_name: user_desc.name().to_string(),
tenant_name: self.tenant_name(),
}
})?
}
};

let tenant_id = self.tenant().id();
let privileges = match role {
TenantRoleIdentifier::System(sys_role) => sys_role.to_privileges(tenant_id),
TenantRoleIdentifier::Custom(ref role_name) => {
let cache = self
.data
.read()
.roles
.get(&user_desc.id().to_string())
.cloned();
if let Some(role) = cache {
role.to_privileges(tenant_id)
} else {
self.custom_role(role_name)
.await?
.map(|e| e.to_privileges(tenant_id))
.unwrap_or_default()
}
}
};

Ok(privileges)
}

// tenant member start

pub async fn member_role(&self, user_id: &Oid) -> MetaResult<Option<TenantRoleIdentifier>> {
Expand Down Expand Up @@ -655,6 +703,8 @@ impl TenantMeta {
self.data.read().version
}

// **[6] /cluster_name/tenants/tenant/roles/name -> [CustomTenantRole<Oid>]
// **[6] /cluster_name/tenants/tenant/members/oid -> [TenantRoleIdentifier]
pub async fn process_watch_log(&self, entry: &EntryLog) -> MetaResult<()> {
let strs: Vec<&str> = entry.key.split('/').collect();

Expand Down Expand Up @@ -716,9 +766,26 @@ impl TenantMeta {
} else if entry.tye == command::ENTRY_LOG_TYPE_DEL {
data.dbs.remove(db_name);
}
} else if len == 6 && strs[4] == key_path::USERS && strs[2] == key_path::TENANTS {
} else if len == 6 && strs[4] == key_path::MEMBERS && strs[2] == key_path::TENANTS {
let key = strs[5];
let mut data = self.data.write();
if entry.tye == command::ENTRY_LOG_TYPE_SET {
if let Ok(info) = serde_json::from_str::<TenantRoleIdentifier>(&entry.val) {
data.members.insert(key.to_owned(), info);
}
} else if entry.tye == command::ENTRY_LOG_TYPE_DEL {
data.members.remove(key);
}
} else if len == 6 && strs[4] == key_path::ROLES && strs[2] == key_path::TENANTS {
let key = strs[5];
let mut data = self.data.write();
if entry.tye == command::ENTRY_LOG_TYPE_SET {
if let Ok(info) = serde_json::from_str::<CustomTenantRole<Oid>>(&entry.val) {
data.roles.insert(key.to_owned(), info);
}
} else if entry.tye == command::ENTRY_LOG_TYPE_DEL {
data.roles.remove(key);
}
}

Ok(())
Expand Down
9 changes: 0 additions & 9 deletions meta/src/store/key_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use models::oid::Oid;
// ** /cluster_name/auto_incr_id -> id
// ** /cluster_name/data_nodes/node_id -> [NodeInfo] 集群、数据节点等信息

// ** /cluster_name/tenant_name/users/name -> [UserInfo] 租户下用户信息、访问权限等
// ** /cluster_name/tenant_name/dbs/db_name -> [DatabaseInfo] db相关信息、保留策略等
// ** /cluster_name/tenant_name/dbs/db_name/buckets/id -> [BucketInfo] bucket相关信息
// ** /cluster_name/tenant_name/dbs/db_name/schemas/name -> [TskvTableSchema] schema相关信息
Expand Down Expand Up @@ -61,14 +60,6 @@ impl KeyPath {
format!("/{}/data_nodes_metrics/{}", cluster, id)
}

pub fn tenant_users(cluster: &str, tenant: &str) -> String {
format!("/{}/tenants/{}/users", cluster, tenant)
}

// pub fn tenant_user_name(cluster: &str, tenant: &str, name: &str) -> String {
// format!("/{}/{}/users/{}", cluster, tenant, name)
// }

pub fn tenant_dbs(cluster: &str, tenant: &str) -> String {
format!("/{}/tenants/{}/dbs", cluster, tenant)
}
Expand Down

0 comments on commit 8637611

Please sign in to comment.