diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 94a2bd86..91b19d6f 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -34,6 +34,7 @@ axum = "0.5.9" async-stream = "0.3" flate2 = "1.0" aws-smithy-http = "0.54.1" +dyn-clone = "1.0.11" itertools.workspace = true urlencoding.workspace = true lazy_static.workspace = true diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index b6d8a7c2..4f73d2f5 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,24 +15,41 @@ * limitations under the License. */ -use std::{sync::Arc, task::Poll}; +use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll}; use aws_smithy_http::body::SdkBody; -use tower_service::Service; +use dubbo_base::Url; -use crate::{empty_body, protocol::BoxInvoker}; +use crate::{ + empty_body, + invocation::RpcInvocation, + protocol::{BoxInvoker, Invoker}, +}; pub mod directory; pub mod loadbalance; pub mod support; -pub trait Directory { - fn list(&self, meta: String) -> Vec; +pub trait Directory: Debug { + fn list(&self, invocation: Arc) -> Vec; fn is_empty(&self) -> bool; } -type BoxDirectory = Box; +type BoxDirectory = Box; +pub trait Cluster { + fn join(&self, dir: BoxDirectory) -> BoxInvoker; +} + +#[derive(Debug, Default)] +pub struct MockCluster {} + +impl Cluster for MockCluster { + fn join(&self, dir: BoxDirectory) -> BoxInvoker { + Box::new(FailoverCluster::new(dir)) + } +} +#[derive(Clone, Debug)] pub struct FailoverCluster { dir: Arc, } @@ -43,7 +60,7 @@ impl FailoverCluster { } } -impl Service> for FailoverCluster { +impl Invoker> for FailoverCluster { type Response = http::Response; type Error = crate::Error; @@ -66,7 +83,11 @@ impl Service> for FailoverCluster { .method(req.method().clone()); *clone_req.headers_mut().unwrap() = req.headers().clone(); let r = clone_req.body(clone_body).unwrap(); - let invokers = self.dir.list("service_name".to_string()); + let invokers = self.dir.list( + RpcInvocation::default() + .with_service_unique_name("hello".to_string()) + .into(), + ); for mut invoker in invokers { let fut = async move { let res = invoker.call(r).await; @@ -83,19 +104,75 @@ impl Service> for FailoverCluster { .unwrap()) }) } + + fn get_url(&self) -> dubbo_base::Url { + Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap() + } +} + +#[derive(Debug, Default)] +pub struct MockDirectory { + // router_chain: RouterChain, + invokers: Vec, } -pub struct MockDirectory {} +impl MockDirectory { + pub fn new(invokers: Vec) -> MockDirectory { + Self { + // router_chain: RouterChain::default(), + invokers, + } + } +} impl Directory for MockDirectory { - fn list(&self, _meta: String) -> Vec { + fn list(&self, _invo: Arc) -> Vec { // tracing::info!("MockDirectory: {}", meta); - // let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); + let _u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); // vec![Box::new(TripleInvoker::new(u))] - todo!() + // self.router_chain.route(u, invo); + self.invokers.clone() } fn is_empty(&self) -> bool { false } } + +#[derive(Debug, Default)] +pub struct RouterChain { + router: HashMap, + invokers: Vec, +} + +impl RouterChain { + pub fn route(&self, url: Url, invo: Arc) -> Vec { + let r = self.router.get("mock").unwrap(); + r.route(self.invokers.clone(), url, invo) + } +} + +pub trait Router: Debug { + fn route( + &self, + invokers: Vec, + url: Url, + invo: Arc, + ) -> Vec; +} + +pub type BoxRouter = Box; + +#[derive(Debug, Default)] +pub struct MockRouter {} + +impl Router for MockRouter { + fn route( + &self, + invokers: Vec, + _url: Url, + _invo: Arc, + ) -> Vec { + invokers + } +} diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 58dca5fd..145bcc8e 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -23,6 +23,7 @@ use std::{ use async_trait::async_trait; use aws_smithy_http::body::SdkBody; +use dyn_clone::DynClone; use tower_service::Service; use dubbo_base::Url; @@ -43,7 +44,7 @@ pub trait Exporter { fn unexport(&self); } -pub trait Invoker: Debug { +pub trait Invoker: Debug + DynClone { type Response; type Error; @@ -68,6 +69,15 @@ pub type BoxInvoker = Box< + Sync, >; +dyn_clone::clone_trait_object!( + Invoker< + http::Request, + Response = http::Response, + Error = crate::Error, + Future = crate::BoxFuture, crate::Error>, + > +); + pub struct WrapperInvoker(T); impl Service> for WrapperInvoker diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 6139cc91..fb661f9e 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -17,24 +17,32 @@ use aws_smithy_http::body::SdkBody; use dubbo_base::Url; -use std::fmt::{Debug, Formatter}; +use std::{ + fmt::{Debug, Formatter}, + str::FromStr, +}; use tower_service::Service; -use crate::{protocol::Invoker, triple::client::builder::ClientBoxService}; +use crate::{ + protocol::Invoker, + triple::{client::builder::ClientBoxService, transport::connection::Connection}, + utils::boxed_clone::BoxCloneService, +}; +#[derive(Clone)] pub struct TripleInvoker { url: Url, conn: ClientBoxService, } impl TripleInvoker { - // pub fn new(url: Url) -> TripleInvoker { - // let uri = http::Uri::from_str(&url.to_url()).unwrap(); - // Self { - // url, - // conn: ClientBuilder::from_uri(&uri).build()connect(), - // } - // } + pub fn new(url: Url) -> TripleInvoker { + let uri = http::Uri::from_str(&url.to_url()).unwrap(); + Self { + url, + conn: BoxCloneService::new(Connection::new().with_host(uri)), + } + } } impl Debug for TripleInvoker { diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index cf667ccd..29957a6f 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -16,18 +16,19 @@ */ use crate::{ - cluster::directory::StaticDirectory, - codegen::{ClusterInvoker, Directory, RegistryDirectory}, + cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory}, + codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker}, triple::compression::CompressionEncoding, - utils::boxed::BoxService, + utils::boxed_clone::BoxCloneService, }; use aws_smithy_http::body::SdkBody; +use dubbo_base::Url; use super::TripleClient; pub type ClientBoxService = - BoxService, http::Response, crate::Error>; + BoxCloneService, http::Response, crate::Error>; #[derive(Clone, Debug, Default)] pub struct ClientBuilder { @@ -35,6 +36,8 @@ pub struct ClientBuilder { pub connector: &'static str, directory: Option>, cluster_invoker: Option, + pub direct: bool, + host: String, } impl ClientBuilder { @@ -44,6 +47,8 @@ impl ClientBuilder { connector: "", directory: None, cluster_invoker: None, + direct: false, + host: "".to_string(), } } @@ -53,15 +58,8 @@ impl ClientBuilder { connector: "", directory: Some(Box::new(StaticDirectory::new(&host))), cluster_invoker: None, - } - } - - pub fn from_uri(uri: &http::Uri) -> ClientBuilder { - Self { - timeout: None, - connector: "", - directory: Some(Box::new(StaticDirectory::from_uri(&uri))), - cluster_invoker: None, + direct: true, + host: host.clone().to_string(), } } @@ -104,11 +102,29 @@ impl ClientBuilder { } } + pub fn with_direct(self, direct: bool) -> Self { + Self { direct, ..self } + } + pub fn build(self) -> TripleClient { - TripleClient { + let mut cli = TripleClient { send_compression_encoding: Some(CompressionEncoding::Gzip), directory: self.directory, cluster_invoker: self.cluster_invoker, + invoker: None, + }; + if self.direct { + cli.invoker = Some(Box::new(TripleInvoker::new( + Url::from_url(&self.host).unwrap(), + ))); + return cli; } + + let cluster = MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new( + TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()), + )]))); + + cli.invoker = Some(cluster); + cli } } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 56edb96b..eb7934dd 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -30,6 +30,7 @@ use crate::codegen::{ClusterInvoker, Directory, RpcInvocation}; use crate::{ cluster::support::cluster_invoker::ClusterRequestBuilder, invocation::{IntoStreamingRequest, Metadata, Request, Response}, + protocol::BoxInvoker, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; @@ -38,11 +39,12 @@ pub struct TripleClient { pub(crate) send_compression_encoding: Option, pub(crate) directory: Option>, pub(crate) cluster_invoker: Option, + pub invoker: Option, } impl TripleClient { pub fn connect(host: String) -> Self { - let builder = ClientBuilder::from_static(&host); + let builder = ClientBuilder::from_static(&host).with_direct(true); builder.build() } @@ -135,7 +137,7 @@ impl TripleClient { req: Request, mut codec: C, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + _invocation: RpcInvocation, ) -> Result, crate::status::Status> where C: Codec, @@ -150,27 +152,14 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(body_stream); - let sdk_body = SdkBody::from(body); - let arc_invocation = Arc::new(invocation); - let req; - let http_uri; - if self.cluster_invoker.is_some() { - let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone(); - req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body); - http_uri = req.uri().clone(); - } else { - let url_list = self - .directory - .as_ref() - .expect("msg") - .list(arc_invocation.clone()); - let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); - http_uri = - http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); - req = self.map_request(http_uri.clone(), path, sdk_body); - } + let bytes = hyper::body::to_bytes(body).await.unwrap(); + let sdk_body = SdkBody::from(bytes); + + // let mut conn = Connection::new().with_host(http_uri); + let mut conn = self.invoker.clone().unwrap(); + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); + let req = self.map_request(http_uri.clone(), path, sdk_body); - let mut conn = Connection::new().with_host(http_uri); let response = conn .call(req) .await