Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Set up cargo cache
uses: actions/cache@v3
continue-on-error: false
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-
- name: setup protoc
run: |
mkdir $HOME/protoc/ -p &&
Expand All @@ -32,19 +44,9 @@ jobs:
unzip /tmp/protoc-21.9-linux-x86_64.zip &&
echo "$HOME/protoc/bin" >> $GITHUB_PATH
shell: bash
- run: cargo check

fmt:
name: Rustfmt
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@main
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- run: rustup component add rustfmt
- run: cargo fmt --all -- --check
- run: cargo check

example-greeter:
name: example/greeter
Expand All @@ -54,15 +56,6 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: setup protoc
run: |
mkdir $HOME/protoc/ -p &&
cd $HOME/protoc/ &&
curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \
https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip &&
unzip /tmp/protoc-21.9-linux-x86_64.zip &&
echo "$HOME/protoc/bin" >> $GITHUB_PATH
shell: bash
- name: Set up cargo cache
uses: actions/cache@v3
continue-on-error: false
Expand All @@ -75,6 +68,15 @@ jobs:
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-
- name: setup protoc
run: |
mkdir $HOME/protoc/ -p &&
cd $HOME/protoc/ &&
curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \
https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip &&
unzip /tmp/protoc-21.9-linux-x86_64.zip &&
echo "$HOME/protoc/bin" >> $GITHUB_PATH
shell: bash
- run: cargo build
working-directory: examples/greeter
- name: example greeter
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"xds",
"registry",
"registry-zookeeper",
"metadata",
"common",
"config",
Expand Down
2 changes: 1 addition & 1 deletion config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl RootConfig {
v
}
Err(err) => {
tracing::error!(
tracing::warn!(
"error loading config_path: {:?}, use default path: {:?}",
err,
DUBBO_CONFIG_PATH
Expand Down
134 changes: 94 additions & 40 deletions dubbo-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,45 +66,39 @@ pub fn generate<T: Service>(
#service_doc
#(#struct_attributes)*
#[derive(Debug, Clone, Default)]
pub struct #service_ident<T> {
inner: TripleClient<T>,
pub struct #service_ident {
inner: TripleClient,
}

impl #service_ident<ClientBoxService> {
impl #service_ident {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
#service_ident {
inner: cli,
}
}

pub fn build(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::with_builder(builder),
}
}
}
// pub fn build(builder: ClientBuilder) -> Self {
// Self {
// inner: TripleClient::new(builder),
// }
// }
Comment on lines +81 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


impl<T> #service_ident<T>
where
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
T::Error: Into<StdError>,
{
pub fn new(inner: T, builder: ClientBuilder) -> Self {
pub fn new(builder: ClientBuilder) -> Self {
Self {
inner: TripleClient::new(inner, builder),
inner: TripleClient::new(builder),
}
}

pub fn with_filter<F>(self, filter: F) -> #service_ident<FilterService<T, F>>
where
F: Filter,
{
let inner = self.inner.with_filter(filter);
#service_ident {
inner,
}
}
// pub fn with_filter<F>(self, filter: F) -> #service_ident<FilterService<T, F>>
// where
// F: Filter,
// {
// let inner = self.inner.with_filter(filter);
// #service_ident {
// inner,
// }
// }

#methods

Expand All @@ -123,6 +117,12 @@ fn generate_methods<T: Service>(
let package = if emit_package { service.package() } else { "" };

for method in service.methods() {
let service_unique_name = format!(
"{}{}{}",
package,
if package.is_empty() { "" } else { "." },
service.identifier()
);
let path = format!(
"/{}{}{}/{}",
package,
Expand All @@ -134,14 +134,34 @@ fn generate_methods<T: Service>(
stream.extend(generate_doc_comments(method.comment()));

let method = match (method.client_streaming(), method.server_streaming()) {
(false, false) => generate_unary(&method, proto_path, compile_well_known_types, path),
(false, true) => {
generate_server_streaming(&method, proto_path, compile_well_known_types, path)
}
(true, false) => {
generate_client_streaming(&method, proto_path, compile_well_known_types, path)
}
(true, true) => generate_streaming(&method, proto_path, compile_well_known_types, path),
(false, false) => generate_unary(
service_unique_name,
&method,
proto_path,
compile_well_known_types,
path,
),
(false, true) => generate_server_streaming(
service_unique_name,
&method,
proto_path,
compile_well_known_types,
path,
),
(true, false) => generate_client_streaming(
service_unique_name,
&method,
proto_path,
compile_well_known_types,
path,
),
(true, true) => generate_streaming(
service_unique_name,
&method,
proto_path,
compile_well_known_types,
path,
),
};

stream.extend(method);
Expand All @@ -151,6 +171,7 @@ fn generate_methods<T: Service>(
}

fn generate_unary<T: Method>(
service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
Expand All @@ -159,26 +180,30 @@ fn generate_unary<T: Method>(
let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path, compile_well_known_types);
let method_name = method.identifier();

quote! {
pub async fn #ident(
&mut self,
request: Request<#request>,
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner
.unary(
self.inner.unary(
request,
codec,
path,
)
.await
invocation,
).await
}
}
}

fn generate_server_streaming<T: Method>(
service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
Expand All @@ -188,6 +213,7 @@ fn generate_server_streaming<T: Method>(
let ident = format_ident!("{}", method.name());

let (request, response) = method.request_response_name(proto_path, compile_well_known_types);
let method_name = method.identifier();

quote! {
pub async fn #ident(
Expand All @@ -196,13 +222,22 @@ fn generate_server_streaming<T: Method>(
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {

let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.server_streaming(request, codec, path).await
self.inner.server_streaming(
request,
codec,
path,
invocation,
).await
}
}
}

fn generate_client_streaming<T: Method>(
service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
Expand All @@ -212,20 +247,30 @@ fn generate_client_streaming<T: Method>(
let ident = format_ident!("{}", method.name());

let (request, response) = method.request_response_name(proto_path, compile_well_known_types);
let method_name = method.identifier();

quote! {
pub async fn #ident(
&mut self,
request: impl IntoStreamingRequest<Message = #request>
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.client_streaming(request, codec, path).await
self.inner.client_streaming(
request,
codec,
path,
invocation,
).await
}
}
}

fn generate_streaming<T: Method>(
service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
Expand All @@ -235,15 +280,24 @@ fn generate_streaming<T: Method>(
let ident = format_ident!("{}", method.name());

let (request, response) = method.request_response_name(proto_path, compile_well_known_types);
let method_name = method.identifier();

quote! {
pub async fn #ident(
&mut self,
request: impl IntoStreamingRequest<Message = #request>
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.bidi_streaming(request, codec, path).await
self.inner.bidi_streaming(
request,
codec,
path,
invocation,
).await
}
}
}
1 change: 1 addition & 0 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async-trait = "0.1.56"
tower-layer = "0.3"
bytes = "1.0"
pin-project = "1.0"
rand = "0.8.5"
serde_json = "1.0.82"
serde = {version="1.0.138", features = ["derive"]}
futures = "0.3"
Expand Down
Loading