From 0fcb29fde2a79334b8a169f33a55092087f732b5 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Thu, 11 May 2023 22:54:28 +0800 Subject: [PATCH 1/6] refactor(cluster): add Cluster MockImpl --- dubbo/Cargo.toml | 1 + dubbo/src/cluster/mod.rs | 82 ++++++++++++++++++--- dubbo/src/protocol/mod.rs | 10 ++- dubbo/src/protocol/triple/triple_invoker.rs | 19 +++-- dubbo/src/triple/client/builder.rs | 4 +- 5 files changed, 94 insertions(+), 22 deletions(-) diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 94a2bd86..91b19d6f 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -34,6 +34,7 @@ axum = "0.5.9" async-stream = "0.3" flate2 = "1.0" aws-smithy-http = "0.54.1" +dyn-clone = "1.0.11" itertools.workspace = true urlencoding.workspace = true lazy_static.workspace = true diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index b6d8a7c2..e7b0a03a 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,24 +15,38 @@ * limitations under the License. */ -use std::{sync::Arc, task::Poll}; +use std::{sync::Arc, task::Poll, fmt::Debug, collections::HashMap}; use aws_smithy_http::body::SdkBody; +use dubbo_base::Url; use tower_service::Service; -use crate::{empty_body, protocol::BoxInvoker}; +use crate::{empty_body, protocol::{BoxInvoker, Invoker}, invocation::RpcInvocation}; pub mod directory; pub mod loadbalance; pub mod support; -pub trait Directory { - fn list(&self, meta: String) -> Vec; +pub trait Directory: Debug { + fn list(&self, invocation: Arc) -> Vec; fn is_empty(&self) -> bool; } -type BoxDirectory = Box; +type BoxDirectory = Box; +pub trait Cluster { + fn join(&self, dir: BoxDirectory) -> BoxInvoker; +} + +#[derive(Debug)] +pub struct MockCluster {} + +impl Cluster for MockCluster { + fn join(&self, dir: BoxDirectory) -> BoxInvoker { + Box::new(FailoverCluster::new(dir)) + } +} +#[derive(Clone, Debug)] pub struct FailoverCluster { dir: Arc, } @@ -43,7 +57,7 @@ impl FailoverCluster { } } -impl Service> for FailoverCluster { +impl Invoker> for FailoverCluster { type Response = http::Response; type Error = crate::Error; @@ -66,7 +80,9 @@ impl Service> for FailoverCluster { .method(req.method().clone()); *clone_req.headers_mut().unwrap() = req.headers().clone(); let r = clone_req.body(clone_body).unwrap(); - let invokers = self.dir.list("service_name".to_string()); + let invokers = self.dir.list( + RpcInvocation::default().with_service_unique_name("hello".to_string()).into() + ); for mut invoker in invokers { let fut = async move { let res = invoker.call(r).await; @@ -83,19 +99,63 @@ impl Service> for FailoverCluster { .unwrap()) }) } + + fn get_url(&self) -> dubbo_base::Url { + Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap() + } +} + +#[derive(Debug, Default)] +pub struct MockDirectory { + router_chain: RouterChain, + invokers: Vec } -pub struct MockDirectory {} +impl MockDirectory { + pub fn new(invokers: Vec) -> MockDirectory { + Self { router_chain: RouterChain::default(), invokers } + } +} impl Directory for MockDirectory { - fn list(&self, _meta: String) -> Vec { + fn list(&self, invo: Arc) -> Vec { // tracing::info!("MockDirectory: {}", meta); - // let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); + let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); // vec![Box::new(TripleInvoker::new(u))] - todo!() + self.router_chain.route(u, invo); + self.invokers.clone() } fn is_empty(&self) -> bool { false } } + +#[derive(Debug, Default)] +pub struct RouterChain { + router: HashMap, + invokers: Vec +} + +impl RouterChain { + pub fn route(&self, url: Url, invo: Arc) -> Vec { + let r = self.router.get("mock").unwrap(); + r.route(self.invokers.clone(), url, invo) + } +} + + +pub trait Router: Debug{ + fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec; +} + +pub type BoxRouter = Box; + +#[derive(Debug, Default)] +pub struct MockRouter {} + +impl Router for MockRouter { + fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec { + invokers + } +} \ No newline at end of file diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 58dca5fd..629513e6 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -23,6 +23,7 @@ use std::{ use async_trait::async_trait; use aws_smithy_http::body::SdkBody; +use dyn_clone::DynClone; use tower_service::Service; use dubbo_base::Url; @@ -43,7 +44,7 @@ pub trait Exporter { fn unexport(&self); } -pub trait Invoker: Debug { +pub trait Invoker: Debug + DynClone { type Response; type Error; @@ -68,6 +69,13 @@ pub type BoxInvoker = Box< + Sync, >; +dyn_clone::clone_trait_object!(Invoker< + http::Request, + Response = http::Response, + Error = crate::Error, + Future = crate::BoxFuture, crate::Error>, +>); + pub struct WrapperInvoker(T); impl Service> for WrapperInvoker diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 6139cc91..fb47bbed 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -18,23 +18,26 @@ use aws_smithy_http::body::SdkBody; use dubbo_base::Url; use std::fmt::{Debug, Formatter}; +use std::str::FromStr; use tower_service::Service; -use crate::{protocol::Invoker, triple::client::builder::ClientBoxService}; +use crate::{protocol::Invoker, triple::client::builder::ClientBoxService, utils::boxed_clone::BoxCloneService}; +use crate::triple::transport::connection::Connection; +#[derive(Clone)] pub struct TripleInvoker { url: Url, conn: ClientBoxService, } impl TripleInvoker { - // pub fn new(url: Url) -> TripleInvoker { - // let uri = http::Uri::from_str(&url.to_url()).unwrap(); - // Self { - // url, - // conn: ClientBuilder::from_uri(&uri).build()connect(), - // } - // } + pub fn new(url: Url) -> TripleInvoker { + let uri = http::Uri::from_str(&url.to_url()).unwrap(); + Self { + url, + conn: BoxCloneService::new(Connection::new().with_host(uri)), + } + } } impl Debug for TripleInvoker { diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index cf667ccd..ea1fd26d 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -19,7 +19,7 @@ use crate::{ cluster::directory::StaticDirectory, codegen::{ClusterInvoker, Directory, RegistryDirectory}, triple::compression::CompressionEncoding, - utils::boxed::BoxService, + utils::{boxed::BoxService, boxed_clone::BoxCloneService}, }; use aws_smithy_http::body::SdkBody; @@ -27,7 +27,7 @@ use aws_smithy_http::body::SdkBody; use super::TripleClient; pub type ClientBoxService = - BoxService, http::Response, crate::Error>; + BoxCloneService, http::Response, crate::Error>; #[derive(Clone, Debug, Default)] pub struct ClientBuilder { From 2cd76b66caeaf168b4f218d2138b55c84df65b1a Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Thu, 11 May 2023 23:46:44 +0800 Subject: [PATCH 2/6] refactor(triple): use ClientBuilder to init Cluster ability --- dubbo/src/cluster/mod.rs | 43 ++++++++++++++------- dubbo/src/protocol/mod.rs | 14 ++++--- dubbo/src/protocol/triple/triple_invoker.rs | 13 +++++-- dubbo/src/triple/client/builder.rs | 39 +++++++++++++++++-- dubbo/src/triple/client/triple.rs | 31 +++++---------- 5 files changed, 92 insertions(+), 48 deletions(-) diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index e7b0a03a..d2c2adb7 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,13 +15,16 @@ * limitations under the License. */ -use std::{sync::Arc, task::Poll, fmt::Debug, collections::HashMap}; +use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll}; use aws_smithy_http::body::SdkBody; use dubbo_base::Url; -use tower_service::Service; -use crate::{empty_body, protocol::{BoxInvoker, Invoker}, invocation::RpcInvocation}; +use crate::{ + empty_body, + invocation::RpcInvocation, + protocol::{BoxInvoker, Invoker}, +}; pub mod directory; pub mod loadbalance; @@ -38,7 +41,7 @@ pub trait Cluster { fn join(&self, dir: BoxDirectory) -> BoxInvoker; } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockCluster {} impl Cluster for MockCluster { @@ -81,7 +84,9 @@ impl Invoker> for FailoverCluster { *clone_req.headers_mut().unwrap() = req.headers().clone(); let r = clone_req.body(clone_body).unwrap(); let invokers = self.dir.list( - RpcInvocation::default().with_service_unique_name("hello".to_string()).into() + RpcInvocation::default() + .with_service_unique_name("hello".to_string()) + .into(), ); for mut invoker in invokers { let fut = async move { @@ -108,12 +113,15 @@ impl Invoker> for FailoverCluster { #[derive(Debug, Default)] pub struct MockDirectory { router_chain: RouterChain, - invokers: Vec + invokers: Vec, } impl MockDirectory { pub fn new(invokers: Vec) -> MockDirectory { - Self { router_chain: RouterChain::default(), invokers } + Self { + router_chain: RouterChain::default(), + invokers, + } } } @@ -134,7 +142,7 @@ impl Directory for MockDirectory { #[derive(Debug, Default)] pub struct RouterChain { router: HashMap, - invokers: Vec + invokers: Vec, } impl RouterChain { @@ -144,9 +152,13 @@ impl RouterChain { } } - -pub trait Router: Debug{ - fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec; +pub trait Router: Debug { + fn route( + &self, + invokers: Vec, + url: Url, + invo: Arc, + ) -> Vec; } pub type BoxRouter = Box; @@ -155,7 +167,12 @@ pub type BoxRouter = Box; pub struct MockRouter {} impl Router for MockRouter { - fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec { + fn route( + &self, + invokers: Vec, + _url: Url, + _invo: Arc, + ) -> Vec { invokers } -} \ No newline at end of file +} diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 629513e6..145bcc8e 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -69,12 +69,14 @@ pub type BoxInvoker = Box< + Sync, >; -dyn_clone::clone_trait_object!(Invoker< - http::Request, - Response = http::Response, - Error = crate::Error, - Future = crate::BoxFuture, crate::Error>, ->); +dyn_clone::clone_trait_object!( + Invoker< + http::Request, + Response = http::Response, + Error = crate::Error, + Future = crate::BoxFuture, crate::Error>, + > +); pub struct WrapperInvoker(T); diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index fb47bbed..fb661f9e 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -17,12 +17,17 @@ use aws_smithy_http::body::SdkBody; use dubbo_base::Url; -use std::fmt::{Debug, Formatter}; -use std::str::FromStr; +use std::{ + fmt::{Debug, Formatter}, + str::FromStr, +}; use tower_service::Service; -use crate::{protocol::Invoker, triple::client::builder::ClientBoxService, utils::boxed_clone::BoxCloneService}; -use crate::triple::transport::connection::Connection; +use crate::{ + protocol::Invoker, + triple::{client::builder::ClientBoxService, transport::connection::Connection}, + utils::boxed_clone::BoxCloneService, +}; #[derive(Clone)] pub struct TripleInvoker { diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index ea1fd26d..26ab9e9f 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -16,13 +16,14 @@ */ use crate::{ - cluster::directory::StaticDirectory, - codegen::{ClusterInvoker, Directory, RegistryDirectory}, + cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory}, + codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker}, triple::compression::CompressionEncoding, - utils::{boxed::BoxService, boxed_clone::BoxCloneService}, + utils::boxed_clone::BoxCloneService, }; use aws_smithy_http::body::SdkBody; +use dubbo_base::Url; use super::TripleClient; @@ -35,6 +36,9 @@ pub struct ClientBuilder { pub connector: &'static str, directory: Option>, cluster_invoker: Option, + pub direct: bool, + host: String, + uri: Option, } impl ClientBuilder { @@ -44,6 +48,9 @@ impl ClientBuilder { connector: "", directory: None, cluster_invoker: None, + direct: true, + uri: None, + host: "".to_string(), } } @@ -53,6 +60,9 @@ impl ClientBuilder { connector: "", directory: Some(Box::new(StaticDirectory::new(&host))), cluster_invoker: None, + direct: true, + uri: None, + host: host.clone().to_string(), } } @@ -62,6 +72,9 @@ impl ClientBuilder { connector: "", directory: Some(Box::new(StaticDirectory::from_uri(&uri))), cluster_invoker: None, + direct: true, + uri: Some(uri.clone()), + host: "".to_string(), } } @@ -104,11 +117,29 @@ impl ClientBuilder { } } + pub fn with_direct(self, direct: bool) -> Self { + Self { direct, ..self } + } + pub fn build(self) -> TripleClient { - TripleClient { + let mut cli = TripleClient { send_compression_encoding: Some(CompressionEncoding::Gzip), directory: self.directory, cluster_invoker: self.cluster_invoker, + invoker: None, + }; + if self.direct { + cli.invoker = Some(Box::new(TripleInvoker::new( + Url::from_url(&self.host).unwrap(), + ))); + return cli; } + + let cluster = MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new( + TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()), + )]))); + + cli.invoker = Some(cluster); + cli } } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 56edb96b..3585b486 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -30,6 +30,7 @@ use crate::codegen::{ClusterInvoker, Directory, RpcInvocation}; use crate::{ cluster::support::cluster_invoker::ClusterRequestBuilder, invocation::{IntoStreamingRequest, Metadata, Request, Response}, + protocol::BoxInvoker, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; @@ -38,11 +39,12 @@ pub struct TripleClient { pub(crate) send_compression_encoding: Option, pub(crate) directory: Option>, pub(crate) cluster_invoker: Option, + pub invoker: Option, } impl TripleClient { pub fn connect(host: String) -> Self { - let builder = ClientBuilder::from_static(&host); + let builder = ClientBuilder::from_static(&host).with_direct(true); builder.build() } @@ -150,27 +152,14 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(body_stream); - let sdk_body = SdkBody::from(body); - let arc_invocation = Arc::new(invocation); - let req; - let http_uri; - if self.cluster_invoker.is_some() { - let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone(); - req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body); - http_uri = req.uri().clone(); - } else { - let url_list = self - .directory - .as_ref() - .expect("msg") - .list(arc_invocation.clone()); - let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); - http_uri = - http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); - req = self.map_request(http_uri.clone(), path, sdk_body); - } + let bytes = hyper::body::to_bytes(body).await.unwrap(); + let sdk_body = SdkBody::from(bytes); + + // let mut conn = Connection::new().with_host(http_uri); + let mut conn = self.invoker.clone().unwrap(); + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); + let req = self.map_request(http_uri.clone(), path, sdk_body); - let mut conn = Connection::new().with_host(http_uri); let response = conn .call(req) .await From 3cba7dc269a9ea0c171b06a1dfe9a1963bff7527 Mon Sep 17 00:00:00 2001 From: Yang Yang <962032265q@gmail.com> Date: Tue, 16 May 2023 10:51:59 +0800 Subject: [PATCH 3/6] Update builder.rs update default direct value --- dubbo/src/triple/client/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 26ab9e9f..5ec8bda4 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -48,7 +48,7 @@ impl ClientBuilder { connector: "", directory: None, cluster_invoker: None, - direct: true, + direct: false, uri: None, host: "".to_string(), } From 1e7285e799e13e0fd569cb1c81f2f01d5ee72ca1 Mon Sep 17 00:00:00 2001 From: Yang Yang <962032265q@gmail.com> Date: Tue, 16 May 2023 10:57:51 +0800 Subject: [PATCH 4/6] Update triple.rs handle unused var --- dubbo/src/triple/client/triple.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 3585b486..eb7934dd 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -137,7 +137,7 @@ impl TripleClient { req: Request, mut codec: C, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + _invocation: RpcInvocation, ) -> Result, crate::status::Status> where C: Codec, From 2c567a73c08c14b3e9507f1557f07797d7d055ec Mon Sep 17 00:00:00 2001 From: Yang Yang <962032265q@gmail.com> Date: Tue, 16 May 2023 11:17:10 +0800 Subject: [PATCH 5/6] Update mod.rs comment some codes --- dubbo/src/cluster/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index d2c2adb7..4f73d2f5 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -112,25 +112,25 @@ impl Invoker> for FailoverCluster { #[derive(Debug, Default)] pub struct MockDirectory { - router_chain: RouterChain, + // router_chain: RouterChain, invokers: Vec, } impl MockDirectory { pub fn new(invokers: Vec) -> MockDirectory { Self { - router_chain: RouterChain::default(), + // router_chain: RouterChain::default(), invokers, } } } impl Directory for MockDirectory { - fn list(&self, invo: Arc) -> Vec { + fn list(&self, _invo: Arc) -> Vec { // tracing::info!("MockDirectory: {}", meta); - let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); + let _u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); // vec![Box::new(TripleInvoker::new(u))] - self.router_chain.route(u, invo); + // self.router_chain.route(u, invo); self.invokers.clone() } From 695a880f65c75296f15feedafb64d6c4a99eff6a Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Tue, 16 May 2023 21:22:42 +0800 Subject: [PATCH 6/6] refactor(triple): rm unused var in clientBuilder --- dubbo/src/triple/client/builder.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 5ec8bda4..29957a6f 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -38,7 +38,6 @@ pub struct ClientBuilder { cluster_invoker: Option, pub direct: bool, host: String, - uri: Option, } impl ClientBuilder { @@ -49,7 +48,6 @@ impl ClientBuilder { directory: None, cluster_invoker: None, direct: false, - uri: None, host: "".to_string(), } } @@ -61,23 +59,10 @@ impl ClientBuilder { directory: Some(Box::new(StaticDirectory::new(&host))), cluster_invoker: None, direct: true, - uri: None, host: host.clone().to_string(), } } - pub fn from_uri(uri: &http::Uri) -> ClientBuilder { - Self { - timeout: None, - connector: "", - directory: Some(Box::new(StaticDirectory::from_uri(&uri))), - cluster_invoker: None, - direct: true, - uri: Some(uri.clone()), - host: "".to_string(), - } - } - pub fn with_timeout(self, timeout: u64) -> Self { Self { timeout: Some(timeout),