diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 878a5e13..8f5e9aa2 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -60,6 +60,13 @@ jobs: - 2181:2181 env: ZOO_MY_ID: 1 + nacos: + image: nacos/nacos-server:v2.3.1 + ports: + - 8848:8848 + - 9848:9848 + env: + MODE: standalone steps: - uses: actions/checkout@main - uses: actions-rs/toolchain@v1 diff --git a/Cargo.toml b/Cargo.toml index 0abff00d..50d46f0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ serde_yaml = "0.9.4" # yaml file parser once_cell = "1.16.0" itertools = "0.10.1" bytes = "1.0" +url = "2.5.0" diff --git a/common/base/Cargo.toml b/common/base/Cargo.toml index 40579e0d..678af777 100644 --- a/common/base/Cargo.toml +++ b/common/base/Cargo.toml @@ -6,6 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -urlencoding.workspace = true -http = "0.2" -dubbo-logger.workspace = true \ No newline at end of file +dubbo-logger.workspace = true +url.workspace = true \ No newline at end of file diff --git a/common/base/src/extension_param.rs b/common/base/src/extension_param.rs new file mode 100644 index 00000000..93e0a16c --- /dev/null +++ b/common/base/src/extension_param.rs @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::{url::UrlParam, StdError}; +use std::{borrow::Cow, convert::Infallible, str::FromStr}; + +pub struct ExtensionName(String); + +impl ExtensionName { + pub fn new(name: String) -> Self { + ExtensionName(name) + } +} + +impl UrlParam for ExtensionName { + type TargetType = String; + + fn name() -> &'static str { + "extension-name" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for ExtensionName { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(ExtensionName::new(s.to_string())) + } +} + +pub enum ExtensionType { + Registry, +} + +impl UrlParam for ExtensionType { + type TargetType = String; + + fn name() -> &'static str { + "extension-type" + } + + fn value(&self) -> Self::TargetType { + match self { + ExtensionType::Registry => "registry".to_owned(), + } + } + + fn as_str(&self) -> Cow { + match self { + ExtensionType::Registry => Cow::Borrowed("registry"), + } + } +} + +impl FromStr for ExtensionType { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + match s { + "registry" => Ok(ExtensionType::Registry), + _ => panic!("the extension type enum is not in range"), + } + } +} diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs index b97b342f..dcc92564 100644 --- a/common/base/src/lib.rs +++ b/common/base/src/lib.rs @@ -19,8 +19,12 @@ allow(dead_code, unused_imports, unused_variables, unused_mut) )] pub mod constants; +pub mod extension_param; pub mod node; +pub mod registry_param; pub mod url; pub use node::Node; pub use url::Url; + +pub type StdError = Box; diff --git a/common/base/src/registry_param.rs b/common/base/src/registry_param.rs new file mode 100644 index 00000000..8aa7c07f --- /dev/null +++ b/common/base/src/registry_param.rs @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::{url::UrlParam, StdError, Url}; +use std::{borrow::Cow, convert::Infallible, str::FromStr}; + +pub struct RegistryUrl(Url); + +impl RegistryUrl { + pub fn new(url: Url) -> Self { + Self(url) + } +} + +impl UrlParam for RegistryUrl { + type TargetType = Url; + + fn name() -> &'static str { + "registry" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for RegistryUrl { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.parse()?)) + } +} + +pub struct ServiceNamespace(String); + +impl ServiceNamespace { + pub fn new(namespace: String) -> Self { + Self(namespace) + } +} + +impl UrlParam for ServiceNamespace { + type TargetType = String; + + fn name() -> &'static str { + "namespace" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for ServiceNamespace { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for ServiceNamespace { + fn default() -> Self { + Self("public".to_string()) + } +} + +pub struct AppName(String); + +impl AppName { + pub fn new(app_name: String) -> Self { + Self(app_name) + } +} + +impl UrlParam for AppName { + type TargetType = String; + + fn name() -> &'static str { + "app_name" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for AppName { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for AppName { + fn default() -> Self { + Self("UnknownApp".to_string()) + } +} + +pub struct InterfaceName(String); + +impl InterfaceName { + pub fn new(interface_name: String) -> Self { + Self(interface_name) + } +} + +impl UrlParam for InterfaceName { + type TargetType = String; + + fn name() -> &'static str { + "interface" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for InterfaceName { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for InterfaceName { + fn default() -> Self { + Self("".to_string()) + } +} + +pub struct Category(String); + +impl Category { + pub fn new(category: String) -> Self { + Self(category) + } +} + +impl UrlParam for Category { + type TargetType = String; + + fn name() -> &'static str { + "category" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for Category { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for Category { + fn default() -> Self { + Self("".to_string()) + } +} + +pub struct Version(String); + +impl Version { + pub fn new(version: String) -> Self { + Self(version) + } +} + +impl UrlParam for Version { + type TargetType = String; + + fn name() -> &'static str { + "version" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for Version { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for Version { + fn default() -> Self { + Self("".to_string()) + } +} + +pub struct Group(String); + +impl Group { + pub fn new(group: String) -> Self { + Self(group) + } +} + +impl UrlParam for Group { + type TargetType = String; + + fn name() -> &'static str { + "group" + } + + fn value(&self) -> Self::TargetType { + self.0.clone() + } + + fn as_str(&self) -> Cow { + self.0.as_str().into() + } +} + +impl FromStr for Group { + type Err = StdError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for Group { + fn default() -> Self { + Self("".to_string()) + } +} + +pub enum Side { + Provider, + Consumer, +} + +impl UrlParam for Side { + type TargetType = String; + + fn name() -> &'static str { + "side" + } + + fn value(&self) -> Self::TargetType { + match self { + Side::Consumer => "consumer".to_owned(), + Side::Provider => "provider".to_owned(), + } + } + + fn as_str(&self) -> Cow { + match self { + Side::Consumer => Cow::Borrowed("consumer"), + Side::Provider => Cow::Borrowed("provider"), + } + } +} + +impl FromStr for Side { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "consumer" => Ok(Side::Consumer), + "provider" => Ok(Side::Provider), + _ => Ok(Side::Consumer), + } + } +} + +impl Default for Side { + fn default() -> Self { + Side::Consumer + } +} + +pub struct StaticInvokerUrls(String); + +impl UrlParam for StaticInvokerUrls { + type TargetType = Vec; + + fn name() -> &'static str { + "static-invoker-urls" + } + + fn value(&self) -> Self::TargetType { + self.0.split(",").map(|url| url.parse().unwrap()).collect() + } + + fn as_str(&self) -> Cow { + Cow::Borrowed(&self.0) + } +} + +impl FromStr for StaticInvokerUrls { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + +impl Default for StaticInvokerUrls { + fn default() -> Self { + Self(String::default()) + } +} diff --git a/common/base/src/url.rs b/common/base/src/url.rs index 82b026fd..ac97f261 100644 --- a/common/base/src/url.rs +++ b/common/base/src/url.rs @@ -16,237 +16,162 @@ */ use std::{ + borrow::Cow, collections::HashMap, - fmt::{Display, Formatter}, + fmt::{Debug, Display, Formatter}, + str::FromStr, }; -use crate::constants::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY}; -use http::Uri; - -#[derive(Debug, Clone, Default, PartialEq)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Url { - pub raw_url_string: String, - // value of scheme is different to base name, eg. triple -> tri:// - pub scheme: String, - pub location: String, - pub ip: String, - pub port: String, - // serviceKey format in dubbo java and go '{group}/{interfaceName}:{version}' - pub service_key: String, - // same to interfaceName - pub service_name: String, - pub params: HashMap, + inner: url::Url, } impl Url { - pub fn new() -> Self { - Default::default() + pub fn empty() -> Self { + "empty://localhost".parse().unwrap() + } + + pub fn protocol(&self) -> &str { + self.inner.scheme() + } + + pub fn host(&self) -> Option<&str> { + self.inner.host_str() + } + + pub fn authority(&self) -> &str { + self.inner.authority() + } + + pub fn username(&self) -> &str { + self.inner.username() + } + + pub fn password(&self) -> Option<&str> { + self.inner.password() } - pub fn from_url(url: &str) -> Option { - // url: triple://127.0.0.1:8888/helloworld.Greeter - let uri = url - .parse::() - .map_err(|err| { - dubbo_logger::tracing::error!("fail to parse url({}), err: {:?}", url, err); - }) - .unwrap(); - let query = uri.path_and_query().unwrap().query(); - let mut url_inst = Self { - raw_url_string: url.to_string(), - scheme: uri.scheme_str()?.to_string(), - ip: uri.authority()?.host().to_string(), - port: uri.authority()?.port()?.to_string(), - location: uri.authority()?.to_string(), - service_key: uri.path().trim_start_matches('/').to_string(), - service_name: uri.path().trim_start_matches('/').to_string(), - params: if let Some(..) = query { - Url::decode(query.unwrap()) - } else { - HashMap::new() - }, - }; - url_inst.renew_raw_url_string(); - Some(url_inst) + pub fn port(&self) -> Option { + self.inner.port_or_known_default() } - pub fn get_service_key(&self) -> String { - self.service_key.clone() + pub fn path(&self) -> &str { + self.inner.path() } - pub fn get_service_name(&self) -> String { - self.service_name.clone() + pub fn query(&self) -> Option { + self.inner + .query_pairs() + .find(|(k, _)| k == T::name()) + .map(|(_, v)| T::from_str(&v).ok()) + .flatten() } - pub fn get_param(&self, key: &str) -> Option { - self.params.get(key).cloned() + pub fn query_param_by_key(&self, key: &str) -> Option { + self.inner + .query_pairs() + .find(|(k, _)| k == key) + .map(|(_, v)| v.into_owned()) } - fn encode_param(&self) -> String { - let mut params_vec: Vec = Vec::new(); - for (k, v) in self.params.iter() { - // let tmp = format!("{}={}", k, v); - params_vec.push(format!("{}={}", k, v)); - } - if params_vec.is_empty() { - "".to_string() - } else { - format!("?{}", params_vec.join("&")) - } + pub fn all_query_params(&self) -> HashMap { + self.inner + .query_pairs() + .map(|(k, v)| (k.into_owned(), v.into_owned())) + .collect() } - pub fn params_count(&self) -> usize { - self.params.len() + pub fn set_protocol(&mut self, protocol: &str) { + let _ = self.inner.set_scheme(protocol); } - fn decode(raw_query_string: &str) -> HashMap { - let mut params = HashMap::new(); - let p: Vec = raw_query_string - .split('&') - .map(|v| v.trim().to_string()) - .collect(); - for v in p.iter() { - let values: Vec = v.split('=').map(|v| v.trim().to_string()).collect(); - if values.len() != 2 { - continue; - } - params.insert(values[0].clone(), values[1].clone()); - } - params + pub fn set_host(&mut self, host: &str) { + let _ = self.inner.set_host(Some(host)); } - pub fn set_param(&mut self, key: &str, value: &str) { - self.params.insert(key.to_string(), value.to_string()); - self.renew_raw_url_string(); + pub fn set_port(&mut self, port: u16) { + let _ = self.inner.set_port(Some(port)); } - pub fn raw_url_string(&self) -> String { - self.raw_url_string.clone() + pub fn set_username(&mut self, username: &str) { + let _ = self.inner.set_username(username); } - pub fn encoded_raw_url_string(&self) -> String { - urlencoding::encode(self.raw_url_string.as_str()).to_string() + pub fn set_password(&mut self, password: &str) { + let _ = self.inner.set_password(Some(password)); } - fn build_service_key(&self) -> String { - format!( - "{group}/{interfaceName}:{version}", - group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()), - interfaceName = self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()), - version = self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string()) - ) + pub fn set_path(&mut self, path: &str) { + let _ = self.inner.set_path(path); } - pub fn to_url(&self) -> String { - self.raw_url_string() + pub fn extend_pairs(&mut self, pairs: impl Iterator) { + let mut query_pairs = self.inner.query_pairs_mut(); + query_pairs.extend_pairs(pairs); } - fn renew_raw_url_string(&mut self) { - self.raw_url_string = format!( - "{}://{}:{}/{}{}", - self.scheme, - self.ip, - self.port, - self.service_name, - self.encode_param() - ); - self.service_key = self.build_service_key() + pub fn add_query_param(&mut self, param: T) { + let mut pairs = self.inner.query_pairs_mut(); + pairs.append_pair(T::name(), ¶m.as_str()); } - // short_url is used for tcp listening - pub fn short_url(&self) -> String { - format!( - "{}://{}:{}/{}", - self.scheme, self.ip, self.port, self.service_name - ) + pub fn remove_query_param(&mut self) { + let query = self.inner.query_pairs().filter(|(k, v)| k.ne(T::name())); + let mut inner_url = self.inner.clone(); + inner_url.query_pairs_mut().clear().extend_pairs(query); + self.inner = inner_url; } - pub fn protocol(&self) -> String { - self.scheme.clone() + pub fn remove_all_param(&mut self) { + self.inner.query_pairs_mut().clear(); } - pub fn get_ip_port(&self) -> String { - format!("{}:{}", self.ip, self.port) + pub fn as_str(&self) -> &str { + self.inner.as_str() + } + + pub fn short_url_without_query(&self) -> String { + let mut url = self.inner.clone(); + url.set_query(Some("")); + url.into() + } +} + +impl FromStr for Url { + type Err = url::ParseError; + + fn from_str(s: &str) -> Result { + Ok(Url { + inner: url::Url::parse(s)?, + }) } } impl Display for Url { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(self.raw_url_string().as_str()) + std::fmt::Display::fmt(&self.inner, f) } } -impl Into for Url { - fn into(self) -> Uri { - self.raw_url_string.parse::().unwrap() +impl Debug for Url { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(&self.inner, f) } } -impl From<&str> for Url { - fn from(url: &str) -> Self { - Url::from_url(url).unwrap() +impl From for String { + fn from(url: Url) -> Self { + url.inner.into() } } -#[cfg(test)] -mod tests { - use crate::{ - constants::{ANYHOST_KEY, VERSION_KEY}, - url::Url, - }; - - #[test] - fn test_from_url() { - let mut u1 = Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\ - application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\ - environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\ - module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\ - side=provider&timeout=3000×tamp=1556509797245&version=1.0.0&application=test"); - assert_eq!( - u1.as_ref().unwrap().service_key, - "default/com.ikurento.user.UserProvider:1.0.0" - ); - assert_eq!( - u1.as_ref() - .unwrap() - .get_param(ANYHOST_KEY) - .unwrap() - .as_str(), - "true" - ); - assert_eq!( - u1.as_ref() - .unwrap() - .get_param("default.timeout") - .unwrap() - .as_str(), - "10000" - ); - assert_eq!(u1.as_ref().unwrap().scheme, "tri"); - assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1"); - assert_eq!(u1.as_ref().unwrap().port, "20000"); - assert_eq!(u1.as_ref().unwrap().params_count(), 18); - u1.as_mut().unwrap().set_param("key1", "value1"); - assert_eq!( - u1.as_ref().unwrap().get_param("key1").unwrap().as_str(), - "value1" - ); - assert_eq!( - u1.as_ref() - .unwrap() - .get_param(VERSION_KEY) - .unwrap() - .as_str(), - "1.0.0" - ); - } - - #[test] - fn test2() { - let url: Url = "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into(); - assert_eq!( - url.raw_url_string(), - "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter" - ) - } +pub trait UrlParam: FromStr { + type TargetType; + + fn name() -> &'static str; + + fn value(&self) -> Self::TargetType; + + fn as_str(&self) -> Cow; } diff --git a/config/src/router.rs b/config/src/router.rs index b45bd478..7976f6ea 100644 --- a/config/src/router.rs +++ b/config/src/router.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] diff --git a/config/src/service.rs b/config/src/service.rs index 1f85a926..8a1f1910 100644 --- a/config/src/service.rs +++ b/config/src/service.rs @@ -41,16 +41,4 @@ impl ServiceConfig { pub fn protocol(self, protocol: String) -> Self { Self { protocol, ..self } } - - // pub fn get_url(&self) -> Vec { - // let mut urls = Vec::new(); - // for (_, conf) in self.protocol_configs.iter() { - // urls.push(Url { - // url: conf.to_owned().to_url(), - // service_key: "".to_string(), - // }); - // } - - // urls - // } } diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs index 8a00c9fb..a223ddf6 100644 --- a/dubbo/src/cluster/failover.rs +++ b/dubbo/src/cluster/failover.rs @@ -1,12 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use std::task::Poll; +use dubbo_base::StdError; use futures_util::future; use http::Request; use tower::{retry::Retry, util::Oneshot, ServiceExt}; use tower_service::Service; -use crate::StdError; - pub struct Failover { inner: N, // loadbalancer service } diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs index 73aca005..21b525ac 100644 --- a/dubbo/src/cluster/router/condition/condition_router.rs +++ b/dubbo/src/cluster/router/condition/condition_router.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::{ cluster::router::{condition::single_router::ConditionSingleRouter, Router}, codegen::RpcInvocation, diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs index 92bbe2da..2ee33d6e 100644 --- a/dubbo/src/cluster/router/condition/matcher.rs +++ b/dubbo/src/cluster/router/condition/matcher.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use regex::Regex; use std::{collections::HashSet, error::Error, option::Option}; diff --git a/dubbo/src/cluster/router/condition/mod.rs b/dubbo/src/cluster/router/condition/mod.rs index 7285b88f..d4a83b90 100644 --- a/dubbo/src/cluster/router/condition/mod.rs +++ b/dubbo/src/cluster/router/condition/mod.rs @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ pub mod condition_router; pub mod matcher; pub mod single_router; diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs index 5f06aa8f..54c61cb1 100644 --- a/dubbo/src/cluster/router/condition/single_router.rs +++ b/dubbo/src/cluster/router/condition/single_router.rs @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ use dubbo_base::Url; use dubbo_logger::tracing::info; use regex::Regex; diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs index 7ad5e1b6..77729503 100644 --- a/dubbo/src/cluster/router/manager/condition_manager.rs +++ b/dubbo/src/cluster/router/manager/condition_manager.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::cluster::router::condition::{ condition_router::{ConditionRouter, ConditionSingleRouters}, single_router::ConditionSingleRouter, diff --git a/dubbo/src/cluster/router/manager/mod.rs b/dubbo/src/cluster/router/manager/mod.rs index 025f6c16..593fa22d 100644 --- a/dubbo/src/cluster/router/manager/mod.rs +++ b/dubbo/src/cluster/router/manager/mod.rs @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ mod condition_manager; pub mod router_manager; mod tag_manager; diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs index e963181e..e6c8b6c3 100644 --- a/dubbo/src/cluster/router/manager/router_manager.rs +++ b/dubbo/src/cluster/router/manager/router_manager.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::cluster::router::{ manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager}, nacos_config_center::nacos_client::NacosClient, diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs index 8dc24999..f028af21 100644 --- a/dubbo/src/cluster/router/manager/tag_manager.rs +++ b/dubbo/src/cluster/router/manager/tag_manager.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner}; use dubbo_config::router::TagRouterConfig; use std::sync::{Arc, RwLock}; diff --git a/dubbo/src/cluster/router/mod.rs b/dubbo/src/cluster/router/mod.rs index 17c9aec2..edc081b8 100644 --- a/dubbo/src/cluster/router/mod.rs +++ b/dubbo/src/cluster/router/mod.rs @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ pub mod condition; pub mod manager; pub mod nacos_config_center; diff --git a/dubbo/src/cluster/router/nacos_config_center/mod.rs b/dubbo/src/cluster/router/nacos_config_center/mod.rs index 7878fa9f..71722315 100644 --- a/dubbo/src/cluster/router/nacos_config_center/mod.rs +++ b/dubbo/src/cluster/router/nacos_config_center/mod.rs @@ -1 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ pub mod nacos_client; diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs index ce72641a..68b5f096 100644 --- a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs +++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::cluster::router::manager::router_manager::{ get_global_router_manager, RouterConfigChangeEvent, }; diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs index 42d5826f..601bc5e1 100644 --- a/dubbo/src/cluster/router/router_chain.rs +++ b/dubbo/src/cluster/router/router_chain.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::{cluster::router::BoxRouter, invocation::RpcInvocation}; use dubbo_base::Url; use std::{collections::HashMap, sync::Arc}; diff --git a/dubbo/src/cluster/router/tag/mod.rs b/dubbo/src/cluster/router/tag/mod.rs index 6ac5b218..673a7201 100644 --- a/dubbo/src/cluster/router/tag/mod.rs +++ b/dubbo/src/cluster/router/tag/mod.rs @@ -1 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ pub mod tag_router; diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs index 7a83ea57..3d28f936 100644 --- a/dubbo/src/cluster/router/tag/tag_router.rs +++ b/dubbo/src/cluster/router/tag/tag_router.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use crate::{ cluster::router::{utils::to_original_map, Router}, codegen::RpcInvocation, diff --git a/dubbo/src/cluster/router/utils.rs b/dubbo/src/cluster/router/utils.rs index 2ca50fcc..eca98f6e 100644 --- a/dubbo/src/cluster/router/utils.rs +++ b/dubbo/src/cluster/router/utils.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use dubbo_base::Url; use std::{collections::HashMap, string::String}; diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 452f560d..eb1d3850 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -22,6 +22,7 @@ pub use std::{ pub use async_trait::async_trait; pub use bytes::Bytes; +pub use dubbo_base::StdError; pub use http_body::Body; pub use hyper::Body as hyperBody; pub use tower_service::Service; @@ -39,7 +40,7 @@ pub use super::{ TripleServer, }, }, - BoxBody, BoxFuture, StdError, + BoxBody, BoxFuture, }; pub use crate::{ filter::{service::FilterService, Filter}, diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs index f0d9ebe2..ace67e68 100644 --- a/dubbo/src/directory/mod.rs +++ b/dubbo/src/directory/mod.rs @@ -27,21 +27,21 @@ use crate::{ invocation::Invocation, invoker::{clone_invoker::CloneInvoker, NewInvoker}, param::Param, - registry::n_registry::Registry, svc::NewService, - StdError, }; -use dubbo_base::Url; -use dubbo_logger::tracing::debug; -use futures_core::ready; +use dubbo_base::{StdError, Url}; +use dubbo_logger::tracing::{debug, error}; use futures_util::future; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tower::{ buffer::Buffer, discover::{Change, Discover}, + ServiceExt, }; +use crate::extension::registry_extension::{proxy::RegistryProxy, Registry}; +use dubbo_base::registry_param::InterfaceName; use tower_service::Service; type BufferedDirectory = @@ -49,7 +49,8 @@ type BufferedDirectory = pub struct NewCachedDirectory where - N: Registry + Clone + Send + Sync + 'static, + N: Service<(), Response = RegistryProxy> + Send + Clone + 'static, + >::Future: Send + 'static, { inner: CachedDirectory, RpcInvocation>, } @@ -76,7 +77,8 @@ pub struct Directory { impl NewCachedDirectory where - N: Registry + Clone + Send + Sync + 'static, + N: Service<(), Response = RegistryProxy> + Send + Clone + 'static, + >::Future: Send + 'static, { pub fn layer() -> impl tower_layer::Layer { tower_layer::layer_fn(|inner: N| { @@ -92,7 +94,8 @@ impl NewService for NewCachedDirectory where T: Param, // service registry - N: Registry + Clone + Send + Sync + 'static, + N: Service<(), Response = RegistryProxy> + Send + Clone + 'static, + >::Future: Send + 'static, { type Service = BufferedDirectory; @@ -151,14 +154,15 @@ impl NewService for NewDirectory where T: Param, // service registry - N: Registry + Clone + Send + Sync + 'static, + N: Service<(), Response = RegistryProxy> + Send + Clone + 'static, + >::Future: Send + 'static, { type Service = BufferedDirectory; fn new_service(&self, target: T) -> Self::Service { let service_name = target.param().get_target_service_unique_name(); - let registry = self.inner.clone(); + let fut = self.inner.clone().oneshot(()); let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE); @@ -166,7 +170,14 @@ where // todo use dubbo url model generate subscribe url // category:serviceInterface:version:group let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888", service_name); - let subscribe_url = Url::from_url(&consumer_url).unwrap(); + let mut subscribe_url: Url = consumer_url.parse().unwrap(); + subscribe_url.add_query_param(InterfaceName::new(service_name)); + + let Ok(registry) = fut.await else { + error!("registry extension load failed."); + return; + }; + let receiver = registry.subscribe(subscribe_url).await; debug!("discover start!"); match receiver { diff --git a/dubbo/src/extension/mod.rs b/dubbo/src/extension/mod.rs new file mode 100644 index 00000000..5641bea8 --- /dev/null +++ b/dubbo/src/extension/mod.rs @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod registry_extension; + +use crate::{ + extension::registry_extension::proxy::RegistryProxy, registry::registry::StaticRegistry, +}; +use dubbo_base::{extension_param::ExtensionType, url::UrlParam, StdError, Url}; +use dubbo_logger::tracing::{error, info}; +use thiserror::Error; +use tokio::sync::oneshot; + +pub static EXTENSIONS: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| ExtensionDirectory::init()); + +#[derive(Default)] +struct ExtensionDirectory { + registry_extension_loader: registry_extension::RegistryExtensionLoader, +} + +impl ExtensionDirectory { + fn init() -> ExtensionDirectoryCommander { + let (tx, mut rx) = tokio::sync::mpsc::channel::(64); + + tokio::spawn(async move { + let mut extension_directory = ExtensionDirectory::default(); + + // register static registry extension + let _ = extension_directory + .register( + StaticRegistry::name(), + StaticRegistry::convert_to_extension_factories(), + ExtensionType::Registry, + ) + .await; + + while let Some(extension_opt) = rx.recv().await { + match extension_opt { + ExtensionOpt::Register( + extension_name, + extension_factories, + extension_type, + tx, + ) => { + let result = extension_directory + .register(extension_name, extension_factories, extension_type) + .await; + let _ = tx.send(result); + } + ExtensionOpt::Remove(extension_name, extension_type, tx) => { + let result = extension_directory + .remove(extension_name, extension_type) + .await; + let _ = tx.send(result); + } + ExtensionOpt::Load(url, extension_type, tx) => { + let result = extension_directory.load(url, extension_type).await; + let _ = tx.send(result); + } + } + } + }); + + ExtensionDirectoryCommander { sender: tx } + } + + async fn register( + &mut self, + extension_name: String, + extension_factories: ExtensionFactories, + extension_type: ExtensionType, + ) -> Result<(), StdError> { + match extension_type { + ExtensionType::Registry => match extension_factories { + ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => { + self.registry_extension_loader + .register(extension_name, registry_extension_factory) + .await; + Ok(()) + } + }, + } + } + + async fn remove( + &mut self, + extension_name: String, + extension_type: ExtensionType, + ) -> Result<(), StdError> { + match extension_type { + ExtensionType::Registry => { + self.registry_extension_loader.remove(extension_name).await; + Ok(()) + } + } + } + + async fn load( + &mut self, + url: Url, + extension_type: ExtensionType, + ) -> Result { + match extension_type { + ExtensionType::Registry => { + let extension = self.registry_extension_loader.load(&url).await; + match extension { + Ok(extension) => Ok(Extensions::Registry(extension)), + Err(err) => { + error!("load extension failed: {}", err); + Err(err) + } + } + } + } + } +} + +pub struct ExtensionDirectoryCommander { + sender: tokio::sync::mpsc::Sender, +} + +impl ExtensionDirectoryCommander { + #[allow(private_bounds)] + pub async fn register(&self) -> Result<(), StdError> + where + T: Extension, + T: ExtensionMetaInfo, + T: ConvertToExtensionFactories, + { + let extension_name = T::name(); + let extension_factories = T::convert_to_extension_factories(); + let extension_type = T::extension_type(); + + info!( + "register extension: {}, type: {}", + extension_name, + extension_type.as_str() + ); + + let (tx, rx) = oneshot::channel(); + + let send = self + .sender + .send(ExtensionOpt::Register( + extension_name.clone(), + extension_factories, + extension_type, + tx, + )) + .await; + + let Ok(_) = send else { + let err_msg = format!("register extension {} failed", extension_name); + return Err(RegisterExtensionError::new(err_msg).into()); + }; + + let ret = rx.await; + + match ret { + Ok(_) => Ok(()), + Err(_) => { + let err_msg = format!("register extension {} failed", extension_name); + Err(RegisterExtensionError::new(err_msg).into()) + } + } + } + + #[allow(private_bounds)] + pub async fn remove(&self) -> Result<(), StdError> + where + T: Extension, + T: ExtensionMetaInfo, + { + let extension_name = T::name(); + let extension_type = T::extension_type(); + + info!( + "remove extension: {}, type: {}", + extension_name, + extension_type.as_str() + ); + + let (tx, rx) = oneshot::channel(); + + let send = self + .sender + .send(ExtensionOpt::Remove( + extension_name.clone(), + extension_type, + tx, + )) + .await; + + let Ok(_) = send else { + let err_msg = format!("remove extension {} failed", extension_name); + return Err(RemoveExtensionError::new(err_msg).into()); + }; + + let ret = rx.await; + + match ret { + Ok(_) => Ok(()), + Err(_) => { + let err_msg = format!("remove extension {} failed", extension_name); + Err(RemoveExtensionError::new(err_msg).into()) + } + } + } + + pub async fn load_registry(&self, url: Url) -> Result { + let url_str = url.to_string(); + info!("load registry extension: {}", url_str); + + let (tx, rx) = oneshot::channel(); + + let send = self + .sender + .send(ExtensionOpt::Load(url, ExtensionType::Registry, tx)) + .await; + + let Ok(_) = send else { + let err_msg = format!("load registry extension failed: {}", url_str); + return Err(LoadExtensionError::new(err_msg).into()); + }; + + let extensions = rx.await; + + let Ok(extension) = extensions else { + let err_msg = format!("load registry extension failed: {}", url_str); + return Err(LoadExtensionError::new(err_msg).into()); + }; + + let Ok(extensions) = extension else { + let err_msg = format!("load registry extension failed: {}", url_str); + return Err(LoadExtensionError::new(err_msg).into()); + }; + + match extensions { + Extensions::Registry(proxy) => Ok(proxy), + } + } +} + +enum ExtensionOpt { + Register( + String, + ExtensionFactories, + ExtensionType, + oneshot::Sender>, + ), + Remove(String, ExtensionType, oneshot::Sender>), + Load( + Url, + ExtensionType, + oneshot::Sender>, + ), +} + +pub(crate) trait Sealed {} + +#[allow(private_bounds)] +#[async_trait::async_trait] +pub trait Extension: Sealed { + type Target; + + fn name() -> String; + + async fn create(url: &Url) -> Result; +} + +#[allow(private_bounds)] +pub(crate) trait ExtensionMetaInfo { + fn extension_type() -> ExtensionType; +} + +pub(crate) enum Extensions { + Registry(RegistryProxy), +} + +pub(crate) enum ExtensionFactories { + RegistryExtensionFactory(registry_extension::RegistryExtensionFactory), +} + +#[allow(private_bounds)] +pub(crate) trait ConvertToExtensionFactories { + fn convert_to_extension_factories() -> ExtensionFactories; +} + +#[derive(Error, Debug)] +#[error("{0}")] +pub(crate) struct RegisterExtensionError(String); + +impl RegisterExtensionError { + pub fn new(msg: String) -> Self { + RegisterExtensionError(msg) + } +} + +#[derive(Error, Debug)] +#[error("{0}")] +pub struct RemoveExtensionError(String); + +impl RemoveExtensionError { + pub fn new(msg: String) -> Self { + RemoveExtensionError(msg) + } +} + +#[derive(Error, Debug)] +#[error("{0}")] +pub struct LoadExtensionError(String); + +impl LoadExtensionError { + pub fn new(msg: String) -> Self { + LoadExtensionError(msg) + } +} diff --git a/dubbo/src/extension/registry_extension.rs b/dubbo/src/extension/registry_extension.rs new file mode 100644 index 00000000..e27d6a58 --- /dev/null +++ b/dubbo/src/extension/registry_extension.rs @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{collections::HashMap, future::Future, pin::Pin}; + +use async_trait::async_trait; +use thiserror::Error; +use tokio::sync::mpsc::Receiver; +use tower::discover::Change; + +use dubbo_base::{ + extension_param::ExtensionName, registry_param::RegistryUrl, url::UrlParam, StdError, Url, +}; +use proxy::RegistryProxy; + +use crate::extension::{ + ConvertToExtensionFactories, Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType, +}; + +// extension://0.0.0.0/?extension-type=registry&extension-name=nacos®istry-url=nacos://127.0.0.1:8848 +pub fn to_extension_url(registry_url: Url) -> Url { + let mut registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap(); + + let protocol = registry_url.protocol(); + + registry_extension_loader_url.add_query_param(ExtensionType::Registry); + registry_extension_loader_url.add_query_param(ExtensionName::new(protocol.to_string())); + registry_extension_loader_url.add_query_param(RegistryUrl::new(registry_url)); + + registry_extension_loader_url +} + +pub type ServiceChange = Change; +pub type DiscoverStream = Receiver>; + +#[async_trait] +pub trait Registry { + async fn register(&self, url: Url) -> Result<(), StdError>; + + async fn unregister(&self, url: Url) -> Result<(), StdError>; + + async fn subscribe(&self, url: Url) -> Result; + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError>; + + fn url(&self) -> &Url; +} + +impl crate::extension::Sealed for T where T: Registry + Send + 'static {} + +impl ExtensionMetaInfo for T +where + T: Registry + Send + 'static, + T: Extension>, +{ + fn extension_type() -> ExtensionType { + ExtensionType::Registry + } +} + +impl ConvertToExtensionFactories for T +where + T: Registry + Send + 'static, + T: Extension>, +{ + fn convert_to_extension_factories() -> ExtensionFactories { + fn constrain(f: F) -> F + where + F: for<'a> Fn( + &'a Url, + ) -> Pin< + Box< + dyn Future, StdError>> + + Send + + 'a, + >, + >, + { + f + } + + let constructor = constrain(|url: &Url| { + let f = ::create(url); + Box::pin(f) + }); + + ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(constructor)) + } +} + +#[derive(Default)] +pub(super) struct RegistryExtensionLoader { + factories: HashMap, +} + +impl RegistryExtensionLoader { + pub(crate) async fn register( + &mut self, + extension_name: String, + factory: RegistryExtensionFactory, + ) { + self.factories.insert(extension_name, factory); + } + + pub(crate) async fn remove(&mut self, extension_name: String) { + self.factories.remove(&extension_name); + } + + pub(crate) async fn load(&mut self, url: &Url) -> Result { + let extension_name = url.query::().unwrap(); + let extension_name = extension_name.value(); + let factory = self.factories.get_mut(&extension_name).ok_or_else(|| { + RegistryExtensionLoaderError::new(format!( + "registry extension loader error: extension name {} not found", + extension_name + )) + })?; + factory.create(url).await + } +} + +type RegistryConstructor = for<'a> fn( + &'a Url, +) -> Pin< + Box, StdError>> + Send + 'a>, +>; + +pub(crate) struct RegistryExtensionFactory { + constructor: RegistryConstructor, + instances: HashMap, +} + +impl RegistryExtensionFactory { + pub(super) fn new(constructor: RegistryConstructor) -> Self { + Self { + constructor, + instances: HashMap::new(), + } + } +} + +impl RegistryExtensionFactory { + pub(super) async fn create(&mut self, url: &Url) -> Result { + let registry_url = url.query::().unwrap(); + let registry_url = registry_url.value(); + let url_str = registry_url.as_str().to_string(); + match self.instances.get(&url_str) { + Some(proxy) => { + let proxy = proxy.clone(); + Ok(proxy) + } + None => { + let registry = (self.constructor)(url).await?; + let proxy = >>::from(registry); + self.instances.insert(url_str, proxy.clone()); + Ok(proxy) + } + } + } +} + +#[derive(Error, Debug)] +#[error("{0}")] +pub(crate) struct RegistryExtensionLoaderError(String); + +impl RegistryExtensionLoaderError { + pub(crate) fn new(msg: String) -> Self { + RegistryExtensionLoaderError(msg) + } +} + +pub mod proxy { + use async_trait::async_trait; + use thiserror::Error; + use tokio::sync::oneshot; + + use dubbo_base::{StdError, Url}; + use dubbo_logger::tracing::error; + + use crate::extension::registry_extension::{DiscoverStream, Registry}; + + pub(super) enum RegistryOpt { + Register(Url, oneshot::Sender>), + Unregister(Url, oneshot::Sender>), + Subscribe(Url, oneshot::Sender>), + UnSubscribe(Url, oneshot::Sender>), + } + + #[derive(Clone)] + pub struct RegistryProxy { + sender: tokio::sync::mpsc::Sender, + url: Url, + } + + #[async_trait] + impl Registry for RegistryProxy { + async fn register(&self, url: Url) -> Result<(), StdError> { + let (tx, rx) = oneshot::channel(); + + match self + .sender + .send(RegistryOpt::Register(url.clone(), tx)) + .await + { + Ok(_) => match rx.await { + Ok(result) => result, + Err(_) => { + error!( + "registry proxy error: receive register response failed, url: {}", + url + ); + return Err( + RegistryProxyError::new("receive register response failed").into() + ); + } + }, + Err(_) => { + error!( + "registry proxy error: send register request failed, url: {}", + url + ); + return Err(RegistryProxyError::new("send register opt failed").into()); + } + } + } + + async fn unregister(&self, url: Url) -> Result<(), StdError> { + let (tx, rx) = oneshot::channel(); + match self + .sender + .send(RegistryOpt::Unregister(url.clone(), tx)) + .await + { + Ok(_) => match rx.await { + Ok(result) => result, + Err(_) => { + error!( + "registry proxy error: receive unregister response failed, url: {}", + url + ); + return Err( + RegistryProxyError::new("receive unregister response failed").into(), + ); + } + }, + Err(_) => { + error!( + "registry proxy error: send unregister request failed, url: {}", + url + ); + return Err(RegistryProxyError::new("send unregister opt failed").into()); + } + } + } + + async fn subscribe(&self, url: Url) -> Result { + let (tx, rx) = oneshot::channel(); + + match self + .sender + .send(RegistryOpt::Subscribe(url.clone(), tx)) + .await + { + Ok(_) => match rx.await { + Ok(result) => result, + Err(_) => { + error!( + "registry proxy error: receive subscribe response failed, url: {}", + url + ); + return Err( + RegistryProxyError::new("receive subscribe response failed").into() + ); + } + }, + Err(_) => { + error!( + "registry proxy error: send subscribe request failed, url: {}", + url + ); + return Err(RegistryProxyError::new("send subscribe opt failed").into()); + } + } + } + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { + let (tx, rx) = oneshot::channel(); + match self + .sender + .send(RegistryOpt::UnSubscribe(url.clone(), tx)) + .await + { + Ok(_) => { + match rx.await { + Ok(result) => result, + Err(_) => { + error!("registry proxy error: receive unsubscribe response failed, url: {}", url); + return Err(RegistryProxyError::new( + "receive unsubscribe response failed", + ) + .into()); + } + } + } + Err(_) => { + error!( + "registry proxy error: send unsubscribe request failed, url: {}", + url + ); + return Err(RegistryProxyError::new("send unsubscribe opt failed").into()); + } + } + } + + fn url(&self) -> &Url { + &self.url + } + } + + impl From> for RegistryProxy { + fn from(registry: Box) -> Self { + let url = registry.url().clone(); + + let (sender, mut receiver) = tokio::sync::mpsc::channel(1024); + + tokio::spawn(async move { + while let Some(opt) = receiver.recv().await { + match opt { + RegistryOpt::Register(url, tx) => { + let register = registry.register(url).await; + if let Err(_) = tx.send(register) { + error!("registry proxy error: send register response failed"); + } + } + RegistryOpt::Unregister(url, tx) => { + let unregister = registry.unregister(url).await; + if let Err(_) = tx.send(unregister) { + error!("registry proxy error: send unregister response failed"); + } + } + RegistryOpt::Subscribe(url, tx) => { + let subscribe = registry.subscribe(url).await; + if let Err(_) = tx.send(subscribe) { + error!("registry proxy error: send subscribe response failed"); + } + } + RegistryOpt::UnSubscribe(url, tx) => { + let unsubscribe = registry.unsubscribe(url).await; + if let Err(_) = tx.send(unsubscribe) { + error!("registry proxy error: send unsubscribe response failed"); + } + } + } + } + }); + + RegistryProxy { sender, url } + } + } + + #[derive(Error, Debug)] + #[error("registry proxy error: {0}")] + pub(crate) struct RegistryProxyError(String); + + impl RegistryProxyError { + pub(crate) fn new(msg: &str) -> Self { + RegistryProxyError(msg.to_string()) + } + } +} diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 2666d259..f3c6dc1a 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -15,20 +15,13 @@ * limitations under the License. */ -use std::{ - collections::HashMap, - error::Error, - pin::Pin, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, error::Error, pin::Pin}; use crate::{ + extension, + extension::registry_extension::Registry, protocol::{BoxExporter, Protocol}, - registry::{ - n_registry::{ArcRegistry, Registry}, - protocol::RegistryProtocol, - types::{Registries, RegistriesOperation}, - }, + registry::protocol::RegistryProtocol, }; use dubbo_base::Url; use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig}; @@ -40,7 +33,7 @@ use futures::{future, Future}; #[derive(Default)] pub struct Dubbo { protocols: HashMap>, - registries: Option, + registries: Vec, service_registry: HashMap>, // registry: Urls config: Option<&'static RootConfig>, } @@ -49,7 +42,7 @@ impl Dubbo { pub fn new() -> Dubbo { Self { protocols: HashMap::new(), - registries: None, + registries: Vec::default(), service_registry: HashMap::new(), config: None, } @@ -60,14 +53,10 @@ impl Dubbo { self } - pub fn add_registry(mut self, registry_key: &str, registry: ArcRegistry) -> Self { - if self.registries.is_none() { - self.registries = Some(Arc::new(Mutex::new(HashMap::new()))); - } - self.registries - .as_ref() - .unwrap() - .insert(registry_key.to_string(), registry); + pub fn add_registry(mut self, registry: &str) -> Self { + let url: Url = registry.parse().unwrap(); + let url = extension::registry_extension::to_extension_url(url); + self.registries.push(url); self } @@ -88,10 +77,15 @@ impl Dubbo { let protocol = root_config .protocols .get_protocol_or_default(service_config.protocol.as_str()); - let protocol_url = - format!("{}/{}", protocol.to_url(), service_config.interface.clone(),); + let interface_name = service_config.interface.clone(); + let protocol_url = format!( + "{}/{}?interface={}", + protocol.to_url(), + interface_name, + interface_name + ); tracing::info!("protocol_url: {:?}", protocol_url); - Url::from_url(&protocol_url) + protocol_url.parse().ok() } else { return Err(format!("base {:?} not exists", service_config.protocol).into()); }; @@ -117,9 +111,20 @@ impl Dubbo { self.init().unwrap(); tracing::info!("starting..."); // TODO: server registry + + let mut registry_extensions = Vec::new(); + + for registry_url in &self.registries { + let registry_url = registry_url.clone(); + let registry_extension = extension::EXTENSIONS.load_registry(registry_url).await; + if let Ok(registry_extension) = registry_extension { + registry_extensions.push(registry_extension); + } + } + let mem_reg = Box::new( RegistryProtocol::new() - .with_registries(self.registries.as_ref().unwrap().clone()) + .with_registries(registry_extensions.clone()) .with_services(self.service_registry.clone()), ); let mut async_vec: Vec + Send>>> = Vec::new(); @@ -128,15 +133,10 @@ impl Dubbo { tracing::info!("base: {:?}, service url: {:?}", name, url); let exporter = mem_reg.clone().export(url.to_owned()); async_vec.push(exporter); + //TODO multiple registry - if self.registries.is_some() { - let _ = self - .registries - .as_ref() - .unwrap() - .default_registry() - .register(url.clone()) - .await; + for registry_extension in ®istry_extensions { + let _ = registry_extension.register(url.clone()).await; } } } diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs index 4de8f899..913910a5 100644 --- a/dubbo/src/invoker/clone_body.rs +++ b/dubbo/src/invoker/clone_body.rs @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ use std::{ collections::VecDeque, pin::Pin, @@ -8,13 +24,12 @@ use std::{ use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures_core::ready; +use dubbo_base::StdError; use http::HeaderMap; use http_body::Body; use pin_project::pin_project; use thiserror::Error; -use crate::StdError; - #[derive(Error, Debug)] #[error("buffered body reach max capacity.")] pub struct ReachMaxCapacityError; diff --git a/dubbo/src/invoker/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs index c1fa00d8..557d76e0 100644 --- a/dubbo/src/invoker/clone_invoker.rs +++ b/dubbo/src/invoker/clone_invoker.rs @@ -1,5 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ use std::{mem, pin::Pin, task::Poll}; +use dubbo_base::StdError; use dubbo_logger::tracing::debug; use futures_core::{future::BoxFuture, ready, Future, TryFuture}; use futures_util::FutureExt; @@ -16,8 +33,6 @@ use tokio_util::sync::ReusableBoxFuture; use tower::{buffer::Buffer, ServiceExt}; use tower_service::Service; -use crate::StdError; - use super::clone_body::CloneBody; enum Inner { diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs index 92b8b462..1c87c0ec 100644 --- a/dubbo/src/invoker/mod.rs +++ b/dubbo/src/invoker/mod.rs @@ -1,5 +1,19 @@ -use dubbo_base::Url; - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, svc::NewService}; pub mod clone_body; @@ -13,7 +27,7 @@ impl NewService for NewInvoker { fn new_service(&self, url: String) -> Self::Service { // todo create another invoker by url protocol - let url = Url::from_url(&url).unwrap(); + let url = url.parse().unwrap(); CloneInvoker::new(TripleInvoker::new(url)) } } diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index d397b42b..1a521a2e 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -19,6 +19,7 @@ pub mod cluster; pub mod codegen; pub mod context; pub mod directory; +pub mod extension; pub mod filter; mod framework; pub mod invocation; @@ -38,7 +39,6 @@ use std::{future::Future, pin::Pin}; pub use framework::Dubbo; -pub type StdError = Box; pub type BoxFuture = self::Pin> + Send + 'static>>; pub(crate) type Error = Box; pub type BoxBody = http_body::combinators::UnsyncBoxBody; diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs index 4e26781d..74f22174 100644 --- a/dubbo/src/loadbalancer/mod.rs +++ b/dubbo/src/loadbalancer/mod.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use dubbo_base::StdError; use futures_core::future::BoxFuture; use tower::{discover::ServiceList, ServiceExt}; use tower_service::Service; @@ -7,7 +24,6 @@ use crate::{ invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker}, param::Param, svc::NewService, - StdError, }; use crate::protocol::triple::triple_invoker::TripleInvoker; diff --git a/dubbo/src/param.rs b/dubbo/src/param.rs index bef50419..298c3b31 100644 --- a/dubbo/src/param.rs +++ b/dubbo/src/param.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + pub trait Param { fn param(&self) -> T; } diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index c8451e3c..cb6b08cc 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -35,7 +35,7 @@ pub struct TripleInvoker { impl TripleInvoker { pub fn new(url: Url) -> TripleInvoker { - let uri = http::Uri::from_str(&url.to_url()).unwrap(); + let uri = http::Uri::from_str(url.as_str()).unwrap(); Self { url, conn: Connection::new().with_host(uri).build(), @@ -55,7 +55,7 @@ impl TripleInvoker { let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap(); - let authority = self.url.clone().get_ip_port(); + let authority = self.url.authority(); let uri = Uri::builder() .scheme("http") diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs index 71f6edc4..27174ba0 100644 --- a/dubbo/src/protocol/triple/triple_protocol.rs +++ b/dubbo/src/protocol/triple/triple_protocol.rs @@ -15,10 +15,10 @@ * limitations under the License. */ -use std::{boxed::Box, collections::HashMap}; +use std::collections::HashMap; use async_trait::async_trait; -use dubbo_base::Url; +use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url}; use super::{ triple_exporter::TripleExporter, triple_invoker::TripleInvoker, triple_server::TripleServer, @@ -44,8 +44,9 @@ impl TripleProtocol { } pub fn get_server(&self, url: Url) -> Option { + let interface_name = url.query::().unwrap(); self.servers - .get(&url.service_key) + .get(interface_name.value().as_str()) .map(|data| data.to_owned()) } } @@ -61,8 +62,12 @@ impl Protocol for TripleProtocol { async fn export(mut self, url: Url) -> BoxExporter { // service_key is same to key of TRIPLE_SERVICES let server = TripleServer::new(); - self.servers.insert(url.service_key.clone(), server.clone()); - server.serve(url.short_url().as_str().into()).await; + + let interface_name = url.query::().unwrap(); + let interface_name = interface_name.value(); + + self.servers.insert(interface_name, server.clone()); + server.serve(url).await; Box::new(TripleExporter::new()) } diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index b82fda8d..08ae175b 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -16,47 +16,43 @@ */ #![allow(unused_variables, dead_code, missing_docs)] + +use crate::{extension, extension::registry_extension::proxy::RegistryProxy}; +use dubbo_base::{StdError, Url}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + pub mod integration; -pub mod n_registry; pub mod protocol; -pub mod types; - -// use std::{ -// fmt::{Debug, Formatter}, -// sync::Arc, -// }; - -// use dubbo_base::Url; - -// pub type RegistryNotifyListener = Arc; -// pub trait Registry { -// fn register(&mut self, url: Url) -> Result<(), crate::StdError>; -// fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>; - -// fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>; -// fn unsubscribe( -// &self, -// url: Url, -// listener: RegistryNotifyListener, -// ) -> Result<(), crate::StdError>; -// } - -// pub trait NotifyListener { -// fn notify(&self, event: ServiceEvent); -// fn notify_all(&self, event: ServiceEvent); -// } - -// #[derive(Debug)] -// pub struct ServiceEvent { -// pub key: String, -// pub action: String, -// pub service: Vec, -// } - -// pub type BoxRegistry = Box; - -// impl Debug for BoxRegistry { -// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { -// f.write_str("BoxRegistry") -// } -// } +pub mod registry; + +#[derive(Clone)] +pub struct MkRegistryService { + registry_url: Url, +} + +impl MkRegistryService { + pub fn new(registry_url: Url) -> Self { + Self { registry_url } + } +} + +impl Service<()> for MkRegistryService { + type Response = RegistryProxy; + type Error = StdError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: ()) -> Self::Future { + let fut = extension::EXTENSIONS.load_registry(self.registry_url.clone()); + Box::pin(fut) + } +} diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs deleted file mode 100644 index abcd56b8..00000000 --- a/dubbo/src/registry/n_registry.rs +++ /dev/null @@ -1,203 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -use async_trait::async_trait; -use dubbo_base::Url; -use thiserror::Error; -use tokio::sync::{ - mpsc::{self, Receiver}, - Mutex, -}; -use tower::discover::Change; - -use crate::StdError; - -pub type ServiceChange = Change; -pub type DiscoverStream = Receiver>; -pub type BoxRegistry = Box; - -#[async_trait] -pub trait Registry { - async fn register(&self, url: Url) -> Result<(), StdError>; - - async fn unregister(&self, url: Url) -> Result<(), StdError>; - - async fn subscribe(&self, url: Url) -> Result; - - async fn unsubscribe(&self, url: Url) -> Result<(), StdError>; -} - -#[derive(Clone)] -pub struct ArcRegistry { - inner: Arc, -} - -pub enum RegistryComponent { - NacosRegistry(ArcRegistry), - ZookeeperRegistry, - StaticRegistry(StaticRegistry), -} - -pub struct StaticServiceValues { - listeners: Vec>>, - urls: HashSet, -} - -#[derive(Default)] -pub struct StaticRegistry { - urls: Mutex>, -} - -impl ArcRegistry { - pub fn new(registry: impl Registry + Send + Sync + 'static) -> Self { - Self { - inner: Arc::new(registry), - } - } -} - -#[async_trait] -impl Registry for ArcRegistry { - async fn register(&self, url: Url) -> Result<(), StdError> { - self.inner.register(url).await - } - - async fn unregister(&self, url: Url) -> Result<(), StdError> { - self.inner.unregister(url).await - } - - async fn subscribe(&self, url: Url) -> Result { - self.inner.subscribe(url).await - } - - async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { - self.inner.unsubscribe(url).await - } -} - -#[async_trait] -impl Registry for RegistryComponent { - async fn register(&self, url: Url) -> Result<(), StdError> { - todo!() - } - - async fn unregister(&self, url: Url) -> Result<(), StdError> { - todo!() - } - - async fn subscribe(&self, url: Url) -> Result { - match self { - RegistryComponent::NacosRegistry(registry) => registry.subscribe(url).await, - RegistryComponent::ZookeeperRegistry => todo!(), - RegistryComponent::StaticRegistry(registry) => registry.subscribe(url).await, - } - } - - async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { - todo!() - } -} - -impl StaticRegistry { - pub fn new(urls: Vec) -> Self { - let mut map = HashMap::with_capacity(urls.len()); - - for url in urls { - let service_name = url.get_service_name(); - let static_values = map - .entry(service_name) - .or_insert_with(|| StaticServiceValues { - listeners: Vec::new(), - urls: HashSet::new(), - }); - let url = url.to_string(); - static_values.urls.insert(url.clone()); - } - - Self { - urls: Mutex::new(map), - } - } -} - -#[async_trait] -impl Registry for StaticRegistry { - async fn register(&self, url: Url) -> Result<(), StdError> { - let service_name = url.get_service_name(); - let mut lock = self.urls.lock().await; - - let static_values = lock - .entry(service_name) - .or_insert_with(|| StaticServiceValues { - listeners: Vec::new(), - urls: HashSet::new(), - }); - let url = url.to_string(); - static_values.urls.insert(url.clone()); - - static_values.listeners.retain(|listener| { - let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ()))); - ret.is_ok() - }); - - Ok(()) - } - - async fn unregister(&self, url: Url) -> Result<(), StdError> { - let service_name = url.get_service_name(); - let mut lock = self.urls.lock().await; - - match lock.get_mut(&service_name) { - None => Ok(()), - Some(static_values) => { - let url = url.to_string(); - static_values.urls.remove(&url); - static_values.listeners.retain(|listener| { - let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone()))); - ret.is_ok() - }); - if static_values.urls.is_empty() { - lock.remove(&service_name); - } - Ok(()) - } - } - } - - async fn subscribe(&self, url: Url) -> Result { - let service_name = url.get_service_name(); - - let change_rx = { - let mut lock = self.urls.lock().await; - let static_values = lock - .entry(service_name) - .or_insert_with(|| StaticServiceValues { - listeners: Vec::new(), - urls: HashSet::new(), - }); - - let (tx, change_rx) = mpsc::channel(64); - static_values.listeners.push(tx); - - for url in static_values.urls.iter() { - static_values.listeners.retain(|listener| { - let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ()))); - ret.is_ok() - }); - } - change_rx - }; - - Ok(change_rx) - } - - async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { - Ok(()) - } -} - -#[derive(Error, Debug)] -#[error("static registry error: {0}")] -struct StaticRegistryError(String); diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs index b9ba7221..76350a42 100644 --- a/dubbo/src/registry/protocol.rs +++ b/dubbo/src/registry/protocol.rs @@ -15,26 +15,25 @@ * limitations under the License. */ -use dubbo_base::Url; +use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url}; use dubbo_logger::tracing; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use super::n_registry::{ArcRegistry, Registry, StaticRegistry}; use crate::{ + extension::registry_extension::{proxy::RegistryProxy, Registry}, protocol::{ triple::{triple_exporter::TripleExporter, triple_protocol::TripleProtocol}, BoxExporter, BoxInvoker, Protocol, }, - registry::types::Registries, }; #[derive(Clone, Default)] pub struct RegistryProtocol { // registerAddr: Registry - registries: Option, + registries: Vec, // providerUrl: Exporter exporters: Arc>>, // serviceName: registryUrls @@ -44,14 +43,14 @@ pub struct RegistryProtocol { impl RegistryProtocol { pub fn new() -> Self { RegistryProtocol { - registries: None, + registries: Vec::default(), exporters: Arc::new(RwLock::new(HashMap::new())), services: HashMap::new(), } } - pub fn with_registries(mut self, registries: Registries) -> Self { - self.registries = Some(registries); + pub fn with_registries(mut self, registries: Vec) -> Self { + self.registries.extend(registries); self } @@ -59,19 +58,6 @@ impl RegistryProtocol { self.services.extend(services); self } - - pub fn get_registry(&mut self, url: Url) -> ArcRegistry { - let mem = StaticRegistry::default(); - let mem = ArcRegistry::new(mem); - self.registries - .as_ref() - .unwrap() - .lock() - .unwrap() - .insert(url.location, mem.clone()); - - mem - } } #[async_trait::async_trait] @@ -88,23 +74,23 @@ impl Protocol for RegistryProtocol { // init Exporter based on provider_url // server registry based on register_url // start server health check - let registry_url = self.services.get(url.get_service_name().as_str()); + let service_name = url.query::().unwrap(); + let registry_url = self.services.get(service_name.as_str().as_ref()); if let Some(urls) = registry_url { - for url in urls.clone().iter() { - if !url.service_key.is_empty() { - let reg = self.get_registry(url.clone()); - let _ = reg.register(url.clone()).await; + for url in urls.iter() { + for registry_proxy in &self.registries { + let _ = registry_proxy.register(url.clone()).await; } } } - match url.clone().scheme.as_str() { + match url.clone().protocol() { "tri" => { let pro = Box::new(TripleProtocol::new()); return pro.export(url).await; } _ => { - tracing::error!("base {:?} not implemented", url.scheme); + tracing::error!("base {:?} not implemented", url.protocol()); Box::new(TripleExporter::new()) } } diff --git a/dubbo/src/registry/registry.rs b/dubbo/src/registry/registry.rs new file mode 100644 index 00000000..85a81683 --- /dev/null +++ b/dubbo/src/registry/registry.rs @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::{HashMap, HashSet}; + +use async_trait::async_trait; +use itertools::Itertools; +use thiserror::Error; +use tokio::sync::{ + mpsc::{self}, + Mutex, +}; + +use dubbo_base::{ + extension_param::{ExtensionName, ExtensionType}, + registry_param::{InterfaceName, RegistryUrl, StaticInvokerUrls}, + url::UrlParam, + StdError, Url, +}; + +use crate::extension::{ + registry_extension::{DiscoverStream, Registry, ServiceChange}, + Extension, +}; + +pub struct StaticServiceValues { + listeners: Vec>>, + urls: HashSet, +} + +pub struct StaticRegistry { + urls: Mutex>, + self_url: Url, +} + +impl StaticRegistry { + pub fn to_extension_url(static_invoker_urls: Vec) -> Url { + let static_invoker_urls: StaticInvokerUrls = + static_invoker_urls.iter().join(",").parse().unwrap(); + let mut static_registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap(); + + static_registry_extension_loader_url.add_query_param(ExtensionType::Registry); + static_registry_extension_loader_url.add_query_param(ExtensionName::new(Self::name())); + static_registry_extension_loader_url + .add_query_param(RegistryUrl::new("static://127.0.0.1".parse().unwrap())); + static_registry_extension_loader_url.add_query_param(static_invoker_urls); + + static_registry_extension_loader_url + } +} + +impl StaticRegistry { + pub fn new(url: Url) -> Self { + let static_urls = url.query::(); + let static_urls = match static_urls { + None => Vec::default(), + Some(static_urls) => static_urls.value(), + }; + + let mut map = HashMap::with_capacity(static_urls.len()); + + for url in static_urls { + let interface_name = url.query::().unwrap(); + let interface_name = interface_name.value(); + + let static_values = map + .entry(interface_name) + .or_insert_with(|| StaticServiceValues { + listeners: Vec::new(), + urls: HashSet::new(), + }); + let url = url.to_string(); + static_values.urls.insert(url.clone()); + } + + let self_url = "static://0.0.0.0".parse().unwrap(); + + Self { + urls: Mutex::new(map), + self_url, + } + } +} + +impl Default for StaticRegistry { + fn default() -> Self { + let self_url = "static://0.0.0.0".parse().unwrap(); + + Self { + self_url, + urls: Mutex::new(HashMap::new()), + } + } +} +#[async_trait] +impl Registry for StaticRegistry { + async fn register(&self, url: Url) -> Result<(), StdError> { + let interface_name = url.query::().unwrap(); + let interface_name = interface_name.value(); + + let mut lock = self.urls.lock().await; + + let static_values = lock + .entry(interface_name) + .or_insert_with(|| StaticServiceValues { + listeners: Vec::new(), + urls: HashSet::new(), + }); + let url = url.to_string(); + static_values.urls.insert(url.clone()); + + static_values.listeners.retain(|listener| { + let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ()))); + ret.is_ok() + }); + + Ok(()) + } + + async fn unregister(&self, url: Url) -> Result<(), StdError> { + let interface_name = url.query::().unwrap(); + let interface_name = interface_name.value(); + + let mut lock = self.urls.lock().await; + + match lock.get_mut(&interface_name) { + None => Ok(()), + Some(static_values) => { + let url = url.to_string(); + static_values.urls.remove(&url); + static_values.listeners.retain(|listener| { + let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone()))); + ret.is_ok() + }); + if static_values.urls.is_empty() { + lock.remove(&interface_name); + } + Ok(()) + } + } + } + + async fn subscribe(&self, url: Url) -> Result { + let interface_name = url.query::().unwrap(); + let interface_name = interface_name.value(); + + let change_rx = { + let mut lock = self.urls.lock().await; + let static_values = lock + .entry(interface_name) + .or_insert_with(|| StaticServiceValues { + listeners: Vec::new(), + urls: HashSet::new(), + }); + + let (tx, change_rx) = mpsc::channel(64); + static_values.listeners.push(tx); + + for listener in &static_values.listeners { + for url in &static_values.urls { + let _ = listener + .send(Ok(ServiceChange::Insert(url.clone(), ()))) + .await; + } + } + + change_rx + }; + + Ok(change_rx) + } + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { + Ok(()) + } + + fn url(&self) -> &Url { + &self.self_url + } +} + +#[async_trait::async_trait] +impl Extension for StaticRegistry { + type Target = Box; + + fn name() -> String { + "static".to_string() + } + + async fn create(url: &Url) -> Result { + // url example: + // extension://0.0.0.0?extension-type=registry&extension-name=static®istry=static://127.0.0.1 + let static_invoker_urls = url.query::(); + + let registry_url = url.query::().unwrap(); + let mut registry_url = registry_url.value(); + + if let Some(static_invoker_urls) = static_invoker_urls { + registry_url.add_query_param(static_invoker_urls); + } + + let static_registry = StaticRegistry::new(registry_url); + + Ok(Box::new(static_registry)) + } +} +#[derive(Error, Debug)] +#[error("static registry error: {0}")] +struct StaticRegistryError(String); diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs deleted file mode 100644 index 5c1687da..00000000 --- a/dubbo/src/registry/types.rs +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -use itertools::Itertools; - -use super::n_registry::ArcRegistry; - -pub type Registries = Arc>>; - -pub const DEFAULT_REGISTRY_KEY: &str = "default"; - -pub trait RegistriesOperation { - fn get(&self, registry_key: &str) -> ArcRegistry; - fn insert(&self, registry_key: String, registry: ArcRegistry); - fn default_registry(&self) -> ArcRegistry; -} - -impl RegistriesOperation for Registries { - fn get(&self, registry_key: &str) -> ArcRegistry { - self.as_ref() - .lock() - .unwrap() - .get(registry_key) - .unwrap() - .clone() - } - - fn insert(&self, registry_key: String, registry: ArcRegistry) { - self.as_ref().lock().unwrap().insert(registry_key, registry); - } - - fn default_registry(&self) -> ArcRegistry { - let guard = self.as_ref().lock().unwrap(); - let (_, result) = guard - .iter() - .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY) - .unwrap() - .to_owned(); - result.clone() - } -} diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs index c2448642..28dfda7c 100644 --- a/dubbo/src/route/mod.rs +++ b/dubbo/src/route/mod.rs @@ -1,5 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use std::pin::Pin; +use dubbo_base::StdError; use dubbo_logger::tracing::debug; use futures_core::{ready, Future}; use futures_util::{future::Ready, FutureExt, TryFutureExt}; @@ -11,7 +29,6 @@ use crate::{ invoker::clone_invoker::CloneInvoker, param::Param, svc::NewService, - StdError, }; pub struct NewRoutes { @@ -20,6 +37,7 @@ pub struct NewRoutes { pub struct NewRoutesFuture { inner: RoutesFutureInnerState, + #[allow(dead_code)] target: T, } @@ -39,6 +57,7 @@ pub enum RoutesFutureInnerState { #[derive(Clone)] pub struct Routes { + #[allow(dead_code)] target: T, invokers: Vec>, } diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs index db59b92d..f8466636 100644 --- a/dubbo/src/svc.rs +++ b/dubbo/src/svc.rs @@ -1,4 +1,20 @@ -use std::{marker::PhantomData, sync::Arc}; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::sync::Arc; pub trait NewService { type Service; @@ -45,32 +61,3 @@ impl NewService for ArcNewService { self.inner.new_service(t) } } - -// inner: Box> + Send>>> + Send>, -pub struct BoxedService { - inner: N, - _mark: PhantomData, -} - -impl BoxedService { - pub fn layer() -> impl tower_layer::Layer { - tower_layer::layer_fn(|inner: N| Self { - inner, - _mark: PhantomData, - }) - } -} - -// impl NewService for BoxedService -// where -// N: NewService, -// N::Service: Service + Send, -// >::Future: Send + 'static, -// { -// type Service = Box>::Response, Error = >::Error, Future = Pin>::Response, >::Error>> + Send>>> + Send>; - -// fn new_service(&self, target: T) -> Self::Service { -// let service = self.inner.new_service(target); -// Box::new(service.map_future(|f|Box::pin(f) as _)) -// } -// } diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 0dadbcc1..94c855b4 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -18,14 +18,11 @@ use std::sync::Arc; use crate::{ - cluster::NewCluster, - directory::NewCachedDirectory, - loadbalancer::NewLoadBalancer, - registry::n_registry::{ArcRegistry, RegistryComponent, StaticRegistry}, - route::NewRoutes, - utils::boxed_clone::BoxCloneService, + cluster::NewCluster, directory::NewCachedDirectory, extension, loadbalancer::NewLoadBalancer, + route::NewRoutes, utils::boxed_clone::BoxCloneService, }; +use crate::registry::{registry::StaticRegistry, MkRegistryService}; use aws_smithy_http::body::SdkBody; use dubbo_base::Url; use tower::ServiceBuilder; @@ -33,15 +30,15 @@ use tower::ServiceBuilder; pub type ClientBoxService = BoxCloneService, http::Response, crate::Error>; -pub type ServiceMK = Arc>>>>; +pub type ServiceMK = + Arc>>>>; #[derive(Default)] pub struct ClientBuilder { pub timeout: Option, pub connector: &'static str, - registry: Option, + registry_extension_url: Option, pub direct: bool, - host: String, } impl ClientBuilder { @@ -49,22 +46,18 @@ impl ClientBuilder { ClientBuilder { timeout: None, connector: "", - registry: None, + registry_extension_url: None, direct: false, - host: "".to_string(), } } pub fn from_static(host: &str) -> ClientBuilder { + let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]); Self { timeout: None, connector: "", - registry: Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url( - host, - ) - .unwrap()]))), + registry_extension_url: Some(registry_extension_url), direct: true, - host: host.to_string(), } } @@ -75,19 +68,19 @@ impl ClientBuilder { } } - pub fn with_registry(self, registry: ArcRegistry) -> Self { + pub fn with_registry(self, registry: Url) -> Self { + let registry_extension_url = extension::registry_extension::to_extension_url(registry); Self { - registry: Some(registry), + registry_extension_url: Some(registry_extension_url), ..self } } pub fn with_host(self, host: &'static str) -> Self { + let registry_extension_url = StaticRegistry::to_extension_url(vec![host.parse().unwrap()]); + Self { - registry: Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url( - host, - ) - .unwrap()]))), + registry_extension_url: Some(registry_extension_url), ..self } } @@ -101,14 +94,17 @@ impl ClientBuilder { } pub fn build(mut self) -> ServiceMK { - let registry = self.registry.take().expect("registry must not be empty"); + let registry = self + .registry_extension_url + .take() + .expect("registry must not be empty"); let mk_service = ServiceBuilder::new() .layer(NewCluster::layer()) .layer(NewLoadBalancer::layer()) .layer(NewRoutes::layer()) .layer(NewCachedDirectory::layer()) - .service(registry); + .service(MkRegistryService::new(registry)); Arc::new(mk_service) } diff --git a/dubbo/src/triple/server/builder.rs b/dubbo/src/triple/server/builder.rs index 9ef71529..b473dd89 100644 --- a/dubbo/src/triple/server/builder.rs +++ b/dubbo/src/triple/server/builder.rs @@ -21,7 +21,7 @@ use std::{ str::FromStr, }; -use dubbo_base::Url; +use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url}; use dubbo_logger::tracing; use http::{Request, Response, Uri}; use hyper::body::Body; @@ -132,7 +132,7 @@ impl ServerBuilder { impl From for ServerBuilder { fn from(u: Url) -> Self { - let uri = match http::Uri::from_str(&u.raw_url_string()) { + let uri = match http::Uri::from_str(&u.as_str()) { Ok(v) => v, Err(err) => { tracing::error!("http uri parse error: {}, url: {:?}", err, &u); @@ -142,10 +142,14 @@ impl From for ServerBuilder { let authority = uri.authority().unwrap(); + let service_name = u.query::().unwrap().value(); + Self { - listener: u.get_param("listener").unwrap_or("tcp".to_string()), + listener: u + .query_param_by_key("listener") + .unwrap_or("tcp".to_string()), addr: authority.to_string().to_socket_addrs().unwrap().next(), - service_names: vec![u.service_name], + service_names: vec![service_name], server: DubboServer::default(), certs: Vec::new(), keys: Vec::new(), diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs index cb0b9d71..212c8807 100644 --- a/dubbo/src/triple/transport/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -15,12 +15,11 @@ * limitations under the License. */ +use dubbo_base::StdError; use hyper::client::{conn::Builder, service::Connect}; use tower_service::Service; -use crate::{ - boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector, StdError, -}; +use crate::{boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector}; type HyperConnect = Connect< crate::utils::boxed_clone::BoxCloneService, diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 0a2f150b..18939a29 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -34,9 +34,10 @@ async fn main() { // let builder = ClientBuilder::new() // .with_connector("unix") // .with_host("unix://127.0.0.1:8888"); - let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888") - .with_timeout(1000000) - .with_direct(true); + let builder = + ClientBuilder::from_static(&"http://127.0.0.1:8888?interface=grpc.examples.echo.Echo") + .with_timeout(1000000) + .with_direct(true); let mut cli = EchoClient::new(builder); // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs index 90efc1a6..010821e2 100644 --- a/examples/echo/src/echo/server.rs +++ b/examples/echo/src/echo/server.rs @@ -70,7 +70,8 @@ async fn main() { // Dubbo::new() // .with_config({ // let mut r = RootConfig::new(); - // r.test_config(); + // r.test_config + // (); // r // }) // .start() diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index cb92ed0b..14743ae3 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -20,11 +20,9 @@ pub mod protos { include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs")); } -use std::env; +use dubbo::codegen::*; -use dubbo::{codegen::*, registry::n_registry::ArcRegistry}; - -use dubbo_base::Url; +use dubbo::extension; use futures_util::StreamExt; use protos::{greeter_client::GreeterClient, GreeterRequest}; use registry_nacos::NacosRegistry; @@ -33,9 +31,9 @@ use registry_nacos::NacosRegistry; async fn main() { dubbo_logger::init(); - let builder = ClientBuilder::new().with_registry(ArcRegistry::new(NacosRegistry::new( - Url::from_url("nacos://127.0.0.1:8848").unwrap(), - ))); + let _ = extension::EXTENSIONS.register::().await; + + let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap()); let mut cli = GreeterClient::new(builder); diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs index fd436e52..a652ed84 100644 --- a/examples/greeter/src/greeter/server.rs +++ b/examples/greeter/src/greeter/server.rs @@ -18,13 +18,12 @@ use std::{io::ErrorKind, pin::Pin}; use async_trait::async_trait; -use dubbo_base::Url; use futures_util::{Stream, StreamExt}; use registry_nacos::NacosRegistry; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use dubbo::{codegen::*, registry::n_registry::ArcRegistry, Dubbo}; +use dubbo::{codegen::*, extension, Dubbo}; use dubbo_config::RootConfig; use dubbo_logger::{ tracing::{info, span}, @@ -34,8 +33,6 @@ use protos::{ greeter_server::{register_server, Greeter}, GreeterReply, GreeterRequest, }; -use registry_zookeeper::ZookeeperRegistry; - pub mod protos { #![allow(non_camel_case_types)] include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs")); @@ -59,10 +56,10 @@ async fn main() { Err(_err) => panic!("err: {:?}", _err), // response was droped }; - let nacos_registry = NacosRegistry::new(Url::from_url("nacos://127.0.0.1:8848").unwrap()); + let _ = extension::EXTENSIONS.register::().await; let mut f = Dubbo::new() .with_config(r) - .add_registry("nacos-registry", ArcRegistry::new(nacos_registry)); + .add_registry("nacos://127.0.0.1:8848/"); f.start().await; } diff --git a/protocol/base/src/invoker.rs b/protocol/base/src/invoker.rs index 14de6ce8..d676fa49 100644 --- a/protocol/base/src/invoker.rs +++ b/protocol/base/src/invoker.rs @@ -66,9 +66,9 @@ impl Node for BaseInvoker { impl Display for BaseInvoker { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Invoker") - .field("protocol", &self.url.scheme) - .field("host", &self.url.ip) - .field("path", &self.url.location) + .field("protocol", &self.url.protocol()) + .field("host", &self.url.host()) + .field("path", &self.url.path()) .finish() } } diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs index ad34237e..204846b2 100644 --- a/registry/nacos/src/lib.rs +++ b/registry/nacos/src/lib.rs @@ -14,84 +14,47 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -mod utils; use async_trait::async_trait; -use dubbo_base::Url; +use dubbo_base::{StdError, Url}; use std::{collections::HashMap, sync::Arc}; -use tokio::{select, sync::mpsc}; +use tokio::sync::mpsc; -use anyhow::anyhow; -use dubbo::{ - registry::n_registry::{DiscoverStream, Registry, ServiceChange}, - StdError, +use dubbo::extension::{ + registry_extension::{DiscoverStream, Registry, ServiceChange}, + Extension, }; -use dubbo_logger::tracing::{debug, error, info}; -use nacos_sdk::api::naming::{ - NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance, +use dubbo_base::{ + registry_param::{ + AppName, Category, Group, InterfaceName, RegistryUrl, ServiceNamespace, Version, + }, + url::UrlParam, }; - -use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, match_range}; - -const VERSION_KEY: &str = "version"; - -const GROUP_KEY: &str = "group"; - -const DEFAULT_GROUP: &str = "DEFAULT_GROUP"; - -const PROVIDER_SIDE: &str = "provider"; - -const DEFAULT_CATEGORY: &str = PROVIDERS_CATEGORY; - -const SIDE_KEY: &str = "side"; - -const REGISTER_CONSUMER_URL_KEY: &str = "register-consumer-url"; - -const SERVICE_NAME_SEPARATOR: &str = ":"; - -const CATEGORY_KEY: &str = "category"; - -const PROVIDERS_CATEGORY: &str = "providers"; - -#[allow(dead_code)] -const ADMIN_PROTOCOL: &str = "admin"; - -#[allow(dead_code)] -const INNERCLASS_SYMBOL: &str = "$"; - -#[allow(dead_code)] -const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___"; +use dubbo_logger::tracing::info; +use nacos_sdk::api::{ + naming::{NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance}, + props::ClientProps, +}; +use tokio::sync::{watch, Notify}; pub struct NacosRegistry { - nacos_naming_service: Arc, + url: Url, + nacos_service: Arc, } impl NacosRegistry { - pub fn new(url: Url) -> Self { - let (nacos_client_props, enable_auth) = build_nacos_client_props(&url); - - let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props); - - if enable_auth { - nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http(); - } - - let nacos_naming_service = nacos_naming_builder.build().unwrap(); - - Self { - nacos_naming_service: Arc::new(nacos_naming_service), - } + pub fn new(url: Url, nacos_service: Arc) -> Self { + Self { url, nacos_service } } -} -impl NacosRegistry { - fn create_nacos_service_instance(url: Url) -> ServiceInstance { - let ip = url.ip; - let port = url.port; - nacos_sdk::api::naming::ServiceInstance { - ip, - port: port.parse().unwrap(), - metadata: url.params, + fn create_nacos_service_instance(url: &Url) -> ServiceInstance { + let ip = url.host().unwrap(); + let port = url.port().unwrap(); + + ServiceInstance { + ip: ip.to_string(), + port: port.into(), + metadata: url.all_query_params(), ..Default::default() } } @@ -144,447 +107,312 @@ impl NacosRegistry { #[async_trait] impl Registry for NacosRegistry { - async fn register(&self, url: Url) -> Result<(), dubbo::StdError> { - // let side = url.get_param(SIDE_KEY).unwrap_or_default(); - // let register_consumer = url - // .get_param(REGISTER_CONSUMER_URL_KEY) - // .unwrap_or_else(|| false.to_string()) - // .parse::() - // .unwrap_or(false); - // if side.ne(PROVIDER_SIDE) && !register_consumer { - // warn!("Please set 'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url registration."); - // return Ok(()); - // } - - let nacos_service_name = NacosServiceName::new(&url); - - let group_name = Some( - nacos_service_name - .get_group_with_default(DEFAULT_GROUP) - .to_string(), - ); - let nacos_service_name = nacos_service_name.to_register_str(); - - let nacos_service_instance = Self::create_nacos_service_instance(url); - - info!("register service: {}", nacos_service_name); - let ret = self - .nacos_naming_service - .register_instance(nacos_service_name, group_name, nacos_service_instance) - .await; - if let Err(e) = ret { - error!("register to nacos occur an error: {:?}", e); - return Err(anyhow!("register to nacos occur an error: {:?}", e).into()); - } + async fn register(&self, url: Url) -> Result<(), StdError> { + let service_name = NacosServiceName::new(&url); + + let group_name = service_name.group(); + + let registry_service_name_str = service_name.value(); + + let service_instance = Self::create_nacos_service_instance(&url); + + self.nacos_service + .register_instance( + registry_service_name_str.to_owned(), + Some(group_name.to_owned()), + service_instance, + ) + .await?; Ok(()) } - async fn unregister(&self, url: Url) -> Result<(), dubbo::StdError> { - let nacos_service_name = NacosServiceName::new(&url); + async fn unregister(&self, url: Url) -> Result<(), StdError> { + let service_name = NacosServiceName::new(&url); - let group_name = Some( - nacos_service_name - .get_group_with_default(DEFAULT_GROUP) - .to_string(), - ); - let nacos_service_name = nacos_service_name.to_register_str(); + let group_name = service_name.group(); - let nacos_service_instance = Self::create_nacos_service_instance(url); + let registry_service_name_str = service_name.value(); - info!("deregister service: {}", nacos_service_name); + let service_instance = Self::create_nacos_service_instance(&url); + + self.nacos_service + .deregister_instance( + registry_service_name_str.to_owned(), + Some(group_name.to_owned()), + service_instance, + ) + .await?; - let ret = self - .nacos_naming_service - .deregister_instance(nacos_service_name, group_name, nacos_service_instance) - .await; - if let Err(e) = ret { - error!("deregister service from nacos occur an error: {:?}", e); - return Err(anyhow!("deregister service from nacos occur an error: {:?}", e).into()); - } Ok(()) } async fn subscribe(&self, url: Url) -> Result { let service_name = NacosServiceName::new(&url); - let service_group = service_name - .get_group_with_default(DEFAULT_GROUP) - .to_string(); - let subscriber_url = service_name.to_subscriber_str(); - info!("subscribe: {}", subscriber_url); - let (listener, mut change_receiver) = ServiceChangeListener::new(); - let arc_listener = Arc::new(listener); + let group_name = service_name.group().to_owned(); + + let registry_service_name_str = service_name.value().to_owned(); + + let all_instance = self + .nacos_service + .get_all_instances( + registry_service_name_str.to_owned(), + Some(group_name.to_owned()), + Vec::default(), + false, + ) + .await?; - let (discover_tx, discover_rx) = mpsc::channel(64); + let (tx, rx) = mpsc::channel(1024); - let nacos_naming_service = self.nacos_naming_service.clone(); + let (event_listener, mut listener_change_rx, closed) = NacosNamingEventListener::new(); + let event_listener = Arc::new(event_listener); - let listener_in_task = arc_listener.clone(); - let service_group_in_task = service_group.clone(); - let subscriber_url_in_task = subscriber_url.clone(); + let nacos_service_cloned = self.nacos_service.clone(); + let event_listener_cloned = event_listener.clone(); + let registry_service_name_str_clone = registry_service_name_str.clone(); + let group_name_clone = group_name.clone(); tokio::spawn(async move { - let listener = listener_in_task; - let service_group = service_group_in_task; - let subscriber_url = subscriber_url_in_task; + let mut current_instances = all_instance; + for instance in ¤t_instances { + let url = instance_to_url(instance).as_str().to_owned(); + let _ = tx.send(Ok(ServiceChange::Insert(url, ()))).await; + } - let mut current_instances = Vec::new(); loop { - let change = select! { - _ = discover_tx.closed() => { - debug!("service {} change task quit, unsubscribe.", subscriber_url); - None + let change = tokio::select! { + _ = closed.notified() => { + break; }, - change = change_receiver.recv() => change + change = listener_change_rx.changed() => change }; - match change { - Some(instances) => { - debug!("service {} changed", subscriber_url); - let (remove_instances, add_instances) = - NacosRegistry::diff(¤t_instances, &instances); - - for instance in remove_instances { - let service_name = instance.service_name.as_ref(); - let url = match service_name { - None => { - format!("triple://{}:{}", instance.ip(), instance.port()) - } - Some(service_name) => { - format!( - "triple://{}:{}/{}", - instance.ip(), - instance.port(), - service_name - ) - } - }; - - match discover_tx.send(Ok(ServiceChange::Remove(url))).await { - Ok(_) => {} - Err(e) => { - error!( - "send service change failed: {:?}, maybe user unsubscribe", - e - ); - break; - } - } - } - - for instance in add_instances { - let service_name = instance.service_name.as_ref(); - let url = match service_name { - None => { - format!("triple://{}:{}", instance.ip(), instance.port()) - } - Some(service_name) => { - format!( - "triple://{}:{}/{}", - instance.ip(), - instance.port(), - service_name - ) - } - }; - - match discover_tx.send(Ok(ServiceChange::Insert(url, ()))).await { - Ok(_) => {} - Err(e) => { - error!( - "send service change failed: {:?}, maybe user unsubscribe", - e - ); - break; - } - } - } - current_instances = instances; - } - None => { - error!( - "receive service change task quit, unsubscribe {}.", - subscriber_url - ); - break; - } + if change.is_err() { + break; } - } - debug!("unsubscribe service: {}", subscriber_url); - // unsubscribe - let unsubscribe = nacos_naming_service - .unsubscribe(subscriber_url, Some(service_group), Vec::new(), listener) - .await; + let change = listener_change_rx.borrow_and_update().clone(); + if change.is_empty() { + continue; + } + let (remove_instances, add_instances) = Self::diff(¤t_instances, &change); + + for remove_instance in remove_instances { + let url = instance_to_url(remove_instance).as_str().to_owned(); + let Ok(_) = tx.send(Ok(ServiceChange::Remove(url))).await else { + break; + }; + } - match unsubscribe { - Ok(_) => {} - Err(e) => { - error!("unsubscribe service failed: {:?}", e); + for add_instance in add_instances { + let url = instance_to_url(add_instance).as_str().to_owned(); + let Ok(_) = tx.send(Ok(ServiceChange::Insert(url, ()))).await else { + break; + }; } + + current_instances = change; } - }); - let all_instance = self - .nacos_naming_service - .get_all_instances( - subscriber_url.clone(), - Some(service_group.clone()), - Vec::new(), - false, - ) - .await?; - let _ = arc_listener.changed(all_instance); + info!("unsubscribe"); + let _ = nacos_service_cloned + .unsubscribe( + registry_service_name_str_clone, + Some(group_name_clone), + Vec::default(), + event_listener_cloned, + ) + .await; + }); - match self - .nacos_naming_service + let _ = self + .nacos_service .subscribe( - subscriber_url.clone(), - Some(service_group.clone()), - Vec::new(), - arc_listener, + registry_service_name_str, + Some(group_name), + Vec::default(), + event_listener, ) - .await - { - Ok(_) => {} - Err(e) => { - error!("subscribe service failed: {:?}", e); - return Err(anyhow!("subscribe service failed: {:?}", e).into()); - } - } + .await?; - Ok(discover_rx) + Ok(rx) } - async fn unsubscribe(&self, url: Url) -> Result<(), dubbo::StdError> { - let service_name = NacosServiceName::new(&url); - let subscriber_url = service_name.to_subscriber_str(); - info!("unsubscribe: {}", &subscriber_url); - + async fn unsubscribe(&self, _: Url) -> Result<(), StdError> { Ok(()) } + + fn url(&self) -> &Url { + &self.url + } } -struct NacosServiceName { - category: String, +#[async_trait] +impl Extension for NacosRegistry { + type Target = Box; - service_interface: String, + fn name() -> String { + "nacos".to_string() + } - version: String, + async fn create(url: &Url) -> Result { + // url example: + // extension://0.0.0.0?extension-type=registry&extension-name=nacos®istry=nacos://127.0.0.1:8848 + let registry_url = url.query::().unwrap(); + let registry_url = registry_url.value(); - group: String, -} + let host = registry_url.host().unwrap(); + let port = registry_url.port().unwrap_or(8848); -impl NacosServiceName { - fn new(url: &Url) -> NacosServiceName { - let service_interface = url.get_service_name(); + let nacos_server_addr = format!("{}:{}", host, port); - let category = url.get_param(CATEGORY_KEY).unwrap_or_default(); + let namespace = registry_url.query::().unwrap_or_default(); + let namespace = namespace.value(); - let version = url.get_param(VERSION_KEY).unwrap_or_default(); + let app_name = registry_url.query::().unwrap_or_default(); + let app_name = app_name.value(); - let group = url.get_param(GROUP_KEY).unwrap_or_default(); + let user_name = registry_url.username(); + let password = registry_url.password().unwrap_or_default(); - Self { - category, - service_interface: service_interface.clone(), - version, - group, + let nacos_client_props = ClientProps::new() + .server_addr(nacos_server_addr) + .namespace(namespace) + .app_name(app_name) + .auth_username(user_name) + .auth_password(password); + + let mut nacos_naming_builder = NamingServiceBuilder::new(nacos_client_props); + + if !user_name.is_empty() { + nacos_naming_builder = nacos_naming_builder.enable_auth_plugin_http(); } - } - #[allow(dead_code)] - fn from_service_name_str(service_name_str: &str) -> Self { - let mut splitter = service_name_str.split(SERVICE_NAME_SEPARATOR); + let nacos_naming_service = nacos_naming_builder.build().unwrap(); - let category = splitter.next().unwrap_or_default().to_string(); - let service_interface = splitter.next().unwrap_or_default().to_string(); - let version = splitter.next().unwrap_or_default().to_string(); - let group = splitter.next().unwrap_or_default().to_string(); + let nacos_registry = NacosRegistry::new(registry_url, Arc::new(nacos_naming_service)); - Self { - category, - service_interface, - version, - group, - } + Ok(Box::new(nacos_registry)) } +} - #[allow(dead_code)] - fn version(&self) -> &str { - &self.version - } +fn instance_to_url(instance: &ServiceInstance) -> Url { + let mut url = Url::empty(); + url.set_protocol("provider"); + url.set_host(instance.ip()); + url.set_port(instance.port().try_into().unwrap_or_default()); + url.extend_pairs( + instance + .metadata() + .iter() + .map(|(k, v)| (k.clone(), v.clone())), + ); - #[allow(dead_code)] - fn get_version_with_default<'a>(&'a self, default: &'a str) -> &str { - if self.version.is_empty() { - default - } else { - &self.version - } - } + url +} - #[allow(dead_code)] - fn group(&self) -> &str { - &self.group +struct NacosNamingEventListener { + tx: watch::Sender>, + closed: Arc, +} + +impl NacosNamingEventListener { + fn new() -> (Self, watch::Receiver>, Arc) { + let (tx, rx) = watch::channel(Vec::new()); + + let closed = Arc::new(Notify::new()); + let this = Self { + tx, + closed: closed.clone(), + }; + (this, rx, closed) } +} - fn get_group_with_default<'a>(&'a self, default: &'a str) -> &str { - if self.group.is_empty() { - default - } else { - &self.group +impl NamingEventListener for NacosNamingEventListener { + fn event(&self, event: Arc) { + match event.instances { + Some(ref instances) => { + let instances = instances.clone(); + let send = self.tx.send(instances); + match send { + Ok(_) => {} + Err(_) => { + self.closed.notify_waiters(); + } + } + } + None => {} } } +} +struct NacosServiceName { #[allow(dead_code)] - fn category(&self) -> &str { - &self.category - } + category: String, #[allow(dead_code)] - fn get_category_with_default<'a>(&'a self, default: &'a str) -> &str { - if self.category.is_empty() { - default - } else { - &self.category - } - } + interface: String, #[allow(dead_code)] - fn service_interface(&self) -> &str { - &self.service_interface - } + version: String, #[allow(dead_code)] - fn get_service_interface_with_default<'a>(&'a self, default: &'a str) -> &str { - if self.service_interface.is_empty() { - default - } else { - &self.service_interface - } - } + group: String, - fn to_register_str(&self) -> String { - let category = if self.category.is_empty() { - DEFAULT_CATEGORY - } else { - &self.category - }; - format!( - "{}:{}:{}:{}", - category, self.service_interface, self.version, self.group - ) - } + #[allow(dead_code)] + value: String, +} - fn to_subscriber_str(&self) -> String { - let category = if is_concrete_str(&self.service_interface) { - DEFAULT_CATEGORY - } else { - &self.category - }; +impl NacosServiceName { + fn new(url: &Url) -> Self { + let interface = url.query::().unwrap(); + let interface = interface.value(); - format!( - "{}:{}:{}:{}", - category, self.service_interface, self.version, self.group - ) - } + let category = url.query::().unwrap_or_default(); + let category = category.value(); - #[allow(dead_code)] - fn to_subscriber_legacy_string(&self) -> String { - let mut legacy_string = DEFAULT_CATEGORY.to_owned(); - if !self.service_interface.is_empty() { - legacy_string.push_str(SERVICE_NAME_SEPARATOR); - legacy_string.push_str(&self.service_interface); - } + let version = url.query::().unwrap_or_default(); + let version = version.value(); - if !self.version.is_empty() { - legacy_string.push_str(SERVICE_NAME_SEPARATOR); - legacy_string.push_str(&self.version); - } + let group = url.query::().unwrap_or_default(); + let group = group.value(); - if !self.group.is_empty() { - legacy_string.push_str(SERVICE_NAME_SEPARATOR); - legacy_string.push_str(&self.group); - } + let value = format!("{}:{}:{}:{}", category, interface, version, group); - legacy_string + Self { + category, + interface, + version, + group, + value, + } } #[allow(dead_code)] - fn is_concrete(&self) -> bool { - is_concrete_str(&self.service_interface) - && is_concrete_str(&self.version) - && is_concrete_str(&self.group) + fn category(&self) -> &str { + &self.category } #[allow(dead_code)] - fn is_compatible(&self, other: &NacosServiceName) -> bool { - if !other.is_concrete() { - return false; - } - - if !self.category.eq(&other.category) && !match_range(&self.category, &other.category) { - return false; - } - - if is_wildcard_str(&self.version) { - return true; - } - - if is_wildcard_str(&self.group) { - return true; - } - - if !&self.version.eq(&other.version) && !match_range(&self.version, &other.version) { - return false; - } - - if !self.group.eq(&other.group) && !match_range(&self.group, &other.group) { - return false; - } - - true + fn interface(&self) -> &str { + &self.interface } -} - -struct ServiceChangeListener { - tx: mpsc::Sender>, -} - -impl ServiceChangeListener { - pub fn new() -> (Self, mpsc::Receiver>) { - let (tx, rx) = mpsc::channel(64); - let this = Self { tx }; - (this, rx) + #[allow(dead_code)] + fn version(&self) -> &str { + &self.version } - pub fn changed(&self, instances: Vec) -> Result<(), dubbo::StdError> { - match self.tx.try_send(instances) { - Ok(_) => Ok(()), - Err(e) => { - error!("send service change failed: {:?}", e); - Err(anyhow!("send service change failed: {:?}", e).into()) - } - } + #[allow(dead_code)] + fn group(&self) -> &str { + &self.group } -} - -impl NamingEventListener for ServiceChangeListener { - fn event(&self, event: Arc) { - debug!("service change {}", event.service_name.clone()); - debug!("nacos event: {:?}", event); - let instances = event.instances.as_ref(); - match instances { - None => { - let _ = self.changed(Vec::default()); - } - Some(instances) => { - let _ = self.changed(instances.clone()); - } - } + #[allow(dead_code)] + fn value(&self) -> &str { + &self.value } } @@ -593,7 +421,9 @@ pub mod tests { use core::time; use std::thread; + use tracing::error; + use dubbo_base::{extension_param::ExtensionName, registry_param::Side}; use tracing::metadata::LevelFilter; use super::*; @@ -610,13 +440,17 @@ pub mod tests { .with_max_level(LevelFilter::DEBUG) .init(); - let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap(); - let registry = NacosRegistry::new(nacos_registry_url); + let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry" + .parse() + .unwrap(); + extension_url.add_query_param(ExtensionName::new("nacos".to_string())); + extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap())); + + let registry = NacosRegistry::create(&extension_url).await.unwrap(); - let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap(); - service_url - .params - .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned()); + let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap(); + + service_url.add_query_param(Side::Provider); let ret = registry.register(service_url).await; @@ -638,13 +472,17 @@ pub mod tests { .with_max_level(LevelFilter::DEBUG) .init(); - let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap(); - let registry = NacosRegistry::new(nacos_registry_url); + let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry" + .parse() + .unwrap(); + extension_url.add_query_param(ExtensionName::new("nacos".to_string())); + extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap())); + + let registry = NacosRegistry::create(&extension_url).await.unwrap(); + + let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap(); - let mut service_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap(); - service_url - .params - .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned()); + service_url.add_query_param(Side::Provider); let ret = registry.register(service_url).await; @@ -653,7 +491,7 @@ pub mod tests { let sleep_millis = time::Duration::from_secs(10); thread::sleep(sleep_millis); - let unregister_url = Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap(); + let unregister_url = "tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap(); let ret = registry.unregister(unregister_url).await; info!("deregister result: {:?}", ret); @@ -674,19 +512,23 @@ pub mod tests { .with_max_level(LevelFilter::DEBUG) .init(); - let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap(); - let registry = NacosRegistry::new(nacos_registry_url); + let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry" + .parse() + .unwrap(); + extension_url.add_query_param(ExtensionName::new("nacos".to_string())); + extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap())); - let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap(); - service_url - .params - .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned()); + let registry = NacosRegistry::create(&extension_url).await.unwrap(); + + let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap(); + + service_url.add_query_param(Side::Provider); let ret = registry.register(service_url).await; info!("register result: {:?}", ret); - let subscribe_url = Url::from_url("consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap(); + let subscribe_url = "consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap(); let subscribe_ret = registry.subscribe(subscribe_url).await; if let Err(e) = subscribe_ret { @@ -714,19 +556,23 @@ pub mod tests { .with_max_level(LevelFilter::DEBUG) .init(); - let nacos_registry_url = Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap(); - let registry = NacosRegistry::new(nacos_registry_url); + let mut extension_url: Url = "extension://0.0.0.0?extension-type=registry" + .parse() + .unwrap(); + extension_url.add_query_param(ExtensionName::new("nacos".to_string())); + extension_url.add_query_param(RegistryUrl::new("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015".parse().unwrap())); + + let registry = NacosRegistry::create(&extension_url).await.unwrap(); - let mut service_url = Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap(); - service_url - .params - .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned()); + let mut service_url: Url = "tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807".parse().unwrap(); + + service_url.add_query_param(Side::Provider); let ret = registry.register(service_url).await; info!("register result: {:?}", ret); - let subscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap(); + let subscribe_url = "provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider".parse().unwrap(); let ret = registry.subscribe(subscribe_url).await; @@ -742,13 +588,7 @@ pub mod tests { let sleep_millis = time::Duration::from_secs(40); thread::sleep(sleep_millis); - let unsubscribe_url = Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap(); - let ret = registry.unsubscribe(unsubscribe_url).await; - - if let Err(e) = ret { - error!("error message: {:?}", e); - return; - } + drop(rx); let sleep_millis = time::Duration::from_secs(40); thread::sleep(sleep_millis); diff --git a/registry/nacos/src/utils/mod.rs b/registry/nacos/src/utils/mod.rs deleted file mode 100644 index b247f60d..00000000 --- a/registry/nacos/src/utils/mod.rs +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use dubbo_base::Url; -use nacos_sdk::api::props::ClientProps; - -const APP_NAME_KEY: &str = "AppName"; - -const UNKNOWN_APP: &str = "UnknownApp"; - -const NAMESPACE_KEY: &str = "namespace"; - -const DEFAULT_NAMESPACE: &str = "public"; - -const USERNAME_KEY: &str = "username"; - -const PASSWORD_KEY: &str = "password"; - -const BACKUP_KEY: &str = "backup"; - -const WILDCARD: &str = "*"; - -const RANGE_STR_SEPARATOR: &str = ","; - -pub(crate) fn build_nacos_client_props(url: &Url) -> (nacos_sdk::api::props::ClientProps, bool) { - let host = &url.ip; - let port = &url.port; - let backup = url - .get_param(BACKUP_KEY) - .map(|mut data| { - data.insert(0, ','); - data - }) - .unwrap_or_default(); - let server_addr = format!("{}:{}{}", host, port, backup); - - let namespace = url - .get_param(NAMESPACE_KEY) - .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string()); - let app_name = url - .get_param(APP_NAME_KEY) - .unwrap_or_else(|| UNKNOWN_APP.to_string()); - let username = url.get_param(USERNAME_KEY).unwrap_or_default(); - let password = url.get_param(PASSWORD_KEY).unwrap_or_default(); - - let enable_auth = !password.is_empty() && !username.is_empty(); - - // todo ext parameters - - let mut client_props = ClientProps::new(); - - client_props = client_props - .server_addr(server_addr) - .namespace(namespace) - .app_name(app_name) - .auth_username(username) - .auth_password(password); - - (client_props, enable_auth) -} - -pub(crate) fn is_wildcard_str(str: &str) -> bool { - str.eq(WILDCARD) -} - -pub(crate) fn is_range_str(str: &str) -> bool { - let ret = str.split(RANGE_STR_SEPARATOR); - let count = ret.count(); - count > 1 -} - -pub(crate) fn is_concrete_str(str: &str) -> bool { - !is_wildcard_str(str) && !is_range_str(str) -} - -pub(crate) fn match_range(range: &str, value: &str) -> bool { - if range.is_empty() { - return true; - } - - if !is_range_str(range) { - return false; - } - - range - .split(RANGE_STR_SEPARATOR) - .any(|data| (*data).eq(value)) -} diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs index f3733d50..dc44899d 100644 --- a/registry/zookeeper/src/lib.rs +++ b/registry/zookeeper/src/lib.rs @@ -22,17 +22,15 @@ use std::{collections::HashMap, env, sync::Arc, time::Duration}; use async_trait::async_trait; use dubbo_base::{ constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY}, - Url, + StdError, Url, }; use dubbo_logger::tracing::{debug, error, info}; use serde::{Deserialize, Serialize}; use tokio::{select, sync::mpsc}; use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper}; -use dubbo::{ - registry::n_registry::{DiscoverStream, Registry, ServiceChange}, - StdError, -}; +use dubbo::extension::registry_extension::{DiscoverStream, Registry, ServiceChange}; +use dubbo_base::{registry_param::InterfaceName, url::UrlParam}; // 从url中获取服务注册的元数据 // rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s) @@ -184,24 +182,24 @@ impl ZookeeperRegistry { ) -> (Vec, Vec) { let old_urls_map: HashMap = old_urls .iter() - .map(|url| dubbo_base::Url::from_url(url.as_str())) - .filter(|item| item.is_some()) + .map(|url| url.parse()) + .filter(|item| item.is_ok()) .map(|item| item.unwrap()) - .map(|item| { - let ip_port = item.get_ip_port(); - let url = item.encoded_raw_url_string(); + .map(|item: Url| { + let ip_port = item.authority().to_owned(); + let url = item.as_str().to_owned(); (ip_port, url) }) .collect(); let new_urls_map: HashMap = new_urls .iter() - .map(|url| dubbo_base::Url::from_url(url.as_str())) - .filter(|item| item.is_some()) + .map(|url| url.parse()) + .filter(|item| item.is_ok()) .map(|item| item.unwrap()) - .map(|item| { - let ip_port = item.get_ip_port(); - let url = item.encoded_raw_url_string(); + .map(|item: Url| { + let ip_port = item.authority().to_owned(); + let url = item.as_str().to_owned(); (ip_port, url) }) .collect(); @@ -263,24 +261,23 @@ impl Default for ZookeeperRegistry { impl Registry for ZookeeperRegistry { async fn register(&self, url: Url) -> Result<(), StdError> { debug!("register url: {}", url); + let interface_name = url.query::().unwrap().value(); + let url_str = url.as_str(); let zk_path = format!( "/{}/{}/{}/{}", - DUBBO_KEY, - url.service_name, - PROVIDERS_KEY, - url.encoded_raw_url_string() + DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str ); self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, CreateMode::Ephemeral)?; Ok(()) } async fn unregister(&self, url: Url) -> Result<(), StdError> { + let interface_name = url.query::().unwrap().value(); + let url_str = url.as_str(); + let zk_path = format!( "/{}/{}/{}/{}", - DUBBO_KEY, - url.service_name, - PROVIDERS_KEY, - url.encoded_raw_url_string() + DUBBO_KEY, interface_name, PROVIDERS_KEY, url_str ); self.delete_path(zk_path.as_str()); Ok(()) @@ -288,8 +285,9 @@ impl Registry for ZookeeperRegistry { // for consumer to find the changes of providers async fn subscribe(&self, url: Url) -> Result { - let service_name = url.get_service_name(); - let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY); + let interface_name = url.query::().unwrap().value(); + + let zk_path = format!("/{}/{}/{}", DUBBO_KEY, interface_name, PROVIDERS_KEY); debug!("subscribe service: {}", zk_path); @@ -302,12 +300,12 @@ impl Registry for ZookeeperRegistry { let zk_client_in_task = self.zk_client.clone(); let zk_path_in_task = zk_path.clone(); - let service_name_in_task = service_name.clone(); + let interface_name_in_task = interface_name.clone(); let arc_listener_in_task = arc_listener.clone(); tokio::spawn(async move { let zk_client = zk_client_in_task; let zk_path = zk_path_in_task; - let service_name = service_name_in_task; + let interface_name = interface_name_in_task; let listener = arc_listener_in_task; let mut current_urls = Vec::new(); @@ -383,12 +381,17 @@ impl Registry for ZookeeperRegistry { } async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { - let service_name = url.get_service_name(); - let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY); + let interface_name = url.query::().unwrap().value(); + + let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &interface_name, PROVIDERS_KEY); info!("unsubscribe service: {}", zk_path); Ok(()) } + + fn url(&self) -> &Url { + todo!() + } } pub struct ZooKeeperListener { diff --git a/remoting/base/src/exchange/client.rs b/remoting/base/src/exchange/client.rs index edbb4a63..71c42d4d 100644 --- a/remoting/base/src/exchange/client.rs +++ b/remoting/base/src/exchange/client.rs @@ -51,7 +51,7 @@ impl ExchangeClient { pub fn new(url: Url, client: BoxedClient, connection_timeout: Duration) -> Self { ExchangeClient { connection_timeout, - address: url.get_ip_port(), + address: url.authority().to_owned(), client: None, init: AtomicBool::new(false), active: AtomicI32::new(0),