From 78047c3431c1f0cd1cb56cf8f0699c9379a4a5be Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Thu, 25 Apr 2024 01:01:54 +0800 Subject: [PATCH 1/2] refact(triple): optimize metadata and header logic --- dubbo/src/codegen.rs | 2 +- dubbo/src/invocation.rs | 33 ++++++++++++- dubbo/src/protocol/triple/triple_invoker.rs | 4 ++ dubbo/src/triple/client/triple.rs | 51 +++++++++++++++++---- examples/echo/src/echo/client.rs | 17 ++++--- 5 files changed, 89 insertions(+), 18 deletions(-) diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 92774141..a0d1ca48 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -29,7 +29,7 @@ pub use tower_service::Service; pub use super::{ empty_body, - invocation::{IntoStreamingRequest, Request, Response, RpcInvocation}, + invocation::{IntoStreamingRequest, Request, Response, RpcInvocation, Metadata}, protocol::{triple::triple_invoker::TripleInvoker, Invoker}, triple::{ client::TripleClient, diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index 57503379..f824c9f4 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -144,11 +144,23 @@ where type Message = T::Item; - fn into_streaming_request(self) -> Request { + fn into_streaming_request(self) -> Request { Request::new(self) } } +impl IntoStreamingRequest for Request +where + T: Stream + Send + 'static, +{ + type Stream = T; + type Message = T::Item; + + fn into_streaming_request(self) -> Self { + self + } +} + // impl sealed::Sealed for T {} // pub mod sealed { @@ -167,6 +179,11 @@ impl Metadata { } } + pub fn insert(mut self, key: String, value: String) -> Self { + self.inner.insert(key, value); + self + } + pub fn from_headers(headers: http::HeaderMap) -> Self { let mut h: HashMap = HashMap::new(); for (k, v) in headers.into_iter() { @@ -196,10 +213,12 @@ pub trait Invocation { fn get_method_name(&self) -> String; } -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct RpcInvocation { target_service_unique_name: String, method_name: String, + + metadata: Metadata } impl RpcInvocation { @@ -211,9 +230,19 @@ impl RpcInvocation { self.method_name = method_name; self } + + pub fn with_metadata(mut self, metadata: Metadata) -> Self { + self.metadata = metadata; + self + } + pub fn unique_fingerprint(&self) -> String { format!("{}#{}", self.target_service_unique_name, self.method_name) } + + pub fn get_metadata(&self) -> Metadata { + self.metadata.clone() + } } impl Invocation for RpcInvocation { diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 29704f5f..516dab2e 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -71,6 +71,10 @@ impl TripleInvoker { .body(body) .unwrap(); + // add header of source + for (k, v) in parts.headers.iter() { + req.headers_mut().insert(k, v.to_owned()); + } // *req.version_mut() = http::Version::HTTP_2; req.headers_mut() .insert("method", HeaderValue::from_static("POST")); diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 46f8e9ca..288bf9dc 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -137,7 +137,7 @@ impl TripleClient { &mut self, req: Request, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -147,6 +147,9 @@ impl TripleClient { Box + Send + 'static>, Box + Send + 'static>, ) = get_codec("application/grpc+proto"); + + let mt = req.metadata.clone(); + let req = req.map(|m| stream::once(future::ready(m))); let body_stream = encode( encoder, @@ -157,13 +160,18 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(body_stream); + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await @@ -200,7 +208,7 @@ impl TripleClient { &mut self, req: impl IntoStreamingRequest, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result>, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -210,7 +218,10 @@ impl TripleClient { Box + Send + 'static>, Box + Send + 'static>, ) = get_codec("application/grpc+proto"); + let req = req.into_streaming_request(); + let mt = req.metadata.clone(); + let en = encode( encoder, req.into_inner().map(Ok), @@ -220,13 +231,19 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(en); + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); + + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await @@ -247,7 +264,7 @@ impl TripleClient { &mut self, req: impl IntoStreamingRequest, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -258,6 +275,8 @@ impl TripleClient { Box + Send + 'static>, ) = get_codec("application/grpc+proto"); let req = req.into_streaming_request(); + let mt = req.metadata.clone(); + let en = encode( encoder, req.into_inner().map(Ok), @@ -266,14 +285,19 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(en); + + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); - // let mut conn = Connection::new().with_host(http_uri); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await @@ -310,7 +334,7 @@ impl TripleClient { &mut self, req: Request, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result>, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -320,7 +344,10 @@ impl TripleClient { Box + Send + 'static>, Box + Send + 'static>, ) = get_codec("application/grpc+proto"); + let req = req.map(|m| stream::once(future::ready(m))); + let mt = req.metadata.clone(); + let en = encode( encoder, req.into_inner().map(Ok), @@ -329,13 +356,19 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(en); + + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 74dec98e..d5d01d15 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -15,7 +15,7 @@ * limitations under the License. */ -use dubbo::codegen::*; +use dubbo::{codegen::*, invocation::Metadata}; use example_echo::generated::generated::{echo_client::EchoClient, EchoRequest}; use futures_util::StreamExt; @@ -41,10 +41,13 @@ async fn main() { let mut cli = EchoClient::new(builder); // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); + let mut mtdata = Metadata::default(); + mtdata = mtdata.insert("static_tag".to_string(), "red".to_string()); + let req = Request::from_parts(mtdata.clone(), EchoRequest { + message: "message from client".to_string(), + }); let resp = cli - .unary_echo(Request::new(EchoRequest { - message: "message from client".to_string(), - })) + .unary_echo(req) .await; let resp = match resp { Ok(resp) => resp, @@ -64,7 +67,9 @@ async fn main() { message: "msg3 from client streaming".to_string(), }, ]; - let req = futures_util::stream::iter(data); + let mut mtdata = Metadata::default(); + mtdata = mtdata.insert("client_streaming".to_string(), "true".to_string()); + let req = Request::from_parts(mtdata, futures_util::stream::iter(data)); let resp = cli.client_streaming_echo(req).await; let client_streaming_resp = match resp { Ok(resp) => resp, @@ -84,7 +89,7 @@ async fn main() { message: "msg3 from client".to_string(), }, ]; - let req = futures_util::stream::iter(data); + let req = Request::new(futures_util::stream::iter(data)); let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap(); From 910a2606703d1f771f569973a101107fb8dc8eb4 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Thu, 25 Apr 2024 01:10:41 +0800 Subject: [PATCH 2/2] style: cargo fmt --- dubbo/src/codegen.rs | 2 +- dubbo/src/invocation.rs | 2 +- dubbo/src/triple/client/triple.rs | 1 - examples/echo/src/echo/client.rs | 13 +++++++------ 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index a0d1ca48..baf39299 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -29,7 +29,7 @@ pub use tower_service::Service; pub use super::{ empty_body, - invocation::{IntoStreamingRequest, Request, Response, RpcInvocation, Metadata}, + invocation::{IntoStreamingRequest, Metadata, Request, Response, RpcInvocation}, protocol::{triple::triple_invoker::TripleInvoker, Invoker}, triple::{ client::TripleClient, diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index f824c9f4..7b251835 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -218,7 +218,7 @@ pub struct RpcInvocation { target_service_unique_name: String, method_name: String, - metadata: Metadata + metadata: Metadata, } impl RpcInvocation { diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 288bf9dc..2948dec5 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -239,7 +239,6 @@ impl TripleClient { .body(body) .unwrap(); - for (k, v) in mt.into_headers().iter() { request.headers_mut().insert(k, v.to_owned()); } diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index d5d01d15..030003f8 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -43,12 +43,13 @@ async fn main() { // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); let mut mtdata = Metadata::default(); mtdata = mtdata.insert("static_tag".to_string(), "red".to_string()); - let req = Request::from_parts(mtdata.clone(), EchoRequest { - message: "message from client".to_string(), - }); - let resp = cli - .unary_echo(req) - .await; + let req = Request::from_parts( + mtdata.clone(), + EchoRequest { + message: "message from client".to_string(), + }, + ); + let resp = cli.unary_echo(req).await; let resp = match resp { Ok(resp) => resp, Err(err) => return println!("{:?}", err),