diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 19d8c4d0..878a5e13 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -6,7 +6,9 @@ on: push: branches: ["*"] pull_request: - branches: ["*"] + branches: + - '*' + - 'refact/*' jobs: diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs index cb3bc09a..b97b342f 100644 --- a/common/base/src/lib.rs +++ b/common/base/src/lib.rs @@ -23,4 +23,4 @@ pub mod node; pub mod url; pub use node::Node; -pub use url::Url; \ No newline at end of file +pub use url::Url; diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs index 1a8149c6..8a00c9fb 100644 --- a/dubbo/src/cluster/failover.rs +++ b/dubbo/src/cluster/failover.rs @@ -2,18 +2,17 @@ use std::task::Poll; use futures_util::future; use http::Request; -use tower::{ServiceExt, retry::Retry, util::Oneshot}; +use tower::{retry::Retry, util::Oneshot, ServiceExt}; use tower_service::Service; - + use crate::StdError; pub struct Failover { - inner: N // loadbalancer service + inner: N, // loadbalancer service } #[derive(Clone)] pub struct FailoverPolicy; - impl Failover { pub fn new(inner: N) -> Self { @@ -25,14 +24,13 @@ impl tower::retry::Policy, Res, E> for FailoverPolicy where B: http_body::Body + Clone, { - type Future = future::Ready; - fn retry(&self, req: &Request, result: Result<&Res, &E>) -> Option { + fn retry(&self, _req: &Request, result: Result<&Res, &E>) -> Option { //TODO some error handling or logging match result { Ok(_) => None, - Err(_) => Some(future::ready(self.clone())) + Err(_) => Some(future::ready(self.clone())), } } @@ -43,21 +41,18 @@ where *clone.headers_mut() = req.headers().clone(); *clone.version_mut() = req.version(); - Some(clone) } } - - -impl Service> for Failover +impl Service> for Failover where // B is CloneBody B: http_body::Body + Clone, // loadbalancer service - N: Service> + Clone + 'static , + N: Service> + Clone + 'static, N::Error: Into, - N::Future: Send + N::Future: Send, { type Response = N::Response; @@ -68,9 +63,9 @@ where fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - + fn call(&mut self, req: Request) -> Self::Future { let retry = Retry::new(FailoverPolicy, self.inner.clone()); retry.oneshot(req) } -} \ No newline at end of file +} diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index 0b1654a0..1a20c160 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,11 +15,12 @@ * limitations under the License. */ - use http::Request; use tower_service::Service; -use crate::{codegen::RpcInvocation, svc::NewService, param::Param, invoker::clone_body::CloneBody}; +use crate::{ + codegen::RpcInvocation, invoker::clone_body::CloneBody, param::Param, svc::NewService, +}; use self::failover::Failover; @@ -30,12 +31,10 @@ pub struct NewCluster { } pub struct Cluster { - inner: S // failover service + inner: S, // failover service } - impl NewCluster { - pub fn layer() -> impl tower_layer::Layer { tower_layer::layer_fn(|inner: N| { NewCluster { @@ -43,45 +42,44 @@ impl NewCluster { } }) } +} -} - -impl NewService for NewCluster +impl NewService for NewCluster where - T: Param, + T: Param, // new loadbalancer service S: NewService, -{ - +{ type Service = Cluster>; - - fn new_service(&self, target: T) -> Self::Service { + fn new_service(&self, target: T) -> Self::Service { Cluster { - inner: Failover::new(self.inner.new_service(target)) + inner: Failover::new(self.inner.new_service(target)), } } } - -impl Service> for Cluster + +impl Service> for Cluster where S: Service>, { - type Response = S::Response; type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.inner.poll_ready(cx) } - + fn call(&mut self, req: Request) -> Self::Future { let (parts, body) = req.into_parts(); let clone_body = CloneBody::new(body); - let req = Request::from_parts(parts, clone_body); + let req = Request::from_parts(parts, clone_body); self.inner.call(req) } } diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 38f8f1dc..452f560d 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -44,8 +44,7 @@ pub use super::{ pub use crate::{ filter::{service::FilterService, Filter}, triple::{ - client::builder::ClientBuilder, - server::builder::ServerBuilder, + client::builder::ClientBuilder, server::builder::ServerBuilder, transport::connection::Connection, }, }; diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs index a4cf4669..84900aab 100644 --- a/dubbo/src/directory/mod.rs +++ b/dubbo/src/directory/mod.rs @@ -16,107 +16,109 @@ */ use std::{ - task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, pin::Pin, + collections::HashMap, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use crate::{ + codegen::{RpcInvocation, TripleInvoker}, + invocation::Invocation, + invoker::{clone_invoker::CloneInvoker, NewInvoker}, + param::Param, + registry::n_registry::Registry, + svc::NewService, + StdError, }; - -use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, invocation::Invocation, registry::n_registry::Registry, invoker::{NewInvoker,clone_invoker::CloneInvoker}, svc::NewService, param::Param}; use dubbo_logger::tracing::debug; use futures_core::ready; use futures_util::future; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tower::{ - discover::{Change, Discover}, buffer::Buffer, + buffer::Buffer, + discover::{Change, Discover}, }; use tower_service::Service; -type BufferedDirectory = Buffer, StdError>>>, ()>; +type BufferedDirectory = + Buffer, StdError>>>, ()>; pub struct NewCachedDirectory where N: Registry + Clone + Send + Sync + 'static, { - inner: CachedDirectory, RpcInvocation> + inner: CachedDirectory, RpcInvocation>, } - -pub struct CachedDirectory +pub struct CachedDirectory where - // NewDirectory - N: NewService + // NewDirectory + N: NewService, { inner: N, - cache: Arc>> + cache: Arc>>, } - pub struct NewDirectory { // registry inner: N, } - pub struct Directory { directory: HashMap>, discover: D, new_invoker: NewInvoker, } - - -impl NewCachedDirectory +impl NewCachedDirectory where N: Registry + Clone + Send + Sync + 'static, { - pub fn layer() -> impl tower_layer::Layer { tower_layer::layer_fn(|inner: N| { NewCachedDirectory { - // inner is registry - inner: CachedDirectory::new(NewDirectory::new(inner)), + // inner is registry + inner: CachedDirectory::new(NewDirectory::new(inner)), } }) } } - impl NewService for NewCachedDirectory where T: Param, // service registry N: Registry + Clone + Send + Sync + 'static, { - type Service = BufferedDirectory; + type Service = BufferedDirectory; fn new_service(&self, target: T) -> Self::Service { - self.inner.new_service(target.param()) } -} - - -impl CachedDirectory +} + +impl CachedDirectory where - N: NewService + N: NewService, { - pub fn new(inner: N) -> Self { CachedDirectory { inner, - cache: Default::default() + cache: Default::default(), } - } -} - - -impl NewService for CachedDirectory + } +} + +impl NewService for CachedDirectory where T: Param, // NewDirectory N: NewService, // Buffered directory - N::Service: Clone + N::Service: Clone, { type Service = N::Service; @@ -124,44 +126,35 @@ where let rpc_invocation = target.param(); let service_name = rpc_invocation.get_target_service_unique_name(); let mut cache = self.cache.lock().expect("cached directory lock failed."); - let value = cache.get(&service_name).map(|val|val.clone()); + let value = cache.get(&service_name).map(|val| val.clone()); match value { None => { let new_service = self.inner.new_service(target); cache.insert(service_name, new_service.clone()); new_service - }, - Some(value) => value - } + } + Some(value) => value, + } } } - impl NewDirectory { - const MAX_DIRECTORY_BUFFER_SIZE: usize = 16; pub fn new(inner: N) -> Self { - NewDirectory { - inner - } + NewDirectory { inner } } -} - - +} -impl NewService for NewDirectory +impl NewService for NewDirectory where T: Param, // service registry - N: Registry + Clone + Send + Sync + 'static, + N: Registry + Clone + Send + Sync + 'static, { - type Service = BufferedDirectory; + type Service = BufferedDirectory; - - fn new_service(&self, target: T) -> Self::Service { - let service_name = target.param().get_target_service_unique_name(); let registry = self.inner.clone(); @@ -172,39 +165,35 @@ where let receiver = registry.subscribe(service_name).await; debug!("discover start!"); match receiver { - Err(e) => { + Err(_e) => { // error!("discover stream error: {}", e); debug!("discover stream error"); - }, - Ok(mut receiver) => { - loop { - let change = receiver.recv().await; - debug!("receive change: {:?}", change); - match change { - None => { - debug!("discover stream closed."); - break; - }, - Some(change) => { - let _ = tx.send(change).await; - } + } + Ok(mut receiver) => loop { + let change = receiver.recv().await; + debug!("receive change: {:?}", change); + match change { + None => { + debug!("discover stream closed."); + break; + } + Some(change) => { + let _ = tx.send(change).await; } } - } + }, } + }); - }); - - Buffer::new(Directory::new(ReceiverStream::new(rx)), Self::MAX_DIRECTORY_BUFFER_SIZE) - } - -} - - -impl Directory { + Buffer::new( + Directory::new(ReceiverStream::new(rx)), + Self::MAX_DIRECTORY_BUFFER_SIZE, + ) + } +} +impl Directory { pub fn new(discover: D) -> Self { - Directory { directory: Default::default(), discover, @@ -213,13 +202,12 @@ impl Directory { } } - -impl Service<()> for Directory +impl Service<()> for Directory where // Discover - D: Discover + Unpin + Send, - D::Error: Into -{ + D: Discover + Unpin + Send, + D::Error: Into, +{ type Response = Vec>; type Error = StdError; @@ -227,21 +215,22 @@ where type Future = future::Ready>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { let pin_discover = Pin::new(&mut self.discover); - let change = ready!(pin_discover.poll_discover(cx)).transpose().map_err(|e| e.into())?; + let change = ready!(pin_discover.poll_discover(cx)) + .transpose() + .map_err(|e| e.into())?; match change { Some(Change::Remove(key)) => { debug!("remove key: {}", key); self.directory.remove(&key); - }, + } Some(Change::Insert(key, _)) => { debug!("insert key: {}", key); let invoker = self.new_invoker.new_service(key.clone()); self.directory.insert(key, invoker); - }, - None => { + } + None => { debug!("stream closed"); return Poll::Ready(Ok(())); } @@ -250,7 +239,11 @@ where } fn call(&mut self, _: ()) -> Self::Future { - let vec = self.directory.values().map(|val|val.clone()).collect::>>(); + let vec = self + .directory + .values() + .map(|val| val.clone()) + .collect::>>(); future::ok(vec) } -} \ No newline at end of file +} diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs index 5ce2e1f9..4de8f899 100644 --- a/dubbo/src/invoker/clone_body.rs +++ b/dubbo/src/invoker/clone_body.rs @@ -7,20 +7,20 @@ use std::{ use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures_core::ready; - + use http::HeaderMap; use http_body::Body; use pin_project::pin_project; use thiserror::Error; use crate::StdError; - + #[derive(Error, Debug)] -#[error("buffered body reach max capacity.")] +#[error("buffered body reach max capacity.")] pub struct ReachMaxCapacityError; pub struct BufferedBody { - shared: Arc>>, + shared: Arc>>, owned: Option, replay_body: bool, replay_trailers: bool, @@ -34,10 +34,7 @@ pub struct OwnedBufferedBody { buf: InnerBuffer, } - - impl BufferedBody { - pub fn new(body: hyper::Body, buf_size: usize) -> Self { let size_hint = body.size_hint(); let is_empty = body.is_end_stream(); @@ -57,11 +54,9 @@ impl BufferedBody { size_hint, } } - } impl Clone for BufferedBody { - fn clone(&self) -> Self { Self { shared: self.shared.clone(), @@ -86,7 +81,6 @@ impl Drop for BufferedBody { } impl Body for BufferedBody { - type Data = BytesData; type Error = StdError; @@ -106,14 +100,10 @@ impl Body for BufferedBody { data.take().expect("cannot get shared buffered body.") }); - - if mut_self.replay_body { mut_self.replay_body = false; if owned_body.buf.has_remaining() { - return Poll::Ready(Some(Ok(BytesData::BufferedBytes( - owned_body.buf.clone(), - )))); + return Poll::Ready(Some(Ok(BytesData::BufferedBytes(owned_body.buf.clone())))); } if owned_body.buf.is_capped() { @@ -150,10 +140,8 @@ impl Body for BufferedBody { } else { owned_body.buf.push_bytes(data.copy_to_bytes(len)) }; - - Poll::Ready(Some(Ok(BytesData::OriginBytes(data)))) - + Poll::Ready(Some(Ok(BytesData::OriginBytes(data)))) } fn poll_trailers( @@ -170,7 +158,7 @@ impl Body for BufferedBody { data.take().expect("cannot get shared buffered body.") }); - + if mut_self.replay_trailers { mut_self.replay_trailers = false; if let Some(ref trailers) = owned_body.trailers { @@ -184,7 +172,7 @@ impl Body for BufferedBody { owned_body.trailers = trailers.clone(); trailers }); - return Poll::Ready(trailers.map_err(|e|e.into())); + return Poll::Ready(trailers.map_err(|e| e.into())); } Poll::Ready(Ok(None)) @@ -195,9 +183,11 @@ impl Body for BufferedBody { return true; } - let is_end = self.owned.as_ref() - .map(|owned|owned.body.is_end_stream()) - .unwrap_or(false); + let is_end = self + .owned + .as_ref() + .map(|owned| owned.body.is_end_stream()) + .unwrap_or(false); !self.replay_body && !self.replay_trailers && is_end } @@ -205,12 +195,8 @@ impl Body for BufferedBody { fn size_hint(&self) -> http_body::SizeHint { self.size_hint.clone() } - - } - - #[derive(Clone)] pub struct InnerBuffer { bufs: VecDeque, @@ -328,13 +314,12 @@ pub struct CloneBody(#[pin] BufferedBody); impl CloneBody { pub fn new(inner_body: hyper::Body) -> Self { - let inner_body = BufferedBody::new(inner_body, 1024 * 64); + let inner_body = BufferedBody::new(inner_body, 1024 * 64); CloneBody(inner_body) } } -impl Body for CloneBody{ - +impl Body for CloneBody { type Data = BytesData; type Error = StdError; @@ -350,7 +335,7 @@ impl Body for CloneBody{ self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - self.project().0.poll_trailers(cx) + self.project().0.poll_trailers(cx) } fn size_hint(&self) -> http_body::SizeHint { @@ -358,9 +343,8 @@ impl Body for CloneBody{ } } - impl Clone for CloneBody { fn clone(&self) -> Self { Self(self.0.clone()) } -} \ No newline at end of file +} diff --git a/dubbo/src/invoker/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs index fe621b87..c1fa00d8 100644 --- a/dubbo/src/invoker/clone_invoker.rs +++ b/dubbo/src/invoker/clone_invoker.rs @@ -1,76 +1,81 @@ -use std::{task::Poll, pin::Pin, mem}; +use std::{mem, pin::Pin, task::Poll}; use dubbo_logger::tracing::debug; -use futures_core::{Future, TryFuture, ready, future::BoxFuture}; +use futures_core::{future::BoxFuture, ready, Future, TryFuture}; use futures_util::FutureExt; use pin_project::pin_project; use thiserror::Error; -use tokio::{task::JoinHandle, sync::{watch::{Sender, Receiver}, self}}; +use tokio::{ + sync::{ + self, + watch::{Receiver, Sender}, + }, + task::JoinHandle, +}; use tokio_util::sync::ReusableBoxFuture; -use tower::{ServiceExt, buffer::Buffer}; +use tower::{buffer::Buffer, ServiceExt}; use tower_service::Service; use crate::StdError; use super::clone_body::CloneBody; - + enum Inner { Invalid, Ready(S), Pending(JoinHandle>), -} +} #[derive(Debug, Error)] #[error("the inner service has not got ready yet!")] struct InnerServiceNotReadyErr; - - #[pin_project(project = InnerServiceCallingResponseProj)] enum InnerServiceCallingResponse { Call(#[pin] Fut), - Fail + Fail, } -impl Future for InnerServiceCallingResponse +impl Future for InnerServiceCallingResponse where Fut: TryFuture, - Fut::Error: Into + Fut::Error: Into, { type Output = Result; - + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - match self.project() { - InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into), - InnerServiceCallingResponseProj::Fail => Poll::Ready(Err(InnerServiceNotReadyErr.into())) - } + match self.project() { + InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into), + InnerServiceCallingResponseProj::Fail => { + Poll::Ready(Err(InnerServiceNotReadyErr.into())) + } + } } } - + #[derive(Clone)] enum ObserveState { Ready, Pending, } - struct ReadyService { inner: Inner, - tx: Sender + tx: Sender, } - impl ReadyService { - fn new(inner: S) -> (Self, Receiver) { let (tx, rx) = sync::watch::channel(ObserveState::Ready); - let ready_service = Self { inner: Inner::Ready(inner), tx}; + let ready_service = Self { + inner: Inner::Ready(inner), + tx, + }; (ready_service, rx) } - } -impl Service for ReadyService +impl Service for ReadyService where S: Service + Send + 'static, >::Error: Into, @@ -85,16 +90,14 @@ where loop { match mem::replace(&mut self.inner, Inner::Invalid) { Inner::Ready(mut svc) => { - let poll_ready = svc.poll_ready(cx); + let poll_ready = svc.poll_ready(cx); match poll_ready { Poll::Pending => { self.inner = Inner::Pending(tokio::spawn(async move { let poll_ready = svc.ready().await; match poll_ready { Ok(_) => Ok(svc), - Err(err) => { - Err((svc, err.into())) - } + Err(err) => Err((svc, err.into())), } })); @@ -108,17 +111,17 @@ where return Poll::Ready(ret.map_err(Into::into)); } } - }, + } Inner::Pending(mut join_handle) => { if let Poll::Ready(res) = join_handle.poll_unpin(cx) { let (svc, res) = match res { Err(join_err) => panic!("ReadyService panicked: {join_err}"), Ok(Err((svc, err))) => (svc, Poll::Ready(Err(err))), - Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))) + Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))), }; self.inner = Inner::Ready(svc); - + let _ = self.tx.send(ObserveState::Ready); return res; } else { @@ -127,31 +130,29 @@ where let _ = self.tx.send(ObserveState::Pending); return Poll::Pending; } - - }, - Inner::Invalid => panic!("ReadyService panicked: inner state is invalid") + } + Inner::Invalid => panic!("ReadyService panicked: inner state is invalid"), } } } fn call(&mut self, req: Req) -> Self::Future { - match self.inner { - Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)), - _ => InnerServiceCallingResponse::Fail - } + match self.inner { + Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)), + _ => InnerServiceCallingResponse::Fail, + } } } - impl Drop for ReadyService { fn drop(&mut self) { - if let Inner::Pending(ref handler) = self.inner { + if let Inner::Pending(ref handler) = self.inner { handler.abort(); - } + } } } -pub struct CloneInvoker +pub struct CloneInvoker where Inv: Service> + Send + 'static, Inv::Error: Into + Send + Sync + 'static, @@ -163,40 +164,44 @@ where polling: bool, } -impl CloneInvoker +impl CloneInvoker where Inv: Service> + Send + 'static, Inv::Error: Into + Send + Sync + 'static, Inv::Future: Send, { - const MAX_INVOKER_BUFFER_SIZE: usize = 16; - + pub fn new(invoker: Inv) -> Self { - let (ready_service, rx) = ReadyService::new(invoker); - let buffer: Buffer, http::Request> = Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE); + let buffer: Buffer, http::Request> = + Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE); - Self { inner: buffer, rx, polling: false, poll: ReusableBoxFuture::new(futures::future::pending()) } + Self { + inner: buffer, + rx, + polling: false, + poll: ReusableBoxFuture::new(futures::future::pending()), + } } } -impl Service> for CloneInvoker +impl Service> for CloneInvoker where Inv: Service> + Send + 'static, Inv::Error: Into + Send + Sync + 'static, Inv::Future: Send, { type Response = Inv::Response; - + type Error = StdError; - + type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { loop { - if !self.polling { + if !self.polling { match self.rx.borrow().clone() { ObserveState::Ready => return self.inner.poll_ready(cx), ObserveState::Pending => { @@ -206,7 +211,7 @@ where loop { let current_state = rx.borrow_and_update().clone(); if matches!(current_state, ObserveState::Ready) { - return current_state; + return current_state; } if let Err(_) = rx.changed().await { debug!("the readyService has already shutdown!"); @@ -216,7 +221,6 @@ where }); } } - } let state = ready!(self.poll.poll_unpin(cx)); @@ -235,14 +239,18 @@ where } } - -impl Clone for CloneInvoker +impl Clone for CloneInvoker where Inv: Service> + Send + 'static, Inv::Error: Into + Send + Sync + 'static, Inv::Future: Send, { fn clone(&self) -> Self { - Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, poll: ReusableBoxFuture::new(futures::future::pending())} + Self { + inner: self.inner.clone(), + rx: self.rx.clone(), + polling: false, + poll: ReusableBoxFuture::new(futures::future::pending()), + } } -} \ No newline at end of file +} diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs index a8179eee..92b8b462 100644 --- a/dubbo/src/invoker/mod.rs +++ b/dubbo/src/invoker/mod.rs @@ -1,14 +1,12 @@ use dubbo_base::Url; -use crate::{codegen::TripleInvoker, svc::NewService, invoker::clone_invoker::CloneInvoker}; +use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, svc::NewService}; pub mod clone_body; pub mod clone_invoker; - pub struct NewInvoker; - impl NewService for NewInvoker { type Service = CloneInvoker; @@ -18,4 +16,4 @@ impl NewService for NewInvoker { let url = Url::from_url(&url).unwrap(); CloneInvoker::new(TripleInvoker::new(url)) } -} \ No newline at end of file +} diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index 252e01c7..d397b42b 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -18,20 +18,20 @@ pub mod cluster; pub mod codegen; pub mod context; +pub mod directory; pub mod filter; mod framework; pub mod invocation; +pub mod invoker; +pub mod loadbalancer; +pub mod param; pub mod protocol; pub mod registry; +pub mod route; pub mod status; +pub mod svc; pub mod triple; pub mod utils; -pub mod directory; -pub mod route; -pub mod loadbalancer; -pub mod invoker; -pub mod param; -pub mod svc; use http_body::Body; use std::{future::Future, pin::Pin}; diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs index 334350e3..4e26781d 100644 --- a/dubbo/src/loadbalancer/mod.rs +++ b/dubbo/src/loadbalancer/mod.rs @@ -1,16 +1,20 @@ use futures_core::future::BoxFuture; -use tower::ServiceExt; -use tower::discover::ServiceList; +use tower::{discover::ServiceList, ServiceExt}; use tower_service::Service; -use crate::invoker::clone_body::CloneBody; -use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker}; +use crate::{ + codegen::RpcInvocation, + invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker}, + param::Param, + svc::NewService, + StdError, +}; use crate::protocol::triple::triple_invoker::TripleInvoker; - + pub struct NewLoadBalancer { inner: N, -} +} #[derive(Clone)] pub struct LoadBalancer { @@ -18,80 +22,74 @@ pub struct LoadBalancer { } impl NewLoadBalancer { - - pub fn layer() -> impl tower_layer::Layer{ - + pub fn layer() -> impl tower_layer::Layer { tower_layer::layer_fn(|inner| { - NewLoadBalancer { - inner // NewRoutes + inner, // NewRoutes } }) } } -impl NewService for NewLoadBalancer +impl NewService for NewLoadBalancer where T: Param + Clone, // NewRoutes - N: NewService, + N: NewService, { - type Service = LoadBalancer; fn new_service(&self, target: T) -> Self::Service { // Routes service let svc = self.inner.new_service(target); - LoadBalancer { - inner: svc, - } - + LoadBalancer { inner: svc } } } -impl Service> for LoadBalancer +impl Service> for LoadBalancer where // Routes service N: Service<(), Response = Vec>> + Clone, N::Error: Into + Send, - N::Future: Send + 'static, + N::Future: Send + 'static, { - type Response = as Service>>::Response; type Error = StdError; type Future = BoxFuture<'static, Result>; - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { self.inner.poll_ready(cx).map_err(Into::into) - } fn call(&mut self, req: http::Request) -> Self::Future { - let routes = self.inner.call(()); + let routes = self.inner.call(()); let fut = async move { let routes = routes.await; let routes: Vec> = match routes { Err(e) => return Err(Into::::into(e)), - Ok(routes) => routes + Ok(routes) => routes, }; - - let service_list: Vec<_> = routes.into_iter().map(|invoker| { - tower::load::Constant::new(invoker, 1) - }).collect(); + + let service_list: Vec<_> = routes + .into_iter() + .map(|invoker| tower::load::Constant::new(invoker, 1)) + .collect(); let service_list = ServiceList::new(service_list); - + let p2c = tower::balance::p2c::Balance::new(service_list); - p2c.oneshot(req).await }; - + Box::pin(fut) } } diff --git a/dubbo/src/param.rs b/dubbo/src/param.rs index b57f98eb..bef50419 100644 --- a/dubbo/src/param.rs +++ b/dubbo/src/param.rs @@ -1,12 +1,9 @@ pub trait Param { - fn param(&self) -> T; } - impl Param for T { - fn param(&self) -> T::Owned { - self.to_owned() + self.to_owned() } } diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 9d35a525..7dbf1f3f 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -15,9 +15,7 @@ * limitations under the License. */ -use std::{ - task::{Context, Poll}, -}; +use std::task::{Context, Poll}; use async_trait::async_trait; use aws_smithy_http::body::SdkBody; diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 71eae903..c8451e3c 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -16,14 +16,17 @@ */ use dubbo_base::Url; -use http::{Uri, HeaderValue}; +use http::{HeaderValue, Uri}; use std::{ fmt::{Debug, Formatter}, str::FromStr, }; use tower_service::Service; -use crate::{triple::transport::{connection::Connection, self}, invoker::clone_body::CloneBody}; +use crate::{ + invoker::clone_body::CloneBody, + triple::transport::{self, connection::Connection}, +}; pub struct TripleInvoker { url: Url, @@ -47,18 +50,19 @@ impl Debug for TripleInvoker { } impl TripleInvoker { - pub fn map_request( - &self, - req: http::Request, - ) -> http::Request { - + pub fn map_request(&self, req: http::Request) -> http::Request { let (parts, body) = req.into_parts(); let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap(); - let authority = self.url.clone().get_ip_port(); - - let uri = Uri::builder().scheme("http").authority(authority).path_and_query(path_and_query).build().unwrap(); + let authority = self.url.clone().get_ip_port(); + + let uri = Uri::builder() + .scheme("http") + .authority(authority) + .path_and_query(path_and_query) + .build() + .unwrap(); let mut req = hyper::Request::builder() .version(http::Version::HTTP_2) @@ -99,12 +103,12 @@ impl TripleInvoker { HeaderValue::from_static("dubbo-rust/0.1.0"), ); // if let Some(_encoding) = self.send_compression_encoding { - + // } req.headers_mut() - .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); - + .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); + req.headers_mut().insert( "grpc-accept-encoding", http::HeaderValue::from_static("gzip"), @@ -137,7 +141,10 @@ impl Service> for TripleInvoker { &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - >>::poll_ready(&mut self.conn, cx) + >>::poll_ready( + &mut self.conn, + cx, + ) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -146,4 +153,3 @@ impl Service> for TripleInvoker { self.conn.call(req) } } - diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index ab2297f3..d8ff6ef3 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -18,9 +18,9 @@ #![allow(unused_variables, dead_code, missing_docs)] pub mod integration; pub mod memory_registry; +pub mod n_registry; pub mod protocol; pub mod types; -pub mod n_registry; use std::{ fmt::{Debug, Formatter}, @@ -29,7 +29,6 @@ use std::{ use dubbo_base::Url; - pub type RegistryNotifyListener = Arc; pub trait Registry { fn register(&mut self, url: Url) -> Result<(), crate::StdError>; diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs index 9b6dca58..928c9b3c 100644 --- a/dubbo/src/registry/n_registry.rs +++ b/dubbo/src/registry/n_registry.rs @@ -2,19 +2,17 @@ use std::sync::Arc; use async_trait::async_trait; use dubbo_base::Url; -use tokio::sync::mpsc::{Receiver, channel}; +use tokio::sync::mpsc::{channel, Receiver}; use tower::discover::Change; - use crate::StdError; type DiscoverStream = Receiver, StdError>>; #[async_trait] pub trait Registry { - async fn register(&self, url: Url) -> Result<(), StdError>; - + async fn unregister(&self, url: Url) -> Result<(), StdError>; // todo service_name change to url @@ -25,31 +23,29 @@ pub trait Registry { #[derive(Clone)] pub struct ArcRegistry { - inner: Arc + inner: Arc, } - pub enum RegistryComponent { NacosRegistry, ZookeeperRegistry, StaticRegistry(StaticRegistry), } - pub struct StaticRegistry { - urls: Vec + urls: Vec, } impl ArcRegistry { - pub fn new(registry: impl Registry + Send + Sync + 'static) -> Self { - Self { inner: Arc::new(registry) } + Self { + inner: Arc::new(registry), + } } } #[async_trait] impl Registry for ArcRegistry { - async fn register(&self, url: Url) -> Result<(), StdError> { self.inner.register(url).await } @@ -67,9 +63,6 @@ impl Registry for ArcRegistry { } } - - - #[async_trait] impl Registry for RegistryComponent { async fn register(&self, url: Url) -> Result<(), StdError> { @@ -93,17 +86,12 @@ impl Registry for RegistryComponent { } } - impl StaticRegistry { - pub fn new(urls: Vec) -> Self { - Self { - urls - } + Self { urls } } } - #[async_trait] impl Registry for StaticRegistry { async fn register(&self, url: Url) -> Result<(), StdError> { @@ -119,7 +107,7 @@ impl Registry for StaticRegistry { for url in self.urls.iter() { let change = Ok(Change::Insert(url.to_url(), ())); tx.send(change).await?; - } + } Ok(rx) } @@ -127,4 +115,4 @@ impl Registry for StaticRegistry { async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { todo!() } -} \ No newline at end of file +} diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs index cff3ccee..c2448642 100644 --- a/dubbo/src/route/mod.rs +++ b/dubbo/src/route/mod.rs @@ -1,42 +1,51 @@ use std::pin::Pin; use dubbo_logger::tracing::debug; -use futures_core::{Future, ready}; +use futures_core::{ready, Future}; use futures_util::{future::Ready, FutureExt, TryFutureExt}; -use tower::{util::FutureService, buffer::Buffer}; +use tower::{buffer::Buffer, util::FutureService}; use tower_service::Service; - -use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker}; + +use crate::{ + codegen::{RpcInvocation, TripleInvoker}, + invoker::clone_invoker::CloneInvoker, + param::Param, + svc::NewService, + StdError, +}; pub struct NewRoutes { inner: N, -} - +} pub struct NewRoutesFuture { inner: RoutesFutureInnerState, target: T, } - pub enum RoutesFutureInnerState { Service(S), - Future(Pin>, StdError>> + Send + 'static>>), + Future( + Pin< + Box< + dyn Future>, StdError>> + + Send + + 'static, + >, + >, + ), Ready(Vec>), } - #[derive(Clone)] pub struct Routes { target: T, - invokers: Vec> + invokers: Vec>, } impl NewRoutes { pub fn new(inner: N) -> Self { - Self { - inner, - } + Self { inner } } } @@ -44,40 +53,39 @@ impl NewRoutes { const MAX_ROUTE_BUFFER_SIZE: usize = 16; pub fn layer() -> impl tower_layer::Layer { - tower_layer::layer_fn(|inner: N| { - NewRoutes::new(inner) - }) + tower_layer::layer_fn(|inner: N| NewRoutes::new(inner)) } } - -impl NewService for NewRoutes +impl NewService for NewRoutes where - T: Param + Clone + Send + Unpin + 'static, + T: Param + Clone + Send + Unpin + 'static, // NewDirectory N: NewService, - // Directory + // Directory N::Service: Service<(), Response = Vec>> + Unpin + Send + 'static, - >::Error: Into, + >::Error: Into, >::Future: Send + 'static, -{ - - type Service = Buffer>::Service, T>, Routes>, ()>; +{ + type Service = + Buffer>::Service, T>, Routes>, ()>; fn new_service(&self, target: T) -> Self::Service { let inner = self.inner.new_service(target.clone()); - Buffer::new(FutureService::new(NewRoutesFuture { - inner: RoutesFutureInnerState::Service(inner), - target, - }), Self::MAX_ROUTE_BUFFER_SIZE) + Buffer::new( + FutureService::new(NewRoutesFuture { + inner: RoutesFutureInnerState::Service(inner), + target, + }), + Self::MAX_ROUTE_BUFFER_SIZE, + ) } } - -impl Future for NewRoutesFuture +impl Future for NewRoutesFuture where - T: Param + Clone + Unpin, + T: Param + Clone + Unpin, // Directory N: Service<(), Response = Vec>> + Unpin, N::Error: Into, @@ -85,8 +93,10 @@ where { type Output = Result, StdError>; - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { let this = self.get_mut(); loop { @@ -94,14 +104,14 @@ where RoutesFutureInnerState::Service(ref mut service) => { debug!("RoutesFutureInnerState::Service"); let _ = ready!(service.poll_ready(cx)).map_err(Into::into)?; - let fut = service.call(()).map_err(|e|e.into()).boxed(); + let fut = service.call(()).map_err(|e| e.into()).boxed(); this.inner = RoutesFutureInnerState::Future(fut); - }, + } RoutesFutureInnerState::Future(ref mut futures) => { debug!("RoutesFutureInnerState::Future"); let invokers = ready!(futures.as_mut().poll(cx))?; this.inner = RoutesFutureInnerState::Ready(invokers); - }, + } RoutesFutureInnerState::Ready(ref invokers) => { debug!("RoutesFutureInnerState::Ready"); let target = this.target.clone(); @@ -109,32 +119,32 @@ where invokers: invokers.clone(), target, })); - }, + } } } - } } - - -impl Service<()> for Routes +impl Service<()> for Routes where - T: Param + Clone, -{ + T: Param + Clone, +{ type Response = Vec>; - + type Error = StdError; - - type Future = Ready>; - fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll> { + type Future = Ready>; + + fn poll_ready( + &mut self, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { // some router operator // if new_invokers changed, send new invokers to routes_rx after router operator - futures_util::future::ok(self.invokers.clone()) + futures_util::future::ok(self.invokers.clone()) } -} \ No newline at end of file +} diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs index 56dc3304..df4c071a 100644 --- a/dubbo/src/svc.rs +++ b/dubbo/src/svc.rs @@ -1,23 +1,19 @@ -use std::{sync::Arc, pin::Pin, marker::PhantomData}; +use std::{marker::PhantomData, sync::Arc}; + + + -use futures_core::Future; -use tower::ServiceExt; -use tower_service::Service; pub trait NewService { - type Service; fn new_service(&self, target: T) -> Self::Service; - } - pub struct ArcNewService { inner: Arc + Send + Sync>, } - impl ArcNewService { pub fn layer() -> impl tower_layer::Layer + Clone + Copy where @@ -57,23 +53,19 @@ impl NewService for ArcNewService { // inner: Box> + Send>>> + Send>, pub struct BoxedService { inner: N, - _mark: PhantomData + _mark: PhantomData, } impl BoxedService { - - pub fn layer() -> impl tower_layer::Layer{ - tower_layer::layer_fn(|inner: N| { - Self { - inner, - _mark: PhantomData - } + pub fn layer() -> impl tower_layer::Layer { + tower_layer::layer_fn(|inner: N| Self { + inner, + _mark: PhantomData, }) } } - -// impl NewService for BoxedService +// impl NewService for BoxedService // where // N: NewService, // N::Service: Service + Send, diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index ed080217..c4e6e62c 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -15,11 +15,15 @@ * limitations under the License. */ - use std::sync::Arc; use crate::{ - utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::NewRoutes, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody, + cluster::NewCluster, + directory::NewCachedDirectory, + loadbalancer::NewLoadBalancer, + registry::n_registry::{ArcRegistry, RegistryComponent, StaticRegistry}, + route::NewRoutes, + utils::boxed_clone::BoxCloneService, }; use aws_smithy_http::body::SdkBody; @@ -29,7 +33,6 @@ use tower::ServiceBuilder; pub type ClientBoxService = BoxCloneService, http::Response, crate::Error>; - pub type ServiceMK = Arc>>>>; #[derive(Default)] @@ -56,7 +59,9 @@ impl ClientBuilder { Self { timeout: None, connector: "", - registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))), + registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry( + StaticRegistry::new(vec![Url::from_url(host).unwrap()]), + ))), direct: true, host: host.to_string(), } @@ -70,24 +75,23 @@ impl ClientBuilder { } pub fn with_registry(self, registry: RegistryComponent) -> Self { - Self { - registry: Some(ArcRegistry::new(registry)), + Self { + registry: Some(ArcRegistry::new(registry)), ..self } } pub fn with_host(self, host: &'static str) -> Self { Self { - registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))), + registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry( + StaticRegistry::new(vec![Url::from_url(host).unwrap()]), + ))), ..self } } pub fn with_connector(self, connector: &'static str) -> Self { - Self { - connector, - ..self - } + Self { connector, ..self } } pub fn with_direct(self, direct: bool) -> Self { @@ -95,16 +99,14 @@ impl ClientBuilder { } pub fn build(mut self) -> ServiceMK { - - let registry = self.registry.take().expect("registry must not be empty"); let mk_service = ServiceBuilder::new() - .layer(NewCluster::layer()) - .layer(NewLoadBalancer::layer()) - .layer(NewRoutes::layer()) - .layer(NewCachedDirectory::layer()) - .service(registry); + .layer(NewCluster::layer()) + .layer(NewLoadBalancer::layer()) + .layer(NewRoutes::layer()) + .layer(NewCachedDirectory::layer()) + .service(registry); Arc::new(mk_service) } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 9bcf144d..377dc435 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -15,7 +15,6 @@ * limitations under the License. */ - use futures_util::{future, stream, StreamExt, TryStreamExt}; use aws_smithy_http::body::SdkBody; @@ -25,9 +24,9 @@ use tower_service::Service; use super::builder::{ClientBuilder, ServiceMK}; use crate::codegen::RpcInvocation; -use crate::svc::NewService; use crate::{ invocation::{IntoStreamingRequest, Metadata, Request, Response}, + svc::NewService, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; @@ -148,15 +147,12 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(body_stream); - let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - - + .body(body) + .unwrap(); let response = invoker .call(request) @@ -211,14 +207,13 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(en); - - let mut invoker = self.mk.new_service(invocation); + let mut invoker = self.mk.new_service(invocation); let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - + .body(body) + .unwrap(); let response = invoker .call(request) @@ -259,12 +254,10 @@ impl TripleClient { let body = hyper::Body::wrap_stream(en); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - - + .body(body) + .unwrap(); // let mut conn = Connection::new().with_host(http_uri); let response = invoker @@ -322,11 +315,10 @@ impl TripleClient { let body = hyper::Body::wrap_stream(en); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - + .body(body) + .unwrap(); let response = invoker .call(request) diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs index 3bbaa179..cb0b9d71 100644 --- a/dubbo/src/triple/transport/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -18,9 +18,15 @@ use hyper::client::{conn::Builder, service::Connect}; use tower_service::Service; -use crate::{boxed, triple::transport::connector::get_connector, StdError, invoker::clone_body::CloneBody}; +use crate::{ + boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector, StdError, +}; -type HyperConnect = Connect, CloneBody, http::Uri>; +type HyperConnect = Connect< + crate::utils::boxed_clone::BoxCloneService, + CloneBody, + http::Uri, +>; pub struct Connection { host: hyper::Uri, @@ -65,13 +71,10 @@ impl Connection { let hyper_connect: HyperConnect = Connect::new(get_connector(self.connector), builder); self.connect = Some(hyper_connect); self - } } impl Service> for Connection { - - type Response = http::Response; type Error = crate::Error; @@ -85,30 +88,28 @@ impl Service> for Connection { match self.connect { None => { panic!("connection must be built before use") - }, - Some(ref mut connect) => { - connect.poll_ready(cx).map_err(|e|e.into()) } + Some(ref mut connect) => connect.poll_ready(cx).map_err(|e| e.into()), } } fn call(&mut self, req: http::Request) -> Self::Future { - match self.connect { None => { panic!("connection must be built before use") - }, + } Some(ref mut connect) => { let uri = self.host.clone(); let call_fut = connect.call(uri); let fut = async move { - let mut con = call_fut.await.unwrap(); - con.call(req).await - .map_err(|err| err.into()) - .map(|res| res.map(boxed)) + let mut con = call_fut.await.unwrap(); + con.call(req) + .await + .map_err(|err| err.into()) + .map(|res| res.map(boxed)) }; - return Box::pin(fut) + return Box::pin(fut); } } }