Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
616c588
feat(dubbo): add filter mod
yang20150702 Sep 21, 2022
1d16d83
Merge branch 'main' of github.com:yang20150702/dubbo-rust
yang20150702 Sep 21, 2022
ef6de15
docs: add helloworld tutorial
yang20150702 Sep 21, 2022
daa2b1f
refactor(triple): redesign connector based on Service
yang20150702 Sep 28, 2022
aab30c5
Merge branch 'apache:main' into main
yang20150702 Sep 29, 2022
d47862d
Merge branch 'apache:main' into main
yang20150702 Sep 29, 2022
ee14b34
refactor(triple): redesign triple client
yang20150702 Sep 30, 2022
629a93d
refactor(dubbo-build): update client api
yang20150702 Sep 30, 2022
d195f90
refactor(dubbo): update logs
yang20150702 Sep 30, 2022
4132b14
refactor(examples): adapt to dubbo-build changes
yang20150702 Sep 30, 2022
549dd05
style(dubbo): cargo fmt
yang20150702 Sep 30, 2022
72e89e9
style: update license header
yang20150702 Sep 30, 2022
11bcbb8
refactor(triple): update tripleClient to support filter trait
yang20150702 Oct 2, 2022
da1924f
refactor(dubbo-build): update client mod
yang20150702 Oct 2, 2022
9010654
refactor(example): add filter example
yang20150702 Oct 2, 2022
8ea2d57
refactor(dubbo): update log
yang20150702 Oct 2, 2022
8d1f7df
Rft(dubbo): use boxBody as response of Service in Client side
yang20150702 Oct 4, 2022
9293547
Rft(dubb-build): update client impl
yang20150702 Oct 4, 2022
2a8e740
Rft(dubbo): impl filter in server
yang20150702 Oct 4, 2022
597834e
Merge branch 'apache:main' into main
yang20150702 Oct 4, 2022
601defe
Rft(dubbo): update impl of protocol mod
yang20150702 Oct 7, 2022
0af4034
Merge branch 'main' of github.com:yang20150702/dubbo-rust
yang20150702 Oct 7, 2022
6b73428
Rft(dubbo): redesign the api of protocol mod
yang20150702 Oct 9, 2022
1a662a2
Rem(protocol): remove tonic as plugin
yang20150702 Oct 14, 2022
8b0ab45
Rem(dubbo-build): remove unuse Invoker in server side
yang20150702 Oct 14, 2022
94c51a1
Rft(protocol): redesign Invoker trait, Invoker=Connection
yang20150702 Oct 14, 2022
1d78481
Rft(dubbo): impl memory_protocol, update registry config
yang20150702 Oct 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct RootConfig {
pub name: String,
pub service: HashMap<String, ServiceConfig>,
pub protocols: HashMap<String, ProtocolConfig>,
pub registries: HashMap<String, String>,

#[serde(skip_serializing, skip_deserializing)]
pub data: HashMap<String, String>,
Expand All @@ -63,6 +64,7 @@ impl RootConfig {
name: "dubbo".to_string(),
service: HashMap::new(),
protocols: HashMap::new(),
registries: HashMap::new(),
data: HashMap::new(),
}
}
Expand All @@ -83,6 +85,7 @@ impl RootConfig {
}
};

tracing::info!("current path: {:?}", env::current_dir());
let data = fs::read(config_path)?;
let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap();
tracing::debug!("origin config: {:?}", conf);
Expand Down
37 changes: 26 additions & 11 deletions dubbo-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,38 @@ pub fn generate<T: Service>(
#service_doc
#(#struct_attributes)*
#[derive(Debug, Clone, Default)]
pub struct #service_ident {
inner: TripleClient,
uri: String,
pub struct #service_ident<T> {
inner: TripleClient<T>,
}

impl #service_ident {
pub fn new() -> Self {
impl #service_ident<Connection> {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
#service_ident {
inner: cli,
}
}
}

impl<T> #service_ident<T>
where
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
T::Error: Into<StdError>,
{
pub fn new(inner: T) -> Self {
Self {
inner: TripleClient::new(),
uri: "".to_string(),
inner: TripleClient::new(inner, None),
}
}

pub fn with_uri(mut self, uri: String) -> Self {
self.uri = uri.clone();
self.inner = self.inner.with_host(uri);
self
pub fn with_filter<F>(self, filter: F) -> #service_ident<FilterService<T, F>>
where
F: Filter,
{
let inner = self.inner.with_filter(filter);
#service_ident {
inner,
}
}

#methods
Expand Down
21 changes: 12 additions & 9 deletions dubbo-build/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,33 @@ pub fn generate<T: Service>(
#service_doc
#(#struct_attributes)*
#[derive(Debug)]
pub struct #server_service<T: #server_trait, I = TripleInvoker> {
pub struct #server_service<T: #server_trait> {
inner: _Inner<T>,
invoker: Option<I>,
}

struct _Inner<T>(Arc<T>);

impl<T: #server_trait, I> #server_service<T, I> {
impl<T: #server_trait> #server_service<T> {
pub fn new(inner: T) -> Self {
Self {
inner: _Inner(Arc::new(inner)),
invoker: None,
}
}

pub fn with_filter<F>(inner: T, filter: F) -> FilterService<Self, F>
where
F: Filter,
{
FilterService::new(Self::new(inner), filter)
}

}

impl<T, I, B> Service<http::Request<B>> for #server_service<T, I>
impl<T, B> Service<http::Request<B>> for #server_service<T>
where
T: #server_trait,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
I: Invoker + Send + 'static,
{
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
Expand Down Expand Up @@ -126,12 +130,11 @@ pub fn generate<T: Service>(
}
}

impl<T: #server_trait, I: Invoker + Send + 'static> Clone for #server_service<T, I> {
impl<T: #server_trait> Clone for #server_service<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
invoker: None,
}
}
}
Expand All @@ -149,7 +152,7 @@ pub fn generate<T: Service>(
}

pub fn register_server<T: #server_trait>(server: T) {
let s = #server_service::<_, TripleInvoker>::new(server);
let s = #server_service::new(server);
dubbo::protocol::triple::TRIPLE_SERVICES
.write()
.unwrap()
Expand Down
1 change: 0 additions & 1 deletion dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ repository = "https://github.com/apache/dubbo-rust.git"
[dependencies]
hyper = { version = "0.14.19", features = ["full"]}
http = "0.2"
tonic = {version ="0.7.2", features = ["compression",]}
tower-service = "0.3.1"
http-body = "0.4.4"
tower = "0.4.12"
Expand Down
5 changes: 5 additions & 0 deletions dubbo/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ pub use std::sync::Arc;
pub use std::task::{Context, Poll};

pub use async_trait::async_trait;
pub use bytes::Bytes;
pub use http_body::Body;
pub use hyper::Body as hyperBody;
pub use tower_service::Service;

pub use super::invocation::{IntoStreamingRequest, Request, Response};
Expand All @@ -34,3 +36,6 @@ pub use super::triple::server::service::{
};
pub use super::triple::server::TripleServer;
pub use super::{empty_body, BoxBody, BoxFuture, StdError};
pub use crate::filter::service::FilterService;
pub use crate::filter::Filter;
pub use crate::triple::client::connection::Connection;
18 changes: 18 additions & 0 deletions dubbo/src/common/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

pub const REGISTRY_PROTOCOL: &str = "registry";
1 change: 1 addition & 0 deletions dubbo/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
* limitations under the License.
*/

pub mod consts;
pub mod url;
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,10 @@
* limitations under the License.
*/

use crate::protocol::{Exporter, Invoker};
pub mod service;

pub struct GrpcExporter<T> {
invoker: T,
}

impl<T> GrpcExporter<T> {
pub fn new(_key: String, invoker: T) -> GrpcExporter<T> {
Self { invoker }
}
}

impl<T: Invoker + Clone> Exporter for GrpcExporter<T> {
type InvokerType = T;

fn unexport(&self) {}

fn get_invoker(&self) -> Self::InvokerType {
self.invoker.clone()
}
}
use crate::invocation::Request;

impl<T: Invoker + Clone> Clone for GrpcExporter<T> {
fn clone(&self) -> Self {
Self {
invoker: self.invoker.clone(),
}
}
pub trait Filter {
fn call(&mut self, req: Request<()>) -> Result<Request<()>, crate::status::Status>;
}
84 changes: 84 additions & 0 deletions dubbo/src/filter/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use tower_service::Service;

use super::Filter;
use crate::invocation::Metadata;
use crate::invocation::Request;

#[derive(Clone)]
pub struct FilterService<S, F> {
inner: S,
f: F,
}

impl<S, F> FilterService<S, F> {
pub fn new(inner: S, f: F) -> Self
where
F: Filter,
{
Self { inner, f }
}
}

impl<S, F, ReqBody> Service<http::Request<ReqBody>> for FilterService<S, F>
where
F: Filter,
S: Service<http::Request<ReqBody>, Response = http::Response<crate::BoxBody>>,
S::Error: Into<crate::Error>,
S::Future: Send + 'static,
{
type Response = http::Response<crate::BoxBody>;

type Error = S::Error;

type Future = crate::BoxFuture<Self::Response, Self::Error>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
let uri = req.uri().clone();
let method = req.method().clone();
let version = req.version();
let (parts, msg) = req.into_parts();

let res = self.f.call(Request::from_parts(
Metadata::from_headers(parts.headers),
(),
));
match res {
Ok(req) => {
let (metadata, _) = req.into_parts();
let req = Request::from_parts(Metadata::from_headers(metadata.into_headers()), msg);
let http_req = req.into_http(uri, method, version);

let resp = self.inner.call(http_req);
Box::pin(resp)
}
Err(err) => {
let fut = async move { Ok(err.to_http()) };
Box::pin(fut)
}
}
}
}
21 changes: 11 additions & 10 deletions dubbo/src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@ use futures::Future;
use futures::FutureExt;

use crate::common::url::Url;
use crate::protocol::triple::triple_invoker::TripleInvoker;
use crate::protocol::triple::triple_protocol::TripleProtocol;
use crate::protocol::{Exporter, Protocol};
use crate::protocol::{BoxExporter, Protocol};
use dubbo_config::{get_global_config, RootConfig};

pub type BoxExporter = Box<dyn Exporter<InvokerType = TripleInvoker>>;
// Invoker是否可以基于hyper写一个通用的

#[derive(Default)]
pub struct Dubbo {
protocols: HashMap<String, Vec<Url>>,
registries: HashMap<String, Url>,
config: Option<RootConfig>,
}

impl Dubbo {
pub fn new() -> Dubbo {
tracing_subscriber::fmt::init();
Self {
protocols: HashMap::new(),
registries: HashMap::new(),
config: None,
}
}
Expand All @@ -51,14 +52,18 @@ impl Dubbo {
}

pub fn init(&mut self) {
tracing_subscriber::fmt::init();

if self.config.is_none() {
self.config = Some(get_global_config())
}

let conf = self.config.as_ref().unwrap();
tracing::debug!("global conf: {:?}", conf);

for (name, url) in conf.registries.iter() {
self.registries
.insert(name.to_string(), Url::from_url(url).unwrap());
}

for (_, c) in conf.service.iter() {
let u = if c.protocol_configs.is_empty() {
let protocol = match conf.protocols.get(&c.protocol) {
Expand Down Expand Up @@ -106,11 +111,7 @@ impl Dubbo {
"triple" => {
let pro = Box::new(TripleProtocol::new());
for u in c.iter() {
let tri_fut = pro
.clone()
.export(u.clone())
.map(|res| Box::new(res) as BoxExporter)
.boxed();
let tri_fut = pro.clone().export(u.clone()).boxed();
async_vec.push(tri_fut);
}
}
Expand Down
11 changes: 9 additions & 2 deletions dubbo/src/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,16 @@ impl<T> Request<T> {
}
}

pub fn into_http(self) -> http::Request<T> {
pub fn into_http(
self,
uri: http::Uri,
method: http::Method,
version: http::Version,
) -> http::Request<T> {
let mut http_req = http::Request::new(self.message);
*http_req.version_mut() = http::Version::HTTP_2;
*http_req.version_mut() = version;
*http_req.uri_mut() = uri;
*http_req.method_mut() = method;
*http_req.headers_mut() = self.metadata.into_headers();

http_req
Expand Down
1 change: 1 addition & 0 deletions dubbo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

pub mod codegen;
pub mod common;
pub mod filter;
mod framework;
pub mod invocation;
pub mod protocol;
Expand Down
Loading