Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-883 continue cluster management with namespace #1731

Merged
merged 89 commits into from Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
ae8b29a
ISSUE-883: add cluster to common and namespace to config
BohuTANG Jul 6, 2021
0557dfe
ISSUE-883: add cluster mgr
BohuTANG Jul 6, 2021
80d1879
ISSUE-883: add backends for cluster storage
BohuTANG Jul 7, 2021
c37cda2
ISSUE-883: add memory backend for test
BohuTANG Jul 7, 2021
b501231
ISSUE-883: add cluster mgr get/put to backend
BohuTANG Jul 7, 2021
6bc4259
ISSUE-883: add cluster executor
BohuTANG Jul 7, 2021
04d647c
ISSUE-883: add executor unregister from the namespace
BohuTANG Jul 7, 2021
5832a13
ISSUE-883: add cluster_mgr create
BohuTANG Jul 7, 2021
ae65209
ISSUE-883: replace scheduler cluster nodes
BohuTANG Jul 7, 2021
5b2dcca
Merge with master
BohuTANG Jul 8, 2021
7be8ce3
ISSUE-883: fix unit test
BohuTANG Jul 8, 2021
0064dc4
ISSUE-883: add more executor plans for shuffle
BohuTANG Jul 8, 2021
964aab3
ISSUE-883: add executor backend url config
BohuTANG Jul 8, 2021
b015ecd
ISSUE-883: change to executor_backend_url for config from_setting
BohuTANG Jul 10, 2021
7a8b3d3
ISSUE-883: add Restful in memory kv store
BohuTANG Jul 10, 2021
83dacf1
ISSUE-883: add local backend for test only
BohuTANG Jul 10, 2021
1825667
ISSUE-883: add memory backend
BohuTANG Jul 10, 2021
e18b156
ISSUE-883: add states api for key/value state storage
BohuTANG Jul 10, 2021
f86d030
ISSUE-883: add serde trait to client
BohuTANG Jul 10, 2021
144f866
ISSUE-883: change from hashmap to sled
BohuTANG Jul 11, 2021
5a5d695
ISSUE-883: local backend ok
BohuTANG Jul 11, 2021
46ad9b6
ISSUE-883: add local backend unit test
BohuTANG Jul 12, 2021
a5a49e8
ISSUE-883: add backend client unit test
BohuTANG Jul 12, 2021
85de7b3
ISSUE-883: add cluster mgr unit tests
BohuTANG Jul 12, 2021
180e0f9
ISSUE-883: add local backend to kv api
BohuTANG Jul 12, 2021
b836a20
ISSUE-883: fix executor address from flight api address
BohuTANG Jul 12, 2021
889046d
ISSUE-883: impl backend memory
BohuTANG Jul 12, 2021
e287c2f
ISSUE-883: remove cluster backend and rename cluster mgr to cluster c…
BohuTANG Jul 12, 2021
d8e2926
ISSUE-883: add http cluster api
BohuTANG Jul 12, 2021
31e2598
ISSUE-883: cluster list api
BohuTANG Jul 13, 2021
ac3e388
ISSUE-883: add kv unit test
BohuTANG Jul 13, 2021
5b5f569
ISSUE-883: add cluster unit test
BohuTANG Jul 13, 2021
681c62d
ISSUE-883: rename memory backend to http backend
BohuTANG Jul 13, 2021
0a16f1d
ISSUE-883: better http api unit test
BohuTANG Jul 13, 2021
c071ed8
ISSUE-883: make http cluster test work
BohuTANG Jul 13, 2021
ee7443a
ISSUE-883: rename config executor to prefix with cluster
BohuTANG Jul 13, 2021
2310d98
ISSUE-883: change the http backend api get/list to GET method
BohuTANG Jul 13, 2021
1aa991b
ISSUE-883: backend http work with kv api
BohuTANG Jul 15, 2021
87f2f9d
ISSUE-883: tests with namespace register
BohuTANG Jul 15, 2021
4b783f1
ISSUE-883: add start cluster registry to tests
BohuTANG Jul 15, 2021
ba75c57
Merge with master
BohuTANG Jul 15, 2021
832bab2
ISSUE-883: fix mysql handler test
BohuTANG Jul 16, 2021
028c932
Merge branch 'master' into cluster_manager
zhang2014 Jul 20, 2021
5475689
Try fix build failure
zhang2014 Jul 23, 2021
51e74fd
Try fix build failure
zhang2014 Jul 26, 2021
3e89b07
Add responses helper
zhang2014 Jul 26, 2021
3547ad5
ISSUE-883 cluster restful API
zhang2014 Jul 27, 2021
f8d7882
Merge branch 'master' into cluster_manager
zhang2014 Jul 27, 2021
5ca23fe
Try fix build failure
zhang2014 Aug 2, 2021
9b93b47
Merge branch 'cluster_manager' into new_cluster_manager
zhang2014 Sep 5, 2021
d8ac4fd
Revert "Merge branch 'cluster_manager' into new_cluster_manager"
zhang2014 Sep 5, 2021
05b9c51
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 8, 2021
bdab489
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 10, 2021
6826f2c
Partial implement
zhang2014 Sep 10, 2021
64690f6
escape for key
zhang2014 Sep 10, 2021
825c90a
Modify namespace API
zhang2014 Sep 12, 2021
7abbfa3
Some refactor
zhang2014 Sep 12, 2021
f27fd87
Unescape node key
zhang2014 Sep 12, 2021
dc93c11
Try fix match range wrong
zhang2014 Sep 13, 2021
c0a607d
Async create_context for session
zhang2014 Sep 13, 2021
6e413ac
Try fix build failure
zhang2014 Sep 15, 2021
66b6236
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 15, 2021
cae890b
Try fix CI failure
zhang2014 Sep 20, 2021
d0261c3
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 20, 2021
98f4806
Try fix build failure
zhang2014 Sep 20, 2021
d1bf3f5
Try fix ci failure
zhang2014 Sep 21, 2021
26db221
Remove useless file
zhang2014 Sep 21, 2021
d1b610e
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 21, 2021
1bf1919
Try fix standalone stateless test failure
zhang2014 Sep 21, 2021
98b0d04
Try fix unit test build failure
zhang2014 Sep 22, 2021
3c7b5d9
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 22, 2021
e0c81de
Try fix some unit test failure
zhang2014 Sep 22, 2021
2f9bcd8
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 22, 2021
0bbb5ca
Try fix build failure after merge master
zhang2014 Sep 22, 2021
ed0edbb
Try fix unit test for tls
zhang2014 Sep 23, 2021
819e8c1
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 23, 2021
4bf7cc6
Try fix build failure after merge
zhang2014 Sep 23, 2021
873e01c
Try fix log test
zhang2014 Sep 23, 2021
5ab87fb
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 23, 2021
e955454
Fix build failure after merge master
zhang2014 Sep 23, 2021
ec5557a
Set context max_threads for ci env
zhang2014 Sep 23, 2021
0158e0a
Try fix http service tls test
zhang2014 Sep 24, 2021
52d6f67
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 24, 2021
84a4b4a
Uncomment cluster list test
zhang2014 Sep 24, 2021
0c72b1a
Make lint for code
zhang2014 Sep 24, 2021
403816c
Uncomment namespace manager test
zhang2014 Sep 25, 2021
30b776d
Merge branch 'master' into new_cluster_manager
zhang2014 Sep 25, 2021
32ee263
Uncomment cluster test
zhang2014 Sep 25, 2021
f9914f2
Try fix License checker
zhang2014 Sep 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions common/clickhouse-srv/src/protocols/protocol_query.rs
Expand Up @@ -106,6 +106,7 @@ impl QueryClientInfo {
}
}

#[allow(dead_code)]
#[derive(Default, Debug)]
pub struct QueryRequest {
pub(crate) query_id: String,
Expand Down
1 change: 1 addition & 0 deletions common/clickhouse-srv/src/types/mod.rs
Expand Up @@ -117,6 +117,7 @@ impl fmt::Debug for ServerInfo {
}

#[derive(Clone)]
#[allow(dead_code)]
pub(crate) struct Context {
pub(crate) server_info: ServerInfo,
pub(crate) hostname: String,
Expand Down
10 changes: 10 additions & 0 deletions common/exception/src/exception.rs
Expand Up @@ -77,6 +77,15 @@ impl ErrorCode {
}
}

pub fn add_message_back(self, msg: impl AsRef<str>) -> Self {
Self {
code: self.code(),
display_text: format!("{}{}", self.display_text, msg.as_ref()),
cause: self.cause,
backtrace: self.backtrace,
}
}

pub fn backtrace(&self) -> Option<ErrorCodeBacktrace> {
self.backtrace.clone()
}
Expand Down Expand Up @@ -168,6 +177,7 @@ build_exceptions! {
AuthenticateFailure(51),
TLSConfigurationFailure(52),
UnknownSession(53),
UnexpectedError(54),

// uncategorized
UnexpectedResponseType(600),
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/aggregates/aggregate_avg.rs
Expand Up @@ -59,7 +59,7 @@ where T: std::ops::Add<Output = T> + DFPrimitiveType
#[derive(Clone)]
pub struct AggregateAvgFunction<T, SumT> {
display_name: String,
arguments: Vec<DataField>,
_arguments: Vec<DataField>,
t: PhantomData<T>,
sum_t: PhantomData<SumT>,
}
Expand Down Expand Up @@ -170,7 +170,7 @@ where
) -> Result<AggregateFunctionRef> {
Ok(Arc::new(Self {
display_name: display_name.to_string(),
arguments,
_arguments: arguments,
t: PhantomData,
sum_t: PhantomData,
}))
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/aggregates/aggregate_stddev_pop.rs
Expand Up @@ -71,7 +71,7 @@ impl AggregateStddevPopState {
#[derive(Clone)]
pub struct AggregateStddevPopFunction<T> {
display_name: String,
arguments: Vec<DataField>,
_arguments: Vec<DataField>,
t: PhantomData<T>,
}

Expand Down Expand Up @@ -202,7 +202,7 @@ where T: DFPrimitiveType + AsPrimitive<f64>
) -> Result<AggregateFunctionRef> {
Ok(Arc::new(Self {
display_name: display_name.to_string(),
arguments,
_arguments: arguments,
t: PhantomData,
}))
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/aggregates/aggregate_sum.rs
Expand Up @@ -61,7 +61,7 @@ where
#[derive(Clone)]
pub struct AggregateSumFunction<T, SumT> {
display_name: String,
arguments: Vec<DataField>,
_arguments: Vec<DataField>,
t: PhantomData<T>,
sum_t: PhantomData<SumT>,
}
Expand Down Expand Up @@ -185,7 +185,7 @@ where
) -> Result<AggregateFunctionRef> {
Ok(Arc::new(Self {
display_name: display_name.to_owned(),
arguments,
_arguments: arguments,
t: PhantomData,
sum_t: PhantomData,
}))
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/aggregates/aggregate_window_funnel.rs
Expand Up @@ -162,7 +162,7 @@ where T: Ord
#[derive(Clone)]
pub struct AggregateWindowFunnelFunction<T> {
display_name: String,
arguments: Vec<DataField>,
_arguments: Vec<DataField>,
event_size: usize,
window: u64,
t: PhantomData<T>,
Expand Down Expand Up @@ -300,7 +300,7 @@ where
let window = params[0].as_u64()?;
Ok(Arc::new(Self {
display_name: display_name.to_owned(),
arguments,
_arguments: arguments,
event_size,
window,
t: PhantomData,
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/conditionals/if.rs
Expand Up @@ -24,13 +24,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct IfFunction {
display_name: String,
_display_name: String,
}

impl IfFunction {
pub fn try_create_func(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(IfFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/expressions/cast.rs
Expand Up @@ -24,15 +24,15 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct CastFunction {
display_name: String,
_display_name: String,
/// The data type to cast to
cast_type: DataType,
}

impl CastFunction {
pub fn create(display_name: String, cast_type: DataType) -> Result<Box<dyn Function>> {
Ok(Box::new(Self {
display_name,
_display_name: display_name,
cast_type,
}))
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/function_column.rs
Expand Up @@ -26,14 +26,14 @@ use crate::scalars::Function;
#[derive(Clone, Debug)]
pub struct ColumnFunction {
value: String,
saved: Option<DataValue>,
_saved: Option<DataValue>,
}

impl ColumnFunction {
pub fn try_create(value: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(ColumnFunction {
value: value.to_string(),
saved: None,
_saved: None,
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/nullables/is_not_null.rs
Expand Up @@ -24,13 +24,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct IsNotNullFunction {
display_name: String,
_display_name: String,
}

impl IsNotNullFunction {
pub fn try_create_func(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(IsNotNullFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/nullables/is_null.rs
Expand Up @@ -24,13 +24,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct IsNullFunction {
display_name: String,
_display_name: String,
}

impl IsNullFunction {
pub fn try_create_func(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(IsNullFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/strings/substring.rs
Expand Up @@ -24,13 +24,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct SubstringFunction {
display_name: String,
_display_name: String,
}

impl SubstringFunction {
pub fn try_create(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(SubstringFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/udfs/crash_me.rs
Expand Up @@ -24,13 +24,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct CrashMeFunction {
display_name: String,
_display_name: String,
}

impl CrashMeFunction {
pub fn try_create(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(CrashMeFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/udfs/to_type_name.rs
Expand Up @@ -25,13 +25,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct ToTypeNameFunction {
display_name: String,
_display_name: String,
}

impl ToTypeNameFunction {
pub fn try_create(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(ToTypeNameFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/functions/src/scalars/udfs/version.rs
Expand Up @@ -24,13 +24,13 @@ use crate::scalars::Function;

#[derive(Clone)]
pub struct VersionFunction {
display_name: String,
_display_name: String,
}

impl VersionFunction {
pub fn try_create(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(VersionFunction {
display_name: display_name.to_string(),
_display_name: display_name.to_string(),
}))
}
}
Expand Down
1 change: 1 addition & 0 deletions common/management/Cargo.toml
Expand Up @@ -24,6 +24,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.9.8"
sha1 = "0.6.0"
tempfile = "3.2.0"

[dev-dependencies]
tempfile = "3.2.0"
Expand Down
3 changes: 3 additions & 0 deletions common/management/src/lib.rs
Expand Up @@ -16,6 +16,9 @@
mod namespace;
mod user;

pub use namespace::NamespaceApi;
pub use namespace::NamespaceMgr;
pub use namespace::NodeInfo;
pub use user::user_api::AuthType;
pub use user::user_api::UserInfo;
pub use user::user_api::UserMgrApi;
Expand Down
7 changes: 4 additions & 3 deletions common/management/src/namespace/mod.rs
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.
//

pub use namespace_api::NamespaceApi;
pub use namespace_api::NodeInfo;

#[cfg(test)]
mod namespace_mgr_test;

mod namespace_api;
mod namespace_mgr;

pub use namespace_api::NamespaceApi;
pub use namespace_api::NodeInfo;
pub use namespace_mgr::NamespaceMgr;
50 changes: 21 additions & 29 deletions common/management/src/namespace/namespace_api.rs
Expand Up @@ -17,20 +17,17 @@ use std::convert::TryFrom;

use common_exception::ErrorCode;
use common_exception::Result;
use common_metatypes::SeqValue;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct NodeInfo {
#[serde(default)]
pub id: String,
#[serde(default)]
pub cpu_nums: u32,
pub cpu_nums: u64,
#[serde(default)]
pub version: u32,
#[serde(default)]
pub ip: String,
#[serde(default)]
pub port: u32,
pub flight_address: String,
}

impl TryFrom<Vec<u8>> for NodeInfo {
Expand All @@ -47,33 +44,28 @@ impl TryFrom<Vec<u8>> for NodeInfo {
}
}

pub trait NamespaceApi {
impl NodeInfo {
pub fn create(id: String, cpu_nums: u64, flight_address: String) -> NodeInfo {
NodeInfo {
id,
cpu_nums,
version: 0,
flight_address,
}
}
}

#[async_trait::async_trait]
pub trait NamespaceApi: Sync + Send {
// Add a new node info to /tenant/namespace/node-name.
fn add_node(&self, tenant_id: String, namespace_id: String, node: NodeInfo) -> Result<u64>;
async fn add_node(&self, node: NodeInfo) -> Result<u64>;

// Get the tenant's namespace all nodes.
fn get_nodes(
&self,
tenant_id: String,
namespace_id: String,
seq: Option<u64>,
) -> Result<Vec<SeqValue<NodeInfo>>>;

// Update the tenant's namespace node.
fn update_node(
&self,
tenant_id: String,
namespace_id: String,
node: NodeInfo,
seq: Option<u64>,
) -> Result<Option<u64>>;
async fn get_nodes(&self) -> Result<Vec<NodeInfo>>;

// Drop the tenant's namespace one node by node.id.
fn drop_node(
&self,
tenant_id: String,
namespace_id: String,
node_id: String,
seq: Option<u64>,
) -> Result<()>;
async fn drop_node(&self, node_id: String, seq: Option<u64>) -> Result<()>;

// Keep the tenant's namespace node alive.
async fn heartbeat(&self, node_id: String, seq: Option<u64>) -> Result<u64>;
}