diff --git a/application.yaml b/application.yaml index 902b0efd..bec29a67 100644 --- a/application.yaml +++ b/application.yaml @@ -25,33 +25,5 @@ dubbo: routers: consumer: - service: "org.apache.dubbo.sample.tri.Greeter" - url: triple://localhost:20000 - protocol: triple - nacos: - addr: "127.0.0.1:8848" - namespace: "" - app: "" - conditions: - - scope: "service" - force: false - runtime: true - enabled: true - key: "org.apache.dubbo.sample.tri.Greeter" - conditions: - - method=greet => port=8889 - - scope: "service" - force: true - runtime: true - enabled: true - key: "user.UserService" - conditions: - - method=get_s => port=2003 - tags: - force: true - enabled: true - key: shop-detail - tags: - - name: gray - matches: - - key: env - value: gray \ No newline at end of file + url: tri://127.0.0.1:20000 + protocol: triple \ No newline at end of file diff --git a/config/src/router.rs b/config/src/router.rs index 98aa14a6..b45bd478 100644 --- a/config/src/router.rs +++ b/config/src/router.rs @@ -2,9 +2,10 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] pub struct ConditionRouterConfig { + #[serde(rename = "configVersion")] + pub config_version: String, pub scope: String, pub force: bool, - pub runtime: bool, pub enabled: bool, pub key: String, pub conditions: Vec, @@ -12,6 +13,8 @@ pub struct ConditionRouterConfig { #[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] pub struct TagRouterConfig { + #[serde(rename = "configVersion")] + pub config_version: String, pub force: bool, pub enabled: bool, pub key: String, @@ -28,6 +31,7 @@ pub struct ConsumerConfig { #[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] pub struct Tag { pub name: String, + #[serde(rename = "match")] pub matches: Vec, } diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index 547878bf..66a3c43a 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -39,7 +39,7 @@ use tower::{ ready_cache::ReadyCache, }; -use crate::cluster::Directory; +use crate::{cluster::Directory, codegen::RpcInvocation, invocation::Invocation}; /// Directory. /// @@ -68,12 +68,12 @@ impl StaticDirectory { } impl Directory for StaticDirectory { - fn list(&self, service_name: String) -> Vec { + fn list(&self, inv: Arc) -> Vec { let url = Url::from_url(&format!( "tri://{}:{}/{}", self.uri.host().unwrap(), self.uri.port().unwrap(), - service_name, + inv.get_target_service_unique_name(), )) .unwrap(); let invoker = Box::new(TripleInvoker::new(url)); @@ -97,7 +97,8 @@ impl RegistryDirectory { } impl Directory for RegistryDirectory { - fn list(&self, service_name: String) -> Vec { + fn list(&self, inv: Arc) -> Vec { + let service_name = inv.get_target_service_unique_name(); let url = Url::from_url(&format!( "triple://{}:{}/{}", "127.0.0.1", "8888", service_name diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index 7411a675..964150d5 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -23,6 +23,9 @@ use tower::{ready_cache::ReadyCache, ServiceExt}; use tower_service::Service; use crate::{ + cluster::router::{ + manager::router_manager::get_global_router_manager, router_chain::RouterChain, + }, codegen::RpcInvocation, invocation::Invocation, protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker}, @@ -35,7 +38,7 @@ pub mod loadbalance; pub mod router; pub trait Directory: Debug { - fn list(&self, service_name: String) -> Vec; + fn list(&self, inv: Arc) -> Vec; } type BoxDirectory = Box; @@ -134,7 +137,7 @@ impl Service> for FailoverCluster { let inv = inv.unwrap(); let service_name = inv.get_target_service_unique_name(); - let invokers = self.dir.list(service_name.clone()); + let invokers = self.dir.list(Arc::new(inv.clone())); Box::pin(async move { let mut current_req = req; @@ -172,55 +175,36 @@ impl Invoker> for FailoverCluster { #[derive(Debug, Default)] pub struct MockDirectory { - // router_chain: RouterChain, + router_chain: RouterChain, } impl MockDirectory { - pub fn new() -> MockDirectory { - // let router_chain = get_global_router_manager().read().unwrap().get_router_chain(invocation); - Self { - // router_chain - } + pub fn new(service_name: String) -> MockDirectory { + let router_chain = get_global_router_manager() + .read() + .unwrap() + .get_router_chain(service_name); + Self { router_chain } } } impl Directory for MockDirectory { - fn list(&self, service_name: String) -> Vec { - // tracing::info!("MockDirectory: {}", meta); + fn list(&self, inv: Arc) -> Vec { let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); - vec![Box::new(TripleInvoker::new(u))] - // self.router_chain.route(u, invo); + let mut urls = vec![u]; + // tracing::info!("MockDirectory: {}", meta); + urls = self.router_chain.route(urls, inv); + let mut result = Vec::new(); + for url in urls { + result.push(Box::new(TripleInvoker::new(url)) as BoxInvoker); + } + result } } - -// #[derive(Debug, Default)] -// pub struct RouterChain { -// router: HashMap, -// invokers: Arc>, -// } - -// impl RouterChain { -// pub fn route(&mut self, url: Url, invo: Arc) -> Arc> { -// let r = self.router.get("mock").unwrap(); -// r.route(self.invokers.clone(), url, invo) -// } -// } - -// pub trait Router: Debug { -// fn route( -// &self, -// invokers: Arc>, -// url: Url, -// invo: Arc, -// ) -> Arc>; -// } - -// pub type BoxRouter = Box; - #[cfg(test)] pub mod tests { - use std::task::Poll; + use std::{sync::Arc, task::Poll}; use bytes::{Buf, BufMut, BytesMut}; use dubbo_base::Url; @@ -250,8 +234,11 @@ pub mod tests { struct MockDirectory; impl Directory for MockDirectory { - fn list(&self, service_name: String) -> Vec { - println!("get invoker list for {}", service_name); + fn list(&self, inv: Arc) -> Vec { + println!( + "get invoker list for {}", + inv.get_target_service_unique_name() + ); vec![ Box::new(MockInvoker(1)), diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs index f39cd1c7..73aca005 100644 --- a/dubbo/src/cluster/router/condition/condition_router.rs +++ b/dubbo/src/cluster/router/condition/condition_router.rs @@ -17,19 +17,18 @@ pub struct ConditionRouter { } impl Router for ConditionRouter { - fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec { - let mut invokers_result = invokers.clone(); - if let Some(routers) = self.application_routers.clone() { + fn route(&self, mut invokers: Vec, url: Url, invo: Arc) -> Vec { + if let Some(routers) = &self.application_routers { for router in &routers.read().unwrap().routers { - invokers_result = router.route(invokers_result, url.clone(), invo.clone()) + invokers = router.route(invokers, url.clone(), invo.clone()); } } - if let Some(routers) = self.service_routers.clone() { + if let Some(routers) = &self.service_routers { for router in &routers.read().unwrap().routers { - invokers_result = router.route(invokers_result, url.clone(), invo.clone()) + invokers = router.route(invokers, url.clone(), invo.clone()); } } - invokers_result + invokers } } diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs index 8c9177e8..92bbe2da 100644 --- a/dubbo/src/cluster/router/condition/matcher.rs +++ b/dubbo/src/cluster/router/condition/matcher.rs @@ -1,6 +1,5 @@ -use crate::codegen::RpcInvocation; use regex::Regex; -use std::{collections::HashSet, error::Error, option::Option, sync::Arc}; +use std::{collections::HashSet, error::Error, option::Option}; #[derive(Clone, Debug, Default)] pub struct ConditionMatcher { @@ -18,50 +17,25 @@ impl ConditionMatcher { } } - pub fn is_match( - &self, - value: Option, - invocation: Arc, - _is_when: bool, - ) -> Result> { + pub fn is_match(&self, value: Option) -> Result> { match value { - None => { - // if key does not present in whichever of url, invocation or attachment based on the matcher type, then return false. - Ok(false) - } + None => Ok(false), Some(val) => { - if !self.matches.is_empty() && self.mismatches.is_empty() { - for match_ in self.matches.iter() { - if self.do_pattern_match(match_, &val, invocation.clone())? { - return Ok(true); - } - } - Ok(false) - } else if !self.mismatches.is_empty() && self.matches.is_empty() { - for mismatch in self.mismatches.iter() { - if self.do_pattern_match(mismatch, &val, invocation.clone())? { - return Ok(false); - } + for match_ in self.matches.iter() { + if self.do_pattern_match(match_, &val) { + return Ok(true); } - Ok(true) - } else if !self.matches.is_empty() && !self.mismatches.is_empty() { - for mismatch in self.mismatches.iter() { - if self.do_pattern_match(mismatch, &val, invocation.clone())? { - return Ok(false); - } - } - for match_ in self.matches.iter() { - if self.do_pattern_match(match_, &val, invocation.clone())? { - return Ok(true); - } + } + for mismatch in self.mismatches.iter() { + if !self.do_pattern_match(mismatch, &val) { + return Ok(true); } - Ok(false) - } else { - Ok(false) } + Ok(false) } } } + pub fn get_matches(&mut self) -> &mut HashSet { &mut self.matches } @@ -69,16 +43,26 @@ impl ConditionMatcher { &mut self.mismatches } - fn do_pattern_match( - &self, - pattern: &String, - value: &String, - _invocation: Arc, - ) -> Result> { - if pattern.contains("*") { - return Ok(star_matcher(pattern, value)); + fn do_pattern_match(&self, pattern: &str, value: &str) -> bool { + if pattern.contains('*') { + return star_matcher(pattern, value); + } + + if pattern.contains('~') { + let parts: Vec<&str> = pattern.split('~').collect(); + + if parts.len() == 2 { + if let (Ok(left), Ok(right), Ok(val)) = ( + parts[0].parse::(), + parts[1].parse::(), + value.parse::(), + ) { + return range_matcher(val, left, right); + } + } + return false; } - Ok(pattern.eq(value)) + pattern == value } } @@ -89,6 +73,6 @@ pub fn star_matcher(pattern: &str, input: &str) -> bool { regex.is_match(input) } -pub fn _range_matcher(val: i32, min: i32, max: i32) -> bool { +pub fn range_matcher(val: i32, min: i32, max: i32) -> bool { min <= val && val <= max } diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs index e290b150..5f06aa8f 100644 --- a/dubbo/src/cluster/router/condition/single_router.rs +++ b/dubbo/src/cluster/router/condition/single_router.rs @@ -169,20 +169,16 @@ impl ConditionSingleRouter { pub fn match_when(&self, url: Url, invocation: Arc) -> bool { if self.when_condition.is_empty() { - true - } else { - false - }; - self.do_match(url, &self.when_condition, invocation, true) + return true; + } + self.do_match(url, &self.when_condition, invocation) } pub fn match_then(&self, url: Url, invocation: Arc) -> bool { - if self.when_condition.is_empty() { - true - } else { - false - }; - self.do_match(url, &self.then_condition, invocation, false) + if self.then_condition.is_empty() { + return false; + } + self.do_match(url, &self.then_condition, invocation) } pub fn do_match( @@ -190,25 +186,19 @@ impl ConditionSingleRouter { url: Url, conditions: &HashMap>>, invocation: Arc, - is_when: bool, ) -> bool { let sample: HashMap = to_original_map(url); - for (key, condition_matcher) in conditions { + conditions.iter().all(|(key, condition_matcher)| { let matcher = condition_matcher.read().unwrap(); let value = get_value(key, &sample, invocation.clone()); - match matcher.is_match(value, invocation.clone(), is_when) { - Ok(result) => { - if !result { - return false; - } - } + match matcher.is_match(value) { + Ok(result) => result, Err(error) => { info!("Error occurred: {:?}", error); - return false; + false } } - } - true + }) } } diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs index 4207b79b..7ad5e1b6 100644 --- a/dubbo/src/cluster/router/manager/condition_manager.rs +++ b/dubbo/src/cluster/router/manager/condition_manager.rs @@ -16,28 +16,30 @@ pub struct ConditionRouterManager { } impl ConditionRouterManager { - pub fn get_router(&self, service_name: String) -> Option { - let routers_service = self.routers_service.get(&service_name); - match routers_service { - Some(routers_service) => { - if self.routers_application.read().unwrap().is_null() { - return Some(ConditionRouter::new(Some(routers_service.clone()), None)); - } - Some(ConditionRouter::new( + pub fn get_router(&self, service_name: &String) -> Option { + let routers_application_is_null = self.routers_application.read().unwrap().is_null(); + self.routers_service + .get(service_name) + .map(|routers_service| { + ConditionRouter::new( Some(routers_service.clone()), - Some(self.routers_application.clone()), - )) - } - None => { - if self.routers_application.read().unwrap().is_null() { - return None; + if routers_application_is_null { + None + } else { + Some(self.routers_application.clone()) + }, + ) + }) + .or_else(|| { + if routers_application_is_null { + None + } else { + Some(ConditionRouter::new( + None, + Some(self.routers_application.clone()), + )) } - Some(ConditionRouter::new( - None, - Some(self.routers_application.clone()), - )) - } - } + }) } pub fn update(&mut self, config: ConditionRouterConfig) { @@ -45,31 +47,26 @@ impl ConditionRouterManager { let scope = config.scope; let key = config.key; let enable = config.enabled; - let mut routers = Vec::new(); - for condition in config.conditions { - routers.push(ConditionSingleRouter::new(condition, force, enable)); - } + + let routers = config + .conditions + .into_iter() + .map(|condition| ConditionSingleRouter::new(condition, force, enable)) + .collect::>(); + match scope.as_str() { "application" => { self.routers_application.write().unwrap().routers = routers; } "service" => { - if let Some(x) = self.routers_service.get(&key) { - x.write().unwrap().routers = routers - } else { - self.routers_service.insert( - key, - Arc::new(RwLock::new(ConditionSingleRouters::new(routers))), - ); - } + self.routers_service + .entry(key) + .or_insert_with(|| Arc::new(RwLock::new(ConditionSingleRouters::new(vec![])))) + .write() + .unwrap() + .routers = routers; } _ => {} } } - - pub fn _parse_rules(&mut self, configs: Vec) { - for config in configs { - self.update(config) - } - } } diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs index a2a66586..e963181e 100644 --- a/dubbo/src/cluster/router/manager/router_manager.rs +++ b/dubbo/src/cluster/router/manager/router_manager.rs @@ -1,10 +1,7 @@ -use crate::{ - cluster::router::{ - manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager}, - nacos_config_center::nacos_client::NacosClient, - router_chain::RouterChain, - }, - invocation::{Invocation, RpcInvocation}, +use crate::cluster::router::{ + manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager}, + nacos_config_center::nacos_client::NacosClient, + router_chain::RouterChain, }; use dubbo_base::Url; use dubbo_config::{ @@ -19,7 +16,8 @@ use std::{ }; pub static GLOBAL_ROUTER_MANAGER: OnceCell>> = OnceCell::new(); - +const TAG: &str = "tag"; +const CONDITION: &str = "condition"; pub struct RouterManager { pub condition_router_manager: ConditionRouterManager, pub tag_router_manager: TagRouterManager, @@ -28,30 +26,28 @@ pub struct RouterManager { } impl RouterManager { - pub fn get_router_chain(&self, invocation: Arc) -> RouterChain { - let service = invocation.get_target_service_unique_name().clone(); - let condition_router = self.condition_router_manager.get_router(service.clone()); - let tag_router = self.tag_router_manager.get_router(); + pub fn get_router_chain(&self, service: String) -> RouterChain { let mut chain = RouterChain::new(); - match self.consumer.get(service.as_str()) { - None => {} - Some(url) => { - chain.set_condition_router(condition_router); - chain.set_tag_router(tag_router); - chain.self_url = url.clone(); + if let Some(url) = self.consumer.get(service.as_str()) { + if let Some(tag_router) = self.tag_router_manager.get_router(&service) { + chain.add_router(TAG.to_string(), Box::new(tag_router)); + } + if let Some(condition_router) = self.condition_router_manager.get_router(&service) { + chain.add_router(CONDITION.to_string(), Box::new(condition_router)); } + chain.self_url = url.clone(); } chain } pub fn notify(&mut self, event: RouterConfigChangeEvent) { match event.router_kind.as_str() { - "condition" => { + CONDITION => { let config: ConditionRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap(); self.condition_router_manager.update(config) } - "tag" => { + TAG => { let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap(); self.tag_router_manager.update(config) } @@ -66,76 +62,71 @@ impl RouterManager { self.init_router_managers_for_nacos(); } - pub fn init_router_managers_for_nacos(&mut self) { - let config = self + fn init_router_managers_for_nacos(&mut self) { + if let Some(tag_config) = self .nacos .as_ref() - .unwrap() - .get_tag_config("application".to_string()); - match config { - None => {} - Some(tag_config) => { - self.tag_router_manager.init(); - self.tag_router_manager.update(tag_config) - } + .and_then(|n| n.get_config("application", TAG, TAG)) + { + self.tag_router_manager.update(tag_config); } + + if let Some(condition_app_config) = self + .nacos + .as_ref() + .and_then(|n| n.get_config("application", CONDITION, TAG)) + { + self.condition_router_manager.update(condition_app_config); + } + for (service_name, _) in &self.consumer { - let config = self + if let Some(condition_config) = self .nacos .as_ref() - .unwrap() - .get_condition_config(service_name.clone()); - match config { - None => {} - Some(condition_config) => self.condition_router_manager.update(condition_config), + .and_then(|n| n.get_config(service_name, CONDITION, CONDITION)) + { + self.condition_router_manager.update(condition_config); } } } pub fn init(&mut self) { let config = get_global_config().routers.clone(); + self.init_consumer_configs(); + if let Some(nacos_config) = &config.nacos { + self.init_nacos(nacos_config.clone()); + } else { + trace!("Nacos not configured, using local YAML configuration for routing"); + if let Some(condition_configs) = &config.conditions { + for condition_config in condition_configs { + self.condition_router_manager + .update(condition_config.clone()); + } + } else { + info!("Unconfigured Condition Router") + } + if let Some(tag_config) = &config.tags { + self.tag_router_manager.update(tag_config.clone()); + } else { + info!("Unconfigured Tag Router") + } + } + } + + fn init_consumer_configs(&mut self) { let consumer_configs = get_global_config() .routers .consumer .clone() - .unwrap_or(Vec::new()); + .unwrap_or_else(Vec::new); + for consumer_config in consumer_configs { - self.consumer.insert( - consumer_config.service.clone(), - Url::from_url( - format!("{}/{}", consumer_config.url, consumer_config.service).as_str(), - ) - .expect("consumer配置出错!Url生成错误"), - ); - } - match &config.nacos { - None => { - trace!("Nacos not configured, using local YAML configuration for routing"); - let condition = config.conditions.clone(); - match condition { - None => { - info!("Unconfigured Condition Router") - } - Some(cons) => { - for con in cons { - self.condition_router_manager.update(con) - } - } - } - let tag = config.tags.clone(); - match tag { - None => { - info!("Unconfigured Tag Router") - } - Some(ta) => { - self.tag_router_manager.init(); - self.tag_router_manager.update(ta) - } - } - } - Some(config) => { - self.init_nacos(config.clone()); - } + let service_url = Url::from_url( + format!("{}/{}", consumer_config.url, consumer_config.service).as_str(), + ) + .expect("Consumer config error"); + + self.consumer.insert(consumer_config.service, service_url); } } } diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs index ce30c92e..8dc24999 100644 --- a/dubbo/src/cluster/router/manager/tag_manager.rs +++ b/dubbo/src/cluster/router/manager/tag_manager.rs @@ -1,27 +1,20 @@ -use crate::cluster::router::tag::tag_router::TagRouter; +use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner}; use dubbo_config::router::TagRouterConfig; use std::sync::{Arc, RwLock}; #[derive(Debug, Clone, Default)] pub struct TagRouterManager { - pub tag_router: Option>>, + pub tag_router: Arc>, } impl TagRouterManager { - pub fn init(&mut self) { - self.tag_router = Some(Arc::new(RwLock::new(TagRouter::default()))) - } - - pub fn get_router(&self) -> Option>> { - self.tag_router.clone() + pub fn get_router(&self, _service_name: &String) -> Option { + Some(TagRouter { + inner: self.tag_router.clone(), + }) } pub fn update(&mut self, config: TagRouterConfig) { - self.tag_router - .as_ref() - .unwrap() - .write() - .unwrap() - .parse_config(config) + self.tag_router.write().unwrap().parse_config(config); } } diff --git a/dubbo/src/cluster/router/mod.rs b/dubbo/src/cluster/router/mod.rs index 84ceae15..17c9aec2 100644 --- a/dubbo/src/cluster/router/mod.rs +++ b/dubbo/src/cluster/router/mod.rs @@ -9,17 +9,17 @@ use crate::invocation::RpcInvocation; use dubbo_base::Url; use std::{fmt::Debug, sync::Arc}; -pub trait Router: Debug + Clone { - fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec; +pub trait Router: Debug { + fn route(&self, invokers: Vec, url: Url, invocation: Arc) -> Vec; } -// pub type BoxRouter = Box; +pub type BoxRouter = Box; #[derive(Debug, Default, Clone)] pub struct MockRouter {} impl Router for MockRouter { - fn route(&self, invokers: Vec, _url: Url, _invo: Arc) -> Vec { + fn route(&self, invokers: Vec, _url: Url, _invocation: Arc) -> Vec { invokers } } diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs index 1600e9b0..ce72641a 100644 --- a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs +++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs @@ -1,7 +1,7 @@ use crate::cluster::router::manager::router_manager::{ get_global_router_manager, RouterConfigChangeEvent, }; -use dubbo_config::router::{ConditionRouterConfig, NacosConfig, TagRouterConfig}; +use dubbo_config::router::NacosConfig; use dubbo_logger::{tracing, tracing::info}; use nacos_sdk::api::{ config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder}, @@ -24,94 +24,86 @@ impl NacosClient { let server_addr = config.addr; let namespace = config.namespace; let app = config.app; - match config.enable_auth { - None => { - info!("disable nacos auth!"); - info!("nacos init,addr:{}", server_addr); - let client = Arc::new(RwLock::new( - ConfigServiceBuilder::new( - ClientProps::new() - .server_addr(server_addr) - .namespace(namespace) - .app_name(app), - ) - .build() - .expect("NacosClient build failed!Please check NacosConfig"), - )); - Self { client } - } - Some(auth) => { - info!("enable nacos auth!"); - info!("nacos init,addr:{}", server_addr); - let client = Arc::new(RwLock::new( - ConfigServiceBuilder::new( - ClientProps::new() - .server_addr(server_addr) - .namespace(namespace) - .app_name(app) - .auth_username(auth.auth_username) - .auth_password(auth.auth_password), - ) - // .enable_auth_plugin_http() - .build() - .expect("NacosClient build failed!Please check NacosConfig"), - )); - return Self { client }; - } + let enable_auth = config.enable_auth; + + let mut props = ClientProps::new() + .server_addr(server_addr) + .namespace(namespace) + .app_name(app); + + if enable_auth.is_some() { + info!("enable nacos auth!"); + } else { + info!("disable nacos auth!"); } - } - pub fn get_condition_config(&self, data_id: String) -> Option { - let config_resp = self - .client - .read() - .unwrap() - .get_config(data_id.clone(), "condition".to_string()); - return match config_resp { - Ok(config_resp) => { - self.add_listener(data_id.clone(), "condition".to_string()); - let string = config_resp.content(); - Some(serde_yaml::from_str(string).unwrap()) - } - Err(_err) => None, - }; + + if let Some(auth) = enable_auth { + props = props + .auth_username(auth.auth_username) + .auth_password(auth.auth_password); + } + + let client = Arc::new(RwLock::new( + ConfigServiceBuilder::new(props) + .build() + .expect("NacosClient build failed! Please check NacosConfig"), + )); + + Self { client } } - pub fn get_tag_config(&self, data_id: String) -> Option { + pub fn get_config(&self, data_id: &str, group: &str, config_type: &str) -> Option + where + T: serde::de::DeserializeOwned, + { let config_resp = self .client .read() .unwrap() - .get_config(data_id.clone(), "tag".to_string()); - return match config_resp { + .get_config(data_id.to_string(), group.to_string()); + + match config_resp { Ok(config_resp) => { - self.add_listener(data_id.clone(), "tag".to_string()); + self.add_listener(data_id, group); let string = config_resp.content(); let result = serde_yaml::from_str(string); + match result { Ok(config) => { - info!("success to get TagRouter config and parse success"); + info!( + "success to get {}Router config and parse success", + config_type + ); Some(config) } - _ => { - info!("failed to parse TagRouter rule"); + Err(_) => { + info!("failed to parse {}Router rule", config_type); None } } } - Err(_err) => None, - }; + Err(_) => None, + } } - pub fn add_listener(&self, data_id: String, group: String) { - let res_listener = self + + pub fn add_listener(&self, data_id: &str, group: &str) { + if let Err(err) = self .client .write() - .expect("failed to create nacos config listener") - .add_listener(data_id, group, Arc::new(ConfigChangeListenerImpl {})); - match res_listener { - Ok(_) => { - info!("listening the config success"); - } - Err(err) => tracing::error!("listen config error {:?}", err), + .map_err(|e| format!("failed to create nacos config listener: {}", e)) + .and_then(|client| { + client + .add_listener( + data_id.to_string(), + group.to_string(), + Arc::new(ConfigChangeListenerImpl {}), + ) + .map_err(|e| format!("failed to add nacos config listener: {}", e)) + }) + { + tracing::error!("{}", err); + } else { + info!("listening the config success"); } } } @@ -120,13 +112,15 @@ impl ConfigChangeListener for ConfigChangeListenerImpl { fn notify(&self, config_resp: ConfigResponse) { let content_type = config_resp.content_type(); let event = RouterConfigChangeEvent { - service_name: config_resp.data_id().clone(), - router_kind: config_resp.group().clone(), - content: config_resp.content().clone(), + service_name: config_resp.data_id().to_string(), + router_kind: config_resp.group().to_string(), + content: config_resp.content().to_string(), }; + if content_type == "yaml" { get_global_router_manager().write().unwrap().notify(event); } - info!("notify config={:?}", config_resp); + + info!("notify config: {:?}", config_resp); } } diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs index 930f0f29..42d5826f 100644 --- a/dubbo/src/cluster/router/router_chain.rs +++ b/dubbo/src/cluster/router/router_chain.rs @@ -1,52 +1,30 @@ -use crate::{ - cluster::router::{ - condition::condition_router::ConditionRouter, tag::tag_router::TagRouter, Router, - }, - invocation::RpcInvocation, -}; +use crate::{cluster::router::BoxRouter, invocation::RpcInvocation}; use dubbo_base::Url; -use std::sync::{Arc, RwLock}; +use std::{collections::HashMap, sync::Arc}; -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default)] pub struct RouterChain { - pub condition_router: Option, - pub tag_router: Option>>, + pub routers: HashMap, pub self_url: Url, } impl RouterChain { pub fn new() -> Self { RouterChain { - condition_router: None, - tag_router: None, + routers: HashMap::new(), self_url: Url::new(), } } - pub fn set_condition_router(&mut self, router: Option) { - self.condition_router = router; - } - pub fn set_tag_router(&mut self, router: Option>>) { - self.tag_router = router; - } - pub fn route(&self, invokers: Vec, invocation: Arc) -> Vec { - let mut result = invokers.clone(); - match &self.tag_router { - None => {} - Some(router) => { - result = - router - .read() - .unwrap() - .route(result, self.self_url.clone(), invocation.clone()) - } - } - match &self.condition_router { - Some(router) => { - result = router.route(result, self.self_url.clone(), invocation.clone()) - } - None => {} + + pub fn route(&self, mut invokers: Vec, invocation: Arc) -> Vec { + for (_, value) in self.routers.iter() { + invokers = value.route(invokers, self.self_url.clone(), invocation.clone()) } - result + invokers + } + + pub fn add_router(&mut self, key: String, router: BoxRouter) { + self.routers.insert(key, router); } } @@ -54,13 +32,16 @@ impl RouterChain { fn test() { use crate::cluster::router::manager::router_manager::get_global_router_manager; - let u1 = Url::from_url("triple://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap(); - let u2 = Url::from_url("triple://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap(); - let u3 = Url::from_url("triple://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap(); - let u4 = Url::from_url("triple://127.0.2.1:880/org.apache.dubbo.sample.tri.Greeter").unwrap(); - let u5 = Url::from_url("triple://127.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap(); - let invos = vec![u1, u2, u3, u4, u5]; - let invo = Arc::new( + let u1 = Url::from_url("tri://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u2 = Url::from_url("tri://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u3 = Url::from_url("tri://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u4 = Url::from_url("tri://127.0.2.1:8880/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u5 = Url::from_url("tri://127.0.1.1:8882/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u6 = Url::from_url("tri://213.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u7 = Url::from_url("tri://169.0.1.1:8887/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let invs = vec![u1, u2, u3, u4, u5, u6, u7]; + let len = invs.len().clone(); + let inv = Arc::new( RpcInvocation::default() .with_method_name("greet".to_string()) .with_service_unique_name("org.apache.dubbo.sample.tri.Greeter".to_string()), @@ -68,7 +49,8 @@ fn test() { let x = get_global_router_manager() .read() .unwrap() - .get_router_chain(invo.clone()); - let result = x.route(invos, invo.clone()); + .get_router_chain(inv.get_target_service_unique_name()); + let result = x.route(invs, inv.clone()); + println!("total:{},result:{}", len, result.len().clone()); dbg!(result); } diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs index d1a962e7..7a83ea57 100644 --- a/dubbo/src/cluster/router/tag/tag_router.rs +++ b/dubbo/src/cluster/router/tag/tag_router.rs @@ -4,16 +4,30 @@ use crate::{ }; use dubbo_base::Url; use dubbo_config::router::TagRouterConfig; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{ + collections::HashMap, + fmt::Debug, + sync::{Arc, RwLock}, +}; #[derive(Debug, Clone, Default)] -pub struct TagRouter { +pub struct TagRouterInner { pub tag_rules: HashMap>, pub force: bool, pub enabled: bool, } -impl TagRouter { +#[derive(Debug, Clone, Default)] +pub struct TagRouter { + pub(crate) inner: Arc>, +} +impl Router for TagRouter { + fn route(&self, invokers: Vec, url: Url, invocation: Arc) -> Vec { + return self.inner.read().unwrap().route(invokers, url, invocation); + } +} + +impl TagRouterInner { pub fn parse_config(&mut self, config: TagRouterConfig) { self.tag_rules = HashMap::new(); self.force = config.force; @@ -43,26 +57,28 @@ impl TagRouter { } tag_result } -} -impl Router for TagRouter { - fn route(&self, invokers: Vec, url: Url, _invocation: Arc) -> Vec { + pub fn route(&self, invokers: Vec, url: Url, _invocation: Arc) -> Vec { if !self.enabled { return invokers; }; let self_param = to_original_map(url); let invocation_tag = self.match_tag(self_param); let mut invokers_result = Vec::new(); + let mut invokers_no_tag = Vec::new(); for invoker in &invokers { let invoker_param = to_original_map(invoker.clone()); let invoker_tag = self.match_tag(invoker_param); + if invoker_tag == None { + invokers_no_tag.push(invoker.clone()); + } if invoker_tag == invocation_tag { - invokers_result.push(invoker.clone()) + invokers_result.push(invoker.clone()); } } if invokers_result.is_empty() { if !self.force { - return invokers; + return invokers_no_tag; } } invokers_result diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index d3eb8ad0..57503379 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -196,7 +196,7 @@ pub trait Invocation { fn get_method_name(&self) -> String; } -#[derive(Default,Clone)] +#[derive(Default, Clone)] pub struct RpcInvocation { target_service_unique_name: String, method_name: String, @@ -224,4 +224,4 @@ impl Invocation for RpcInvocation { fn get_method_name(&self) -> String { self.method_name.clone() } -} \ No newline at end of file +} diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 50080555..c941ae5d 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -21,12 +21,11 @@ use std::{ }; use async_trait::async_trait; -use aws_smithy_http::body::SdkBody; use tower_service::Service; use dubbo_base::Url; -use crate::triple::client::replay::{ClonedBytesStream, ClonedBody}; +use crate::triple::client::replay::ClonedBody; pub mod server_desc; pub mod triple; diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index af174451..9f9c7698 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -24,7 +24,10 @@ use tower_service::Service; use crate::{ protocol::Invoker, - triple::{client::{builder::ClientBoxService, replay::ClonedBody}, transport::connection::Connection}, + triple::{ + client::{builder::ClientBoxService, replay::ClonedBody}, + transport::connection::Connection, + }, utils::boxed_clone::BoxCloneService, }; diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 32b26a55..a9756f7e 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::{ cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory}, - codegen::{RegistryDirectory, RpcInvocation, TripleInvoker}, + codegen::{RegistryDirectory, TripleInvoker}, protocol::BoxInvoker, utils::boxed_clone::BoxCloneService, }; @@ -101,14 +101,14 @@ impl ClientBuilder { Self { direct, ..self } } - pub fn build(self) -> Option { + pub fn build(self, service_name: String) -> Option { if self.direct { return Some(Box::new(TripleInvoker::new( Url::from_url(&self.host).unwrap(), ))); } - let cluster = MockCluster::default().join(Box::new(MockDirectory::new())); + let cluster = MockCluster::default().join(Box::new(MockDirectory::new(service_name))); return Some(cluster); } diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs index 97be85eb..013d9e30 100644 --- a/dubbo/src/triple/client/mod.rs +++ b/dubbo/src/triple/client/mod.rs @@ -16,6 +16,6 @@ */ pub mod builder; -pub mod triple; pub mod replay; +pub mod triple; pub use triple::TripleClient; diff --git a/dubbo/src/triple/client/replay.rs b/dubbo/src/triple/client/replay.rs index 4ba8b930..195d7509 100644 --- a/dubbo/src/triple/client/replay.rs +++ b/dubbo/src/triple/client/replay.rs @@ -253,7 +253,7 @@ impl Body for ClonedBody { fn poll_trailers( self: Pin<&mut Self>, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll, Self::Error>> { Poll::Ready(Ok(None)) } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 22273415..ace7aafd 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -21,12 +21,11 @@ use futures_util::{future, stream, StreamExt, TryStreamExt}; use http::HeaderValue; -use super::builder::ClientBuilder; -use super::replay::ClonedBody; +use super::{builder::ClientBuilder, replay::ClonedBody}; use crate::codegen::RpcInvocation; use crate::{ - invocation::{IntoStreamingRequest, Metadata, Request, Response}, + invocation::{IntoStreamingRequest, Invocation, Metadata, Request, Response}, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; @@ -153,7 +152,7 @@ impl TripleClient { .builder .clone() .unwrap() - .build() + .build(invocation.get_target_service_unique_name()) .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); @@ -212,14 +211,13 @@ impl TripleClient { ) .into_stream(); - let body = ClonedBody::new(en); let mut conn = self .builder .clone() .unwrap() - .build() + .build(invocation.get_target_service_unique_name()) .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); @@ -267,7 +265,7 @@ impl TripleClient { .builder .clone() .unwrap() - .build() + .build(invocation.get_target_service_unique_name()) .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); @@ -333,7 +331,7 @@ impl TripleClient { .builder .clone() .unwrap() - .build() + .build(invocation.get_target_service_unique_name()) .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index e756d5c7..0701ec3b 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -36,16 +36,12 @@ pub mod echo_client { &mut self, request: Request, ) -> Result, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static( - "/grpc.examples.echo.Echo/UnaryEcho", - ); + let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); self.inner.unary(request, codec, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -53,51 +49,51 @@ pub mod echo_client { &mut self, request: Request, ) -> Result>, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ServerStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); - self.inner.server_streaming(request, codec, path, invocation).await + self.inner + .server_streaming(request, codec, path, invocation) + .await } /// ClientStreamingEcho is client side streaming. pub async fn client_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ClientStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); - self.inner.client_streaming(request, codec, path, invocation).await + self.inner + .client_streaming(request, codec, path, invocation) + .await } /// BidirectionalStreamingEcho is bidi streaming. pub async fn bidirectional_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result>, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("BidirectionalStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); - self.inner.bidi_streaming(request, codec, path, invocation).await + self.inner + .bidi_streaming(request, codec, path, invocation) + .await } } } @@ -114,9 +110,7 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type ServerStreamingEchoStream: futures_util::Stream> + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -130,19 +124,14 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type BidirectionalStreamingEchoStream: futures_util::Stream> + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result< - Response, - dubbo::status::Status, - >; + ) -> Result, dubbo::status::Status>; } /// Echo is the echo service. #[derive(Debug)] @@ -172,10 +161,7 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -188,26 +174,18 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -218,32 +196,22 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc - for ServerStreamingEchoServer { + impl ServerStreamingSvc for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = + BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.server_streaming_echo(request).await - }; + let fut = async move { inner.server_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -256,31 +224,23 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc - for ClientStreamingEchoServer { + impl ClientStreamingSvc for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.client_streaming_echo(request).await - }; + let fut = async move { inner.client_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -293,56 +253,41 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc - for BidirectionalStreamingEchoServer { + impl StreamingSvc for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = + BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.bidirectional_streaming_echo(request).await - }; + let fut = + async move { inner.bidirectional_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server - .bidi_streaming( - BidirectionalStreamingEchoServer { - inner, - }, - req, - ) + .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) .await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } }