diff --git a/Cargo.toml b/Cargo.toml index 5270ba42..0abff00d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +resolver = "2" members = [ "common/logger", "common/utils", diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs index b97b342f..cb3bc09a 100644 --- a/common/base/src/lib.rs +++ b/common/base/src/lib.rs @@ -23,4 +23,4 @@ pub mod node; pub mod url; pub use node::Node; -pub use url::Url; +pub use url::Url; \ No newline at end of file diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index bfcfe45b..418bc10e 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -78,12 +78,6 @@ pub fn generate( } } - // pub fn build(builder: ClientBuilder) -> Self { - // Self { - // inner: TripleClient::new(builder), - // } - // } - pub fn new(builder: ClientBuilder) -> Self { Self { inner: TripleClient::new(builder), diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 43be50b9..84874748 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -14,13 +14,15 @@ hyper = { version = "0.14.26", features = ["full"] } http = "0.2" tower-service.workspace = true http-body = "0.4.4" -tower = { workspace = true, features = ["timeout", "ready-cache","discover"] } +tower = { workspace = true, features = ["timeout", "ready-cache","discover","retry"] } futures-util = "0.3.23" futures-core ="0.3.23" argh = "0.1" rustls-pemfile = "1.0.0" tokio-rustls="0.23.4" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal", "full" ] } +tokio-util = "0.7.9" +tokio-stream = "0.1" prost = "0.10.4" async-trait = "0.1.56" tower-layer.workspace = true diff --git a/dubbo/src/cluster/clone_body.rs b/dubbo/src/cluster/clone_body.rs new file mode 100644 index 00000000..1d6b65a8 --- /dev/null +++ b/dubbo/src/cluster/clone_body.rs @@ -0,0 +1,382 @@ +use std::{ + collections::VecDeque, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use futures_core::ready; + +use http::HeaderMap; +use http_body::Body; +use pin_project::pin_project; +use thiserror::Error; + +use crate::StdError; + + +#[derive(Error, Debug)] +#[error("buffered body reach max capacity.")] +pub struct ReachMaxCapacityError; + +pub struct BufferedBody { + shared: Arc>>>, + owned: Option>, + replay_body: bool, + replay_trailers: bool, + is_empty: bool, + size_hint: http_body::SizeHint, +} + +pub struct OwnedBufferedBody { + body: B, + trailers: Option, + buf: InnerBuffer, +} + + + +impl BufferedBody { + + pub fn new(body: B, buf_size: usize) -> Self { + let size_hint = body.size_hint(); + let is_empty = body.is_end_stream(); + BufferedBody { + shared: Default::default(), + owned: Some(OwnedBufferedBody { + body, + trailers: None, + buf: InnerBuffer { + bufs: Default::default(), + capacity: buf_size, + }, + }), + replay_body: false, + replay_trailers: false, + is_empty, + size_hint, + } + } + +} + +impl Clone for BufferedBody { + + fn clone(&self) -> Self { + Self { + shared: self.shared.clone(), + owned: None, + replay_body: true, + replay_trailers: true, + is_empty: self.is_empty, + size_hint: self.size_hint.clone(), + } + } +} + +impl Drop for BufferedBody { + fn drop(&mut self) { + if let Some(owned) = self.owned.take() { + let lock = self.shared.lock(); + if let Ok(mut lock) = lock { + *lock = Some(owned); + } + } + } +} + +impl Body for BufferedBody +where + B: http_body::Body + Unpin, + B::Error: Into, +{ + type Data = BytesData; + type Error = StdError; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut_self = self.get_mut(); + + let owned_body = mut_self.owned.get_or_insert_with(|| { + let lock = mut_self.shared.lock(); + if let Err(e) = lock { + panic!("buffered body get shared data lock failed. {}", e); + } + let mut data = lock.unwrap(); + + data.take().expect("cannot get shared buffered body.") + }); + + + + if mut_self.replay_body { + mut_self.replay_body = false; + if owned_body.buf.has_remaining() { + return Poll::Ready(Some(Ok(BytesData::BufferedBytes( + owned_body.buf.clone(), + )))); + } + + if owned_body.buf.is_capped() { + return Poll::Ready(Some(Err(ReachMaxCapacityError.into()))); + } + } + + if mut_self.is_empty { + return Poll::Ready(None); + } + + let mut data = { + let pin = Pin::new(&mut owned_body.body); + let data = ready!(pin.poll_data(cx)); + match data { + Some(Ok(data)) => data, + Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + None => { + mut_self.is_empty = true; + return Poll::Ready(None); + } + } + }; + + let len = data.remaining(); + + owned_body.buf.capacity = owned_body.buf.capacity.saturating_sub(len); + + let data = if owned_body.buf.is_capped() { + if owned_body.buf.has_remaining() { + owned_body.buf.bufs = VecDeque::default(); + } + data.copy_to_bytes(len) + } else { + owned_body.buf.push_bytes(data.copy_to_bytes(len)) + }; + + Poll::Ready(Some(Ok(BytesData::OriginBytes(data)))) + + + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let mut_self = self.get_mut(); + let owned_body = mut_self.owned.get_or_insert_with(|| { + let lock = mut_self.shared.lock(); + if let Err(e) = lock { + panic!("buffered body get shared data lock failed. {}", e); + } + let mut data = lock.unwrap(); + + data.take().expect("cannot get shared buffered body.") + }); + + if mut_self.replay_trailers { + mut_self.replay_trailers = false; + if let Some(ref trailers) = owned_body.trailers { + return Poll::Ready(Ok(Some(trailers.clone()))); + } + } + + let mut_body = &mut owned_body.body; + if !mut_body.is_end_stream() { + let trailers = ready!(Pin::new(mut_body).poll_trailers(cx)).map(|trailers| { + owned_body.trailers = trailers.clone(); + trailers + }); + return Poll::Ready(trailers.map_err(|e|e.into())); + } + + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + if self.is_empty { + return true; + } + + let is_end = self.owned.as_ref() + .map(|owned|owned.body.is_end_stream()) + .unwrap_or(false); + + !self.replay_body && !self.replay_trailers && is_end + } + + fn size_hint(&self) -> http_body::SizeHint { + self.size_hint.clone() + } + + +} + + + +#[derive(Clone)] +pub struct InnerBuffer { + bufs: VecDeque, + capacity: usize, +} + +impl InnerBuffer { + pub fn push_bytes(&mut self, bytes: Bytes) -> Bytes { + self.bufs.push_back(bytes.clone()); + bytes + } + + pub fn is_capped(&self) -> bool { + self.capacity == 0 + } +} + +impl Buf for InnerBuffer { + fn remaining(&self) -> usize { + self.bufs.iter().map(|bytes| bytes.remaining()).sum() + } + + fn chunk(&self) -> &[u8] { + self.bufs.front().map(Buf::chunk).unwrap_or(&[]) + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { + if dst.is_empty() { + return 0; + } + + let mut filled = 0; + + for bytes in self.bufs.iter() { + filled += bytes.chunks_vectored(&mut dst[filled..]) + } + + filled + } + + fn advance(&mut self, mut cnt: usize) { + while cnt > 0 { + let first = self.bufs.front_mut(); + if first.is_none() { + break; + } + let first = first.unwrap(); + let first_remaining = first.remaining(); + if first_remaining > cnt { + first.advance(cnt); + break; + } + + first.advance(first_remaining); + cnt = cnt - first_remaining; + self.bufs.pop_front(); + } + } + + fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes { + match self.bufs.front_mut() { + Some(buf) if len <= buf.remaining() => { + let bytes = buf.copy_to_bytes(len); + if buf.remaining() == 0 { + self.bufs.pop_front(); + } + bytes + } + _ => { + let mut bytes = BytesMut::with_capacity(len); + bytes.put(self.take(len)); + bytes.freeze() + } + } + } +} + +pub enum BytesData { + BufferedBytes(InnerBuffer), + OriginBytes(Bytes), +} + +impl Buf for BytesData { + fn remaining(&self) -> usize { + match self { + BytesData::BufferedBytes(bytes) => bytes.remaining(), + BytesData::OriginBytes(bytes) => bytes.remaining(), + } + } + + fn chunk(&self) -> &[u8] { + match self { + BytesData::BufferedBytes(bytes) => bytes.chunk(), + BytesData::OriginBytes(bytes) => bytes.chunk(), + } + } + + fn advance(&mut self, cnt: usize) { + match self { + BytesData::BufferedBytes(bytes) => bytes.advance(cnt), + BytesData::OriginBytes(bytes) => bytes.advance(cnt), + } + } + + fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes { + match self { + BytesData::BufferedBytes(bytes) => bytes.copy_to_bytes(len), + BytesData::OriginBytes(bytes) => bytes.copy_to_bytes(len), + } + } +} + +#[pin_project] +pub struct CloneBody(#[pin] BufferedBody); + +impl CloneBody +where + B: http_body::Body + Unpin, + B::Error: Into, +{ + pub fn new(inner_body: B) -> Self { + let inner_body = BufferedBody::new(inner_body, 1024 * 64); + CloneBody(inner_body) + } +} + +impl Body for CloneBody +where + B: http_body::Body + Unpin, + B::Error: Into, +{ + + type Data = BytesData; + + type Error = StdError; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.project().0.poll_data(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + self.project().0.poll_trailers(cx) + } + + fn size_hint(&self) -> http_body::SizeHint { + self.0.size_hint() + } +} + + +impl Clone for CloneBody +where + B: http_body::Body + Unpin, + B::Error: Into, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} \ No newline at end of file diff --git a/dubbo/src/cluster/clone_invoker.rs b/dubbo/src/cluster/clone_invoker.rs new file mode 100644 index 00000000..20e18118 --- /dev/null +++ b/dubbo/src/cluster/clone_invoker.rs @@ -0,0 +1,248 @@ +use std::{task::Poll, pin::Pin, mem}; + +use dubbo_logger::tracing::debug; +use futures_core::{Future, TryFuture, ready, future::BoxFuture}; +use futures_util::FutureExt; +use pin_project::pin_project; +use thiserror::Error; +use tokio::{task::JoinHandle, sync::{watch::{Sender, Receiver}, self}}; +use tokio_util::sync::ReusableBoxFuture; +use tower::{ServiceExt, buffer::Buffer}; +use tower_service::Service; + +use crate::StdError; + +enum Inner { + Invalid, + Ready(S), + Pending(JoinHandle>), +} + +#[derive(Debug, Error)] +#[error("the inner service has not got ready yet!")] +struct InnerServiceNotReadyErr; + + + +#[pin_project(project = InnerServiceCallingResponseProj)] +enum InnerServiceCallingResponse { + Call(#[pin] Fut), + Fail +} + +impl Future for InnerServiceCallingResponse +where + Fut: TryFuture, + Fut::Error: Into +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + match self.project() { + InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into), + InnerServiceCallingResponseProj::Fail => Poll::Ready(Err(InnerServiceNotReadyErr.into())) + } + } +} + +#[derive(Clone)] +enum ObserveState { + Ready, + Pending, +} + + +struct ReadyService { + inner: Inner, + tx: Sender +} + + +impl ReadyService { + + fn new(inner: S) -> (Self, Receiver) { + let (tx, rx) = sync::watch::channel(ObserveState::Ready); + let ready_service = Self { inner: Inner::Ready(inner), tx}; + (ready_service, rx) + } + +} + +impl Service for ReadyService +where + S: Service + Send + 'static, + >::Error: Into, +{ + type Response = S::Response; + + type Error = StdError; + + type Future = InnerServiceCallingResponse; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + loop { + match mem::replace(&mut self.inner, Inner::Invalid) { + Inner::Ready(mut svc) => { + let poll_ready = svc.poll_ready(cx); + match poll_ready { + Poll::Pending => { + self.inner = Inner::Pending(tokio::spawn(async move { + let poll_ready = svc.ready().await; + match poll_ready { + Ok(_) => Ok(svc), + Err(err) => { + Err((svc, err.into())) + } + } + })); + + let _ = self.tx.send(ObserveState::Pending); + continue; + } + Poll::Ready(ret) => { + self.inner = Inner::Ready(svc); + + let _ = self.tx.send(ObserveState::Ready); + return Poll::Ready(ret.map_err(Into::into)); + } + } + }, + Inner::Pending(mut join_handle) => { + if let Poll::Ready(res) = join_handle.poll_unpin(cx) { + let (svc, res) = match res { + Err(join_err) => panic!("ReadyService panicked: {join_err}"), + Ok(Err((svc, err))) => (svc, Poll::Ready(Err(err))), + Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))) + }; + + self.inner = Inner::Ready(svc); + + let _ = self.tx.send(ObserveState::Ready); + return res; + } else { + self.inner = Inner::Pending(join_handle); + + let _ = self.tx.send(ObserveState::Pending); + return Poll::Pending; + } + + }, + Inner::Invalid => panic!("ReadyService panicked: inner state is invalid") + } + } + } + + fn call(&mut self, req: Req) -> Self::Future { + match self.inner { + Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)), + _ => InnerServiceCallingResponse::Fail + } + } +} + + +impl Drop for ReadyService { + fn drop(&mut self) { + if let Inner::Pending(ref handler) = self.inner { + handler.abort(); + } + } +} + +pub struct CloneInvoker +where + Inv: Service + Send + 'static, + Inv::Error: Into + Send + Sync + 'static, + Inv::Future: Send, + Req: Send +{ + inner: Buffer, Req>, + rx: Receiver, + poll: ReusableBoxFuture<'static, ObserveState>, + polling: bool, +} + +impl CloneInvoker +where + Inv: Service + Send + 'static, + Inv::Error: Into + Send + Sync + 'static, + Inv::Future: Send, + Req: Send + 'static +{ + + pub fn new(invoker: Inv) -> Self { + + let (ready_service, rx) = ReadyService::new(invoker); + + let buffer: Buffer, Req> = Buffer::new(ready_service, 1024); + + Self { inner: buffer, rx, polling: false, poll: ReusableBoxFuture::new(futures::future::pending()) } + } +} + +impl Service for CloneInvoker +where + Inv: Service + Send + 'static, + Inv::Error: Into + Send + Sync + 'static, + Inv::Future: Send, + Req: Send + 'static +{ + type Response = Inv::Response; + + type Error = StdError; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + loop { + if !self.polling { + match self.rx.borrow().clone() { + ObserveState::Ready => return self.inner.poll_ready(cx), + ObserveState::Pending => { + self.polling = true; + let mut rx = self.rx.clone(); + self.poll.set(async move { + loop { + let current_state = rx.borrow_and_update().clone(); + if matches!(current_state, ObserveState::Ready) { + return current_state; + } + if let Err(_) = rx.changed().await { + debug!("the readyService has already shutdown!"); + futures::future::pending::().await; + } + } + }); + } + } + + } + + let state = ready!(self.poll.poll_unpin(cx)); + self.polling = false; + + if matches!(state, ObserveState::Pending) { + continue; + } + + return self.inner.poll_ready(cx); + } + } + + fn call(&mut self, req: Req) -> Self::Future { + Box::pin(self.inner.call(req)) + } +} + + +impl Clone for CloneInvoker +where + Inv: Service + Send + 'static, + Inv::Error: Into + Send + Sync + 'static, + Inv::Future: Send, + Req: Send +{ + fn clone(&self) -> Self { + Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, poll: ReusableBoxFuture::new(futures::future::pending())} + } +} \ No newline at end of file diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs deleted file mode 100644 index 66a3c43a..00000000 --- a/dubbo/src/cluster/directory.rs +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::{ - collections::HashMap, - fmt::Debug, - pin::Pin, - str::FromStr, - sync::{Arc, RwLock}, - task::{Context, Poll}, -}; - -use crate::{ - codegen::TripleInvoker, - protocol::BoxInvoker, - registry::{memory_registry::MemoryNotifyListener, BoxRegistry}, - triple::client::replay::ClonedBody, - StdError, -}; -use dubbo_base::Url; -use dubbo_logger::tracing; -use futures_core::ready; -use tower::{ - discover::{Change, Discover}, - ready_cache::ReadyCache, -}; - -use crate::{cluster::Directory, codegen::RpcInvocation, invocation::Invocation}; - -/// Directory. -/// -/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service) - -#[derive(Debug, Clone)] -pub struct StaticDirectory { - uri: http::Uri, -} - -impl StaticDirectory { - pub fn new(host: &str) -> StaticDirectory { - let uri = match http::Uri::from_str(host) { - Ok(v) => v, - Err(err) => { - tracing::error!("http uri parse error: {}, host: {}", err, host); - panic!("http uri parse error: {}, host: {}", err, host) - } - }; - StaticDirectory { uri } - } - - pub fn from_uri(uri: &http::Uri) -> StaticDirectory { - StaticDirectory { uri: uri.clone() } - } -} - -impl Directory for StaticDirectory { - fn list(&self, inv: Arc) -> Vec { - let url = Url::from_url(&format!( - "tri://{}:{}/{}", - self.uri.host().unwrap(), - self.uri.port().unwrap(), - inv.get_target_service_unique_name(), - )) - .unwrap(); - let invoker = Box::new(TripleInvoker::new(url)); - vec![invoker] - } -} - -#[derive(Debug, Clone)] -pub struct RegistryDirectory { - registry: Arc, - service_instances: Arc>>>, -} - -impl RegistryDirectory { - pub fn new(registry: BoxRegistry) -> RegistryDirectory { - RegistryDirectory { - registry: Arc::new(registry), - service_instances: Arc::new(RwLock::new(HashMap::new())), - } - } -} - -impl Directory for RegistryDirectory { - fn list(&self, inv: Arc) -> Vec { - let service_name = inv.get_target_service_unique_name(); - let url = Url::from_url(&format!( - "triple://{}:{}/{}", - "127.0.0.1", "8888", service_name - )) - .unwrap(); - - self.registry - .subscribe( - url, - Arc::new(MemoryNotifyListener { - service_instances: Arc::clone(&self.service_instances), - }), - ) - .expect("subscribe"); - - let map = self - .service_instances - .read() - .expect("service_instances.read"); - let binding = Vec::new(); - let url_vec = map.get(&service_name).unwrap_or(&binding); - // url_vec.to_vec() - let mut invokers: Vec = vec![]; - for item in url_vec.iter() { - invokers.push(Box::new(TripleInvoker::new(item.clone()))); - } - - invokers - } -} - -pub struct ServiceNameDirectory { - cache: ReadyCache>, - discover: D, -} - -impl ServiceNameDirectory -where - D: Discover + Unpin, - D::Error: Into, -{ - pub fn new(discover: D) -> Self { - Self { - cache: ReadyCache::default(), - discover, - } - } - - fn update_cache(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - let discover = Pin::new(&mut self.discover); - let update = ready!(discover.poll_discover(cx)); - match update { - Some(update) => { - let change = match update { - Err(_) => continue, - Ok(change) => change, - }; - - match change { - Change::Insert(key, invoker) => { - self.cache.push(key, invoker); - } - Change::Remove(key) => { - self.cache.evict(&key); - } - } - } - None => break, - } - } - - // poll pending - let _ = ready!(self.cache.poll_pending(cx))?; - - Poll::Ready(Ok(())) - } - - pub fn list(&mut self, cx: &mut Context<'_>) -> Poll, StdError>> { - let _ = self.update_cache(cx)?; - - let ready_len = self.cache.ready_len(); - - if ready_len == 0 { - return Poll::Pending; - } - - let mut invoker_list = Vec::with_capacity(ready_len); - - for idx in 0..ready_len { - let check = self.cache.check_ready_index(cx, idx); - let is_ready = match check { - Ok(is_ready) => is_ready, - Err(_) => false, - }; - if !is_ready { - continue; - } - - let invoker_url = match self.cache.get_ready_index(idx) { - None => continue, - Some((k, _)) => k.clone(), - }; - - invoker_list.push(invoker_url); - } - - Poll::Ready(Ok(invoker_list)) - } -} - -#[cfg(test)] -pub mod tests { - use std::{ - convert::Infallible, - pin::Pin, - task::{Context, Poll}, - }; - - use crate::{boxed, cluster::directory::ServiceNameDirectory}; - use bytes::Buf; - use futures_core::Stream; - use futures_util::future::poll_fn; - use http::StatusCode; - use http_body::Body; - use tower::{discover::Change, Service}; - - use dubbo_base::Url; - - use crate::{codegen::Invoker, protocol::BoxInvoker, triple::client::replay::ClonedBody}; - - pub struct MockStaticServiceList - where - T: IntoIterator, - { - iter: T::IntoIter, - } - - impl MockStaticServiceList - where - T: IntoIterator, - { - pub fn new(list: T) -> Self { - let iter = list.into_iter(); - - Self { iter } - } - } - - impl Stream for MockStaticServiceList - where - T: IntoIterator, - T::IntoIter: Unpin, - { - type Item = Result, Infallible>; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - let mut_self_ref = self.get_mut(); - let mut_iter_ref = &mut mut_self_ref.iter; - - match mut_iter_ref.next() { - Some(next) => { - let invoker_url = next.get_url(); - let raw_url_string = invoker_url.raw_url_string(); - return Poll::Ready(Some(Ok(Change::Insert(raw_url_string, next)))); - } - None => Poll::Ready(None), - } - } - } - - #[derive(Debug)] - struct MockInvoker(u8, bool); - - impl Invoker> for MockInvoker { - fn get_url(&self) -> dubbo_base::Url { - let str = format!( - "triple://127.0.0.1:8888/failover_cluster_service/directory/{}", - self.0 - ); - Url::from_url(str.as_str()).unwrap() - } - } - - impl Service> for MockInvoker { - type Response = http::Response; - - type Error = crate::Error; - - type Future = crate::BoxFuture, crate::Error>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - if !self.1 { - return Poll::Pending; - } - - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: http::Request) -> Self::Future { - Box::pin(async move { - let mut body = req.into_body(); - - let mut body_data = poll_fn(|cx| Pin::new(&mut body).poll_data(cx)) - .await - .unwrap() - .unwrap(); - - let len = body_data.remaining(); - let bytes = body_data.copy_to_bytes(len); - - let req_str = String::from_utf8(bytes.to_vec()).unwrap(); - println!("req: {}", req_str); - let echo_data = format!("echo: {}", req_str); - let response = http::Response::builder() - .status(StatusCode::OK) - .body(boxed(echo_data)) - .unwrap(); - - return Ok(response); - }) - } - } - - #[tokio::test] - async fn test_directory() { - let invoker_list = vec![ - Box::new(MockInvoker(1, true)) as BoxInvoker, - Box::new(MockInvoker(2, true)) as BoxInvoker, - Box::new(MockInvoker(3, true)) as BoxInvoker, - Box::new(MockInvoker(4, true)) as BoxInvoker, - Box::new(MockInvoker(5, true)) as BoxInvoker, - Box::new(MockInvoker(6, true)) as BoxInvoker, - ]; - let service_list = MockStaticServiceList::new(invoker_list); - - let mut service_name_directory = ServiceNameDirectory::new(service_list); - - let list = poll_fn(|cx| service_name_directory.list(cx)).await; - - assert!(list.is_ok()); - - let list = list.unwrap(); - - assert!(!list.is_empty()); - - assert_eq!(6, list.len()); - - for uri in list { - println!("invoker uri: {}", uri); - } - } - - #[tokio::test] - async fn test_ready_in_any_order() { - let invoker_list = vec![ - Box::new(MockInvoker(1, true)) as BoxInvoker, - Box::new(MockInvoker(2, false)) as BoxInvoker, - Box::new(MockInvoker(3, false)) as BoxInvoker, - Box::new(MockInvoker(4, true)) as BoxInvoker, - Box::new(MockInvoker(5, false)) as BoxInvoker, - Box::new(MockInvoker(6, true)) as BoxInvoker, - ]; - let service_list = MockStaticServiceList::new(invoker_list); - - let mut service_name_directory = ServiceNameDirectory::new(service_list); - - let list = poll_fn(|cx| service_name_directory.list(cx)).await; - - assert!(list.is_ok()); - - let list = list.unwrap(); - - assert!(!list.is_empty()); - - assert_eq!(3, list.len()); - - for uri in list { - println!("invoker uri: {}", uri); - } - } -} diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs new file mode 100644 index 00000000..1a8149c6 --- /dev/null +++ b/dubbo/src/cluster/failover.rs @@ -0,0 +1,76 @@ +use std::task::Poll; + +use futures_util::future; +use http::Request; +use tower::{ServiceExt, retry::Retry, util::Oneshot}; +use tower_service::Service; + +use crate::StdError; + +pub struct Failover { + inner: N // loadbalancer service +} + +#[derive(Clone)] +pub struct FailoverPolicy; + + +impl Failover { + pub fn new(inner: N) -> Self { + Self { inner } + } +} + +impl tower::retry::Policy, Res, E> for FailoverPolicy +where + B: http_body::Body + Clone, +{ + + type Future = future::Ready; + + fn retry(&self, req: &Request, result: Result<&Res, &E>) -> Option { + //TODO some error handling or logging + match result { + Ok(_) => None, + Err(_) => Some(future::ready(self.clone())) + } + } + + fn clone_request(&self, req: &Request) -> Option> { + let mut clone = http::Request::new(req.body().clone()); + *clone.method_mut() = req.method().clone(); + *clone.uri_mut() = req.uri().clone(); + *clone.headers_mut() = req.headers().clone(); + *clone.version_mut() = req.version(); + + + Some(clone) + } +} + + + +impl Service> for Failover +where + // B is CloneBody + B: http_body::Body + Clone, + // loadbalancer service + N: Service> + Clone + 'static , + N::Error: Into, + N::Future: Send +{ + type Response = N::Response; + + type Error = N::Error; + + type Future = Oneshot, Request>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let retry = Retry::new(FailoverPolicy, self.inner.clone()); + retry.oneshot(req) + } +} \ No newline at end of file diff --git a/dubbo/src/cluster/loadbalance/impls/mod.rs b/dubbo/src/cluster/loadbalance/impls/mod.rs deleted file mode 100644 index 5a84af8c..00000000 --- a/dubbo/src/cluster/loadbalance/impls/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -pub mod random; -pub mod roundrobin; diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs b/dubbo/src/cluster/loadbalance/impls/random.rs deleted file mode 100644 index 3e1cf651..00000000 --- a/dubbo/src/cluster/loadbalance/impls/random.rs +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -use dubbo_base::Url; -use std::{ - fmt::{Debug, Formatter}, - sync::Arc, -}; - -use crate::{ - cluster::loadbalance::types::{LoadBalance, Metadata}, - codegen::RpcInvocation, -}; - -pub struct RandomLoadBalance { - pub metadata: Metadata, -} - -impl Default for RandomLoadBalance { - fn default() -> Self { - RandomLoadBalance { - metadata: Metadata::new("random"), - } - } -} - -impl Debug for RandomLoadBalance { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RandomLoadBalance") - } -} - -impl LoadBalance for RandomLoadBalance { - fn select( - &self, - invokers: Arc>, - _url: Option, - _invocation: Arc, - ) -> Option { - if invokers.is_empty() { - return None; - } - let index = rand::random::() % invokers.len(); - Some(invokers[index].clone()) - } -} diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs deleted file mode 100644 index 5fd0ed49..00000000 --- a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -use dubbo_base::Url; -use std::{ - collections::HashMap, - fmt::{Debug, Formatter}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, RwLock, - }, -}; - -use crate::{ - cluster::loadbalance::types::{LoadBalance, Metadata}, - codegen::RpcInvocation, -}; - -pub struct RoundRobinLoadBalance { - pub metadata: Metadata, - pub counter_map: RwLock>, -} - -impl Default for RoundRobinLoadBalance { - fn default() -> Self { - RoundRobinLoadBalance { - metadata: Metadata::new("roundrobin"), - counter_map: RwLock::new(HashMap::new()), - } - } -} - -impl Debug for RoundRobinLoadBalance { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RoundRobinLoadBalance") - } -} - -impl RoundRobinLoadBalance { - fn guarantee_counter_key(&self, key: &str) { - let contained = self.counter_map.try_read().unwrap().contains_key(key); - if !contained { - self.counter_map - .try_write() - .unwrap() - .insert(key.to_string(), AtomicUsize::new(0)); - } - } -} - -impl LoadBalance for RoundRobinLoadBalance { - fn select( - &self, - invokers: Arc>, - _url: Option, - invocation: Arc, - ) -> Option { - if invokers.is_empty() { - return None; - } - let fingerprint = invocation.unique_fingerprint(); - self.guarantee_counter_key(fingerprint.as_str()); - let index = self - .counter_map - .try_read() - .unwrap() - .get(fingerprint.as_str())? - .fetch_add(1, Ordering::SeqCst) - % invokers.len(); - Some(invokers[index].clone()) - } -} diff --git a/dubbo/src/cluster/loadbalance/mod.rs b/dubbo/src/cluster/loadbalance/mod.rs deleted file mode 100644 index 1d04f7fc..00000000 --- a/dubbo/src/cluster/loadbalance/mod.rs +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -use std::collections::HashMap; - -use lazy_static::lazy_static; - -use crate::cluster::loadbalance::{ - impls::{random::RandomLoadBalance, roundrobin::RoundRobinLoadBalance}, - types::BoxLoadBalance, -}; - -pub mod impls; -pub mod types; - -lazy_static! { - pub static ref LOAD_BALANCE_EXTENSIONS: HashMap = - init_loadbalance_extensions(); -} - -fn init_loadbalance_extensions() -> HashMap { - let mut loadbalance_map: HashMap = HashMap::new(); - loadbalance_map.insert("random".to_string(), Box::new(RandomLoadBalance::default())); - loadbalance_map.insert( - "roundrobin".to_string(), - Box::new(RoundRobinLoadBalance::default()), - ); - loadbalance_map -} diff --git a/dubbo/src/cluster/loadbalance/types.rs b/dubbo/src/cluster/loadbalance/types.rs deleted file mode 100644 index 9273d07e..00000000 --- a/dubbo/src/cluster/loadbalance/types.rs +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use dubbo_base::Url; -use std::{fmt::Debug, sync::Arc}; - -use crate::codegen::RpcInvocation; - -pub type BoxLoadBalance = Box; - -pub trait LoadBalance: Debug { - fn select( - &self, - invokers: Arc>, - url: Option, - invocation: Arc, - ) -> Option; -} - -pub struct Metadata { - pub name: &'static str, -} - -impl Metadata { - pub fn new(name: &'static str) -> Self { - Metadata { name } - } -} diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index 964150d5..47394e2e 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,434 +15,77 @@ * limitations under the License. */ -use std::{fmt::Debug, sync::Arc}; -use dubbo_base::Url; -use thiserror::Error; -use tower::{ready_cache::ReadyCache, ServiceExt}; +use http::Request; use tower_service::Service; -use crate::{ - cluster::router::{ - manager::router_manager::get_global_router_manager, router_chain::RouterChain, - }, - codegen::RpcInvocation, - invocation::Invocation, - protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker}, - triple::client::replay::ClonedBody, - StdError, -}; +use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param}; -pub mod directory; -pub mod loadbalance; -pub mod router; +use self::{failover::Failover, clone_body::CloneBody}; + +mod clone_body; +mod clone_invoker; +mod failover; -pub trait Directory: Debug { - fn list(&self, inv: Arc) -> Vec; +pub struct NewCluster { + inner: N, // new loadbalancer service } -type BoxDirectory = Box; - -pub trait Cluster { - fn join(&self, dir: BoxDirectory) -> BoxInvoker; -} - -#[derive(Debug, Default)] -pub struct MockCluster {} - -impl Cluster for MockCluster { - fn join(&self, dir: BoxDirectory) -> BoxInvoker { - Box::new(FailoverCluster::new(dir)) - } -} - -// 在Cluster上进行缓存Service -#[derive(Debug)] -pub struct FailoverCluster { - dir: Arc, - caches: ReadyCache>, -} - -impl FailoverCluster { - pub fn new(dir: BoxDirectory) -> FailoverCluster { - Self { - dir: Arc::new(dir), - caches: ReadyCache::default(), - } - } -} - -#[derive(Error, Debug)] -#[error("no available service for {0}")] -pub struct NoAvailableServiceErr(String); - -#[derive(Error, Debug)] -#[error("invalid service name {0}")] -pub struct InvalidServiceNameErr(String); - -impl FailoverCluster { - async fn invoke( - req: http::Request, - mut invoker: BoxInvoker, - ) -> Result, (StdError, http::Request)> { - let clone_request = FailoverCluster::clone_request(&req); - let invoker = invoker - .ready() - .await - .map_err(|e| (e, FailoverCluster::clone_request(&req)))?; - let ret = invoker.call(req).await.map_err(|e| (e, clone_request))?; - - Ok(ret) - } - - fn clone_request(req: &http::Request) -> http::Request { - let mut clone = http::Request::new(req.body().clone()); - *clone.method_mut() = req.method().clone(); - *clone.uri_mut() = req.uri().clone(); - *clone.headers_mut() = req.headers().clone(); - *clone.version_mut() = req.version(); - - if let Some(inv) = req.extensions().get::().cloned() { - clone.extensions_mut().insert(inv); - } - - clone - } +pub struct Cluster { + inner: S // failover service } -impl Service> for FailoverCluster { - type Response = http::Response; - - type Error = crate::Error; - - type Future = crate::BoxFuture; - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.caches.poll_pending(cx).map_err(|e| e.into()) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inv = req.extensions().get::(); - if inv.is_none() { - return Box::pin(async move { - return Err( - InvalidServiceNameErr("service name must not be null".to_owned()).into(), - ); - }); - } - - let inv = inv.unwrap(); - let service_name = inv.get_target_service_unique_name(); - - let invokers = self.dir.list(Arc::new(inv.clone())); - - Box::pin(async move { - let mut current_req = req; - let mut last_err = None; - - let is_empty = invokers.is_empty(); - if is_empty { - return Err(NoAvailableServiceErr(service_name).into()); - } - - for invoker in invokers { - match FailoverCluster::invoke(current_req, invoker).await { - Ok(resp) => return Ok(resp), - Err((e, cloned_request)) => { - current_req = cloned_request; - last_err = Some(e); - } - } +impl NewCluster { + + pub fn layer() -> impl tower_layer::Layer { + tower_layer::layer_fn(|inner: N| { + NewCluster { + inner, // new loadbalancer service } - - if last_err.is_none() { - return Err(NoAvailableServiceErr(service_name).into()); - } - - return Err(last_err.unwrap()); }) } -} -impl Invoker> for FailoverCluster { - fn get_url(&self) -> dubbo_base::Url { - Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap() - } -} +} -#[derive(Debug, Default)] -pub struct MockDirectory { - router_chain: RouterChain, -} +impl NewService for NewCluster +where + T: Param, + // new loadbalancer service + S: NewService, +{ -impl MockDirectory { - pub fn new(service_name: String) -> MockDirectory { - let router_chain = get_global_router_manager() - .read() - .unwrap() - .get_router_chain(service_name); - Self { router_chain } - } -} + type Service = Cluster>; + + fn new_service(&self, target: T) -> Self::Service { -impl Directory for MockDirectory { - fn list(&self, inv: Arc) -> Vec { - let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); - let mut urls = vec![u]; - // tracing::info!("MockDirectory: {}", meta); - urls = self.router_chain.route(urls, inv); - let mut result = Vec::new(); - for url in urls { - result.push(Box::new(TripleInvoker::new(url)) as BoxInvoker); + Cluster { + inner: Failover::new(self.inner.new_service(target)) } - result } } + +impl Service> for Cluster +where + S: Service>>, + B: http_body::Body + Unpin, + B::Error: Into, +{ -#[cfg(test)] -pub mod tests { - use std::{sync::Arc, task::Poll}; - - use bytes::{Buf, BufMut, BytesMut}; - use dubbo_base::Url; - use futures_util::future::poll_fn; - use http::StatusCode; - use http_body::Body; - use thiserror::Error; - use tower::ServiceExt; - use tower_service::Service; - - use crate::{ - boxed, - cluster::FailoverCluster, - codegen::{Invoker, RpcInvocation}, - empty_body, - invocation::Invocation, - triple::client::replay::ClonedBody, - }; - - use super::Directory; + type Response = S::Response; - #[derive(Error, Debug)] - #[error("{0}")] - struct NoResponseErr(String); + type Error = S::Error; - #[derive(Debug)] - struct MockDirectory; + type Future = S::Future; - impl Directory for MockDirectory { - fn list(&self, inv: Arc) -> Vec { - println!( - "get invoker list for {}", - inv.get_target_service_unique_name() - ); - - vec![ - Box::new(MockInvoker(1)), - Box::new(MockInvoker(2)), - Box::new(MockInvoker(3)), - Box::new(MockInvoker(4)), - Box::new(MockInvoker(5)), - ] - } - } - - #[derive(Debug)] - struct MockInvoker(u8); - - impl Invoker> for MockInvoker { - fn get_url(&self) -> dubbo_base::Url { - let str = format!( - "triple://127.0.0.1:8888/failover_cluster_service/{}", - self.0 - ); - Url::from_url(str.as_str()).unwrap() - } + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.inner.poll_ready(cx) } - - impl Service> for MockInvoker { - type Response = http::Response; - - type Error = crate::Error; - - type Future = crate::BoxFuture, crate::Error>; - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: http::Request) -> Self::Future { - let inv = req.extensions().get::(); - if inv.is_none() { - return Box::pin(async move { - let response = http::Response::builder() - .status(StatusCode::OK) - .body(empty_body()) - .unwrap(); - - return Ok(response); - }); - } - - let inv = inv.unwrap(); - let method_name = inv.get_method_name(); - if method_name.eq("invoker_request") { - return Box::pin(async move { - let response = http::Response::builder() - .status(StatusCode::OK) - .body(boxed("invoker response".to_owned())) - .unwrap(); - - return Ok(response); - }); - } - - let self_index = self.0; - if method_name.eq("failover_request") { - return Box::pin(async move { - let body = req.into_body(); - let mut pin_body = Box::pin(body); - let ret = poll_fn(|cx| pin_body.as_mut().poll_data(cx)).await; - - if ret.is_none() { - #[derive(Error, Debug)] - #[error("{0}")] - struct BodyIsNoneErr(&'static str); - - return Err(BodyIsNoneErr("body must not be null").into()); - } - - let ret = ret.unwrap(); - - if ret.is_err() { - #[derive(Error, Debug)] - #[error("{0}")] - struct BodyIsErr(&'static str); - return Err(BodyIsErr("body must be ok").into()); - } - - let mut ret = ret.unwrap(); - - let index = ret.get_u8(); - - if index == self_index { - let ret_msg = format!("failover cluster index: {} was invoked", self_index); - let response = http::Response::builder() - .status(StatusCode::OK) - .body(boxed(ret_msg)) - .unwrap(); - - return Ok(response); - } - - #[derive(Error, Debug)] - #[error("{0}")] - struct NotTargetInvoker(String); - - let ret_msg = format!( - "failover cluster index: {} was invoked, but is not target invoker {}", - self_index, index - ); - - println!("{}", ret_msg); - return Err(NotTargetInvoker(ret_msg).into()); - }); - } - - return Box::pin(async move { return Err(NoResponseErr(method_name).into()) }); - } - } - - #[tokio::test] - async fn test_failover_cluster() { - let mut cluster = FailoverCluster::new(Box::new(MockDirectory)); - let cluster = cluster.ready().await; - assert!(cluster.is_ok()); - - let cluster = cluster.unwrap(); - - let empty_stream = futures::stream::empty(); - let cloned_body = ClonedBody::new(empty_stream); - - let rpc_inv = RpcInvocation::default() - .with_service_unique_name("failover_cluster_service".to_owned()) - .with_method_name("invoker_request".to_owned()); - - let req = http::Request::builder() - .extension(rpc_inv) - .body(cloned_body) - .unwrap(); - - let ret = cluster.call(req).await; - assert!(ret.is_ok()); - - let ret = ret.unwrap(); - - assert_eq!(ret.status(), StatusCode::OK); - - let body = ret.into_body(); - - let mut pin = Box::pin(body); - let data = poll_fn(|cx| pin.as_mut().poll_data(cx)).await; - assert!(data.is_some()); - let data = data.unwrap(); - assert!(data.is_ok()); - let data = data.unwrap(); - - assert_eq!( - String::from_utf8(data.to_vec()).unwrap(), - "invoker response" - ) - } - - #[tokio::test] - async fn test_failover_request() { - let mut cluster = FailoverCluster::new(Box::new(MockDirectory)); - let cluster = cluster.ready().await; - assert!(cluster.is_ok()); - - let cluster = cluster.unwrap(); - - let once_stream = futures::stream::once(async { - let mut mut_bytes = BytesMut::default(); - mut_bytes.put_u8(5); - return Ok(mut_bytes.freeze()); - }); - let cloned_body = ClonedBody::new(once_stream); - - let rpc_inv = RpcInvocation::default() - .with_service_unique_name("failover_cluster_service".to_owned()) - .with_method_name("failover_request".to_owned()); - - let req = http::Request::builder() - .extension(rpc_inv) - .body(cloned_body) - .unwrap(); - - let ret = cluster.call(req).await; - assert!(ret.is_ok()); - - let ret = ret.unwrap(); - - assert_eq!(ret.status(), StatusCode::OK); - - let body = ret.into_body(); - - let mut pin = Box::pin(body); - let data = poll_fn(|cx| pin.as_mut().poll_data(cx)).await; - assert!(data.is_some()); - let data = data.unwrap(); - assert!(data.is_ok()); - let data = data.unwrap(); - - let resp_str = String::from_utf8(data.to_vec()).unwrap(); - println!("{}", resp_str); - assert_eq!(resp_str, "failover cluster index: 5 was invoked") + + fn call(&mut self, req: Request) -> Self::Future { + let (parts, body) = req.into_parts(); + let clone_body = CloneBody::new(body); + let req = Request::from_parts(parts, clone_body); + self.inner.call(req) } } diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 412feb91..38f8f1dc 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -27,11 +27,9 @@ pub use hyper::Body as hyperBody; pub use tower_service::Service; pub use super::{ - cluster::directory::RegistryDirectory, empty_body, invocation::{IntoStreamingRequest, Request, Response, RpcInvocation}, protocol::{triple::triple_invoker::TripleInvoker, Invoker}, - registry::{BoxRegistry, Registry}, triple::{ client::TripleClient, codec::{prost::ProstCodec, Codec}, @@ -46,7 +44,7 @@ pub use super::{ pub use crate::{ filter::{service::FilterService, Filter}, triple::{ - client::builder::{ClientBoxService, ClientBuilder}, + client::builder::ClientBuilder, server::builder::ServerBuilder, transport::connection::Connection, }, diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs new file mode 100644 index 00000000..0861d32a --- /dev/null +++ b/dubbo/src/directory/mod.rs @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + use core::panic; +use std::{ + hash::Hash, + task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, +}; + +use crate::{StdError, codegen::RpcInvocation, invocation::Invocation, registry::n_registry::Registry, invoker::NewInvoker, svc::NewService, param::Param}; +use futures_util::future::{poll_fn, self}; +use tokio::{sync::{watch, Notify, mpsc::channel}, select}; +use tokio_stream::wrappers::ReceiverStream; +use tower::{ + discover::{Change, Discover}, buffer::Buffer, +}; + +use tower_service::Service; + +type BufferedDirectory = Buffer, StdError>>>, ()>; + +pub struct NewCachedDirectory +where + N: Registry + Clone + Send + Sync + 'static, +{ + inner: CachedDirectory, RpcInvocation> +} + + +pub struct CachedDirectory +where + // NewDirectory + N: NewService +{ + inner: N, + cache: Arc>> +} + + +pub struct NewDirectory { + // registry + inner: N, +} + + + +#[derive(Clone)] +pub struct Directory +where + D: Discover +{ + rx: watch::Receiver>, + close: Arc +} + + + +impl NewCachedDirectory +where + N: Registry + Clone + Send + Sync + 'static, +{ + + pub fn layer() -> impl tower_layer::Layer { + tower_layer::layer_fn(|inner: N| { + NewCachedDirectory { + // inner is registry + inner: CachedDirectory::new(NewDirectory::new(inner)), + } + }) + } +} + + +impl NewService for NewCachedDirectory +where + T: Param, + // service registry + N: Registry + Clone + Send + Sync + 'static, +{ + type Service = BufferedDirectory; + + fn new_service(&self, target: T) -> Self::Service { + + self.inner.new_service(target.param()) + } +} + + +impl CachedDirectory +where + N: NewService +{ + + pub fn new(inner: N) -> Self { + CachedDirectory { + inner, + cache: Default::default() + } + } +} + + +impl NewService for CachedDirectory +where + T: Param, + // NewDirectory + N: NewService, + // Buffered directory + N::Service: Clone +{ + type Service = N::Service; + + fn new_service(&self, target: T) -> Self::Service { + let rpc_invocation = target.param(); + let service_name = rpc_invocation.get_target_service_unique_name(); + let mut cache = self.cache.lock().expect("cached directory lock failed."); + let value = cache.get(&service_name).map(|val|val.clone()); + match value { + None => { + let new_service = self.inner.new_service(target); + cache.insert(service_name, new_service.clone()); + new_service + }, + Some(value) => value + } + } +} + + +impl NewDirectory { + + pub fn new(inner: N) -> Self { + NewDirectory { + inner + } + } +} + +impl NewService for NewDirectory +where + T: Param, + // service registry + N: Registry + Clone + Send + Sync + 'static, +{ + type Service = BufferedDirectory; + + fn new_service(&self, target: T) -> Self::Service { + + let service_name = target.param().get_target_service_unique_name(); + + let registry = self.inner.clone(); + + let (tx, rx) = channel(1024); + + tokio::spawn(async move { + + let receiver = registry.subscribe(service_name).await; + match receiver { + Err(e) => { + // error!("discover stream error: {}", e); + + }, + Ok(mut receiver) => { + loop { + let change = receiver.recv().await; + match change { + None => { + // debug!("discover stream closed."); + break; + }, + Some(change) => { + let _ = tx.send(change); + } + } + } + } + } + + }); + + Buffer::new(Directory::new(ReceiverStream::new(rx)), 1024) + } + +} + + +impl Directory +where + // Discover + D: Discover + Unpin + Send + 'static, + // the key may be dubbo url + D::Key: Hash + Eq + Clone + Send, + // invoker new service + D::Service: NewService<()> + Clone + Send + Sync, +{ + + pub fn new(discover: D) -> Self { + + let mut discover = Box::pin(discover); + + let (tx, rx) = watch::channel(Vec::new()); + let close = Arc::new(Notify::new()); + let close_clone = close.clone(); + + tokio::spawn(async move { + let mut cache: HashMap = HashMap::new(); + + loop { + let changed = select! { + _ = close_clone.notified() => { + // info!("discover stream closed.") + return; + }, + changed = poll_fn(|cx| discover.as_mut().poll_discover(cx)) => { + changed + } + }; + let Some(changed) = changed else { + // debug!("discover stream closed."); + break; + }; + + match changed { + Err(e) => { + // error!("discover stream error: {}", e); + continue; + }, + Ok(changed) => match changed { + Change::Insert(k, v) => { + cache.insert(k, v); + }, + Change::Remove(k) => { + cache.remove(&k); + } + } + } + + let vec: Vec = cache.values().map(|v|v.clone()).collect(); + let _ = tx.send(vec); + } + + }); + Directory { + rx, + close + } + } +} + + +impl Service<()> for Directory +where + // Discover + D: Discover + Unpin + Send, + // the key may be dubbo url + D::Key: Hash + Eq + Clone + Send, + // invoker new service + D::Service: NewService<()> + Clone + Send + Sync, +{ + type Response = watch::Receiver>; + + type Error = StdError; + + type Future = future::Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + future::ok(self.rx.clone()) + } +} + +impl Drop for Directory +where + D: Discover, +{ + fn drop(&mut self) { + if Arc::strong_count(&self.close) == 1 { + self.close.notify_one(); + } + } +} \ No newline at end of file diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs new file mode 100644 index 00000000..81bcb68f --- /dev/null +++ b/dubbo/src/invoker/mod.rs @@ -0,0 +1,75 @@ +use dubbo_base::Url; +use tower_service::Service; + +use crate::{codegen::TripleInvoker, param::Param, svc::NewService}; + +#[derive(Clone)] +pub struct NewInvoker { + url: Url +} + +pub enum InvokerComponent { + TripleInvoker(TripleInvoker) +} + + +impl NewInvoker { + pub fn new(url: Url) -> Self { + Self { + url + } + } +} + +impl From for NewInvoker { + fn from(url: String) -> Self { + Self { + url: Url::from_url(&url).unwrap() + } + } +} + +impl Param for NewInvoker { + fn param(&self) -> Url { + self.url.clone() + } +} + +impl NewService<()> for NewInvoker { + type Service = InvokerComponent; + fn new_service(&self, _: ()) -> Self::Service { + // todo create another invoker + InvokerComponent::TripleInvoker(TripleInvoker::new(self.url.clone())) + } +} + + +impl Service> for InvokerComponent +where + B: http_body::Body + Unpin + Send + 'static, + B::Error: Into, + B::Data: Send + Unpin, +{ + type Response = http::Response; + + type Error = crate::Error; + + type Future = crate::BoxFuture; + + fn call(&mut self, req: http::Request) -> Self::Future { + match self { + InvokerComponent::TripleInvoker(invoker) => invoker.call(req), + } + } + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self { + InvokerComponent::TripleInvoker(invoker) => >>::poll_ready(invoker, cx), + } + } +} + +// InvokerComponent::TripleInvoker(invoker) => >>::poll_ready(invoker, cx), \ No newline at end of file diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index 63c09d3a..252e01c7 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -26,6 +26,12 @@ pub mod registry; pub mod status; pub mod triple; pub mod utils; +pub mod directory; +pub mod route; +pub mod loadbalancer; +pub mod invoker; +pub mod param; +pub mod svc; use http_body::Body; use std::{future::Future, pin::Pin}; diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs new file mode 100644 index 00000000..822794cc --- /dev/null +++ b/dubbo/src/loadbalancer/mod.rs @@ -0,0 +1,102 @@ +use futures_core::future::BoxFuture; +use tower::{discover::ServiceList, ServiceExt}; +use tower_service::Service; + +use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param}; + +pub struct NewLoadBalancer { + inner: N, +} + +#[derive(Clone)] +pub struct LoadBalancer { + inner: S, // Routes service +} + +impl NewLoadBalancer { + + pub fn layer() -> impl tower_layer::Layer{ + + tower_layer::layer_fn(|inner| { + + NewLoadBalancer { + inner // NewRoutes + } + }) + } +} + +impl NewService for NewLoadBalancer +where + T: Param + Clone, + // NewRoutes + N: NewService, +{ + + type Service = LoadBalancer; + + fn new_service(&self, target: T) -> Self::Service { + // Routes service + let svc = self.inner.new_service(target); + + LoadBalancer { + inner: svc + } + + } +} + +impl Service for LoadBalancer +where + Req: Send + 'static, + // Routes service + N: Service<(), Response = Vec> + Clone, + N::Error: Into + Send, + N::Future: Send + 'static, + // new invoker + Nsv: NewService<()> + Send, + Nsv::Service: Service + Send, + // invoker + >::Error: Into + Send, + >::Future: Send + 'static, +{ + + type Response = >::Response; + + type Error = StdError; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + + } + + fn call(&mut self, req: Req) -> Self::Future { + let routes = self.inner.call(()); + + let fut = async move { + let routes = routes.await; + + let routes: Vec = match routes { + Err(e) => return Err(Into::::into(e)), + Ok(routes) => routes + }; + + let service_list: Vec<_> = routes.iter().map(|inv| { + let invoker = inv.new_service(()); + tower::load::Constant::new(invoker, 1) + + }).collect(); + + let service_list = ServiceList::new(service_list); + + let p2c = tower::balance::p2c::Balance::new(service_list); + + + p2c.oneshot(req).await + }; + + Box::pin(fut) + } +} diff --git a/dubbo/src/param.rs b/dubbo/src/param.rs new file mode 100644 index 00000000..b57f98eb --- /dev/null +++ b/dubbo/src/param.rs @@ -0,0 +1,12 @@ +pub trait Param { + + fn param(&self) -> T; +} + + +impl Param for T { + + fn param(&self) -> T::Owned { + self.to_owned() + } +} diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index c941ae5d..9d35a525 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -16,17 +16,15 @@ */ use std::{ - fmt::Debug, task::{Context, Poll}, }; use async_trait::async_trait; +use aws_smithy_http::body::SdkBody; use tower_service::Service; use dubbo_base::Url; -use crate::triple::client::replay::ClonedBody; - pub mod server_desc; pub mod triple; @@ -43,18 +41,19 @@ pub trait Exporter { fn unexport(&self); } -pub trait Invoker: Debug + Service { +pub trait Invoker: Service { fn get_url(&self) -> Url; } pub type BoxExporter = Box; pub type BoxInvoker = Box< dyn Invoker< - http::Request, + http::Request, Response = http::Response, Error = crate::Error, Future = crate::BoxFuture, crate::Error>, - > + Send, + > + Send + + Sync, >; pub struct WrapperInvoker(T); diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 9f9c7698..685c4344 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -22,19 +22,12 @@ use std::{ }; use tower_service::Service; -use crate::{ - protocol::Invoker, - triple::{ - client::{builder::ClientBoxService, replay::ClonedBody}, - transport::connection::Connection, - }, - utils::boxed_clone::BoxCloneService, -}; +use crate::triple::transport::{connection::Connection, self}; #[derive(Clone)] pub struct TripleInvoker { url: Url, - conn: ClientBoxService, + conn: Connection, } impl TripleInvoker { @@ -42,7 +35,7 @@ impl TripleInvoker { let uri = http::Uri::from_str(&url.to_url()).unwrap(); Self { url, - conn: BoxCloneService::new(Connection::new().with_host(uri)), + conn: Connection::new().with_host(uri), } } } @@ -53,14 +46,19 @@ impl Debug for TripleInvoker { } } -impl Service> for TripleInvoker { +impl Service> for TripleInvoker +where + B: http_body::Body + Unpin + Send + 'static, + B::Error: Into, + B::Data: Send + Unpin, +{ type Response = http::Response; type Error = crate::Error; type Future = crate::BoxFuture; - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { self.conn.call(req) } @@ -68,12 +66,7 @@ impl Service> for TripleInvoker { &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.conn.poll_ready(cx) + >>::poll_ready(&mut self.conn, cx) } } -impl Invoker> for TripleInvoker { - fn get_url(&self) -> Url { - self.url.clone() - } -} diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index 2a95452a..ab2297f3 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -20,6 +20,7 @@ pub mod integration; pub mod memory_registry; pub mod protocol; pub mod types; +pub mod n_registry; use std::{ fmt::{Debug, Formatter}, @@ -28,6 +29,7 @@ use std::{ use dubbo_base::Url; + pub type RegistryNotifyListener = Arc; pub trait Registry { fn register(&mut self, url: Url) -> Result<(), crate::StdError>; diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs new file mode 100644 index 00000000..69300f2a --- /dev/null +++ b/dubbo/src/registry/n_registry.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use dubbo_base::Url; +use tokio::sync::mpsc::{Receiver, channel}; +use tower::discover::Change; + + +use crate::{StdError, invoker::NewInvoker}; + +type DiscoverStream = Receiver, StdError>>; + +#[async_trait] +pub trait Registry { + + async fn register(&self, url: Url) -> Result<(), StdError>; + + async fn unregister(&self, url: Url) -> Result<(), StdError>; + + // todo service_name change to url + async fn subscribe(&self, service_name: String) -> Result; + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError>; +} + +#[derive(Clone)] +pub struct ArcRegistry { + inner: Arc +} + + +pub enum RegistryComponent { + NacosRegistry, + ZookeeperRegistry, + StaticRegistry(StaticRegistry), +} + + +pub struct StaticRegistry { + urls: Vec +} + +impl ArcRegistry { + + pub fn new(registry: impl Registry + Send + Sync + 'static) -> Self { + Self { inner: Arc::new(registry) } + } +} + +#[async_trait] +impl Registry for ArcRegistry { + + async fn register(&self, url: Url) -> Result<(), StdError> { + self.register(url).await + } + + async fn unregister(&self, url: Url) -> Result<(), StdError> { + self.unregister(url).await + } + + async fn subscribe(&self, service_name: String) -> Result { + self.subscribe(service_name).await + } + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { + self.unsubscribe(url).await + } +} + + + + +#[async_trait] +impl Registry for RegistryComponent { + async fn register(&self, url: Url) -> Result<(), StdError> { + todo!() + } + + async fn unregister(&self, url: Url) -> Result<(), StdError> { + todo!() + } + + async fn subscribe(&self, service_name: String) -> Result { + todo!() + } + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { + todo!() + } +} + + +impl StaticRegistry { + + pub fn new(urls: Vec) -> Self { + Self { + urls + } + } +} + + +#[async_trait] +impl Registry for StaticRegistry { + async fn register(&self, url: Url) -> Result<(), StdError> { + todo!() + } + + async fn unregister(&self, url: Url) -> Result<(), StdError> { + todo!() + } + + async fn subscribe(&self, service_name: String) -> Result { + let (tx, rx) = channel(self.urls.len()); + for url in self.urls.iter() { + let invoker = NewInvoker::new(url.clone()); + let change = Ok(Change::Insert(service_name.clone(), invoker)); + tx.send(change).await?; + } + + Ok(rx) + } + + async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { + todo!() + } +} \ No newline at end of file diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs new file mode 100644 index 00000000..dfbe73e3 --- /dev/null +++ b/dubbo/src/route/mod.rs @@ -0,0 +1,196 @@ +use std::{sync::{Arc, Mutex}, collections::HashMap}; + +use futures_core::{Future, ready}; +use futures_util::future::Ready; +use pin_project::pin_project; +use tokio::{sync::watch, pin}; +use tower::{util::FutureService, buffer::Buffer}; +use tower_service::Service; + +use crate::{StdError, codegen::RpcInvocation, svc::NewService, param::Param, invocation::Invocation}; + +pub struct NewRoutes { + inner: N, +} + +pub struct NewRoutesCache +where + N: NewService +{ + inner: N, + cache: Arc>>, +} + +#[pin_project] +pub struct NewRoutesFuture { + #[pin] + inner: N, + target: T, +} + +#[derive(Clone)] +pub struct Routes { + target: T, + new_invokers: Vec, + invokers_receiver: watch::Receiver>, +} + +impl NewRoutes { + pub fn new(inner: N) -> Self { + Self { + inner, + } + } +} + + +impl NewService for NewRoutes +where + T: Param + Clone + Send + 'static, + // NewDirectory + N: NewService, + // Directory + N::Service: Service<(), Response = watch::Receiver>> + Unpin + Send + 'static, + >::Error: Into, + // new invoker service + Nsv: NewService<()> + Clone + Send + Sync + 'static, +{ + + type Service = Buffer>::Service, T>, Routes>, ()>; + + fn new_service(&self, target: T) -> Self::Service { + let inner = self.inner.new_service(target.clone()); + + Buffer::new(FutureService::new(NewRoutesFuture { + inner, + target, + }), 1024) + } +} +impl NewRoutesCache +where + N: NewService, + >::Service: Service<(), Response = watch::Receiver>> + Unpin + Send + 'static, + >::Error: Into, + Nsv: NewService<()> + Clone + Send + Sync + 'static, + +{ + pub fn layer() -> impl tower_layer::Layer>> { + tower_layer::layer_fn(|inner: N| { + NewRoutesCache::new(NewRoutes::new(inner)) + }) + } + + +} + + +impl NewRoutesCache +where + N: NewService +{ + pub fn new(inner: N) -> Self { + Self { + inner, + cache: Default::default(), + } + } +} + +impl NewService for NewRoutesCache +where + T: Param, + N: NewService, + N::Service: Clone, +{ + type Service = N::Service; + + fn new_service(&self, target: T) -> Self::Service { + let rpc_inv = target.param(); + let service_name = rpc_inv.get_target_service_unique_name(); + + let mut cache = self.cache.lock().expect("RoutesCache get lock failed"); + + let service = cache.get(&service_name); + match service { + Some(service) => service.clone(), + None => { + let service = self.inner.new_service(rpc_inv); + cache.insert(service_name, service.clone()); + service + } + } + } +} + + +impl Future for NewRoutesFuture +where + T: Param + Clone, + // Directory + N: Service<(), Response = watch::Receiver>> + Unpin, + N::Error: Into, + // new invoker service + Nsv: NewService<()> + Clone, +{ + type Output = Result, StdError>; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + + let this = self.get_mut(); + + let target = this.target.clone(); + + let _ = ready!(this.inner.poll_ready(cx)).map_err(Into::into)?; + + + let call = this.inner.call(()); + pin!(call); + + let mut invokers_receiver = ready!(call.poll(cx).map_err(Into::into))?; + let new_invokers = { + let wait_for = invokers_receiver.wait_for(|invs|!invs.is_empty()); + pin!(wait_for); + + let changed = ready!(wait_for.poll(cx))?; + + changed.clone() + }; + + + std::task::Poll::Ready(Ok(Routes { + invokers_receiver, + new_invokers, + target, + })) + } +} + + + +impl Service<()> for Routes +where + T: Param + Clone, + // new invoker service + Nsv: NewService<()> + Clone, +{ + type Response = Vec; + + type Error = StdError; + + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll> { + let has_change = self.invokers_receiver.has_changed()?; + if has_change { + self.new_invokers = self.invokers_receiver.borrow_and_update().clone(); + } + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + // some router operator + // if new_invokers changed, send new invokers to routes_rx after router operator + futures_util::future::ok(self.new_invokers.clone()) + } +} \ No newline at end of file diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs new file mode 100644 index 00000000..56dc3304 --- /dev/null +++ b/dubbo/src/svc.rs @@ -0,0 +1,88 @@ +use std::{sync::Arc, pin::Pin, marker::PhantomData}; + +use futures_core::Future; +use tower::ServiceExt; +use tower_service::Service; + +pub trait NewService { + + type Service; + + fn new_service(&self, target: T) -> Self::Service; + +} + + +pub struct ArcNewService { + inner: Arc + Send + Sync>, +} + + +impl ArcNewService { + pub fn layer() -> impl tower_layer::Layer + Clone + Copy + where + N: NewService + Send + Sync + 'static, + S: Send + 'static, + { + tower_layer::layer_fn(Self::new) + } + + pub fn new(inner: N) -> Self + where + N: NewService + Send + Sync + 'static, + S: Send + 'static, + { + Self { + inner: Arc::new(inner), + } + } +} + +impl Clone for ArcNewService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl NewService for ArcNewService { + type Service = S; + + fn new_service(&self, t: T) -> S { + self.inner.new_service(t) + } +} + +// inner: Box> + Send>>> + Send>, +pub struct BoxedService { + inner: N, + _mark: PhantomData +} + +impl BoxedService { + + pub fn layer() -> impl tower_layer::Layer{ + tower_layer::layer_fn(|inner: N| { + Self { + inner, + _mark: PhantomData + } + }) + } +} + + +// impl NewService for BoxedService +// where +// N: NewService, +// N::Service: Service + Send, +// >::Future: Send + 'static, +// { +// type Service = Box>::Response, Error = >::Error, Future = Pin>::Response, >::Error>> + Send>>> + Send>; + +// fn new_service(&self, target: T) -> Self::Service { +// let service = self.inner.new_service(target); +// Box::new(service.map_future(|f|Box::pin(f) as _)) +// } +// } diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index a9756f7e..89d7a00a 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -15,27 +15,28 @@ * limitations under the License. */ + use std::sync::Arc; use crate::{ - cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory}, - codegen::{RegistryDirectory, TripleInvoker}, - protocol::BoxInvoker, - utils::boxed_clone::BoxCloneService, + utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::{NewRoutes, NewRoutesCache}, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody, }; -use dubbo_base::Url; -use super::replay::ClonedBody; +use aws_smithy_http::body::SdkBody; +use dubbo_base::Url; +use tower::ServiceBuilder; pub type ClientBoxService = - BoxCloneService, http::Response, crate::Error>; + BoxCloneService, http::Response, crate::Error>; + -#[allow(dead_code)] -#[derive(Clone, Debug, Default)] +pub type ServiceMK = Arc>>>>>; + +#[derive(Default)] pub struct ClientBuilder { pub timeout: Option, pub connector: &'static str, - directory: Option>>, + registry: Option, pub direct: bool, host: String, } @@ -45,7 +46,7 @@ impl ClientBuilder { ClientBuilder { timeout: None, connector: "", - directory: None, + registry: None, direct: false, host: "".to_string(), } @@ -55,7 +56,7 @@ impl ClientBuilder { Self { timeout: None, connector: "", - directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))), + registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))), direct: true, host: host.to_string(), } @@ -68,31 +69,23 @@ impl ClientBuilder { } } - /// host: http://0.0.0.0:8888 - pub fn with_directory(self, directory: Box) -> Self { - Self { - directory: Some(Arc::new(directory)), - ..self - } - } - - pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self { - Self { - directory: Some(Arc::new(Box::new(registry))), + pub fn with_registry(self, registry: RegistryComponent) -> Self { + Self { + registry: Some(ArcRegistry::new(registry)), ..self } } pub fn with_host(self, host: &'static str) -> Self { Self { - directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))), + registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))), ..self } } pub fn with_connector(self, connector: &'static str) -> Self { Self { - connector: connector, + connector, ..self } } @@ -101,15 +94,18 @@ impl ClientBuilder { Self { direct, ..self } } - pub fn build(self, service_name: String) -> Option { - if self.direct { - return Some(Box::new(TripleInvoker::new( - Url::from_url(&self.host).unwrap(), - ))); - } + pub fn build(mut self) -> ServiceMK { + + + let registry = self.registry.take().expect("registry must not be empty"); - let cluster = MockCluster::default().join(Box::new(MockDirectory::new(service_name))); + let mk_service = ServiceBuilder::new() + .layer(NewCluster::layer()) + .layer(NewLoadBalancer::layer()) + .layer(NewRoutesCache::layer()) + .layer(NewCachedDirectory::layer()) + .service(registry); - return Some(cluster); + Arc::new(mk_service) } } diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs index 013d9e30..7a8e0131 100644 --- a/dubbo/src/triple/client/mod.rs +++ b/dubbo/src/triple/client/mod.rs @@ -16,6 +16,6 @@ */ pub mod builder; -pub mod replay; pub mod triple; + pub use triple::TripleClient; diff --git a/dubbo/src/triple/client/replay.rs b/dubbo/src/triple/client/replay.rs deleted file mode 100644 index 195d7509..00000000 --- a/dubbo/src/triple/client/replay.rs +++ /dev/null @@ -1,441 +0,0 @@ -use std::{ - collections::VecDeque, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, -}; - -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_core::{ready, stream::BoxStream, Stream}; -use futures_util::StreamExt; -use http_body::Body; -use pin_project::pin_project; - -use crate::status::Status; - -type BoxBytesStream = BoxStream<'static, Result>; -pub struct ClonedBytesStream { - shared: Arc>>, - owned: Option, - replay: bool, -} - -pub struct OwnedBytesStream { - bytes_stream: BoxBytesStream, - buf: InnerBuffer, -} - -#[derive(Clone)] -pub struct InnerBuffer { - bufs: VecDeque, - capacity: usize, -} - -impl ClonedBytesStream { - pub fn new(buffer_capacity: usize, stream: BoxBytesStream) -> Self { - Self { - shared: Arc::new(Mutex::new(None)), - owned: Some(OwnedBytesStream { - bytes_stream: stream, - buf: InnerBuffer { - bufs: Default::default(), - capacity: buffer_capacity, - }, - }), - replay: false, - } - } -} - -impl Clone for ClonedBytesStream { - fn clone(&self) -> Self { - Self { - shared: self.shared.clone(), - owned: None, - replay: true, - } - } -} - -impl Drop for ClonedBytesStream { - fn drop(&mut self) { - if let Some(owned) = self.owned.take() { - let lock = self.shared.lock(); - if let Ok(mut lock) = lock { - *lock = Some(owned); - } - } - } -} - -impl InnerBuffer { - pub fn push_bytes(&mut self, bytes: Bytes) -> Bytes { - self.bufs.push_back(bytes.clone()); - bytes - } - - pub fn is_capped(&self) -> bool { - self.capacity == 0 - } -} - -impl Buf for InnerBuffer { - fn remaining(&self) -> usize { - self.bufs.iter().map(|bytes| bytes.remaining()).sum() - } - - fn chunk(&self) -> &[u8] { - self.bufs.front().map(Buf::chunk).unwrap_or(&[]) - } - - fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { - if dst.is_empty() { - return 0; - } - - let mut filled = 0; - - for bytes in self.bufs.iter() { - filled += bytes.chunks_vectored(&mut dst[filled..]) - } - - filled - } - - fn advance(&mut self, mut cnt: usize) { - while cnt > 0 { - let first = self.bufs.front_mut(); - if first.is_none() { - break; - } - let first = first.unwrap(); - let first_remaining = first.remaining(); - if first_remaining > cnt { - first.advance(cnt); - break; - } - - first.advance(first_remaining); - cnt = cnt - first_remaining; - self.bufs.pop_front(); - } - } - - fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes { - match self.bufs.front_mut() { - Some(buf) if len <= buf.remaining() => { - let bytes = buf.copy_to_bytes(len); - if buf.remaining() == 0 { - self.bufs.pop_front(); - } - bytes - } - _ => { - let mut bytes = BytesMut::with_capacity(len); - bytes.put(self.take(len)); - bytes.freeze() - } - } - } -} - -pub enum BytesData { - BufferedBytes(InnerBuffer), - OriginBytes(Bytes), -} - -impl Buf for BytesData { - fn remaining(&self) -> usize { - match self { - BytesData::BufferedBytes(bytes) => bytes.remaining(), - BytesData::OriginBytes(bytes) => bytes.remaining(), - } - } - - fn chunk(&self) -> &[u8] { - match self { - BytesData::BufferedBytes(bytes) => bytes.chunk(), - BytesData::OriginBytes(bytes) => bytes.chunk(), - } - } - - fn advance(&mut self, cnt: usize) { - match self { - BytesData::BufferedBytes(bytes) => bytes.advance(cnt), - BytesData::OriginBytes(bytes) => bytes.advance(cnt), - } - } - - fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes { - match self { - BytesData::BufferedBytes(bytes) => bytes.copy_to_bytes(len), - BytesData::OriginBytes(bytes) => bytes.copy_to_bytes(len), - } - } -} - -impl Stream for ClonedBytesStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut_self = self.get_mut(); - - let owned_bytes_stream = mut_self.owned.get_or_insert_with(|| { - let lock = mut_self.shared.lock(); - if let Err(e) = lock { - panic!("bytes streams get shared data lock failed. {}", e); - } - let mut data = lock.unwrap(); - - data.take().expect("cannot get shared bytes streams.") - }); - - if mut_self.replay { - mut_self.replay = false; - if owned_bytes_stream.buf.has_remaining() { - return Poll::Ready(Some(Ok(BytesData::BufferedBytes( - owned_bytes_stream.buf.clone(), - )))); - } - } - - let next = owned_bytes_stream.bytes_stream.poll_next_unpin(cx); - - let next = match ready!(next) { - Some(next) => match next { - Ok(next) => next, - Err(e) => return Poll::Ready(Some(Err(e))), - }, - _ => return Poll::Ready(None), - }; - - let len = next.len(); - owned_bytes_stream.buf.capacity = owned_bytes_stream.buf.capacity.saturating_sub(len); - let next = if owned_bytes_stream.buf.is_capped() { - if owned_bytes_stream.buf.has_remaining() { - owned_bytes_stream.buf.bufs = VecDeque::default(); - } - next - } else { - owned_bytes_stream.buf.push_bytes(next) - }; - - return Poll::Ready(Some(Ok(BytesData::OriginBytes(next)))); - } -} - -#[pin_project] -#[derive(Clone)] -pub struct ClonedBody(#[pin] ClonedBytesStream); - -impl ClonedBody { - pub fn new(inner_body: T) -> Self - where - T: Stream> + Send + 'static, - { - let inner_body = Box::pin(inner_body); - let inner_body = ClonedBytesStream::new(1024 * 64, inner_body); - ClonedBody(inner_body) - } -} - -impl Body for ClonedBody { - type Data = BytesData; - - type Error = Status; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - self.project().0.poll_next(cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } - - fn size_hint(&self) -> http_body::SizeHint { - http_body::SizeHint::default() - } -} - -#[cfg(test)] -pub mod tests { - use super::*; - - #[tokio::test] - async fn test_cloned_bytes_stream() { - let bytes1 = Ok(Bytes::from("hello".as_bytes())); - let bytes2 = Ok(Bytes::from(" world".as_bytes())); - let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); - - let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); - - let mut origin_stream = ClonedBytesStream::new(64 * 1024, Box::pin(stream)); - let hello_bytes = origin_stream.next().await; - - assert!(hello_bytes.is_some()); - - let hello_bytes = hello_bytes.unwrap(); - - assert!(hello_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(hello_bytes.unwrap()), "hello"); - - let world_bytes = origin_stream.next().await; - - assert!(world_bytes.is_some()); - - let world_bytes = world_bytes.unwrap(); - - assert!(world_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(world_bytes.unwrap()), " world"); - - let end_bytes = origin_stream.next().await; - - assert!(end_bytes.is_some()); - - let end_bytes = end_bytes.unwrap(); - - assert!(end_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(end_bytes.unwrap()), ", end test!"); - - let none_bytes = origin_stream.next().await; - - assert!(none_bytes.is_none()); - } - - #[tokio::test] - async fn test_cloned_bytes_stream_and_replay() { - let bytes1 = Ok(Bytes::from("hello".as_bytes())); - let bytes2 = Ok(Bytes::from(" world".as_bytes())); - let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); - - let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); - - let mut origin_stream = ClonedBytesStream::new(64 * 1024, Box::pin(stream)); - origin_stream.next().await; - origin_stream.next().await; - origin_stream.next().await; - - let none_bytes = origin_stream.next().await; - assert!(none_bytes.is_none()); - - let mut clone_origin_stream = origin_stream.clone(); - - drop(origin_stream); - - let cached_bytes = clone_origin_stream.next().await; - assert!(cached_bytes.is_some()); - - let cached_bytes = cached_bytes.unwrap(); - - assert!(cached_bytes.is_ok()); - - assert_eq!( - bytes_data_to_str(cached_bytes.unwrap()), - "hello world, end test!" - ); - - let none_bytes = clone_origin_stream.next().await; - assert!(none_bytes.is_none()); - } - - #[tokio::test] - async fn test_replay_stream_continue_poll_next() { - let bytes1 = Ok(Bytes::from("hello".as_bytes())); - let bytes2 = Ok(Bytes::from(" world".as_bytes())); - let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); - - let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); - - let mut origin_stream = ClonedBytesStream::new(1024 * 64, Box::pin(stream)); - origin_stream.next().await; - origin_stream.next().await; - - let mut clone_origin_stream = origin_stream.clone(); - - drop(origin_stream); - - let cached_bytes = clone_origin_stream.next().await; - assert!(cached_bytes.is_some()); - - let cached_bytes = cached_bytes.unwrap(); - - assert!(cached_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(cached_bytes.unwrap()), "hello world"); - - let next_bytes = clone_origin_stream.next().await; - - assert!(next_bytes.is_some()); - - let next_bytes = next_bytes.unwrap(); - - assert!(next_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(next_bytes.unwrap()), ", end test!"); - - let none_bytes = clone_origin_stream.next().await; - assert!(none_bytes.is_none()); - } - - #[tokio::test] - async fn test_cloned_bytes_stream_reached_max_capacity() { - let bytes1 = Ok(Bytes::from("hello".as_bytes())); - let bytes2 = Ok(Bytes::from(" world".as_bytes())); - let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); - - let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); - - let mut origin_stream = ClonedBytesStream::new(5, Box::pin(stream)); - let hello_bytes = origin_stream.next().await; - - assert!(hello_bytes.is_some()); - - let hello_bytes = hello_bytes.unwrap(); - - assert!(hello_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(hello_bytes.unwrap()), "hello"); - - let world_bytes = origin_stream.next().await; - - assert!(world_bytes.is_some()); - - let world_bytes = world_bytes.unwrap(); - - assert!(world_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(world_bytes.unwrap()), " world"); - - let mut cloned_origin_stream = origin_stream.clone(); - - drop(origin_stream); - - let end_bytes = cloned_origin_stream.next().await; - - assert!(end_bytes.is_some()); - - let end_bytes = end_bytes.unwrap(); - - assert!(end_bytes.is_ok()); - - assert_eq!(bytes_data_to_str(end_bytes.unwrap()), ", end test!"); - - let none_bytes = cloned_origin_stream.next().await; - assert!(none_bytes.is_none()); - } - - fn bytes_data_to_str(mut bytes_data: BytesData) -> String { - let len = bytes_data.remaining(); - let bytes = bytes_data.copy_to_bytes(len); - String::from_utf8(bytes.to_vec()).unwrap() - } -} diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index ace7aafd..091e0207 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -15,40 +15,43 @@ * limitations under the License. */ -use std::str::FromStr; use futures_util::{future, stream, StreamExt, TryStreamExt}; +use aws_smithy_http::body::SdkBody; use http::HeaderValue; +use tower_service::Service; -use super::{builder::ClientBuilder, replay::ClonedBody}; +use super::builder::{ClientBuilder, ServiceMK}; use crate::codegen::RpcInvocation; +use crate::svc::NewService; use crate::{ - invocation::{IntoStreamingRequest, Invocation, Metadata, Request, Response}, + invocation::{IntoStreamingRequest, Metadata, Request, Response}, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; -#[derive(Debug, Default, Clone)] +#[derive(Clone)] pub struct TripleClient { pub(crate) send_compression_encoding: Option, - pub(crate) builder: Option, + pub(crate) mk: ServiceMK, } impl TripleClient { pub fn connect(host: String) -> Self { let builder = ClientBuilder::from_static(&host).with_direct(true); + let mk = builder.build(); TripleClient { send_compression_encoding: Some(CompressionEncoding::Gzip), - builder: Some(builder), + mk, } } pub fn new(builder: ClientBuilder) -> Self { TripleClient { send_compression_encoding: Some(CompressionEncoding::Gzip), - builder: Some(builder), + mk: builder.build(), } } @@ -56,9 +59,8 @@ impl TripleClient { &self, uri: http::Uri, path: http::uri::PathAndQuery, - body: ClonedBody, - invocation: RpcInvocation, - ) -> http::Request { + body: SdkBody, + ) -> http::Request { let mut parts = uri.into_parts(); parts.path_and_query = Some(path); @@ -110,8 +112,6 @@ impl TripleClient { http::HeaderValue::from_static("gzip"), ); - req.extensions_mut().insert(invocation); - // const ( // TripleContentType = "application/grpc+proto" // TripleUserAgent = "grpc-go/1.35.0-dev" @@ -140,26 +140,25 @@ impl TripleClient { M2: Send + Sync + 'static, { let req = req.map(|m| stream::once(future::ready(m))); - let en = encode( + let body_stream = encode( codec.encoder(), req.into_inner().map(Ok), self.send_compression_encoding, ) .into_stream(); - let body = ClonedBody::new(en); + let body = hyper::Body::wrap_stream(body_stream); + + + let mut invoker = self.mk.new_service(invocation); - let mut conn = self - .builder - .clone() - .unwrap() - .build(invocation.get_target_service_unique_name()) - .unwrap(); - let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, body, invocation); + let request = http::Request::builder() + .body(body).unwrap(); - let response = conn - .call(req) + + + let response = invoker + .call(request) .await .map_err(|err| crate::status::Status::from_error(err.into())); @@ -210,21 +209,17 @@ impl TripleClient { self.send_compression_encoding, ) .into_stream(); + let body = hyper::Body::wrap_stream(en); + + let mut invoker = self.mk.new_service(invocation); - let body = ClonedBody::new(en); - let mut conn = self - .builder - .clone() - .unwrap() - .build(invocation.get_target_service_unique_name()) - .unwrap(); + let request = http::Request::builder() + .body(body).unwrap(); - let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, body, invocation); - let response = conn - .call(req) + let response = invoker + .call(request) .await .map_err(|err| crate::status::Status::from_error(err.into())); @@ -259,21 +254,18 @@ impl TripleClient { self.send_compression_encoding, ) .into_stream(); - let body = ClonedBody::new(en); + let body = hyper::Body::wrap_stream(en); + let mut invoker = self.mk.new_service(invocation); + + + let request = http::Request::builder() + .body(body).unwrap(); - let mut conn = self - .builder - .clone() - .unwrap() - .build(invocation.get_target_service_unique_name()) - .unwrap(); - let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, body, invocation); // let mut conn = Connection::new().with_host(http_uri); - let response = conn - .call(req) + let response = invoker + .call(request) .await .map_err(|err| crate::status::Status::from_error(err.into())); @@ -324,21 +316,16 @@ impl TripleClient { self.send_compression_encoding, ) .into_stream(); + let body = hyper::Body::wrap_stream(en); + let mut invoker = self.mk.new_service(invocation); - let body = ClonedBody::new(en); - let mut conn = self - .builder - .clone() - .unwrap() - .build(invocation.get_target_service_unique_name()) - .unwrap(); + let request = http::Request::builder() + .body(body).unwrap(); - let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, body, invocation); - let response = conn - .call(req) + let response = invoker + .call(request) .await .map_err(|err| crate::status::Status::from_error(err.into())); diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index 0701ec3b..e756d5c7 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -36,12 +36,16 @@ pub mod echo_client { &mut self, request: Request, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); self.inner.unary(request, codec, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -49,51 +53,51 @@ pub mod echo_client { &mut self, request: Request, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ServerStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); - self.inner - .server_streaming(request, codec, path, invocation) - .await + self.inner.server_streaming(request, codec, path, invocation).await } /// ClientStreamingEcho is client side streaming. pub async fn client_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ClientStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); - self.inner - .client_streaming(request, codec, path, invocation) - .await + self.inner.client_streaming(request, codec, path, invocation).await } /// BidirectionalStreamingEcho is bidi streaming. pub async fn bidirectional_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("BidirectionalStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); - self.inner - .bidi_streaming(request, codec, path, invocation) - .await + self.inner.bidi_streaming(request, codec, path, invocation).await } } } @@ -110,7 +114,9 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream> + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -124,14 +130,19 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream> + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result, dubbo::status::Status>; + ) -> Result< + Response, + dubbo::status::Status, + >; } /// Echo is the echo service. #[derive(Debug)] @@ -161,7 +172,10 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -174,18 +188,26 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -196,22 +218,32 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc for ServerStreamingEchoServer { + impl ServerStreamingSvc + for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.server_streaming_echo(request).await }; + let fut = async move { + inner.server_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -224,23 +256,31 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc for ClientStreamingEchoServer { + impl ClientStreamingSvc + for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.client_streaming_echo(request).await }; + let fut = async move { + inner.client_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -253,41 +293,56 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc for BidirectionalStreamingEchoServer { + impl StreamingSvc + for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = - async move { inner.bidirectional_streaming_echo(request).await }; + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server - .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) .await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index eed3e522..1d59cdfa 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -35,20 +35,7 @@ async fn main() { dubbo_logger::init(); let mut builder = ClientBuilder::new(); - - if let Ok(zk_servers) = env::var("ZOOKEEPER_SERVERS") { - let zkr = ZookeeperRegistry::new(&zk_servers); - let directory = RegistryDirectory::new(Box::new(zkr)); - builder = builder.with_directory(Box::new(directory)); - } else if let Ok(nacos_url_str) = env::var("NACOS_URL") { - // NACOS_URL=nacos://mse-96efa264-p.nacos-ans.mse.aliyuncs.com - let nacos_url = Url::from_url(&nacos_url_str).unwrap(); - let registry = NacosRegistry::new(nacos_url); - let directory = RegistryDirectory::new(Box::new(registry)); - builder = builder.with_directory(Box::new(directory)); - } else { - builder = builder.with_host("http://127.0.0.1:8888"); - } + builder.with_host("http://127.0.0.1:8888"); let mut cli = GreeterClient::new(builder); diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml index 2437bc0b..798253a6 100644 --- a/registry/nacos/Cargo.toml +++ b/registry/nacos/Cargo.toml @@ -9,7 +9,7 @@ repository = "https://github.com/apache/dubbo-rust.git" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -nacos-sdk = { version = "0.2.3", features = ["naming", "auth-by-http"] } +nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http"] } dubbo.workspace = true serde_json.workspace = true serde = { workspace = true, features = ["derive"] }