Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ on:
push:
branches: ["*"]
pull_request:
branches: ["*"]
branches:
- '*'
- 'refact/*'


jobs:
Expand Down
2 changes: 1 addition & 1 deletion common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ pub mod node;
pub mod url;

pub use node::Node;
pub use url::Url;
pub use url::Url;
25 changes: 10 additions & 15 deletions dubbo/src/cluster/failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ use std::task::Poll;

use futures_util::future;
use http::Request;
use tower::{ServiceExt, retry::Retry, util::Oneshot};
use tower::{retry::Retry, util::Oneshot, ServiceExt};
use tower_service::Service;

use crate::StdError;

pub struct Failover<N> {
inner: N // loadbalancer service
inner: N, // loadbalancer service
}

#[derive(Clone)]
pub struct FailoverPolicy;


impl<N> Failover<N> {
pub fn new(inner: N) -> Self {
Expand All @@ -25,14 +24,13 @@ impl<B, Res, E> tower::retry::Policy<Request<B>, Res, E> for FailoverPolicy
where
B: http_body::Body + Clone,
{

type Future = future::Ready<Self>;

fn retry(&self, req: &Request<B>, result: Result<&Res, &E>) -> Option<Self::Future> {
fn retry(&self, _req: &Request<B>, result: Result<&Res, &E>) -> Option<Self::Future> {
//TODO some error handling or logging
match result {
Ok(_) => None,
Err(_) => Some(future::ready(self.clone()))
Err(_) => Some(future::ready(self.clone())),
}
}

Expand All @@ -43,21 +41,18 @@ where
*clone.headers_mut() = req.headers().clone();
*clone.version_mut() = req.version();


Some(clone)
}
}



impl<N, B> Service<Request<B>> for Failover<N>
impl<N, B> Service<Request<B>> for Failover<N>
where
// B is CloneBody<B>
B: http_body::Body + Clone,
// loadbalancer service
N: Service<Request<B>> + Clone + 'static ,
N: Service<Request<B>> + Clone + 'static,
N::Error: Into<StdError>,
N::Future: Send
N::Future: Send,
{
type Response = N::Response;

Expand All @@ -68,9 +63,9 @@ where
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
let retry = Retry::new(FailoverPolicy, self.inner.clone());
retry.oneshot(req)
}
}
}
38 changes: 18 additions & 20 deletions dubbo/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* limitations under the License.
*/


use http::Request;
use tower_service::Service;

use crate::{codegen::RpcInvocation, svc::NewService, param::Param, invoker::clone_body::CloneBody};
use crate::{
codegen::RpcInvocation, invoker::clone_body::CloneBody, param::Param, svc::NewService,
};

use self::failover::Failover;

Expand All @@ -30,58 +31,55 @@ pub struct NewCluster<N> {
}

pub struct Cluster<S> {
inner: S // failover service
inner: S, // failover service
}


impl<N> NewCluster<N> {

pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
tower_layer::layer_fn(|inner: N| {
NewCluster {
inner, // new loadbalancer service
}
})
}
}

}

impl<S, T> NewService<T> for NewCluster<S>
impl<S, T> NewService<T> for NewCluster<S>
where
T: Param<RpcInvocation>,
T: Param<RpcInvocation>,
// new loadbalancer service
S: NewService<T>,
{

{
type Service = Cluster<Failover<S::Service>>;

fn new_service(&self, target: T) -> Self::Service {

fn new_service(&self, target: T) -> Self::Service {
Cluster {
inner: Failover::new(self.inner.new_service(target))
inner: Failover::new(self.inner.new_service(target)),
}
}
}
impl<S> Service<Request<hyper::Body>> for Cluster<S>

impl<S> Service<Request<hyper::Body>> for Cluster<S>
where
S: Service<Request<CloneBody>>,
{

type Response = S::Response;

type Error = S::Error;

type Future = S::Future;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
let (parts, body) = req.into_parts();
let clone_body = CloneBody::new(body);
let req = Request::from_parts(parts, clone_body);
let req = Request::from_parts(parts, clone_body);
self.inner.call(req)
}
}
3 changes: 1 addition & 2 deletions dubbo/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ pub use super::{
pub use crate::{
filter::{service::FilterService, Filter},
triple::{
client::builder::ClientBuilder,
server::builder::ServerBuilder,
client::builder::ClientBuilder, server::builder::ServerBuilder,
transport::connection::Connection,
},
};
Loading