From 01367b694e028483f90702201f6945c2cbf4673e Mon Sep 17 00:00:00 2001 From: luyanbo Date: Thu, 19 Jan 2023 17:48:00 +0800 Subject: [PATCH 1/4] implement service discovery --- Cargo.toml | 1 + config/src/config.rs | 2 +- dubbo-build/src/client.rs | 100 ++++- dubbo/Cargo.toml | 1 + dubbo/src/cluster/directory.rs | 146 +++++++ dubbo/src/cluster/mod.rs | 18 + dubbo/src/codegen.rs | 6 + dubbo/src/invocation.rs | 32 ++ dubbo/src/lib.rs | 1 + dubbo/src/registry/memory_registry.rs | 12 +- dubbo/src/registry/mod.rs | 25 +- dubbo/src/triple/client/triple.rs | 76 +++- examples/echo/Cargo.toml | 3 + examples/echo/src/echo/grpc.examples.echo.rs | 396 +++++++++++++++++++ examples/echo/src/protos/hello_echo.rs | 35 +- examples/greeter/Cargo.toml | 3 + examples/greeter/proto/greeter.proto | 2 +- examples/greeter/src/greeter/client.rs | 32 +- registry-zookeeper/Cargo.toml | 14 + registry-zookeeper/LICENSE | 202 ++++++++++ registry-zookeeper/src/lib.rs | 27 ++ registry-zookeeper/src/zookeeper_registry.rs | 306 ++++++++++++++ 22 files changed, 1388 insertions(+), 52 deletions(-) create mode 100644 dubbo/src/cluster/directory.rs create mode 100644 dubbo/src/cluster/mod.rs create mode 100644 examples/echo/src/echo/grpc.examples.echo.rs create mode 100644 registry-zookeeper/Cargo.toml create mode 100644 registry-zookeeper/LICENSE create mode 100644 registry-zookeeper/src/lib.rs create mode 100644 registry-zookeeper/src/zookeeper_registry.rs diff --git a/Cargo.toml b/Cargo.toml index 140c0797..d3942a69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "xds", "registry", + "registry-zookeeper", "metadata", "common", "config", diff --git a/config/src/config.rs b/config/src/config.rs index 672319a4..500fd33d 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -77,7 +77,7 @@ impl RootConfig { v } Err(err) => { - tracing::error!( + tracing::warn!( "error loading config_path: {:?}, use default path: {:?}", err, DUBBO_CONFIG_PATH diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 8d58ab16..0a3ff474 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -61,7 +61,7 @@ pub fn generate( // will trigger if compression is disabled clippy::let_unit_value, )] - use dubbo::codegen::*; + use dubbo::{codegen::*, cluster::directory::StaticDirectory}; #service_doc #(#struct_attributes)* @@ -72,7 +72,8 @@ pub fn generate( impl #service_ident { pub fn connect(host: String) -> Self { - let cli = TripleClient::connect(host); + let mut cli = TripleClient::connect(host.clone()); + cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); #service_ident { inner: cli, } @@ -106,6 +107,11 @@ pub fn generate( } } + pub fn with_directory(mut self, directory: Box) -> Self { + self.inner = self.inner.with_directory(directory); + self + } + #methods } @@ -123,6 +129,12 @@ fn generate_methods( let package = if emit_package { service.package() } else { "" }; for method in service.methods() { + let service_unique_name = format!( + "{}{}{}", + package, + if package.is_empty() { "" } else { "." }, + service.identifier() + ); let path = format!( "/{}{}{}/{}", package, @@ -134,14 +146,34 @@ fn generate_methods( stream.extend(generate_doc_comments(method.comment())); let method = match (method.client_streaming(), method.server_streaming()) { - (false, false) => generate_unary(&method, proto_path, compile_well_known_types, path), - (false, true) => { - generate_server_streaming(&method, proto_path, compile_well_known_types, path) - } - (true, false) => { - generate_client_streaming(&method, proto_path, compile_well_known_types, path) - } - (true, true) => generate_streaming(&method, proto_path, compile_well_known_types, path), + (false, false) => generate_unary( + service_unique_name, + &method, + proto_path, + compile_well_known_types, + path, + ), + (false, true) => generate_server_streaming( + service_unique_name, + &method, + proto_path, + compile_well_known_types, + path, + ), + (true, false) => generate_client_streaming( + service_unique_name, + &method, + proto_path, + compile_well_known_types, + path, + ), + (true, true) => generate_streaming( + service_unique_name, + &method, + proto_path, + compile_well_known_types, + path, + ), }; stream.extend(method); @@ -151,6 +183,7 @@ fn generate_methods( } fn generate_unary( + service_unique_name: String, method: &T, proto_path: &str, compile_well_known_types: bool, @@ -159,6 +192,7 @@ fn generate_unary( let codec_name = syn::parse_str::(CODEC_PATH).unwrap(); let ident = format_ident!("{}", method.name()); let (request, response) = method.request_response_name(proto_path, compile_well_known_types); + let method_name = method.identifier(); quote! { pub async fn #ident( @@ -166,19 +200,22 @@ fn generate_unary( request: Request<#request>, ) -> Result, dubbo::status::Status> { let codec = #codec_name::<#request, #response>::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from(#service_unique_name)) + .with_method_name(String::from(#method_name)); let path = http::uri::PathAndQuery::from_static(#path); - self.inner - .unary( + self.inner.unary( request, codec, path, - ) - .await + invocation, + ).await } } } fn generate_server_streaming( + service_unique_name: String, method: &T, proto_path: &str, compile_well_known_types: bool, @@ -188,6 +225,7 @@ fn generate_server_streaming( let ident = format_ident!("{}", method.name()); let (request, response) = method.request_response_name(proto_path, compile_well_known_types); + let method_name = method.identifier(); quote! { pub async fn #ident( @@ -196,13 +234,22 @@ fn generate_server_streaming( ) -> Result>, dubbo::status::Status> { let codec = #codec_name::<#request, #response>::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from(#service_unique_name)) + .with_method_name(String::from(#method_name)); let path = http::uri::PathAndQuery::from_static(#path); - self.inner.server_streaming(request, codec, path).await + self.inner.server_streaming( + request, + codec, + path, + invocation, + ).await } } } fn generate_client_streaming( + service_unique_name: String, method: &T, proto_path: &str, compile_well_known_types: bool, @@ -212,6 +259,7 @@ fn generate_client_streaming( let ident = format_ident!("{}", method.name()); let (request, response) = method.request_response_name(proto_path, compile_well_known_types); + let method_name = method.identifier(); quote! { pub async fn #ident( @@ -219,13 +267,22 @@ fn generate_client_streaming( request: impl IntoStreamingRequest ) -> Result, dubbo::status::Status> { let codec = #codec_name::<#request, #response>::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from(#service_unique_name)) + .with_method_name(String::from(#method_name)); let path = http::uri::PathAndQuery::from_static(#path); - self.inner.client_streaming(request, codec, path).await + self.inner.client_streaming( + request, + codec, + path, + invocation, + ).await } } } fn generate_streaming( + service_unique_name: String, method: &T, proto_path: &str, compile_well_known_types: bool, @@ -235,6 +292,7 @@ fn generate_streaming( let ident = format_ident!("{}", method.name()); let (request, response) = method.request_response_name(proto_path, compile_well_known_types); + let method_name = method.identifier(); quote! { pub async fn #ident( @@ -242,8 +300,16 @@ fn generate_streaming( request: impl IntoStreamingRequest ) -> Result>, dubbo::status::Status> { let codec = #codec_name::<#request, #response>::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from(#service_unique_name)) + .with_method_name(String::from(#method_name)); let path = http::uri::PathAndQuery::from_static(#path); - self.inner.bidi_streaming(request, codec, path).await + self.inner.bidi_streaming( + request, + codec, + path, + invocation, + ).await } } } diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 145b4a0e..6a51730e 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -24,6 +24,7 @@ async-trait = "0.1.56" tower-layer = "0.3" bytes = "1.0" pin-project = "1.0" +rand = "0.8.5" serde_json = "1.0.82" serde = {version="1.0.138", features = ["derive"]} futures = "0.3" diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs new file mode 100644 index 00000000..0e25afc2 --- /dev/null +++ b/dubbo/src/cluster/directory.rs @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; +use std::fmt::Debug; +use std::str::FromStr; +use std::sync::{Arc, RwLock}; + +use crate::common::url::Url; +use crate::invocation::{Invocation, RpcInvocation}; +use crate::registry::memory_registry::MemoryNotifyListener; +use crate::registry::{BoxRegistry, RegistryWrapper}; + +/// Directory. +/// +/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service) +pub trait Directory: Debug + DirectoryClone { + fn list(&self, invocation: RpcInvocation) -> Vec; +} + +pub trait DirectoryClone { + fn clone_box(&self) -> Box; +} + +impl DirectoryClone for T +where + T: 'static + Directory + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +#[derive(Debug)] +pub struct StaticDirectory { + uri: http::Uri, +} + +impl StaticDirectory { + pub fn new(host: &str) -> StaticDirectory { + let uri = match http::Uri::from_str(host) { + Ok(v) => v, + Err(err) => { + tracing::error!("http uri parse error: {}, host: {}", err, host); + panic!("http uri parse error: {}, host: {}", err, host) + } + }; + StaticDirectory { uri: uri } + } +} + +impl Directory for StaticDirectory { + fn list(&self, invocation: RpcInvocation) -> Vec { + let url = Url::from_url(&format!( + "triple://{}:{}/{}", + self.uri.host().unwrap(), + self.uri.port().unwrap(), + invocation.get_target_service_unique_name(), + )) + .unwrap(); + vec![url] + } +} + +impl DirectoryClone for StaticDirectory { + fn clone_box(&self) -> Box { + Box::new(StaticDirectory { + uri: self.uri.clone(), + }) + } +} + +#[derive(Debug)] +pub struct RegistryDirectory { + registry: RegistryWrapper, + service_instances: Arc>>>, +} + +impl RegistryDirectory { + pub fn new(registry: BoxRegistry) -> RegistryDirectory { + RegistryDirectory { + registry: RegistryWrapper { + registry: Some(registry), + }, + service_instances: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl DirectoryClone for RegistryDirectory { + fn clone_box(&self) -> Box { + todo!() + } +} + +impl Directory for RegistryDirectory { + fn list(&self, invocation: RpcInvocation) -> Vec { + let service_name = invocation.get_target_service_unique_name(); + + let url = Url::from_url(&format!( + "triple://{}:{}/{}", + "127.0.0.1", "8888", service_name + )) + .unwrap(); + + self.registry + .registry + .as_ref() + .expect("msg") + .subscribe( + url, + MemoryNotifyListener { + service_instances: Arc::clone(&self.service_instances), + }, + ) + .expect("subscribe"); + + let map = self + .service_instances + .read() + .expect("service_instances.read"); + let binding = Vec::new(); + let url_vec = map.get(&service_name).unwrap_or(&binding); + url_vec.to_vec() + } +} diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs new file mode 100644 index 00000000..84980ea8 --- /dev/null +++ b/dubbo/src/cluster/mod.rs @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod directory; diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index ef656469..3535fbf0 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -24,9 +24,15 @@ pub use http_body::Body; pub use hyper::Body as hyperBody; pub use tower_service::Service; +pub use super::cluster::directory::Directory; +pub use super::cluster::directory::RegistryDirectory; +pub use super::invocation::RpcInvocation; pub use super::invocation::{IntoStreamingRequest, Request, Response}; pub use super::protocol::triple::triple_invoker::TripleInvoker; pub use super::protocol::Invoker; +pub use super::registry::BoxRegistry; +pub use super::registry::Registry; +pub use super::registry::RegistryWrapper; pub use super::triple::client::TripleClient; pub use super::triple::codec::prost::ProstCodec; pub use super::triple::codec::Codec; diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index 5a80e9a7..423e9c74 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -189,3 +189,35 @@ impl Metadata { header } } + +pub trait Invocation { + fn get_target_service_unique_name(&self) -> String; + fn get_method_name(&self) -> String; +} + +#[derive(Default)] +pub struct RpcInvocation { + target_service_unique_name: String, + method_name: String, +} + +impl RpcInvocation { + pub fn with_servie_unique_name(mut self, service_unique_name: String) -> Self { + self.target_service_unique_name = service_unique_name; + self + } + pub fn with_method_name(mut self, method_name: String) -> Self { + self.method_name = method_name; + self + } +} + +impl Invocation for RpcInvocation { + fn get_target_service_unique_name(&self) -> String { + self.target_service_unique_name.clone() + } + + fn get_method_name(&self) -> String { + self.method_name.clone() + } +} diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index 68bbea56..bf97ef75 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +pub mod cluster; pub mod codegen; pub mod common; pub mod context; diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs index 67df5c7a..4d0350e6 100644 --- a/dubbo/src/registry/memory_registry.rs +++ b/dubbo/src/registry/memory_registry.rs @@ -20,6 +20,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; +use crate::common::url::Url; + use super::{NotifyListener, Registry}; // 从url中获取服务注册的元数据 @@ -100,11 +102,17 @@ impl Registry for MemoryRegistry { } } -pub struct MemoryNotifyListener {} +pub struct MemoryNotifyListener { + pub service_instances: Arc>>>, +} impl NotifyListener for MemoryNotifyListener { fn notify(&self, event: super::ServiceEvent) { - todo!() + let mut map = self.service_instances.write().expect("msg"); + match event.action.as_str() { + "ADD" => map.insert(event.key, event.service), + &_ => todo!(), + }; } fn notify_all(&self, event: super::ServiceEvent) { diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index cbae16b4..8c566922 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -19,6 +19,8 @@ pub mod memory_registry; pub mod protocol; +use std::fmt::Debug; + use crate::common::url::Url; pub trait Registry { @@ -37,10 +39,27 @@ pub trait NotifyListener { } pub struct ServiceEvent { - key: String, - action: String, - service: Url, + pub key: String, + pub action: String, + pub service: Vec, } pub type BoxRegistry = Box + Send + Sync>; + +#[derive(Default)] +pub struct RegistryWrapper { + pub registry: Option>>, +} + +impl Clone for RegistryWrapper { + fn clone(&self) -> Self { + Self { registry: None } + } +} + +impl Debug for RegistryWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RegistryWrapper").finish() + } +} diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index d4864efd..b6037f2a 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -18,13 +18,18 @@ use std::str::FromStr; use futures_util::{future, stream, StreamExt, TryStreamExt}; + use http::HeaderValue; +use rand::prelude::SliceRandom; use tower_service::Service; +use super::super::transport::connection::Connection; use super::builder::{ClientBoxService, ClientBuilder}; +use crate::codegen::{Directory, RpcInvocation}; use crate::filter::service::FilterService; use crate::filter::Filter; use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response}; + use crate::triple::codec::Codec; use crate::triple::compression::CompressionEncoding; use crate::triple::decode::Decoding; @@ -35,6 +40,7 @@ pub struct TripleClient { builder: Option, inner: T, send_compression_encoding: Option, + directory: Option>, } impl TripleClient { @@ -53,6 +59,7 @@ impl TripleClient { builder: Some(builder.clone()), inner: builder.connect(), send_compression_encoding: Some(CompressionEncoding::Gzip), + directory: None, } } @@ -61,6 +68,7 @@ impl TripleClient { builder: Some(builder.clone()), inner: builder.connect(), send_compression_encoding: Some(CompressionEncoding::Gzip), + directory: None, } } } @@ -71,6 +79,7 @@ impl TripleClient { builder: Some(builder), inner, send_compression_encoding: Some(CompressionEncoding::Gzip), + directory: None, } } @@ -83,6 +92,14 @@ impl TripleClient { self.builder.unwrap(), ) } + + /// host: http://0.0.0.0:8888 + pub fn with_directory(self, directory: Box) -> Self { + TripleClient { + directory: Some(directory), + ..self + } + } } impl TripleClient @@ -92,16 +109,11 @@ where { fn map_request( &self, + uri: http::Uri, path: http::uri::PathAndQuery, body: hyper::Body, ) -> http::Request { - let mut parts = match self.builder.as_ref() { - Some(v) => v.to_owned().uri.into_parts(), - None => { - tracing::error!("client host is empty"); - return http::Request::new(hyper::Body::empty()); - } - }; + let mut parts = uri.into_parts(); parts.path_and_query = Some(path); let uri = http::Uri::from_parts(parts).unwrap(); @@ -127,7 +139,7 @@ where ); req.headers_mut().insert( "content-type", - HeaderValue::from_static("application/grpc+json"), + HeaderValue::from_static("application/grpc+proto"), ); req.headers_mut() .insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0")); @@ -172,6 +184,7 @@ where req: Request, mut codec: C, path: http::uri::PathAndQuery, + invocation: RpcInvocation, ) -> Result, crate::status::Status> where C: Codec, @@ -187,10 +200,15 @@ where .into_stream(); let body = hyper::Body::wrap_stream(body_stream); - let req = self.map_request(path, body); + let url_list = self.directory.as_ref().expect("msg").list(invocation); + let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); + let http_uri = + http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); + + let req = self.map_request(http_uri.clone(), path, body); - let response = self - .inner + let mut conn = Connection::new().with_host(http_uri); + let response = conn .call(req) .await .map_err(|err| crate::status::Status::from_error(err.into())); @@ -228,6 +246,7 @@ where req: impl IntoStreamingRequest, mut codec: C, path: http::uri::PathAndQuery, + invocation: RpcInvocation, ) -> Result>, crate::status::Status> where C: Codec, @@ -243,10 +262,15 @@ where .into_stream(); let body = hyper::Body::wrap_stream(en); - let req = self.map_request(path, body); + let url_list = self.directory.as_ref().expect("msg").list(invocation); + let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); + let http_uri = + http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); - let response = self - .inner + let req = self.map_request(http_uri.clone(), path, body); + + let mut conn = Connection::new().with_host(http_uri); + let response = conn .call(req) .await .map_err(|err| crate::status::Status::from_error(err.into())); @@ -268,6 +292,7 @@ where req: impl IntoStreamingRequest, mut codec: C, path: http::uri::PathAndQuery, + invocation: RpcInvocation, ) -> Result, crate::status::Status> where C: Codec, @@ -283,10 +308,15 @@ where .into_stream(); let body = hyper::Body::wrap_stream(en); - let req = self.map_request(path, body); + let url_list = self.directory.as_ref().expect("msg").list(invocation); + let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); + let http_uri = + http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); + + let req = self.map_request(http_uri.clone(), path, body); - let response = self - .inner + let mut conn = Connection::new().with_host(http_uri); + let response = conn .call(req) .await .map_err(|err| crate::status::Status::from_error(err.into())); @@ -324,6 +354,7 @@ where req: Request, mut codec: C, path: http::uri::PathAndQuery, + invocation: RpcInvocation, ) -> Result>, crate::status::Status> where C: Codec, @@ -339,10 +370,15 @@ where .into_stream(); let body = hyper::Body::wrap_stream(en); - let req = self.map_request(path, body); + let url_list = self.directory.as_ref().expect("msg").list(invocation); + let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); + let http_uri = + http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); + + let req = self.map_request(http_uri.clone(), path, body); - let response = self - .inner + let mut conn = Connection::new().with_host(http_uri); + let response = conn .call(req) .await .map_err(|err| crate::status::Status::from_error(err.into())); diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml index 9e794b09..f82ef800 100644 --- a/examples/echo/Cargo.toml +++ b/examples/echo/Cargo.toml @@ -23,8 +23,11 @@ prost = "0.10.4" async-trait = "0.1.56" tokio-stream = "0.1" +hyper = { version = "0.14.19", features = ["full"]} + dubbo = {path = "../../dubbo", version = "0.2.0"} dubbo-config = {path = "../../config", version = "0.2.0"} +dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = "0.2.0"} [build-dependencies] dubbo-build = {path = "../../dubbo-build", version = "0.2.0"} diff --git a/examples/echo/src/echo/grpc.examples.echo.rs b/examples/echo/src/echo/grpc.examples.echo.rs new file mode 100644 index 00000000..40b7b486 --- /dev/null +++ b/examples/echo/src/echo/grpc.examples.echo.rs @@ -0,0 +1,396 @@ +/// EchoRequest is the request for echo. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EchoRequest { + #[prost(string, tag="1")] + pub message: ::prost::alloc::string::String, +} +/// EchoResponse is the response for echo. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EchoResponse { + #[prost(string, tag="1")] + pub message: ::prost::alloc::string::String, +} +/// Generated client implementations. +pub mod echo_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use dubbo::{codegen::*, cluster::directory::StaticDirectory}; + /// Echo is the echo service. + #[derive(Debug, Clone, Default)] + pub struct EchoClient { + inner: TripleClient, + } + impl EchoClient { + pub fn connect(host: String) -> Self { + let mut cli = TripleClient::connect(host.clone()); + cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + EchoClient { inner: cli } + } + pub fn build(builder: ClientBuilder) -> Self { + Self { + inner: TripleClient::with_builder(builder), + } + } + } + impl EchoClient + where + T: Service, Response = http::Response>, + T::Error: Into, + { + pub fn new(inner: T, builder: ClientBuilder) -> Self { + Self { + inner: TripleClient::new(inner, builder), + } + } + pub fn with_filter(self, filter: F) -> EchoClient> + where + F: Filter, + { + let inner = self.inner.with_filter(filter); + EchoClient { inner } + } + pub fn with_directory(mut self, directory: Box) -> Self { + self.inner = self.inner.with_directory(directory); + self + } + /// UnaryEcho is unary echo. + pub async fn unary_echo( + &mut self, + request: Request, + ) -> Result, dubbo::status::Status> { + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("UnaryEcho")); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); + self.inner.unary(request, codec, path, invocation).await + } + /// ServerStreamingEcho is server side streaming. + pub async fn server_streaming_echo( + &mut self, + request: Request, + ) -> Result>, dubbo::status::Status> { + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("ServerStreamingEcho")); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/ServerStreamingEcho", + ); + self.inner.server_streaming(request, codec, path, invocation).await + } + /// ClientStreamingEcho is client side streaming. + pub async fn client_streaming_echo( + &mut self, + request: impl IntoStreamingRequest, + ) -> Result, dubbo::status::Status> { + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("ClientStreamingEcho")); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/ClientStreamingEcho", + ); + self.inner.client_streaming(request, codec, path, invocation).await + } + /// BidirectionalStreamingEcho is bidi streaming. + pub async fn bidirectional_streaming_echo( + &mut self, + request: impl IntoStreamingRequest, + ) -> Result>, dubbo::status::Status> { + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("BidirectionalStreamingEcho")); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", + ); + self.inner.bidi_streaming(request, codec, path, invocation).await + } + } +} +/// Generated server implementations. +pub mod echo_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use dubbo::codegen::*; + ///Generated trait containing gRPC methods that should be implemented for use with EchoServer. + #[async_trait] + pub trait Echo: Send + Sync + 'static { + /// UnaryEcho is unary echo. + async fn unary_echo( + &self, + request: Request, + ) -> Result, dubbo::status::Status>; + ///Server streaming response type for the ServerStreamingEcho method. + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + + Send + + 'static; + /// ServerStreamingEcho is server side streaming. + async fn server_streaming_echo( + &self, + request: Request, + ) -> Result, dubbo::status::Status>; + /// ClientStreamingEcho is client side streaming. + async fn client_streaming_echo( + &self, + request: Request>, + ) -> Result, dubbo::status::Status>; + ///Server streaming response type for the BidirectionalStreamingEcho method. + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + + Send + + 'static; + /// BidirectionalStreamingEcho is bidi streaming. + async fn bidirectional_streaming_echo( + &self, + request: Request>, + ) -> Result< + Response, + dubbo::status::Status, + >; + } + /// Echo is the echo service. + #[derive(Debug)] + pub struct EchoServer { + inner: _Inner, + } + struct _Inner(Arc); + impl EchoServer { + pub fn new(inner: T) -> Self { + Self { + inner: _Inner(Arc::new(inner)), + } + } + pub fn with_filter(inner: T, filter: F) -> FilterService + where + F: Filter, + { + FilterService::new(Self::new(inner), filter) + } + } + impl Service> for EchoServer + where + T: Echo, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/grpc.examples.echo.Echo/UnaryEcho" => { + #[allow(non_camel_case_types)] + struct UnaryEchoServer { + inner: _Inner, + } + impl UnarySvc for UnaryEchoServer { + type Response = super::EchoResponse; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { + let inner = self.inner.0.clone(); + let fut = async move { inner.unary_echo(request).await }; + Box::pin(fut) + } + } + let fut = async move { + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); + let res = server.unary(UnaryEchoServer { inner }, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/grpc.examples.echo.Echo/ServerStreamingEcho" => { + #[allow(non_camel_case_types)] + struct ServerStreamingEchoServer { + inner: _Inner, + } + impl ServerStreamingSvc + for ServerStreamingEchoServer { + type Response = super::EchoResponse; + type ResponseStream = T::ServerStreamingEchoStream; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { + let inner = self.inner.0.clone(); + let fut = async move { + inner.server_streaming_echo(request).await + }; + Box::pin(fut) + } + } + let fut = async move { + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); + let res = server + .server_streaming(ServerStreamingEchoServer { inner }, req) + .await; + Ok(res) + }; + Box::pin(fut) + } + "/grpc.examples.echo.Echo/ClientStreamingEcho" => { + #[allow(non_camel_case_types)] + struct ClientStreamingEchoServer { + inner: _Inner, + } + impl ClientStreamingSvc + for ClientStreamingEchoServer { + type Response = super::EchoResponse; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request>, + ) -> Self::Future { + let inner = self.inner.0.clone(); + let fut = async move { + inner.client_streaming_echo(request).await + }; + Box::pin(fut) + } + } + let fut = async move { + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); + let res = server + .client_streaming(ClientStreamingEchoServer { inner }, req) + .await; + Ok(res) + }; + Box::pin(fut) + } + "/grpc.examples.echo.Echo/BidirectionalStreamingEcho" => { + #[allow(non_camel_case_types)] + struct BidirectionalStreamingEchoServer { + inner: _Inner, + } + impl StreamingSvc + for BidirectionalStreamingEchoServer { + type Response = super::EchoResponse; + type ResponseStream = T::BidirectionalStreamingEchoStream; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request>, + ) -> Self::Future { + let inner = self.inner.0.clone(); + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; + Box::pin(fut) + } + } + let fut = async move { + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); + let res = server + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) + .await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for EchoServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { inner } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + pub fn register_server(server: T) { + let s = EchoServer::new(server); + dubbo::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert( + "grpc.examples.echo.Echo".to_string(), + dubbo::utils::boxed_clone::BoxCloneService::new(s), + ); + } +} diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index 08a1d299..d29446a8 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -30,7 +30,7 @@ pub struct EchoResponse { /// Generated client implementations. pub mod echo_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use dubbo::codegen::*; + use dubbo::{cluster::directory::StaticDirectory, codegen::*}; /// Echo is the echo service. #[derive(Debug, Clone, Default)] pub struct EchoClient { @@ -38,7 +38,8 @@ pub mod echo_client { } impl EchoClient { pub fn connect(host: String) -> Self { - let cli = TripleClient::connect(host); + let mut cli = TripleClient::connect(host.clone()); + cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); EchoClient { inner: cli } } pub fn build(builder: ClientBuilder) -> Self { @@ -64,6 +65,10 @@ pub mod echo_client { let inner = self.inner.with_filter(filter); EchoClient { inner } } + pub fn with_directory(mut self, directory: Box) -> Self { + self.inner = self.inner.with_directory(directory); + self + } /// UnaryEcho is unary echo. pub async fn unary_echo( &mut self, @@ -71,8 +76,11 @@ pub mod echo_client { ) -> Result, dubbo::status::Status> { let codec = dubbo::codegen::ProstCodec::::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("UnaryEcho")); let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); - self.inner.unary(request, codec, path).await + self.inner.unary(request, codec, path, invocation).await } /// ServerStreamingEcho is server side streaming. pub async fn server_streaming_echo( @@ -81,10 +89,15 @@ pub mod echo_client { ) -> Result>, dubbo::status::Status> { let codec = dubbo::codegen::ProstCodec::::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("ServerStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); - self.inner.server_streaming(request, codec, path).await + self.inner + .server_streaming(request, codec, path, invocation) + .await } /// ClientStreamingEcho is client side streaming. pub async fn client_streaming_echo( @@ -93,10 +106,15 @@ pub mod echo_client { ) -> Result, dubbo::status::Status> { let codec = dubbo::codegen::ProstCodec::::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("ClientStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); - self.inner.client_streaming(request, codec, path).await + self.inner + .client_streaming(request, codec, path, invocation) + .await } /// BidirectionalStreamingEcho is bidi streaming. pub async fn bidirectional_streaming_echo( @@ -105,10 +123,15 @@ pub mod echo_client { ) -> Result>, dubbo::status::Status> { let codec = dubbo::codegen::ProstCodec::::default(); + let invocation = RpcInvocation::default() + .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) + .with_method_name(String::from("BidirectionalStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); - self.inner.bidi_streaming(request, codec, path).await + self.inner + .bidi_streaming(request, codec, path, invocation) + .await } } } diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml index b698bc71..c4b20cc2 100644 --- a/examples/greeter/Cargo.toml +++ b/examples/greeter/Cargo.toml @@ -22,9 +22,12 @@ prost-derive = {version = "0.10", optional = true} prost = "0.10.4" async-trait = "0.1.56" tokio-stream = "0.1" +tracing = "0.1" +tracing-subscriber = "0.2.0" dubbo = {path = "../../dubbo", version = "0.2.0"} dubbo-config = {path = "../../config", version = "0.2.0"} +dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = "0.2.0"} [build-dependencies] dubbo-build = {path = "../../dubbo-build", version = "0.2.0"} diff --git a/examples/greeter/proto/greeter.proto b/examples/greeter/proto/greeter.proto index 0d8be79e..a0c466a6 100644 --- a/examples/greeter/proto/greeter.proto +++ b/examples/greeter/proto/greeter.proto @@ -29,7 +29,7 @@ message GreeterReply { string message = 1; } -service Greeter{ +service Greeter { // unary rpc greet(GreeterRequest) returns (GreeterReply); diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index 2004b6f9..0abd5201 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -19,14 +19,42 @@ pub mod protos { #![allow(non_camel_case_types)] include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs")); } +use std::str::FromStr; -use dubbo::codegen::*; +use dubbo::{cluster::directory::StaticDirectory, codegen::*}; use futures_util::StreamExt; +use http; use protos::{greeter_client::GreeterClient, GreeterRequest}; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; #[tokio::main] async fn main() { - let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string()); + // a builder for `FmtSubscriber`. + let subscriber = FmtSubscriber::builder() + // all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.) + // will be written to stdout. + .with_max_level(Level::INFO) + // completes the builder. + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888").unwrap(); + + let mut cli = GreeterClient::new(Connection::new().with_host(http_uri), ClientBuilder::new()); + let directory = StaticDirectory::new("http://127.0.0.1:8888"); + cli = cli.with_directory(Box::new(directory)); + //let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string()); + + // Here is example for zk + // let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") { + // Ok(val) => val, + // Err(_) => "localhost:2181".to_string(), + // }; + // let zkr = ZookeeperRegistry::new(&zk_connect_string); + // let directory = RegistryDirectory::new(Box::new(zkr)); + // cli = cli.with_directory(Box::new(directory)); println!("# unary call"); let resp = cli diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml new file mode 100644 index 00000000..cf040fa6 --- /dev/null +++ b/registry-zookeeper/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "dubbo-registry-zookeeper" +version = "0.2.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +zookeeper = "0.6.1" +dubbo = {path = "../dubbo/", version = "0.2.0"} +serde_json = "1.0" +serde = {version = "1.0.145",features = ["derive"]} +tracing = "0.1" diff --git a/registry-zookeeper/LICENSE b/registry-zookeeper/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/registry-zookeeper/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs new file mode 100644 index 00000000..ccfce104 --- /dev/null +++ b/registry-zookeeper/src/lib.rs @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod zookeeper_registry; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + let result = 2 + 2; + assert_eq!(result, 4); + } +} diff --git a/registry-zookeeper/src/zookeeper_registry.rs b/registry-zookeeper/src/zookeeper_registry.rs new file mode 100644 index 00000000..813f4927 --- /dev/null +++ b/registry-zookeeper/src/zookeeper_registry.rs @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#![allow(unused_variables, dead_code, missing_docs)] + +use dubbo::common::url::Url; +use dubbo::registry::memory_registry::MemoryNotifyListener; +use dubbo::registry::NotifyListener; +use dubbo::registry::Registry; +use dubbo::registry::ServiceEvent; +use dubbo::StdError; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::Duration; +use tracing::info; + +use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper}; + +// 从url中获取服务注册的元数据 +/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s) +/// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER]) + +pub const REGISTRY_GROUP_KEY: &str = "registry.group"; + +struct LoggingWatcher; +impl Watcher for LoggingWatcher { + fn handle(&self, e: WatchedEvent) { + println!("{:?}", e) + } +} + +//#[derive(Debug)] +pub struct ZookeeperRegistry { + root_path: String, + zk_client: Arc, + + listeners: RwLock::NotifyListener>>>, +} + +pub struct MyNotifyListener {} + +impl NotifyListener for MyNotifyListener { + fn notify(&self, event: dubbo::registry::ServiceEvent) { + todo!() + } + + fn notify_all(&self, event: dubbo::registry::ServiceEvent) { + todo!() + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ZkServiceInstance { + name: String, + address: String, + port: i32, +} + +impl ZkServiceInstance { + pub fn get_service_name(&self) -> &str { + self.name.as_str() + } + + pub fn get_host(&self) -> &str { + self.address.as_str() + } + + pub fn get_port(&self) -> i32 { + self.port + } +} + +impl ZookeeperRegistry { + pub fn new(connect_string: &str) -> ZookeeperRegistry { + let zk_client = + ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap(); + ZookeeperRegistry { + root_path: "/services".to_string(), + zk_client: Arc::new(zk_client), + + listeners: RwLock::new(HashMap::new()), + } + } + + fn create_listener( + &self, + path: String, + service_name: String, + listener: Arc<::NotifyListener>, + ) -> ServiceInstancesChangedListener { + let mut service_names = HashSet::new(); + service_names.insert(service_name.clone()); + return ServiceInstancesChangedListener { + zk_client: Arc::clone(&self.zk_client), + path: path, + + service_name: service_name.clone(), + listener: listener, + }; + } + + fn get_app_name(&self, service_name: String) -> String { + let res = self + .zk_client + .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false); + + let x = res.unwrap().0; + let s = match std::str::from_utf8(&x) { + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence: {}", e), + }; + s.to_string() + } +} + +impl Registry for ZookeeperRegistry { + type NotifyListener = MemoryNotifyListener; + + fn register(&mut self, url: Url) -> Result<(), StdError> { + todo!(); + } + + fn unregister(&mut self, url: Url) -> Result<(), StdError> { + todo!(); + } + + fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { + let binding = url.get_service_name(); + let service_name = binding.get(0).unwrap(); + let app_name = self.get_app_name(service_name.clone()); + let path = self.root_path.clone() + "/" + &app_name; + if self.listeners.read().unwrap().get(service_name).is_some() { + return Ok(()); + } + + let arc_listener = Arc::new(listener); + self.listeners + .write() + .unwrap() + .insert(service_name.to_string(), Arc::clone(&arc_listener)); + + let zk_listener = self.create_listener( + path.clone(), + service_name.to_string(), + Arc::clone(&arc_listener), + ); + + let res = self.zk_client.get_children_w(&path, zk_listener); + let result: Vec = res + .unwrap() + .iter() + .map(|node_key| { + let zk_res = self.zk_client.get_data( + &(self.root_path.clone() + "/" + &app_name + "/" + &node_key), + false, + ); + let vec_u8 = zk_res.unwrap().0; + let sstr = std::str::from_utf8(&vec_u8).unwrap(); + let instance: ZkServiceInstance = serde_json::from_str(sstr).unwrap(); + let url = Url::from_url(&format!( + "triple://{}:{}/{}", + instance.get_host(), + instance.get_port(), + service_name + )) + .unwrap(); + url + }) + .collect(); + + info!("notifing {}->{:?}", service_name, result); + arc_listener.notify(ServiceEvent { + key: service_name.to_string(), + action: String::from("ADD"), + service: result, + }); + Ok(()) + } + + fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { + todo!() + } +} + +pub struct ServiceInstancesChangedListener { + zk_client: Arc, + path: String, + + service_name: String, + listener: Arc, +} + +impl Watcher for ServiceInstancesChangedListener { + fn handle(&self, event: WatchedEvent) { + if let (WatchedEventType::NodeChildrenChanged, Some(path)) = (event.event_type, event.path) + { + let event_path = path.clone(); + let dirs = self + .zk_client + .get_children(&event_path.clone(), false) + .expect("msg"); + let result: Vec = dirs + .iter() + .map(|node_key| { + let zk_res = self + .zk_client + .get_data(&(event_path.clone() + "/" + node_key), false); + let vec_u8 = zk_res.unwrap().0; + let sstr = std::str::from_utf8(&vec_u8).unwrap(); + let instance: ZkServiceInstance = serde_json::from_str(sstr).unwrap(); + let url = Url::from_url(&format!( + "triple://{}:{}/{}", + instance.get_host(), + instance.get_port(), + self.service_name + )) + .unwrap(); + url + }) + .collect(); + + let res = self.zk_client.get_children_w( + &path, + ServiceInstancesChangedListener { + zk_client: Arc::clone(&self.zk_client), + path: path.clone(), + + service_name: self.service_name.clone(), + listener: Arc::clone(&self.listener), + }, + ); + + info!("notifing {}->{:?}", self.service_name, result); + self.listener.notify(ServiceEvent { + key: self.service_name.clone(), + action: String::from("ADD"), + service: result, + }); + } + } +} + +impl NotifyListener for ServiceInstancesChangedListener { + fn notify(&self, event: ServiceEvent) { + todo!() + } + + fn notify_all(&self, event: ServiceEvent) { + todo!() + } +} + +#[test] +fn it_works() { + let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181"; + let zk_client = + ZooKeeper::connect(connect_string, Duration::from_secs(15), LoggingWatcher).unwrap(); + let watcher = Arc::new(Some(TestZkWatcher { + watcher: Arc::new(None), + })); + watcher.as_ref().expect("").watcher = Arc::clone(&watcher); + let x = watcher.as_ref().expect(""); + zk_client.get_children_w("/test", x); + zk_client.delete("/test/a", None); + zk_client.delete("/test/b", None); + let zk_res = zk_client.create( + "/test/a", + vec![1, 3], + Acl::open_unsafe().clone(), + CreateMode::Ephemeral, + ); + let zk_res = zk_client.create( + "/test/b", + vec![1, 3], + Acl::open_unsafe().clone(), + CreateMode::Ephemeral, + ); + zk_client.close(); +} + +struct TestZkWatcher { + pub watcher: Arc>, +} + +impl Watcher for TestZkWatcher { + fn handle(&self, event: WatchedEvent) { + println!("event: {:?}", event); + } +} From 3e09d500c60a40001a0c2e5448e373321342dbd0 Mon Sep 17 00:00:00 2001 From: luyanbo Date: Thu, 19 Jan 2023 17:55:44 +0800 Subject: [PATCH 2/4] update github-actions.yml --- .github/workflows/github-actions.yml | 42 +++++++++++++++------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index e69ee90b..d6c85bb0 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -23,6 +23,18 @@ jobs: - uses: actions-rs/toolchain@v1 with: toolchain: stable + - name: Set up cargo cache + uses: actions/cache@v3 + continue-on-error: false + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo- - name: setup protoc run: | mkdir $HOME/protoc/ -p && @@ -32,19 +44,9 @@ jobs: unzip /tmp/protoc-21.9-linux-x86_64.zip && echo "$HOME/protoc/bin" >> $GITHUB_PATH shell: bash - - run: cargo check - - fmt: - name: Rustfmt - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@main - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - run: rustup component add rustfmt - run: cargo fmt --all -- --check + - run: cargo check example-greeter: name: example/greeter @@ -54,15 +56,6 @@ jobs: - uses: actions-rs/toolchain@v1 with: toolchain: stable - - name: setup protoc - run: | - mkdir $HOME/protoc/ -p && - cd $HOME/protoc/ && - curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \ - https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip && - unzip /tmp/protoc-21.9-linux-x86_64.zip && - echo "$HOME/protoc/bin" >> $GITHUB_PATH - shell: bash - name: Set up cargo cache uses: actions/cache@v3 continue-on-error: false @@ -75,6 +68,15 @@ jobs: target/ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} restore-keys: ${{ runner.os }}-cargo- + - name: setup protoc + run: | + mkdir $HOME/protoc/ -p && + cd $HOME/protoc/ && + curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \ + https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip && + unzip /tmp/protoc-21.9-linux-x86_64.zip && + echo "$HOME/protoc/bin" >> $GITHUB_PATH + shell: bash - run: cargo build working-directory: examples/greeter - name: example greeter From 18d87ae6ce9fc549519ae842003d9a991d6da996 Mon Sep 17 00:00:00 2001 From: luyanbo Date: Thu, 9 Feb 2023 00:53:19 +0800 Subject: [PATCH 3/4] use new pattern --- dubbo-build/src/client.rs | 54 ++--- dubbo/src/cluster/directory.rs | 4 + dubbo/src/protocol/triple/triple_invoker.rs | 18 +- dubbo/src/protocol/triple/triple_protocol.rs | 5 +- dubbo/src/registry/protocol.rs | 4 +- dubbo/src/triple/client/builder.rs | 67 +++--- dubbo/src/triple/client/triple.rs | 77 +----- examples/echo/src/echo/client.rs | 9 +- examples/echo/src/echo/grpc.examples.echo.rs | 237 +++++++------------ examples/echo/src/protos/hello_echo.rs | 37 +-- examples/greeter/src/greeter/client.rs | 11 +- 11 files changed, 184 insertions(+), 339 deletions(-) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 0a3ff474..05e8e891 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -61,56 +61,44 @@ pub fn generate( // will trigger if compression is disabled clippy::let_unit_value, )] - use dubbo::{codegen::*, cluster::directory::StaticDirectory}; + use dubbo::codegen::*; #service_doc #(#struct_attributes)* #[derive(Debug, Clone, Default)] - pub struct #service_ident { - inner: TripleClient, + pub struct #service_ident { + inner: TripleClient, } - impl #service_ident { + impl #service_ident { pub fn connect(host: String) -> Self { - let mut cli = TripleClient::connect(host.clone()); - cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + let cli = TripleClient::connect(host); #service_ident { inner: cli, } } - pub fn build(builder: ClientBuilder) -> Self { - Self { - inner: TripleClient::with_builder(builder), - } - } - } + // pub fn build(builder: ClientBuilder) -> Self { + // Self { + // inner: TripleClient::new(builder), + // } + // } - impl #service_ident - where - T: Service, Response = http::Response>, - T::Error: Into, - { - pub fn new(inner: T, builder: ClientBuilder) -> Self { + pub fn new(builder: ClientBuilder) -> Self { Self { - inner: TripleClient::new(inner, builder), + inner: TripleClient::new(builder), } } - pub fn with_filter(self, filter: F) -> #service_ident> - where - F: Filter, - { - let inner = self.inner.with_filter(filter); - #service_ident { - inner, - } - } - - pub fn with_directory(mut self, directory: Box) -> Self { - self.inner = self.inner.with_directory(directory); - self - } + // pub fn with_filter(self, filter: F) -> #service_ident> + // where + // F: Filter, + // { + // let inner = self.inner.with_filter(filter); + // #service_ident { + // inner, + // } + // } #methods diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index 0e25afc2..4952de77 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -67,6 +67,10 @@ impl StaticDirectory { }; StaticDirectory { uri: uri } } + + pub fn from_uri(uri: &http::Uri) -> StaticDirectory { + StaticDirectory { uri: uri.clone() } + } } impl Directory for StaticDirectory { diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 1cbe2ed7..2c03fa72 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -15,13 +15,11 @@ * limitations under the License. */ -use std::str::FromStr; - use tower_service::Service; use crate::common::url::Url; use crate::protocol::Invoker; -use crate::triple::client::builder::{ClientBoxService, ClientBuilder}; +use crate::triple::client::builder::ClientBoxService; pub struct TripleInvoker { url: Url, @@ -29,13 +27,13 @@ pub struct TripleInvoker { } impl TripleInvoker { - pub fn new(url: Url) -> TripleInvoker { - let uri = http::Uri::from_str(&url.to_url()).unwrap(); - Self { - url, - conn: ClientBuilder::from(uri).connect(), - } - } + // pub fn new(url: Url) -> TripleInvoker { + // let uri = http::Uri::from_str(&url.to_url()).unwrap(); + // Self { + // url, + // conn: ClientBuilder::from_uri(&uri).build()connect(), + // } + // } } impl Invoker> for TripleInvoker { diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs index 037bfb7a..4d56b7ae 100644 --- a/dubbo/src/protocol/triple/triple_protocol.rs +++ b/dubbo/src/protocol/triple/triple_protocol.rs @@ -68,8 +68,9 @@ impl Protocol for TripleProtocol { Box::new(TripleExporter::new()) } - async fn refer(self, url: Url) -> Self::Invoker { - TripleInvoker::new(url) + async fn refer(self, _url: Url) -> Self::Invoker { + todo!() + // TripleInvoker::new(url) // Self::Invoker } } diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs index 933975cb..bf6a8a59 100644 --- a/dubbo/src/registry/protocol.rs +++ b/dubbo/src/registry/protocol.rs @@ -20,7 +20,6 @@ use std::sync::{Arc, RwLock}; use super::memory_registry::MemoryRegistry; use super::BoxRegistry; -use crate::codegen::TripleInvoker; use crate::common::url::Url; use crate::protocol::triple::triple_exporter::TripleExporter; use crate::protocol::triple::triple_protocol::TripleProtocol; @@ -104,6 +103,7 @@ impl Protocol for RegistryProtocol { // get Registry from registry_url // init directory based on registry_url and Registry // init Cluster based on Directory generates Invoker - Box::new(TripleInvoker::new(url)) + todo!() + //Box::new(TripleInvoker::new(url)) } } diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 21afc0cb..40a6ffb1 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -15,35 +15,46 @@ * limitations under the License. */ -use http::Uri; -use hyper::client::conn::Builder; -use tokio::time::Duration; -use tower::ServiceBuilder; - -use crate::triple::transport::connection::Connection; +use crate::cluster::directory::StaticDirectory; +use crate::codegen::Directory; +use crate::triple::compression::CompressionEncoding; use crate::utils::boxed::BoxService; +use super::TripleClient; + pub type ClientBoxService = BoxService, http::Response, crate::Error>; #[derive(Clone, Debug, Default)] pub struct ClientBuilder { - pub uri: Uri, pub timeout: Option, pub connector: &'static str, + directory: Option>, } impl ClientBuilder { pub fn new() -> ClientBuilder { ClientBuilder { - uri: Uri::builder().build().unwrap(), timeout: None, connector: "", + directory: None, + } + } + + pub fn from_static(host: &str) -> ClientBuilder { + Self { + timeout: None, + connector: "", + directory: Some(Box::new(StaticDirectory::new(&host))), } } - pub fn from_static(s: &'static str) -> ClientBuilder { - Self::from(Uri::from_static(s)) + pub fn from_uri(uri: &http::Uri) -> ClientBuilder { + Self { + timeout: None, + connector: "", + directory: Some(Box::new(StaticDirectory::from_uri(&uri))), + } } pub fn with_timeout(self, timeout: u64) -> Self { @@ -53,9 +64,17 @@ impl ClientBuilder { } } + /// host: http://0.0.0.0:8888 + pub fn with_directory(self, directory: Box) -> Self { + Self { + directory: Some(directory), + ..self + } + } + pub fn with_host(self, host: &'static str) -> Self { Self { - uri: Uri::from_static(host), + directory: Some(Box::new(StaticDirectory::new(&host))), ..self } } @@ -67,28 +86,10 @@ impl ClientBuilder { } } - pub fn connect(self) -> ClientBoxService { - let builder = ServiceBuilder::new(); - let timeout = self.timeout.unwrap_or(5); - let builder = builder.timeout(Duration::from_secs(timeout)); - - let mut b = Builder::new(); - let hyper_builder = b.http2_only(true); - let conn = Connection::new() - .with_host(self.uri.clone()) - .with_connector(self.connector) - .with_builder(hyper_builder.to_owned()); - - BoxService::new(builder.service(conn)) - } -} - -impl From for ClientBuilder { - fn from(u: Uri) -> Self { - Self { - uri: u, - timeout: None, - connector: "tcp", + pub fn build(self) -> TripleClient { + TripleClient { + send_compression_encoding: Some(CompressionEncoding::Gzip), + directory: self.directory, } } } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index b6037f2a..b9f64928 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -24,89 +24,32 @@ use rand::prelude::SliceRandom; use tower_service::Service; use super::super::transport::connection::Connection; -use super::builder::{ClientBoxService, ClientBuilder}; +use super::builder::ClientBuilder; use crate::codegen::{Directory, RpcInvocation}; -use crate::filter::service::FilterService; -use crate::filter::Filter; -use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response}; +use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response}; use crate::triple::codec::Codec; use crate::triple::compression::CompressionEncoding; use crate::triple::decode::Decoding; use crate::triple::encode::encode; #[derive(Debug, Clone, Default)] -pub struct TripleClient { - builder: Option, - inner: T, - send_compression_encoding: Option, - directory: Option>, +pub struct TripleClient { + pub(crate) send_compression_encoding: Option, + pub(crate) directory: Option>, } -impl TripleClient { +impl TripleClient { pub fn connect(host: String) -> Self { - let uri = match http::Uri::from_str(&host) { - Ok(v) => v, - Err(err) => { - tracing::error!("http uri parse error: {}, host: {}", err, host); - panic!("http uri parse error: {}, host: {}", err, host) - } - }; - - let builder = ClientBuilder::from(uri); - - TripleClient { - builder: Some(builder.clone()), - inner: builder.connect(), - send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: None, - } - } + let builder = ClientBuilder::from_static(&host); - pub fn with_builder(builder: ClientBuilder) -> Self { - TripleClient { - builder: Some(builder.clone()), - inner: builder.connect(), - send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: None, - } + builder.build() } -} -impl TripleClient { - pub fn new(inner: T, builder: ClientBuilder) -> Self { - TripleClient { - builder: Some(builder), - inner, - send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: None, - } + pub fn new(builder: ClientBuilder) -> Self { + builder.build() } - pub fn with_filter(self, filter: F) -> TripleClient> - where - F: Filter, - { - TripleClient::new( - FilterService::new(self.inner, filter), - self.builder.unwrap(), - ) - } - - /// host: http://0.0.0.0:8888 - pub fn with_directory(self, directory: Box) -> Self { - TripleClient { - directory: Some(directory), - ..self - } - } -} - -impl TripleClient -where - T: Service, Response = http::Response>, - T::Error: Into, -{ fn map_request( &self, uri: http::Uri, diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 060a17f2..7312f722 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -31,10 +31,11 @@ impl Filter for FakeFilter { #[tokio::main] async fn main() { - let builder = ClientBuilder::new() - .with_connector("unix") - .with_host("unix://127.0.0.1:8888"); - let mut cli = EchoClient::build(builder); + // let builder = ClientBuilder::new() + // .with_connector("unix") + // .with_host("unix://127.0.0.1:8888"); + let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000); + let mut cli = EchoClient::new(builder); // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); let resp = cli diff --git a/examples/echo/src/echo/grpc.examples.echo.rs b/examples/echo/src/echo/grpc.examples.echo.rs index 40b7b486..9c487e6b 100644 --- a/examples/echo/src/echo/grpc.examples.echo.rs +++ b/examples/echo/src/echo/grpc.examples.echo.rs @@ -1,72 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /// EchoRequest is the request for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoRequest { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub message: ::prost::alloc::string::String, } /// EchoResponse is the response for echo. #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoResponse { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub message: ::prost::alloc::string::String, } /// Generated client implementations. pub mod echo_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use dubbo::{codegen::*, cluster::directory::StaticDirectory}; + use dubbo::codegen::*; /// Echo is the echo service. #[derive(Debug, Clone, Default)] - pub struct EchoClient { - inner: TripleClient, + pub struct EchoClient { + inner: TripleClient, } - impl EchoClient { + impl EchoClient { pub fn connect(host: String) -> Self { - let mut cli = TripleClient::connect(host.clone()); - cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + let cli = TripleClient::connect(host); EchoClient { inner: cli } } - pub fn build(builder: ClientBuilder) -> Self { + pub fn new(builder: ClientBuilder) -> Self { Self { - inner: TripleClient::with_builder(builder), + inner: TripleClient::new(builder), } } - } - impl EchoClient - where - T: Service, Response = http::Response>, - T::Error: Into, - { - pub fn new(inner: T, builder: ClientBuilder) -> Self { - Self { - inner: TripleClient::new(inner, builder), - } - } - pub fn with_filter(self, filter: F) -> EchoClient> - where - F: Filter, - { - let inner = self.inner.with_filter(filter); - EchoClient { inner } - } - pub fn with_directory(mut self, directory: Box) -> Self { - self.inner = self.inner.with_directory(directory); - self - } /// UnaryEcho is unary echo. pub async fn unary_echo( &mut self, request: Request, ) -> Result, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static( - "/grpc.examples.echo.Echo/UnaryEcho", - ); + let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); self.inner.unary(request, codec, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -74,51 +64,51 @@ pub mod echo_client { &mut self, request: Request, ) -> Result>, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ServerStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); - self.inner.server_streaming(request, codec, path, invocation).await + self.inner + .server_streaming(request, codec, path, invocation) + .await } /// ClientStreamingEcho is client side streaming. pub async fn client_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ClientStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); - self.inner.client_streaming(request, codec, path, invocation).await + self.inner + .client_streaming(request, codec, path, invocation) + .await } /// BidirectionalStreamingEcho is bidi streaming. pub async fn bidirectional_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result>, dubbo::status::Status> { - let codec = dubbo::codegen::ProstCodec::< - super::EchoRequest, - super::EchoResponse, - >::default(); + let codec = + dubbo::codegen::ProstCodec::::default(); let invocation = RpcInvocation::default() .with_servie_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("BidirectionalStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); - self.inner.bidi_streaming(request, codec, path, invocation).await + self.inner + .bidi_streaming(request, codec, path, invocation) + .await } } } @@ -135,9 +125,7 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type ServerStreamingEchoStream: futures_util::Stream> + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -151,19 +139,14 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type BidirectionalStreamingEchoStream: futures_util::Stream> + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result< - Response, - dubbo::status::Status, - >; + ) -> Result, dubbo::status::Status>; } /// Echo is the echo service. #[derive(Debug)] @@ -193,10 +176,7 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -209,26 +189,18 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -239,32 +211,22 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc - for ServerStreamingEchoServer { + impl ServerStreamingSvc for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = + BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.server_streaming_echo(request).await - }; + let fut = async move { inner.server_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -277,31 +239,23 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc - for ClientStreamingEchoServer { + impl ClientStreamingSvc for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.client_streaming_echo(request).await - }; + let fut = async move { inner.client_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -314,56 +268,41 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc - for BidirectionalStreamingEchoServer { + impl StreamingSvc for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = + BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.bidirectional_streaming_echo(request).await - }; + let fut = + async move { inner.bidirectional_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new( - dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default(), - ); + let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default()); let res = server - .bidi_streaming( - BidirectionalStreamingEchoServer { - inner, - }, - req, - ) + .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) .await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/examples/echo/src/protos/hello_echo.rs b/examples/echo/src/protos/hello_echo.rs index d29446a8..9c487e6b 100644 --- a/examples/echo/src/protos/hello_echo.rs +++ b/examples/echo/src/protos/hello_echo.rs @@ -30,45 +30,22 @@ pub struct EchoResponse { /// Generated client implementations. pub mod echo_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use dubbo::{cluster::directory::StaticDirectory, codegen::*}; + use dubbo::codegen::*; /// Echo is the echo service. #[derive(Debug, Clone, Default)] - pub struct EchoClient { - inner: TripleClient, + pub struct EchoClient { + inner: TripleClient, } - impl EchoClient { + impl EchoClient { pub fn connect(host: String) -> Self { - let mut cli = TripleClient::connect(host.clone()); - cli = cli.with_directory(Box::new(StaticDirectory::new(&host))); + let cli = TripleClient::connect(host); EchoClient { inner: cli } } - pub fn build(builder: ClientBuilder) -> Self { - Self { - inner: TripleClient::with_builder(builder), - } - } - } - impl EchoClient - where - T: Service, Response = http::Response>, - T::Error: Into, - { - pub fn new(inner: T, builder: ClientBuilder) -> Self { + pub fn new(builder: ClientBuilder) -> Self { Self { - inner: TripleClient::new(inner, builder), + inner: TripleClient::new(builder), } } - pub fn with_filter(self, filter: F) -> EchoClient> - where - F: Filter, - { - let inner = self.inner.with_filter(filter); - EchoClient { inner } - } - pub fn with_directory(mut self, directory: Box) -> Self { - self.inner = self.inner.with_directory(directory); - self - } /// UnaryEcho is unary echo. pub async fn unary_echo( &mut self, diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index 0abd5201..c493d606 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -19,11 +19,9 @@ pub mod protos { #![allow(non_camel_case_types)] include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs")); } -use std::str::FromStr; -use dubbo::{cluster::directory::StaticDirectory, codegen::*}; +use dubbo::codegen::*; use futures_util::StreamExt; -use http; use protos::{greeter_client::GreeterClient, GreeterRequest}; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -40,12 +38,7 @@ async fn main() { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888").unwrap(); - - let mut cli = GreeterClient::new(Connection::new().with_host(http_uri), ClientBuilder::new()); - let directory = StaticDirectory::new("http://127.0.0.1:8888"); - cli = cli.with_directory(Box::new(directory)); - //let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string()); + let mut cli = GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888")); // Here is example for zk // let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") { From 5fd148fb6d6e60c7a34bdf47eeb672665fdc0adb Mon Sep 17 00:00:00 2001 From: Robert LU Date: Thu, 9 Feb 2023 20:09:50 +0800 Subject: [PATCH 4/4] Update zookeeper_registry.rs --- registry-zookeeper/src/zookeeper_registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry-zookeeper/src/zookeeper_registry.rs b/registry-zookeeper/src/zookeeper_registry.rs index 813f4927..4c82634b 100644 --- a/registry-zookeeper/src/zookeeper_registry.rs +++ b/registry-zookeeper/src/zookeeper_registry.rs @@ -33,7 +33,7 @@ use tracing::info; use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper}; -// 从url中获取服务注册的元数据 +// extract service registry metadata from url /// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s) /// dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])