From 616c588635fb5133b44d4e49413f8fb39c84673d Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Wed, 21 Sep 2022 23:32:55 +0800 Subject: [PATCH 01/22] feat(dubbo): add filter mod --- dubbo/src/filter/mod.rs | 24 ++++++ dubbo/src/filter/service.rs | 144 ++++++++++++++++++++++++++++++++++++ dubbo/src/invocation.rs | 11 ++- dubbo/src/lib.rs | 1 + 4 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 dubbo/src/filter/mod.rs create mode 100644 dubbo/src/filter/service.rs diff --git a/dubbo/src/filter/mod.rs b/dubbo/src/filter/mod.rs new file mode 100644 index 00000000..cc2a59a2 --- /dev/null +++ b/dubbo/src/filter/mod.rs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod service; + +use crate::invocation::Request; + +pub trait Filter { + fn call(&mut self, _req: Request<()>) -> Result, crate::status::Status>; +} diff --git a/dubbo/src/filter/service.rs b/dubbo/src/filter/service.rs new file mode 100644 index 00000000..03ee2915 --- /dev/null +++ b/dubbo/src/filter/service.rs @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{future::Future, task::Poll}; + +use pin_project::pin_project; +use tower_service::Service; + +use super::Filter; +use crate::boxed; +use crate::invocation::Metadata; +use crate::invocation::Request; +use crate::BoxBody; + +pub struct FilterService { + inner: S, + f: F, +} + +impl FilterService { + pub fn new(inner: S, f: F) -> Self + where + F: Filter, + { + Self { inner, f } + } +} + +impl Service> for FilterService +where + RespBody: Default + http_body::Body + Send + 'static, + RespBody::Error: Into, + F: Filter, + S: Service, Response = http::Response>, + S::Error: Into, +{ + type Response = http::Response; + + type Error = S::Error; + + type Future = ResponseFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let uri = req.uri().clone(); + let method = req.method().clone(); + let version = req.version(); + let (parts, msg) = req.into_parts(); + + let res = self.f.call(Request::from_parts( + Metadata::from_headers(parts.headers), + (), + )); + match res { + Ok(req) => { + let (metadata, _) = req.into_parts(); + let req = Request::from_parts(Metadata::from_headers(metadata.into_headers()), msg); + let http_req = req.into_http(uri, method, version); + + let resp = self.inner.call(http_req); + ResponseFuture::future(resp) + } + Err(err) => ResponseFuture::status(err), + } + } +} + +#[pin_project] +#[derive(Debug)] +pub struct ResponseFuture { + #[pin] + kind: Kind, +} + +impl ResponseFuture { + fn status(status: crate::status::Status) -> Self { + Self { + kind: Kind::Status(Some(status)), + } + } + + fn future(future: F) -> Self { + Self { + kind: Kind::Future(future), + } + } +} + +#[pin_project(project = KindProj)] +#[derive(Debug)] +enum Kind { + Future(#[pin] F), + Status(Option), +} + +impl Future for ResponseFuture +where + F: Future, E>>, + E: Into, + B: Default + http_body::Body + Send + 'static, + B::Error: Into, +{ + type Output = Result, E>; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match self.project().kind.project() { + KindProj::Future(future) => future + .poll(cx) + .map(|result| result.map(|res| res.map(boxed))), + KindProj::Status(status) => { + let response = status + .take() + .unwrap() + .to_http() + .map(|_| B::default()) + .map(boxed); + Poll::Ready(Ok(response)) + } + } + } +} diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index ad6c6f42..5a80e9a7 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -51,9 +51,16 @@ impl Request { } } - pub fn into_http(self) -> http::Request { + pub fn into_http( + self, + uri: http::Uri, + method: http::Method, + version: http::Version, + ) -> http::Request { let mut http_req = http::Request::new(self.message); - *http_req.version_mut() = http::Version::HTTP_2; + *http_req.version_mut() = version; + *http_req.uri_mut() = uri; + *http_req.method_mut() = method; *http_req.headers_mut() = self.metadata.into_headers(); http_req diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index 3ce295e4..8ad9fa04 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -17,6 +17,7 @@ pub mod codegen; pub mod common; +pub mod filter; mod framework; pub mod invocation; pub mod protocol; From ef6de15ed905c6c0675c5de285d10f4b9d84bf1f Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Wed, 21 Sep 2022 23:36:20 +0800 Subject: [PATCH 02/22] docs: add helloworld tutorial --- examples/example-tutorial.md | 0 examples/helloworld-tutorial.md | 194 ++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+) delete mode 100644 examples/example-tutorial.md create mode 100644 examples/helloworld-tutorial.md diff --git a/examples/example-tutorial.md b/examples/example-tutorial.md deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/helloworld-tutorial.md b/examples/helloworld-tutorial.md new file mode 100644 index 00000000..672170f9 --- /dev/null +++ b/examples/helloworld-tutorial.md @@ -0,0 +1,194 @@ +# Dubbo-rust quick start + +本文介绍如何使用Rust快速开发Dubbo服务。 + +主要内容分为: ++ 环境准备 ++ 使用Protobuf IDL定义Dubbo服务 ++ 使用dubbo-build来编译IDL ++ 编写Dubbo业务代码 + +## 环境准备 + +首先需要安装Rust开发环境,具体步骤为: +``` +``` + +其次还需要下载protoc可执行文件,并将protoc添加到Path环境变量中: +下载地址为:`https://github.com/protocolbuffers/protobuf/releases` + +``` +export Path = "" +``` + +## 使用Protobuf IDL定义Dubbo服务 + +## 使用dubbo-build来编译IDL + +首先创建一个项目: +``` +cargo new --lib dubbo-example +``` + +在`cargo.toml`中添加需要的依赖 +``` +[dependencies] +http = "0.2" +http-body = "0.4.4" +futures-util = {version = "0.3", default-features = false} +tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] } +prost-derive = {version = "0.10", optional = true} +prost = "0.10.4" +prost-types = { version = "0.8", default-features = false } +async-trait = "0.1.56" +tokio-stream = "0.1" + +dubbo = {git = "https://github.com/yang20150702/dubbo-rust.git"} + +[build-dependencies] +dubbo-build = {git = "https://github.com/yang20150702/dubbo-rust.git"} +``` + +将写好的`helloworld.proto`放在proto/目录下。 + +编写`build.rs`文件来生成`helloworld.proto`对应的rust代码: +``` +use std::path::PathBuf; + +fn main() { + let path = PathBuf::from("./src"); + dubbo_build::prost::configure() + .output_dir(path) + .compile(&["proto/helloworld.proto"], &["proto/"]) + .unwrap(); +} +``` + +此时在src目录下会有一个名为`helloworld.rs`的文件,这就是生成的rust代码。 + +> `helloworld.rs`可以放在src目录下的任一位置,只要它能够被业务代码引用到即可~ + +## 编写Dubbo业务代码 + +Dubbo业务代码分为server和client两部分。 + +首先在lib.rs目录下引入`helloworld.rs`mod: +``` +pub mod helloworld; +``` + +然后,编写`dubbo.yaml`配置文件,将该文件放在src目录的同级目录中: +``` +name: dubbo +service: + helloworld.Greeter: + version: 1.0.0 + group: test + protocol: triple + registry: '' + serializer: json + protocol_configs: {} +protocols: + triple: + ip: 0.0.0.0 + port: '8888' + name: triple +``` + +> 另外,可以通过环境变量`DUBBO_CONFIG_PATH`来自定义配置文件的路径。 + +### 编写 Dubbo Server + +在src目录下新建server文件夹,创建`main.rs`文件,并在`Cargo.toml`中增加如下配置: +``` +[[bin]] +name = "hello-server" +path = "src/server/main.rs" +``` + +首先引入需要的模块: +``` +use dubbo_rust_examples::helloworld::greeter_server::{register_server, Greeter}; +use dubbo_rust_examples::helloworld::{HelloRequest, HelloReply}; +use async_trait::async_trait; +use dubbo::codegen::{Request, Response}; +use dubbo::Dubbo; +``` + +接下来,实现IDL中定义的服务trait: +``` +#[derive(Debug, Clone, Default)] +pub struct GreeterImpl {} + +impl GreeterImpl { + pub fn new() -> Self { + GreeterImpl { } + } +} + +#[async_trait] +impl Greeter for GreeterImpl { + async fn say_hello( + &self, + request: Request, + ) -> Result, dubbo::status::Status> { + println!("request: {:?}", request.into_inner()); + + Ok(Response::new(HelloReply{ + message: "hello dubbo-rust!".to_string(), + })) + } +} +``` + +接下来进行服务注册,并启动Dubbo框架: +``` +#[tokio::main] +async fn main() { + register_server(GreeterImpl::new()); + + Dubbo::new().start().await; +} +``` + +通过如下命令启动服务: +``` +cargo run --bin hello-server +``` + +### 编写 Dubbo Client + +在src目录下新建client文件夹,创建`main.rs`文件,并在`Cargo.toml`中增加如下配置: +``` +[[bin]] +name = "hello-client" +path = "src/client/main.rs" +``` + +首先引入需要的模块: +``` +use dubbo_rust_examples::helloworld::greeter_client::GreeterClient; +use dubbo_rust_examples::helloworld::HelloRequest; +use dubbo::codegen::Request; +``` + +接下来,实现IDL中定义的服务trait: +``` +#[tokio::main] +async fn main() { + let mut cli = GreeterClient::new().with_uri("http://127.0.0.1:8888".to_string()); + let resp = cli.say_hello(Request::new( + HelloRequest{ + name: "hello, I'm client".to_string(), + } + )).await.unwrap(); + + let (_, msg) = resp.into_parts(); + println!("response: {:?}", msg); +} +``` + +通过如下命令启动服务: +``` +cargo run --bin hello-client +``` \ No newline at end of file From daa2b1ff3a3156d134e59efcd701b672bb7cd548 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Thu, 29 Sep 2022 00:25:34 +0800 Subject: [PATCH 03/22] refactor(triple): redesign connector based on Service --- dubbo/src/triple/client/connection.rs | 94 +++++++++++++++++++++ dubbo/src/triple/client/mod.rs | 1 + dubbo/src/triple/client/triple.rs | 61 ++++--------- dubbo/src/triple/transport/connector/mod.rs | 25 +++++- 4 files changed, 135 insertions(+), 46 deletions(-) create mode 100644 dubbo/src/triple/client/connection.rs diff --git a/dubbo/src/triple/client/connection.rs b/dubbo/src/triple/client/connection.rs new file mode 100644 index 00000000..fe522b41 --- /dev/null +++ b/dubbo/src/triple/client/connection.rs @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::task::Poll; + +use hyper::client::conn::Builder; +use hyper::client::service::Connect; +use tower_service::Service; + +use crate::triple::transport::connector::get_connector; + +#[derive(Debug, Clone)] +pub struct Connection { + host: hyper::Uri, + connector: String, + builder: Builder, +} + +impl Default for Connection { + fn default() -> Self { + Self::new() + } +} + +impl Connection { + pub fn new() -> Self { + Connection { + host: hyper::Uri::default(), + connector: "http".to_string(), + builder: Builder::new(), + } + } + + pub fn with_connector(mut self, connector: String) -> Self { + self.connector = connector; + self + } + + pub fn with_host(mut self, uri: hyper::Uri) -> Self { + self.host = uri; + self + } + + pub fn with_builder(mut self, builder: Builder) -> Self { + self.builder = builder; + self + } +} + +impl Service> for Connection +where + ReqBody: http_body::Body + Unpin + Send + 'static, + ReqBody::Data: Send + Unpin, + ReqBody::Error: Into, +{ + type Response = http::Response; + + type Error = crate::Error; + + type Future = crate::BoxFuture; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let builder = self.builder.clone().http2_only(true).to_owned(); + let mut connector = Connect::new(get_connector(self.connector.clone()), builder); + let uri = self.host.clone(); + let fut = async move { + let mut con = connector.call(uri).await.unwrap(); + con.call(req).await.map_err(|err| err.into()) + }; + + Box::pin(fut) + } +} diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs index cbc3b129..333a60a4 100644 --- a/dubbo/src/triple/client/mod.rs +++ b/dubbo/src/triple/client/mod.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +pub mod connection; pub mod triple; pub use triple::TripleClient; diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index bb2e59a7..51029eec 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -19,54 +19,27 @@ use std::str::FromStr; use futures_util::{future, stream, StreamExt, TryStreamExt}; use http::HeaderValue; -use hyper::client::connect::Connect; +use tower_service::Service; +use super::connection::Connection; use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response}; use crate::triple::codec::Codec; use crate::triple::compression::CompressionEncoding; use crate::triple::decode::Decoding; use crate::triple::encode::encode; -use crate::triple::transport::connector::http_connector::HttpConnector; #[derive(Debug, Clone, Default)] pub struct TripleClient { host: Option, - inner: ConnectionPool, + inner: Connection, send_compression_encoding: Option, } -#[derive(Debug, Default, Clone)] -pub struct ConnectionPool { - http2_only: bool, -} - -impl ConnectionPool { - pub fn new() -> Self { - ConnectionPool { http2_only: true } - } - - pub fn builder(self) -> hyper::Client { - hyper::Client::builder() - .http2_only(self.http2_only) - .build_http() - } - - pub fn builder_with_connector(self, connector: C) -> hyper::Client - where - C: Connect + Clone, - { - hyper::Client::builder() - .http2_only(self.http2_only) - .build(connector) - } -} - -// TODO: initial connection pool impl TripleClient { pub fn new() -> Self { TripleClient { host: None, - inner: ConnectionPool::new(), + inner: Connection::new(), send_compression_encoding: Some(CompressionEncoding::Gzip), } } @@ -80,7 +53,12 @@ impl TripleClient { None } }; - TripleClient { host: uri, ..self } + + TripleClient { + inner: self.inner.with_host(uri.clone().unwrap()), + host: uri, + ..self + } } } @@ -163,7 +141,7 @@ impl TripleClient { } pub async fn unary( - &self, + &mut self, req: Request, mut codec: C, path: http::uri::PathAndQuery, @@ -183,11 +161,8 @@ impl TripleClient { let body = hyper::Body::wrap_stream(body_stream); let req = self.map_request(path, body); - let cli = self - .inner - .clone() - .builder_with_connector(HttpConnector::new()); - let response = cli.request(req).await; + + let response = self.inner.call(req).await; match response { Ok(v) => { @@ -242,8 +217,7 @@ impl TripleClient { let req = self.map_request(path, body); - let cli = self.inner.clone().builder(); - let response = cli.request(req).await; + let response = self.inner.call(req).await; match response { Ok(v) => { @@ -281,8 +255,8 @@ impl TripleClient { let body = hyper::Body::wrap_stream(en); let req = self.map_request(path, body); - let cli = self.inner.clone().builder(); - let response = cli.request(req).await; + + let response = self.inner.call(req).await; match response { Ok(v) => { @@ -337,8 +311,7 @@ impl TripleClient { let req = self.map_request(path, body); - let cli = self.inner.clone().builder(); - let response = cli.request(req).await; + let response = self.inner.call(req).await; match response { Ok(v) => { diff --git a/dubbo/src/triple/transport/connector/mod.rs b/dubbo/src/triple/transport/connector/mod.rs index 3dd3fef8..3139fff1 100644 --- a/dubbo/src/triple/transport/connector/mod.rs +++ b/dubbo/src/triple/transport/connector/mod.rs @@ -17,18 +17,26 @@ pub mod http_connector; -use hyper::Uri; +use hyper::{client::connect::Connection, Uri}; +use tokio::io::{AsyncRead, AsyncWrite}; use tower::make::MakeConnection; use tower_service::Service; use super::io::BoxIO; +use crate::utils::boxed_clone::BoxCloneService; +#[derive(Clone)] pub struct Connector { inner: C, } impl Connector { - pub fn new(inner: C) -> Connector { + pub fn new(inner: C) -> Connector + where + C: Service, + C::Error: Into, + C::Response: AsyncRead + AsyncWrite + Connection + Send + 'static, + { Self { inner } } } @@ -62,3 +70,16 @@ where }) } } + +pub fn get_connector(connector: String) -> BoxCloneService { + match connector.as_str() { + "http" => { + let c = http_connector::HttpConnector::new(); + BoxCloneService::new(Connector::new(c)) + } + _ => { + let c = http_connector::HttpConnector::new(); + BoxCloneService::new(Connector::new(c)) + } + } +} From ee14b3420cbc6fa194a2e99a4a60362144ea1386 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 30 Sep 2022 10:56:50 +0800 Subject: [PATCH 04/22] refactor(triple): redesign triple client --- dubbo/src/codegen.rs | 5 ++ dubbo/src/triple/client/triple.rs | 94 +++++++++++++++++++------------ 2 files changed, 62 insertions(+), 37 deletions(-) diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index e6a77276..9ffce899 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -19,7 +19,9 @@ pub use std::sync::Arc; pub use std::task::{Context, Poll}; pub use async_trait::async_trait; +pub use bytes::Bytes; pub use http_body::Body; +pub use hyper::Body as hyperBody; pub use tower_service::Service; pub use super::invocation::{IntoStreamingRequest, Request, Response}; @@ -34,3 +36,6 @@ pub use super::triple::server::service::{ }; pub use super::triple::server::TripleServer; pub use super::{empty_body, BoxBody, BoxFuture, StdError}; +pub use crate::filter::service::FilterService; +pub use crate::filter::Filter; +pub use crate::triple::client::connection::Connection; diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 51029eec..9fb3bec1 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -22,6 +22,8 @@ use http::HeaderValue; use tower_service::Service; use super::connection::Connection; +use crate::filter::service::FilterService; +use crate::filter::Filter; use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response}; use crate::triple::codec::Codec; use crate::triple::compression::CompressionEncoding; @@ -29,23 +31,14 @@ use crate::triple::decode::Decoding; use crate::triple::encode::encode; #[derive(Debug, Clone, Default)] -pub struct TripleClient { +pub struct TripleClient { host: Option, - inner: Connection, + inner: T, send_compression_encoding: Option, } -impl TripleClient { - pub fn new() -> Self { - TripleClient { - host: None, - inner: Connection::new(), - send_compression_encoding: Some(CompressionEncoding::Gzip), - } - } - - /// host: http://0.0.0.0:8888 - pub fn with_host(self, host: String) -> Self { +impl TripleClient { + pub fn connect(host: String) -> Self { let uri = match http::Uri::from_str(&host) { Ok(v) => Some(v), Err(err) => { @@ -55,14 +48,37 @@ impl TripleClient { }; TripleClient { - inner: self.inner.with_host(uri.clone().unwrap()), - host: uri, - ..self + host: uri.clone(), + inner: Connection::new().with_host(uri.unwrap()), + send_compression_encoding: Some(CompressionEncoding::Gzip), + } + } +} + +impl TripleClient { + pub fn new(inner: T) -> Self { + TripleClient { + host: None, + inner, + send_compression_encoding: Some(CompressionEncoding::Gzip), } } + + pub fn with_filter(self, filter: F) -> TripleClient> + where + F: Filter, + { + TripleClient::new(FilterService::new(self.inner, filter)) + } } -impl TripleClient { +impl TripleClient +where + RespBody: http_body::Body + Send + Sync + 'static, + RespBody::Error: Into + Send + Sync + 'static, + T: Service, Response = http::Response>, + T::Error: Into, +{ fn map_request( &self, path: http::uri::PathAndQuery, @@ -162,7 +178,11 @@ impl TripleClient { let req = self.map_request(path, body); - let response = self.inner.call(req).await; + let response = self + .inner + .call(req) + .await + .map_err(|err| crate::status::Status::from_error(err.into())); match response { Ok(v) => { @@ -188,10 +208,7 @@ impl TripleClient { Ok(Response::from_parts(parts, message)) } - Err(err) => Err(crate::status::Status::new( - crate::status::Code::Internal, - err.to_string(), - )), + Err(err) => Err(err), } } @@ -217,7 +234,11 @@ impl TripleClient { let req = self.map_request(path, body); - let response = self.inner.call(req).await; + let response = self + .inner + .call(req) + .await + .map_err(|err| crate::status::Status::from_error(err.into())); match response { Ok(v) => { @@ -227,10 +248,7 @@ impl TripleClient { Ok(Response::from_http(resp)) } - Err(err) => Err(crate::status::Status::new( - crate::status::Code::Internal, - err.to_string(), - )), + Err(err) => Err(err), } } @@ -256,7 +274,11 @@ impl TripleClient { let req = self.map_request(path, body); - let response = self.inner.call(req).await; + let response = self + .inner + .call(req) + .await + .map_err(|err| crate::status::Status::from_error(err.into())); match response { Ok(v) => { @@ -282,10 +304,7 @@ impl TripleClient { Ok(Response::from_parts(parts, message)) } - Err(err) => Err(crate::status::Status::new( - crate::status::Code::Internal, - err.to_string(), - )), + Err(err) => Err(err), } } @@ -311,7 +330,11 @@ impl TripleClient { let req = self.map_request(path, body); - let response = self.inner.call(req).await; + let response = self + .inner + .call(req) + .await + .map_err(|err| crate::status::Status::from_error(err.into())); match response { Ok(v) => { @@ -321,10 +344,7 @@ impl TripleClient { Ok(Response::from_http(resp)) } - Err(err) => Err(crate::status::Status::new( - crate::status::Code::Internal, - err.to_string(), - )), + Err(err) => Err(err), } } } From 629a93d39ec83921b6bde1b1a2bc65de1f9fd3c9 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 30 Sep 2022 11:20:49 +0800 Subject: [PATCH 05/22] refactor(dubbo-build): update client api --- dubbo-build/src/client.rs | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 084c398e..0f56c2cd 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -66,23 +66,40 @@ pub fn generate( #service_doc #(#struct_attributes)* #[derive(Debug, Clone, Default)] - pub struct #service_ident { - inner: TripleClient, - uri: String, + pub struct #service_ident { + inner: TripleClient, } - impl #service_ident { - pub fn new() -> Self { + impl #service_ident { + pub fn connect(host: String) -> Self { + let cli = TripleClient::connect(host); + #service_ident { + inner: cli, + } + } + } + + impl #service_ident + where + RespBody: http_body::Body + Send + Sync + 'static, + RespBody::Error: Into + Send + Sync + 'static, + T: Service, Response = http::Response>, + T::Error: Into, + { + pub fn new(inner: T) -> Self { Self { - inner: TripleClient::new(), - uri: "".to_string(), + inner: TripleClient::new(inner), } } - pub fn with_uri(mut self, uri: String) -> Self { - self.uri = uri.clone(); - self.inner = self.inner.with_host(uri); - self + pub fn with_filter(self, filter: F) -> #service_ident> + where + F: Filter, + { + let inner = self.inner.with_filter(filter); + #service_ident { + inner, + } } #methods From d195f902c719a060ea3870c3aec65728906967e3 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 30 Sep 2022 11:51:12 +0800 Subject: [PATCH 06/22] refactor(dubbo): update logs --- config/src/config.rs | 1 + dubbo/src/framework.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/config/src/config.rs b/config/src/config.rs index 2e577b18..68cace28 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -83,6 +83,7 @@ impl RootConfig { } }; + tracing::info!("current path: {:?}", env::current_dir()); let data = fs::read(config_path)?; let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap(); tracing::debug!("origin config: {:?}", conf); diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index a494bdd7..9c8938f2 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -39,6 +39,7 @@ pub struct Dubbo { impl Dubbo { pub fn new() -> Dubbo { + tracing_subscriber::fmt::init(); Self { protocols: HashMap::new(), config: None, @@ -51,7 +52,6 @@ impl Dubbo { } pub fn init(&mut self) { - tracing_subscriber::fmt::init(); if self.config.is_none() { self.config = Some(get_global_config()) From 4132b1403bb08c1468e2e378507d60d9ae34ac53 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 30 Sep 2022 11:55:19 +0800 Subject: [PATCH 07/22] refactor(examples): adapt to dubbo-build changes --- examples/echo/build.rs | 8 ++-- examples/echo/proto/echo/echo.proto | 43 ++++++++++++++++++++++ examples/echo/src/echo/client.rs | 2 +- examples/echo/src/protos/hello_echo.rs | 51 ++++++++++++-------------- examples/greeter/src/greeter/client.rs | 2 +- 5 files changed, 72 insertions(+), 34 deletions(-) create mode 100644 examples/echo/proto/echo/echo.proto diff --git a/examples/echo/build.rs b/examples/echo/build.rs index db27746b..ecec1ab3 100644 --- a/examples/echo/build.rs +++ b/examples/echo/build.rs @@ -20,8 +20,8 @@ use std::path::PathBuf; fn main() { let path = PathBuf::from("./src/echo"); println!("path: {:?}", path); - // dubbo_build::prost::configure() - // .output_dir(path) - // .compile(&["proto/echo/echo.proto"], &["proto/"]) - // .unwrap(); + dubbo_build::prost::configure() + .output_dir(path) + .compile(&["proto/echo/echo.proto"], &["proto/"]) + .unwrap(); } diff --git a/examples/echo/proto/echo/echo.proto b/examples/echo/proto/echo/echo.proto new file mode 100644 index 00000000..49adc342 --- /dev/null +++ b/examples/echo/proto/echo/echo.proto @@ -0,0 +1,43 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + syntax = "proto3"; + + package grpc.examples.echo; + + // EchoRequest is the request for echo. + message EchoRequest { + string message = 1; + } + + // EchoResponse is the response for echo. + message EchoResponse { + string message = 1; + } + + // Echo is the echo service. + service Echo { + // UnaryEcho is unary echo. + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} + // ServerStreamingEcho is server side streaming. + rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} + // ClientStreamingEcho is client side streaming. + rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} + // BidirectionalStreamingEcho is bidi streaming. + rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} + } \ No newline at end of file diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 1d09a0a9..6ac5d5a9 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -22,7 +22,7 @@ use futures_util::StreamExt; #[tokio::main] async fn main() { - let mut cli = EchoClient::new().with_uri("http://127.0.0.1:8888".to_string()); + let mut cli = EchoClient::connect("http://127.0.0.1:8888".to_string()); let resp = cli .unary_echo(Request::new(EchoRequest { message: "message from client".to_string(), diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index c80565a4..8d6b74ff 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -1,20 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - /// EchoRequest is the request for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoRequest { @@ -33,21 +16,33 @@ pub mod echo_client { use dubbo::codegen::*; /// Echo is the echo service. #[derive(Debug, Clone, Default)] - pub struct EchoClient { - inner: TripleClient, - uri: String, + pub struct EchoClient { + inner: TripleClient, } - impl EchoClient { - pub fn new() -> Self { + impl EchoClient { + pub fn connect(host: String) -> Self { + let cli = TripleClient::connect(host); + EchoClient { inner: cli } + } + } + impl EchoClient + where + RespBody: http_body::Body + Send + Sync + 'static, + RespBody::Error: Into + Send + Sync + 'static, + T: Service, Response = http::Response>, + T::Error: Into, + { + pub fn new(inner: T) -> Self { Self { - inner: TripleClient::new(), - uri: "".to_string(), + inner: TripleClient::new(inner), } } - pub fn with_uri(mut self, uri: String) -> Self { - self.uri = uri.clone(); - self.inner = self.inner.with_host(uri); - self + pub fn with_filter(self, filter: F) -> EchoClient> + where + F: Filter, + { + let inner = self.inner.with_filter(filter); + EchoClient { inner } } /// UnaryEcho is unary echo. pub async fn unary_echo( diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index ba6b5244..2004b6f9 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -26,7 +26,7 @@ use protos::{greeter_client::GreeterClient, GreeterRequest}; #[tokio::main] async fn main() { - let mut cli = GreeterClient::new().with_uri("http://127.0.0.1:8888".to_string()); + let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string()); println!("# unary call"); let resp = cli From 549dd05984930e2a701eb752bce5afc1bfdd742d Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 30 Sep 2022 11:57:32 +0800 Subject: [PATCH 08/22] style(dubbo): cargo fmt --- dubbo/src/framework.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 9c8938f2..98710d77 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -52,7 +52,6 @@ impl Dubbo { } pub fn init(&mut self) { - if self.config.is_none() { self.config = Some(get_global_config()) } From 72e89e9b3a49c5f977b2bb60f87eeafa300a30ff Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 30 Sep 2022 12:12:17 +0800 Subject: [PATCH 09/22] style: update license header --- examples/echo/proto/echo/echo.proto | 14 ++++++-------- examples/echo/src/protos/hello_echo.rs | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/examples/echo/proto/echo/echo.proto b/examples/echo/proto/echo/echo.proto index 49adc342..0931bbb4 100644 --- a/examples/echo/proto/echo/echo.proto +++ b/examples/echo/proto/echo/echo.proto @@ -1,10 +1,10 @@ /* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -13,9 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ - syntax = "proto3"; package grpc.examples.echo; diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index 8d6b74ff..7ac71a4b 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /// EchoRequest is the request for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoRequest { From 11bcbb838806c0103d710d201ecc07136b814686 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Sun, 2 Oct 2022 09:55:19 +0800 Subject: [PATCH 10/22] refactor(triple): update tripleClient to support filter trait --- dubbo/src/filter/mod.rs | 2 +- dubbo/src/filter/service.rs | 79 ++++--------------------------- dubbo/src/status.rs | 24 ++++++++++ dubbo/src/triple/client/triple.rs | 20 ++++---- 4 files changed, 43 insertions(+), 82 deletions(-) diff --git a/dubbo/src/filter/mod.rs b/dubbo/src/filter/mod.rs index cc2a59a2..075781a7 100644 --- a/dubbo/src/filter/mod.rs +++ b/dubbo/src/filter/mod.rs @@ -20,5 +20,5 @@ pub mod service; use crate::invocation::Request; pub trait Filter { - fn call(&mut self, _req: Request<()>) -> Result, crate::status::Status>; + fn call(&mut self, req: Request<()>) -> Result, crate::status::Status>; } diff --git a/dubbo/src/filter/service.rs b/dubbo/src/filter/service.rs index 03ee2915..0ff6bfaa 100644 --- a/dubbo/src/filter/service.rs +++ b/dubbo/src/filter/service.rs @@ -15,16 +15,11 @@ * limitations under the License. */ -use std::{future::Future, task::Poll}; - -use pin_project::pin_project; use tower_service::Service; use super::Filter; -use crate::boxed; use crate::invocation::Metadata; use crate::invocation::Request; -use crate::BoxBody; pub struct FilterService { inner: S, @@ -40,19 +35,18 @@ impl FilterService { } } -impl Service> for FilterService +impl Service> for FilterService where - RespBody: Default + http_body::Body + Send + 'static, - RespBody::Error: Into, F: Filter, - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, S::Error: Into, + S::Future: Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = S::Error; - type Future = ResponseFuture; + type Future = crate::BoxFuture; fn poll_ready( &mut self, @@ -78,66 +72,11 @@ where let http_req = req.into_http(uri, method, version); let resp = self.inner.call(http_req); - ResponseFuture::future(resp) + Box::pin(resp) } - Err(err) => ResponseFuture::status(err), - } - } -} - -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - kind: Kind, -} - -impl ResponseFuture { - fn status(status: crate::status::Status) -> Self { - Self { - kind: Kind::Status(Some(status)), - } - } - - fn future(future: F) -> Self { - Self { - kind: Kind::Future(future), - } - } -} - -#[pin_project(project = KindProj)] -#[derive(Debug)] -enum Kind { - Future(#[pin] F), - Status(Option), -} - -impl Future for ResponseFuture -where - F: Future, E>>, - E: Into, - B: Default + http_body::Body + Send + 'static, - B::Error: Into, -{ - type Output = Result, E>; - - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - match self.project().kind.project() { - KindProj::Future(future) => future - .poll(cx) - .map(|result| result.map(|res| res.map(boxed))), - KindProj::Status(status) => { - let response = status - .take() - .unwrap() - .to_http() - .map(|_| B::default()) - .map(boxed); - Poll::Ready(Ok(response)) + Err(err) => { + let fut = async move { Ok(err.to_hyper_body()) }; + Box::pin(fut) } } } diff --git a/dubbo/src/status.rs b/dubbo/src/status.rs index 12b6fd6d..926c1d6a 100644 --- a/dubbo/src/status.rs +++ b/dubbo/src/status.rs @@ -305,6 +305,30 @@ impl Status { http::Response::from_parts(parts, crate::empty_body()) } + + pub fn to_hyper_body(&self) -> http::Response { + let (mut parts, _) = http::Response::new(()).into_parts(); + + parts.headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/grpc"), + ); + + parts + .headers + .insert(GRPC_STATUS, self.code.to_http_header_value()); + parts.headers.insert( + GRPC_MESSAGE, + http::HeaderValue::from_str(&self.message).unwrap(), + ); + + parts.headers.insert( + "grpc-accept-encoding", + http::HeaderValue::from_static("gzip,identity"), + ); + + http::Response::from_parts(parts, hyper::Body::empty()) + } } impl From for Status { diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 9fb3bec1..22acae3d 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -56,9 +56,9 @@ impl TripleClient { } impl TripleClient { - pub fn new(inner: T) -> Self { + pub fn new(inner: T, host: Option) -> Self { TripleClient { - host: None, + host, inner, send_compression_encoding: Some(CompressionEncoding::Gzip), } @@ -68,15 +68,13 @@ impl TripleClient { where F: Filter, { - TripleClient::new(FilterService::new(self.inner, filter)) + TripleClient::new(FilterService::new(self.inner, filter), self.host) } } -impl TripleClient +impl TripleClient where - RespBody: http_body::Body + Send + Sync + 'static, - RespBody::Error: Into + Send + Sync + 'static, - T: Service, Response = http::Response>, + T: Service, Response = http::Response>, T::Error: Into, { fn map_request( @@ -87,7 +85,7 @@ where let mut parts = match self.host.as_ref() { Some(v) => v.to_owned().into_parts(), None => { - tracing::warn!("client host is empty"); + tracing::error!("client host is empty"); return http::Request::new(hyper::Body::empty()); } }; @@ -119,18 +117,18 @@ where HeaderValue::from_static("application/grpc+json"), ); req.headers_mut() - .insert("user-agent", HeaderValue::from_static("dubbo-rust/1.0.0")); + .insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0")); req.headers_mut() .insert("te", HeaderValue::from_static("trailers")); req.headers_mut().insert( "tri-service-version", - HeaderValue::from_static("dubbo-rust/1.0.0"), + HeaderValue::from_static("dubbo-rust/0.1.0"), ); req.headers_mut() .insert("tri-service-group", HeaderValue::from_static("cluster")); req.headers_mut().insert( "tri-unit-info", - HeaderValue::from_static("dubbo-rust/1.0.0"), + HeaderValue::from_static("dubbo-rust/0.1.0"), ); if let Some(_encoding) = self.send_compression_encoding { req.headers_mut() From da1924f75c5e9be4e8f4db75fdc41cc8549e2979 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Sun, 2 Oct 2022 10:07:10 +0800 Subject: [PATCH 11/22] refactor(dubbo-build): update client mod --- dubbo-build/src/client.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 0f56c2cd..56dec587 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -79,16 +79,14 @@ pub fn generate( } } - impl #service_ident + impl #service_ident where - RespBody: http_body::Body + Send + Sync + 'static, - RespBody::Error: Into + Send + Sync + 'static, - T: Service, Response = http::Response>, + T: Service, Response = http::Response>, T::Error: Into, { pub fn new(inner: T) -> Self { Self { - inner: TripleClient::new(inner), + inner: TripleClient::new(inner, None), } } From 9010654aeff3610484fb5d7d8a8c4e04d7545084 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Sun, 2 Oct 2022 10:11:19 +0800 Subject: [PATCH 12/22] refactor(example): add filter example --- examples/echo/src/echo/client.rs | 12 +- examples/echo/src/protos/hello_echo.rs | 184 ++++++++++++++++--------- 2 files changed, 131 insertions(+), 65 deletions(-) diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 6ac5d5a9..8b3cc9bf 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -20,10 +20,20 @@ use example_echo::protos::hello_echo::echo_client::EchoClient; use example_echo::protos::hello_echo::EchoRequest; use futures_util::StreamExt; +pub struct FakeFilter {} + +impl Filter for FakeFilter { + fn call(&mut self, req: Request<()>) -> Result, dubbo::status::Status> { + println!("fake filter: {:?}", req.metadata); + Ok(req) + } +} + #[tokio::main] async fn main() { let mut cli = EchoClient::connect("http://127.0.0.1:8888".to_string()); - let resp = cli + let mut unary_cli = cli.clone().with_filter(FakeFilter {}); + let resp = unary_cli .unary_echo(Request::new(EchoRequest { message: "message from client".to_string(), })) diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index 7ac71a4b..84f16ea7 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -18,13 +18,13 @@ /// EchoRequest is the request for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoRequest { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub message: ::prost::alloc::string::String, } /// EchoResponse is the response for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoResponse { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub message: ::prost::alloc::string::String, } /// Generated client implementations. @@ -42,16 +42,14 @@ pub mod echo_client { EchoClient { inner: cli } } } - impl EchoClient + impl EchoClient where - RespBody: http_body::Body + Send + Sync + 'static, - RespBody::Error: Into + Send + Sync + 'static, - T: Service, Response = http::Response>, + T: Service, Response = http::Response>, T::Error: Into, { pub fn new(inner: T) -> Self { Self { - inner: TripleClient::new(inner), + inner: TripleClient::new(inner, None), } } pub fn with_filter(self, filter: F) -> EchoClient> @@ -66,9 +64,13 @@ pub mod echo_client { &mut self, request: Request, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); - let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); self.inner.unary(request, codec, path).await } /// ServerStreamingEcho is server side streaming. @@ -76,8 +78,10 @@ pub mod echo_client { &mut self, request: Request, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); @@ -88,8 +92,10 @@ pub mod echo_client { &mut self, request: impl IntoStreamingRequest, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); @@ -100,8 +106,10 @@ pub mod echo_client { &mut self, request: impl IntoStreamingRequest, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); @@ -122,7 +130,9 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream> + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -136,14 +146,19 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream> + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result, dubbo::status::Status>; + ) -> Result< + Response, + dubbo::status::Status, + >; } /// Echo is the echo service. #[derive(Debug)] @@ -170,7 +185,10 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -183,18 +201,26 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -205,22 +231,32 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc for ServerStreamingEchoServer { + impl ServerStreamingSvc + for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.server_streaming_echo(request).await }; + let fut = async move { + inner.server_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -233,23 +269,31 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc for ClientStreamingEchoServer { + impl ClientStreamingSvc + for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.client_streaming_echo(request).await }; + let fut = async move { + inner.client_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -262,51 +306,63 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc for BidirectionalStreamingEchoServer { + impl StreamingSvc + for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = - async move { inner.bidirectional_streaming_echo(request).await }; + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server - .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) .await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } impl Clone for EchoServer { fn clone(&self) -> Self { let inner = self.inner.clone(); - Self { - inner, - invoker: None, - } + Self { inner, invoker: None } } } impl Clone for _Inner { From 8ea2d57ce2853a307cb2b86ddf49c8ee357d19a1 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Sun, 2 Oct 2022 10:11:58 +0800 Subject: [PATCH 13/22] refactor(dubbo): update log --- dubbo/src/protocol/triple/triple_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo/src/protocol/triple/triple_server.rs b/dubbo/src/protocol/triple/triple_server.rs index eb0544e2..b85dbe17 100644 --- a/dubbo/src/protocol/triple/triple_server.rs +++ b/dubbo/src/protocol/triple/triple_server.rs @@ -38,7 +38,7 @@ impl TripleServer { let lock = super::TRIPLE_SERVICES.read().unwrap(); for name in self.service_names.iter() { if lock.get(name).is_none() { - tracing::warn!("service {} not register", name); + tracing::warn!("service ({}) not register", name); continue; } let svc = lock.get(name).unwrap(); From 8d1f7dfb84f75bf4e9e1fb6fb1b1f7e48fc78d50 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Tue, 4 Oct 2022 19:07:26 +0800 Subject: [PATCH 14/22] Rft(dubbo): use boxBody as response of Service in Client side --- dubbo/src/filter/service.rs | 6 +++--- dubbo/src/triple/client/connection.rs | 8 ++++++-- dubbo/src/triple/client/triple.rs | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dubbo/src/filter/service.rs b/dubbo/src/filter/service.rs index 0ff6bfaa..29d732df 100644 --- a/dubbo/src/filter/service.rs +++ b/dubbo/src/filter/service.rs @@ -38,11 +38,11 @@ impl FilterService { impl Service> for FilterService where F: Filter, - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, S::Error: Into, S::Future: Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = S::Error; @@ -75,7 +75,7 @@ where Box::pin(resp) } Err(err) => { - let fut = async move { Ok(err.to_hyper_body()) }; + let fut = async move { Ok(err.to_http()) }; Box::pin(fut) } } diff --git a/dubbo/src/triple/client/connection.rs b/dubbo/src/triple/client/connection.rs index fe522b41..ce453096 100644 --- a/dubbo/src/triple/client/connection.rs +++ b/dubbo/src/triple/client/connection.rs @@ -21,6 +21,7 @@ use hyper::client::conn::Builder; use hyper::client::service::Connect; use tower_service::Service; +use crate::boxed; use crate::triple::transport::connector::get_connector; #[derive(Debug, Clone)] @@ -67,7 +68,7 @@ where ReqBody::Data: Send + Unpin, ReqBody::Error: Into, { - type Response = http::Response; + type Response = http::Response; type Error = crate::Error; @@ -86,7 +87,10 @@ where let uri = self.host.clone(); let fut = async move { let mut con = connector.call(uri).await.unwrap(); - con.call(req).await.map_err(|err| err.into()) + con.call(req) + .await + .map_err(|err| err.into()) + .map(|res| res.map(boxed)) }; Box::pin(fut) diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 22acae3d..8021c40d 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -74,7 +74,7 @@ impl TripleClient { impl TripleClient where - T: Service, Response = http::Response>, + T: Service, Response = http::Response>, T::Error: Into, { fn map_request( From 9293547f6dd45482bf3398158da21774f73e163e Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Tue, 4 Oct 2022 19:10:40 +0800 Subject: [PATCH 15/22] Rft(dubb-build): update client impl --- dubbo-build/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 56dec587..10fa4468 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -81,7 +81,7 @@ pub fn generate( impl #service_ident where - T: Service, Response = http::Response>, + T: Service, Response = http::Response>, T::Error: Into, { pub fn new(inner: T) -> Self { From 2a8e740983bd51a6e738fae2987f5dc2dc0838a9 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Tue, 4 Oct 2022 19:24:38 +0800 Subject: [PATCH 16/22] Rft(dubbo): impl filter in server --- dubbo-build/src/server.rs | 7 +++++++ dubbo/src/filter/service.rs | 1 + examples/echo/src/echo/server.rs | 21 ++++++++++++++++++++- examples/echo/src/protos/hello_echo.rs | 8 +++++++- 4 files changed, 35 insertions(+), 2 deletions(-) diff --git a/dubbo-build/src/server.rs b/dubbo-build/src/server.rs index c99530d2..2bfb8f32 100644 --- a/dubbo-build/src/server.rs +++ b/dubbo-build/src/server.rs @@ -91,6 +91,13 @@ pub fn generate( } } + pub fn with_filter(inner: T, filter: F) -> FilterService + where + F: Filter, + { + FilterService::new(Self::new(inner), filter) + } + } impl Service> for #server_service diff --git a/dubbo/src/filter/service.rs b/dubbo/src/filter/service.rs index 29d732df..6ca34284 100644 --- a/dubbo/src/filter/service.rs +++ b/dubbo/src/filter/service.rs @@ -21,6 +21,7 @@ use super::Filter; use crate::invocation::Metadata; use crate::invocation::Request; +#[derive(Clone)] pub struct FilterService { inner: S, f: F, diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs index 7c5b8097..2d2f5a44 100644 --- a/examples/echo/src/echo/server.rs +++ b/examples/echo/src/echo/server.rs @@ -28,18 +28,37 @@ use dubbo::codegen::*; use dubbo::Dubbo; use dubbo_config::RootConfig; use example_echo::protos::hello_echo::{ - echo_server::{register_server, Echo}, + echo_server::{register_server, Echo, EchoServer}, EchoRequest, EchoResponse, }; type ResponseStream = Pin> + Send>>; +#[derive(Clone)] +pub struct FakeFilter {} + +impl Filter for FakeFilter { + fn call(&mut self, req: Request<()>) -> Result, dubbo::status::Status> { + println!("server fake filter: {:?}", req.metadata); + Ok(req) + } +} + #[tokio::main] async fn main() { register_server(EchoServerImpl { name: "echo".to_string(), }); + let server = EchoServerImpl::default(); + let s = EchoServer::::with_filter(server, FakeFilter {}); + dubbo::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert( + "grpc.examples.echo.Echo".to_string(), + dubbo::utils::boxed_clone::BoxCloneService::new(s), + ); // Dubbo::new().start().await; Dubbo::new() diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index 84f16ea7..57d2e5db 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -44,7 +44,7 @@ pub mod echo_client { } impl EchoClient where - T: Service, Response = http::Response>, + T: Service, Response = http::Response>, T::Error: Into, { pub fn new(inner: T) -> Self { @@ -174,6 +174,12 @@ pub mod echo_server { invoker: None, } } + pub fn with_filter(inner: T, filter: F) -> FilterService + where + F: Filter, + { + FilterService::new(Self::new(inner), filter) + } } impl Service> for EchoServer where From 601defef707dcda154ef7227237d08af5d64fa2a Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 7 Oct 2022 22:38:25 +0800 Subject: [PATCH 17/22] Rft(dubbo): update impl of protocol mod --- dubbo/src/protocol/grpc/grpc_invoker.rs | 13 -------- dubbo/src/protocol/mod.rs | 2 -- dubbo/src/protocol/triple/triple_invoker.rs | 8 ----- dubbo/src/registry/mod.rs | 3 ++ dubbo/src/registry/protocol.rs | 34 +++++++++++++++++++++ 5 files changed, 37 insertions(+), 23 deletions(-) create mode 100644 dubbo/src/registry/protocol.rs diff --git a/dubbo/src/protocol/grpc/grpc_invoker.rs b/dubbo/src/protocol/grpc/grpc_invoker.rs index f0b7f73e..5a6bacf4 100644 --- a/dubbo/src/protocol/grpc/grpc_invoker.rs +++ b/dubbo/src/protocol/grpc/grpc_invoker.rs @@ -15,8 +15,6 @@ * limitations under the License. */ -use std::sync::Once; - use tonic::client::Grpc; use tonic::transport::Channel; use tonic::transport::Endpoint; @@ -28,7 +26,6 @@ use crate::protocol::Invoker; pub struct GrpcInvoker { client: Grpc, url: Url, - once: Once, } impl GrpcInvoker { @@ -38,20 +35,11 @@ impl GrpcInvoker { Self { url, client: Grpc::new(conn), - once: Once::new(), } } } impl Invoker for GrpcInvoker { - fn is_available(&self) -> bool { - true - } - - fn destroy(&self) { - self.once.call_once(|| println!("destroy...")) - } - fn get_url(&self) -> Url { self.url.to_owned() } @@ -75,7 +63,6 @@ impl Clone for GrpcInvoker { Self { client: self.client.clone(), url: self.url.clone(), - once: Once::new(), } } } diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index a59ad35e..7f2fe8af 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -46,8 +46,6 @@ pub trait Invoker { fn invoke(&self, req: Request) -> Response where M1: Send + 'static; - fn is_available(&self) -> bool; - fn destroy(&self); fn get_url(&self) -> Url; } diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 469e4128..33d3e6c1 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -39,14 +39,6 @@ impl Invoker for TripleInvoker { todo!() } - fn is_available(&self) -> bool { - todo!() - } - - fn destroy(&self) { - todo!() - } - fn get_url(&self) -> Url { todo!() } diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index 9c2d945e..06f921b5 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -17,6 +17,7 @@ #![allow(unused_variables, dead_code, missing_docs)] pub mod memory_registry; +pub mod protocol; use crate::common::url::Url; @@ -40,3 +41,5 @@ pub struct ServiceEvent { action: String, service: Url, } + +pub type BoxRegistry = Box>; diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs new file mode 100644 index 00000000..c9785e67 --- /dev/null +++ b/dubbo/src/registry/protocol.rs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; + +use super::BoxRegistry; +use crate::common::url::Url; + +pub struct RegistryProtocol { + registries: HashMap, +} + +impl RegistryProtocol { + fn destroy(&self) { + todo!() + } + + async fn export(self, url: Url) {} + async fn refer(self, url: Url) {} +} From 6b734286252e48c0fca2538aa563b99c28731bda Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Sun, 9 Oct 2022 22:15:21 +0800 Subject: [PATCH 18/22] Rft(dubbo): redesign the api of protocol mod --- dubbo/src/framework.rs | 10 ++------- dubbo/src/protocol/grpc/grpc_exporter.rs | 6 ----- dubbo/src/protocol/grpc/grpc_invoker.rs | 14 ------------ dubbo/src/protocol/grpc/grpc_protocol.rs | 8 +++---- dubbo/src/protocol/grpc/mod.rs | 6 ----- dubbo/src/protocol/mod.rs | 23 +++----------------- dubbo/src/protocol/triple/triple_exporter.rs | 7 ------ dubbo/src/protocol/triple/triple_invoker.rs | 8 ------- dubbo/src/protocol/triple/triple_protocol.rs | 8 +++---- 9 files changed, 11 insertions(+), 79 deletions(-) diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 98710d77..dc3b02f0 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -23,12 +23,10 @@ use futures::Future; use futures::FutureExt; use crate::common::url::Url; -use crate::protocol::triple::triple_invoker::TripleInvoker; use crate::protocol::triple::triple_protocol::TripleProtocol; -use crate::protocol::{Exporter, Protocol}; +use crate::protocol::{BoxExporter, Protocol}; use dubbo_config::{get_global_config, RootConfig}; -pub type BoxExporter = Box>; // Invoker是否可以基于hyper写一个通用的 #[derive(Default)] @@ -105,11 +103,7 @@ impl Dubbo { "triple" => { let pro = Box::new(TripleProtocol::new()); for u in c.iter() { - let tri_fut = pro - .clone() - .export(u.clone()) - .map(|res| Box::new(res) as BoxExporter) - .boxed(); + let tri_fut = pro.clone().export(u.clone()).boxed(); async_vec.push(tri_fut); } } diff --git a/dubbo/src/protocol/grpc/grpc_exporter.rs b/dubbo/src/protocol/grpc/grpc_exporter.rs index 55c41a8b..a5fa3e5e 100644 --- a/dubbo/src/protocol/grpc/grpc_exporter.rs +++ b/dubbo/src/protocol/grpc/grpc_exporter.rs @@ -28,13 +28,7 @@ impl GrpcExporter { } impl Exporter for GrpcExporter { - type InvokerType = T; - fn unexport(&self) {} - - fn get_invoker(&self) -> Self::InvokerType { - self.invoker.clone() - } } impl Clone for GrpcExporter { diff --git a/dubbo/src/protocol/grpc/grpc_invoker.rs b/dubbo/src/protocol/grpc/grpc_invoker.rs index 5a6bacf4..33d81cef 100644 --- a/dubbo/src/protocol/grpc/grpc_invoker.rs +++ b/dubbo/src/protocol/grpc/grpc_invoker.rs @@ -20,7 +20,6 @@ use tonic::transport::Channel; use tonic::transport::Endpoint; use crate::common::url::Url; -use crate::invocation::{Request, Response}; use crate::protocol::Invoker; pub struct GrpcInvoker { @@ -43,19 +42,6 @@ impl Invoker for GrpcInvoker { fn get_url(&self) -> Url { self.url.to_owned() } - - // 根据req中的数据发起req,由Client发起请求,获取响应 - fn invoke(&self, req: Request) -> Response - where - M1: Send + 'static, - { - let (metadata, _) = req.into_parts(); - - let resp = Response::new("string"); - let (_resp_meta, msg) = resp.into_parts(); - - Response::from_parts(metadata, msg.to_string()) - } } impl Clone for GrpcInvoker { diff --git a/dubbo/src/protocol/grpc/grpc_protocol.rs b/dubbo/src/protocol/grpc/grpc_protocol.rs index 31e12261..e0609de6 100644 --- a/dubbo/src/protocol/grpc/grpc_protocol.rs +++ b/dubbo/src/protocol/grpc/grpc_protocol.rs @@ -21,7 +21,7 @@ use super::grpc_exporter::GrpcExporter; use super::grpc_invoker::GrpcInvoker; use super::grpc_server::GrpcServer; use crate::common::url::Url; -use crate::protocol::Protocol; +use crate::protocol::{BoxExporter, Protocol}; pub struct GrpcProtocol { server_map: HashMap, @@ -47,8 +47,6 @@ impl Default for GrpcProtocol { impl Protocol for GrpcProtocol { type Invoker = GrpcInvoker; - type Exporter = GrpcExporter; - fn destroy(&self) { todo!() } @@ -57,7 +55,7 @@ impl Protocol for GrpcProtocol { GrpcInvoker::new(url) } - async fn export(self, url: Url) -> Self::Exporter { + async fn export(self, url: Url) -> BoxExporter { let service_key = url.service_key.join(","); let exporter: GrpcExporter = @@ -71,6 +69,6 @@ impl Protocol for GrpcProtocol { let mut server_map = self.server_map; server_map.insert(service_key.clone(), server.clone()); server.serve(url.clone()).await; - exporter + Box::new(exporter) } } diff --git a/dubbo/src/protocol/grpc/mod.rs b/dubbo/src/protocol/grpc/mod.rs index 7145a92f..743e035e 100644 --- a/dubbo/src/protocol/grpc/mod.rs +++ b/dubbo/src/protocol/grpc/mod.rs @@ -24,9 +24,7 @@ use lazy_static::lazy_static; use std::collections::HashMap; use std::sync::RwLock; -use crate::protocol::DubboGrpcService; use crate::utils::boxed_clone::BoxCloneService; -use grpc_invoker::GrpcInvoker; pub type GrpcBoxCloneService = BoxCloneService< http::Request, @@ -34,11 +32,7 @@ pub type GrpcBoxCloneService = BoxCloneService< std::convert::Infallible, >; -pub type DubboGrpcBox = Box + Send + Sync + 'static>; - lazy_static! { - pub static ref DUBBO_GRPC_SERVICES: RwLock + Send + Sync + 'static>>> = - RwLock::new(HashMap::new()); pub static ref GRPC_SERVICES: RwLock> = RwLock::new(HashMap::new()); } diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 7f2fe8af..5747b51c 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -22,40 +22,23 @@ pub mod triple; use async_trait::async_trait; use crate::common::url::Url; -use crate::invocation::{Request, Response}; -use crate::utils::boxed_clone::BoxCloneService; #[async_trait] pub trait Protocol { type Invoker; - type Exporter; fn destroy(&self); - async fn export(self, url: Url) -> Self::Exporter; + async fn export(self, url: Url) -> BoxExporter; async fn refer(self, url: Url) -> Self::Invoker; } pub trait Exporter { - type InvokerType: Invoker; - fn unexport(&self); - fn get_invoker(&self) -> Self::InvokerType; } pub trait Invoker { - fn invoke(&self, req: Request) -> Response - where - M1: Send + 'static; fn get_url(&self) -> Url; } -pub trait DubboGrpcService { - fn set_proxy_impl(&mut self, invoker: T); - fn service_desc(&self) -> server_desc::ServiceDesc; -} - -pub type GrpcBoxCloneService = BoxCloneService< - http::Request, - http::Response, - std::convert::Infallible, ->; +pub type BoxExporter = Box; +pub type BoxInvoker = Box; diff --git a/dubbo/src/protocol/triple/triple_exporter.rs b/dubbo/src/protocol/triple/triple_exporter.rs index 48ceab3e..7c5b4c0c 100644 --- a/dubbo/src/protocol/triple/triple_exporter.rs +++ b/dubbo/src/protocol/triple/triple_exporter.rs @@ -15,7 +15,6 @@ * limitations under the License. */ -use super::triple_invoker::TripleInvoker; use crate::protocol::Exporter; #[derive(Clone)] @@ -34,13 +33,7 @@ impl Default for TripleExporter { } impl Exporter for TripleExporter { - type InvokerType = TripleInvoker; - fn unexport(&self) { todo!() } - - fn get_invoker(&self) -> Self::InvokerType { - todo!() - } } diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 33d3e6c1..74e9ad1b 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -16,7 +16,6 @@ */ use crate::common::url::Url; -use crate::invocation::{Request, Response}; use crate::protocol::Invoker; #[allow(dead_code)] @@ -32,13 +31,6 @@ impl TripleInvoker { } impl Invoker for TripleInvoker { - fn invoke(&self, _req: Request) -> Response - where - M1: Send + 'static, - { - todo!() - } - fn get_url(&self) -> Url { todo!() } diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs index 1aecbb80..5b713ada 100644 --- a/dubbo/src/protocol/triple/triple_protocol.rs +++ b/dubbo/src/protocol/triple/triple_protocol.rs @@ -24,7 +24,7 @@ use super::triple_exporter::TripleExporter; use super::triple_invoker::TripleInvoker; use super::triple_server::TripleServer; use crate::common::url::Url; -use crate::protocol::Protocol; +use crate::protocol::{BoxExporter, Protocol}; #[derive(Clone)] pub struct TripleProtocol { @@ -55,19 +55,17 @@ impl TripleProtocol { impl Protocol for TripleProtocol { type Invoker = TripleInvoker; - type Exporter = TripleExporter; - fn destroy(&self) { todo!() } - async fn export(mut self, url: Url) -> Self::Exporter { + async fn export(mut self, url: Url) -> BoxExporter { let server = TripleServer::new(url.service_key.clone()); self.servers .insert(url.service_key.join(","), server.clone()); server.serve(url.to_url()).await; - TripleExporter::new() + Box::new(TripleExporter::new()) } async fn refer(self, url: Url) -> Self::Invoker { From 1a662a2f7a3e0a00d5fc2755f6b66c6eb537222d Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 14 Oct 2022 22:13:47 +0800 Subject: [PATCH 19/22] Rem(protocol): remove tonic as plugin --- dubbo/Cargo.toml | 1 - dubbo/src/protocol/grpc/grpc_exporter.rs | 40 ---------- dubbo/src/protocol/grpc/grpc_invoker.rs | 54 -------------- dubbo/src/protocol/grpc/grpc_protocol.rs | 74 ------------------ dubbo/src/protocol/grpc/grpc_server.rs | 95 ------------------------ dubbo/src/protocol/grpc/mod.rs | 38 ---------- 6 files changed, 302 deletions(-) delete mode 100644 dubbo/src/protocol/grpc/grpc_exporter.rs delete mode 100644 dubbo/src/protocol/grpc/grpc_invoker.rs delete mode 100644 dubbo/src/protocol/grpc/grpc_protocol.rs delete mode 100644 dubbo/src/protocol/grpc/grpc_server.rs delete mode 100644 dubbo/src/protocol/grpc/mod.rs diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index d96a72bf..7b38a28f 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -12,7 +12,6 @@ repository = "https://github.com/apache/dubbo-rust.git" [dependencies] hyper = { version = "0.14.19", features = ["full"]} http = "0.2" -tonic = {version ="0.7.2", features = ["compression",]} tower-service = "0.3.1" http-body = "0.4.4" tower = "0.4.12" diff --git a/dubbo/src/protocol/grpc/grpc_exporter.rs b/dubbo/src/protocol/grpc/grpc_exporter.rs deleted file mode 100644 index a5fa3e5e..00000000 --- a/dubbo/src/protocol/grpc/grpc_exporter.rs +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::protocol::{Exporter, Invoker}; - -pub struct GrpcExporter { - invoker: T, -} - -impl GrpcExporter { - pub fn new(_key: String, invoker: T) -> GrpcExporter { - Self { invoker } - } -} - -impl Exporter for GrpcExporter { - fn unexport(&self) {} -} - -impl Clone for GrpcExporter { - fn clone(&self) -> Self { - Self { - invoker: self.invoker.clone(), - } - } -} diff --git a/dubbo/src/protocol/grpc/grpc_invoker.rs b/dubbo/src/protocol/grpc/grpc_invoker.rs deleted file mode 100644 index 33d81cef..00000000 --- a/dubbo/src/protocol/grpc/grpc_invoker.rs +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use tonic::client::Grpc; -use tonic::transport::Channel; -use tonic::transport::Endpoint; - -use crate::common::url::Url; -use crate::protocol::Invoker; - -pub struct GrpcInvoker { - client: Grpc, - url: Url, -} - -impl GrpcInvoker { - pub fn new(url: Url) -> GrpcInvoker { - let endpoint = Endpoint::new(url.to_url()).unwrap(); - let conn = endpoint.connect_lazy(); - Self { - url, - client: Grpc::new(conn), - } - } -} - -impl Invoker for GrpcInvoker { - fn get_url(&self) -> Url { - self.url.to_owned() - } -} - -impl Clone for GrpcInvoker { - fn clone(&self) -> Self { - Self { - client: self.client.clone(), - url: self.url.clone(), - } - } -} diff --git a/dubbo/src/protocol/grpc/grpc_protocol.rs b/dubbo/src/protocol/grpc/grpc_protocol.rs deleted file mode 100644 index e0609de6..00000000 --- a/dubbo/src/protocol/grpc/grpc_protocol.rs +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::collections::HashMap; - -use super::grpc_exporter::GrpcExporter; -use super::grpc_invoker::GrpcInvoker; -use super::grpc_server::GrpcServer; -use crate::common::url::Url; -use crate::protocol::{BoxExporter, Protocol}; - -pub struct GrpcProtocol { - server_map: HashMap, - export_map: HashMap>, -} - -impl GrpcProtocol { - pub fn new() -> Self { - Self { - server_map: HashMap::new(), - export_map: HashMap::new(), - } - } -} - -impl Default for GrpcProtocol { - fn default() -> Self { - Self::new() - } -} - -#[async_trait::async_trait] -impl Protocol for GrpcProtocol { - type Invoker = GrpcInvoker; - - fn destroy(&self) { - todo!() - } - - async fn refer(self, url: Url) -> Self::Invoker { - GrpcInvoker::new(url) - } - - async fn export(self, url: Url) -> BoxExporter { - let service_key = url.service_key.join(","); - - let exporter: GrpcExporter = - GrpcExporter::new(service_key.clone(), GrpcInvoker::new(url.clone())); - let mut export = self.export_map; - export.insert(service_key.clone(), exporter.clone()); - - // 启动服务 - - let server = super::grpc_server::GrpcServer::new(service_key.clone()); - let mut server_map = self.server_map; - server_map.insert(service_key.clone(), server.clone()); - server.serve(url.clone()).await; - Box::new(exporter) - } -} diff --git a/dubbo/src/protocol/grpc/grpc_server.rs b/dubbo/src/protocol/grpc/grpc_server.rs deleted file mode 100644 index e04d5747..00000000 --- a/dubbo/src/protocol/grpc/grpc_server.rs +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::task::Context; -use std::task::Poll; - -use tonic::codegen::BoxFuture; -use tonic::transport; -use tonic::transport::NamedService; -use tower::Service; - -use crate::common::url::Url; -use crate::utils::boxed_clone::BoxCloneService; - -// 每个service对应一个Server -#[derive(Clone)] -pub struct GrpcServer { - inner: transport::Server, - name: String, -} - -impl GrpcServer { - pub fn new(name: String) -> GrpcServer { - Self { - inner: transport::Server::builder(), - name, - } - } - - pub async fn serve(mut self, url: Url) { - let addr = url.to_url().parse().unwrap(); - let svc = super::GRPC_SERVICES - .read() - .unwrap() - .get(self.name.as_str()) - .unwrap() - .clone(); - tracing::info!("server{:?} start...", url); - self.inner - .add_service(MakeSvc::new(svc)) - .serve(addr) - .await - .unwrap(); - } -} - -struct MakeSvc { - inner: BoxCloneService, -} - -impl MakeSvc { - pub fn new(inner: BoxCloneService) -> Self { - Self { inner } - } -} - -impl NamedService for MakeSvc { - const NAME: &'static str = "helloworld.Greeter"; -} - -impl Service for MakeSvc { - type Response = U; - type Error = E; - type Future = BoxFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, request: T) -> BoxFuture { - self.inner.call(request) - } -} - -impl Clone for MakeSvc { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} diff --git a/dubbo/src/protocol/grpc/mod.rs b/dubbo/src/protocol/grpc/mod.rs deleted file mode 100644 index 743e035e..00000000 --- a/dubbo/src/protocol/grpc/mod.rs +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -pub mod grpc_exporter; -pub mod grpc_invoker; -pub mod grpc_protocol; -pub mod grpc_server; - -use lazy_static::lazy_static; -use std::collections::HashMap; -use std::sync::RwLock; - -use crate::utils::boxed_clone::BoxCloneService; - -pub type GrpcBoxCloneService = BoxCloneService< - http::Request, - http::Response, - std::convert::Infallible, ->; - -lazy_static! { - pub static ref GRPC_SERVICES: RwLock> = - RwLock::new(HashMap::new()); -} From 8b0ab45d32dd83736975870b37e312f7d79cd902 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 14 Oct 2022 22:26:15 +0800 Subject: [PATCH 20/22] Rem(dubbo-build): remove unuse Invoker in server side --- dubbo-build/src/server.rs | 14 +++++--------- examples/echo/src/echo/server.rs | 2 +- examples/echo/src/protos/hello_echo.rs | 15 ++++++--------- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/dubbo-build/src/server.rs b/dubbo-build/src/server.rs index 2bfb8f32..4dbbc9d2 100644 --- a/dubbo-build/src/server.rs +++ b/dubbo-build/src/server.rs @@ -76,18 +76,16 @@ pub fn generate( #service_doc #(#struct_attributes)* #[derive(Debug)] - pub struct #server_service { + pub struct #server_service { inner: _Inner, - invoker: Option, } struct _Inner(Arc); - impl #server_service { + impl #server_service { pub fn new(inner: T) -> Self { Self { inner: _Inner(Arc::new(inner)), - invoker: None, } } @@ -100,12 +98,11 @@ pub fn generate( } - impl Service> for #server_service + impl Service> for #server_service where T: #server_trait, B: Body + Send + 'static, B::Error: Into + Send + 'static, - I: Invoker + Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -133,12 +130,11 @@ pub fn generate( } } - impl Clone for #server_service { + impl Clone for #server_service { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { inner, - invoker: None, } } } @@ -156,7 +152,7 @@ pub fn generate( } pub fn register_server(server: T) { - let s = #server_service::<_, TripleInvoker>::new(server); + let s = #server_service::new(server); dubbo::protocol::triple::TRIPLE_SERVICES .write() .unwrap() diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs index 2d2f5a44..2be45b0e 100644 --- a/examples/echo/src/echo/server.rs +++ b/examples/echo/src/echo/server.rs @@ -51,7 +51,7 @@ async fn main() { name: "echo".to_string(), }); let server = EchoServerImpl::default(); - let s = EchoServer::::with_filter(server, FakeFilter {}); + let s = EchoServer::::with_filter(server, FakeFilter {}); dubbo::protocol::triple::TRIPLE_SERVICES .write() .unwrap() diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index 57d2e5db..ce8ce09d 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -162,16 +162,14 @@ pub mod echo_server { } /// Echo is the echo service. #[derive(Debug)] - pub struct EchoServer { + pub struct EchoServer { inner: _Inner, - invoker: Option, } struct _Inner(Arc); - impl EchoServer { + impl EchoServer { pub fn new(inner: T) -> Self { Self { inner: _Inner(Arc::new(inner)), - invoker: None, } } pub fn with_filter(inner: T, filter: F) -> FilterService @@ -181,12 +179,11 @@ pub mod echo_server { FilterService::new(Self::new(inner), filter) } } - impl Service> for EchoServer + impl Service> for EchoServer where T: Echo, B: Body + Send + 'static, B::Error: Into + Send + 'static, - I: Invoker + Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -365,10 +362,10 @@ pub mod echo_server { } } } - impl Clone for EchoServer { + impl Clone for EchoServer { fn clone(&self) -> Self { let inner = self.inner.clone(); - Self { inner, invoker: None } + Self { inner } } } impl Clone for _Inner { @@ -382,7 +379,7 @@ pub mod echo_server { } } pub fn register_server(server: T) { - let s = EchoServer::<_, TripleInvoker>::new(server); + let s = EchoServer::new(server); dubbo::protocol::triple::TRIPLE_SERVICES .write() .unwrap() From 94c51a133ecd5e61f91f4214b1f57d54b36b96d8 Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Fri, 14 Oct 2022 22:32:39 +0800 Subject: [PATCH 21/22] Rft(protocol): redesign Invoker trait, Invoker=Connection --- dubbo/src/protocol/mod.rs | 38 +++++++++++++++++++-- dubbo/src/protocol/triple/triple_invoker.rs | 31 +++++++++++++++-- dubbo/src/triple/client/triple.rs | 2 +- 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 5747b51c..c73dc5b0 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -15,11 +15,14 @@ * limitations under the License. */ -pub mod grpc; pub mod server_desc; pub mod triple; +use std::future::Future; +use std::task::{Context, Poll}; + use async_trait::async_trait; +use tower_service::Service; use crate::common::url::Url; @@ -36,9 +39,38 @@ pub trait Exporter { fn unexport(&self); } -pub trait Invoker { +pub trait Invoker { + type Response; + + type Error; + + type Future: Future>; + fn get_url(&self) -> Url; + + fn call(&mut self, req: ReqBody) -> Self::Future; } pub type BoxExporter = Box; -pub type BoxInvoker = Box; + +pub struct WrapperInvoker(T); + +impl Service> for WrapperInvoker +where + T: Invoker, Response = http::Response>, + T::Error: Into, +{ + type Response = T::Response; + + type Error = T::Error; + + type Future = T::Future; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + self.0.call(req) + } +} diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 74e9ad1b..2ec3f4a8 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -15,23 +15,48 @@ * limitations under the License. */ +use std::str::FromStr; + +use tower_service::Service; + use crate::common::url::Url; use crate::protocol::Invoker; +use crate::triple::client::connection::Connection; #[allow(dead_code)] #[derive(Clone, Default)] pub struct TripleInvoker { url: Url, + conn: Connection, } impl TripleInvoker { pub fn new(url: Url) -> TripleInvoker { - Self { url } + let uri = http::Uri::from_str(&url.to_url()).unwrap(); + Self { + url, + conn: Connection::new().with_host(uri), + } } } -impl Invoker for TripleInvoker { +impl Invoker> for TripleInvoker +where + ReqBody: http_body::Body + Unpin + Send + 'static, + ReqBody::Error: Into, + ReqBody::Data: Send + Unpin, +{ + type Response = http::Response; + + type Error = crate::Error; + + type Future = crate::BoxFuture; + fn get_url(&self) -> Url { - todo!() + self.url.clone() + } + + fn call(&mut self, req: http::Request) -> Self::Future { + self.conn.call(req) } } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 8021c40d..36c9ce94 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -43,7 +43,7 @@ impl TripleClient { Ok(v) => Some(v), Err(err) => { tracing::error!("http uri parse error: {}, host: {}", err, host); - None + panic!("http uri parse error: {}, host: {}", err, host) } }; From 1d78481704797f83b3ed480cc3b2eaa105f828df Mon Sep 17 00:00:00 2001 From: yangyang <962032265@qq.com> Date: Sun, 16 Oct 2022 01:20:36 +0800 Subject: [PATCH 22/22] Rft(dubbo): impl memory_protocol, update registry config --- config/src/config.rs | 2 ++ dubbo/src/common/consts.rs | 18 ++++++++++ dubbo/src/common/mod.rs | 1 + dubbo/src/framework.rs | 8 +++++ dubbo/src/protocol/mod.rs | 11 +++++- dubbo/src/registry/mod.rs | 3 +- dubbo/src/registry/protocol.rs | 62 ++++++++++++++++++++++++++++++++-- 7 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 dubbo/src/common/consts.rs diff --git a/config/src/config.rs b/config/src/config.rs index 68cace28..a07cb333 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -37,6 +37,7 @@ pub struct RootConfig { pub name: String, pub service: HashMap, pub protocols: HashMap, + pub registries: HashMap, #[serde(skip_serializing, skip_deserializing)] pub data: HashMap, @@ -63,6 +64,7 @@ impl RootConfig { name: "dubbo".to_string(), service: HashMap::new(), protocols: HashMap::new(), + registries: HashMap::new(), data: HashMap::new(), } } diff --git a/dubbo/src/common/consts.rs b/dubbo/src/common/consts.rs new file mode 100644 index 00000000..8d941739 --- /dev/null +++ b/dubbo/src/common/consts.rs @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub const REGISTRY_PROTOCOL: &str = "registry"; diff --git a/dubbo/src/common/mod.rs b/dubbo/src/common/mod.rs index 0bfd0ed6..23284214 100644 --- a/dubbo/src/common/mod.rs +++ b/dubbo/src/common/mod.rs @@ -15,4 +15,5 @@ * limitations under the License. */ +pub mod consts; pub mod url; diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index dc3b02f0..00e46d7b 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -32,6 +32,7 @@ use dubbo_config::{get_global_config, RootConfig}; #[derive(Default)] pub struct Dubbo { protocols: HashMap>, + registries: HashMap, config: Option, } @@ -40,6 +41,7 @@ impl Dubbo { tracing_subscriber::fmt::init(); Self { protocols: HashMap::new(), + registries: HashMap::new(), config: None, } } @@ -56,6 +58,12 @@ impl Dubbo { let conf = self.config.as_ref().unwrap(); tracing::debug!("global conf: {:?}", conf); + + for (name, url) in conf.registries.iter() { + self.registries + .insert(name.to_string(), Url::from_url(url).unwrap()); + } + for (_, c) in conf.service.iter() { let u = if c.protocol_configs.is_empty() { let protocol = match conf.protocols.get(&c.protocol) { diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index c73dc5b0..20042cb5 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -51,7 +51,16 @@ pub trait Invoker { fn call(&mut self, req: ReqBody) -> Self::Future; } -pub type BoxExporter = Box; +pub type BoxExporter = Box; +pub type BoxInvoker = Box< + dyn Invoker< + http::Request, + Response = http::Response, + Error = crate::Error, + Future = crate::BoxFuture, crate::Error>, + > + Send + + Sync, +>; pub struct WrapperInvoker(T); diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index 06f921b5..cbae16b4 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -42,4 +42,5 @@ pub struct ServiceEvent { service: Url, } -pub type BoxRegistry = Box>; +pub type BoxRegistry = + Box + Send + Sync>; diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs index c9785e67..5b0973a3 100644 --- a/dubbo/src/registry/protocol.rs +++ b/dubbo/src/registry/protocol.rs @@ -16,19 +16,75 @@ */ use std::collections::HashMap; +use std::sync::Arc; +use super::memory_registry::MemoryRegistry; use super::BoxRegistry; +use crate::codegen::TripleInvoker; +use crate::common::consts; use crate::common::url::Url; +use crate::protocol::triple::triple_exporter::TripleExporter; +use crate::protocol::BoxExporter; +use crate::protocol::BoxInvoker; +use crate::protocol::Protocol; +#[derive(Clone, Default)] pub struct RegistryProtocol { - registries: HashMap, + // registerAddr: Registry + registries: Arc>, + // providerUrl: Exporter + exporters: Arc>, } impl RegistryProtocol { + pub fn new() -> Self { + RegistryProtocol { + registries: Arc::new(HashMap::new()), + exporters: Arc::new(HashMap::new()), + } + } + + pub fn get_registry(&self, url: Url) -> BoxRegistry { + // self.registries.clone().insert(url.location.clone(), Box::new(MemoryRegistry::default())); + + // *(self.registries.get(&url.location).unwrap()) + Box::new(MemoryRegistry::default()) + } +} + +#[async_trait::async_trait] +impl Protocol for RegistryProtocol { + type Invoker = BoxInvoker; + fn destroy(&self) { todo!() } - async fn export(self, url: Url) {} - async fn refer(self, url: Url) {} + async fn export(self, url: Url) -> BoxExporter { + // getProviderUrl + // getRegisterUrl + // init Exporter based on provider_url + // server registry based on register_url + // start server health check + Box::new(TripleExporter::new()) + } + async fn refer(self, url: Url) -> Self::Invoker { + // getRegisterUrl + // get Registry from registry_url + // init directory based on registry_url and Registry + // init Cluster based on Directory generates Invoker + Box::new(TripleInvoker::new(url)) + } +} + +fn get_registry_url(mut url: Url) -> Url { + if url.protocol == consts::REGISTRY_PROTOCOL { + url.protocol = url.get_param("registry".to_string()).unwrap(); + } + + url +} + +fn get_provider_url(url: Url) -> Url { + url }