diff --git a/Cargo.toml b/Cargo.toml index 1d173a38..735eda40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,9 +14,8 @@ members = [ "dubbo-build", "remoting/net", "remoting/http", - "remoting/h2", "remoting/zookeeper", - "remoting/exchange", + "remoting/base", "remoting/xds", "protocol/dubbo2", "protocol/base", @@ -28,6 +27,8 @@ members = [ pin-project = "1" tokio = "1.0" tower = "0.4" +tower-service = "0.3.1" +tower-layer = "0.3" tokio-stream = "0.1" tokio-util = "0.7" socket2 = "0.4" @@ -42,16 +43,20 @@ logger = {path="./common/logger"} utils = {path="./common/utils"} base = {path="./common/base"} remoting-net = {path="./remoting/net"} -protocol = {path= "protocol/base" } +remoting-base = {path="./remoting/base"} +protocol-base = {path= "protocol/base" } protocol-dubbo2 = {path="./protocol/dubbo2"} protocol-triple = {path="./protocol/triple"} registry-zookeeper = {path="./registry/zookeeper"} registry-nacos = {path="./registry/nacos"} anyhow = "1.0.66" +thiserror = "1.0.30" dubbo = { path = "./dubbo/" } bb8 = "0.8.0" # A connecton pool based on tokio serde_yaml = "0.9.4" # yaml file parser once_cell = "1.16.0" itertools = "0.10.1" +bytes = "1.0" + diff --git a/common/base/src/url.rs b/common/base/src/url.rs index 81a72c2b..48fbc1eb 100644 --- a/common/base/src/url.rs +++ b/common/base/src/url.rs @@ -161,6 +161,10 @@ impl Url { pub fn protocol(&self) -> String { self.scheme.clone() } + + pub fn get_ip_port(&self) -> String { + format!("{}:{}", self.ip, self.port) + } } impl Display for Url { diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 3b66523b..a814bd0d 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -12,23 +12,21 @@ repository = "https://github.com/apache/dubbo-rust.git" [dependencies] hyper = { version = "0.14.19", features = ["full"] } http = "0.2" -tower-service = "0.3.1" +tower-service.workspace = true http-body = "0.4.4" -tower = { version = "0.4.12", features = ["timeout"] } +tower = { workspace = true, features = ["timeout"] } futures-util = "0.3.23" futures-core = "0.3.23" -tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] } +tokio = { workspace = true, features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] } prost = "0.10.4" async-trait = "0.1.56" -tower-layer = "0.3" -bytes = "1.0" +tower-layer.workspace = true +bytes.workspace = true pin-project.workspace = true rand = "0.8.5" serde_json.workspace = true serde = { workspace = true, features = ["derive"] } -futures = "0.3" -tracing = "0.1" -tracing-subscriber = "0.3.15" +futures.workspace = true axum = "0.5.9" async-stream = "0.3" flate2 = "1.0" @@ -36,7 +34,8 @@ aws-smithy-http = "0.54.1" itertools.workspace = true urlencoding.workspace = true lazy_static.workspace = true -base.workspace=true +base.workspace = true +logger.workspace = true dubbo-config = { path = "../config", version = "0.3.0" } diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index d92bb20c..8c2536c0 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -27,6 +27,7 @@ use crate::{ registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper}, }; use base::Url; +use logger::tracing; /// Directory. /// diff --git a/dubbo/src/context.rs b/dubbo/src/context.rs index 6def1bf4..e94eb26a 100644 --- a/dubbo/src/context.rs +++ b/dubbo/src/context.rs @@ -21,6 +21,7 @@ use std::{ thread, }; +use logger::tracing; use serde_json::Value; use state::Container; diff --git a/dubbo/src/filter/context.rs b/dubbo/src/filter/context.rs index 3f73fd00..b17168bf 100644 --- a/dubbo/src/filter/context.rs +++ b/dubbo/src/filter/context.rs @@ -17,6 +17,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; +use logger::tracing; use serde_json::Value; use crate::{ diff --git a/dubbo/src/filter/timeout.rs b/dubbo/src/filter/timeout.rs index 353585d7..ea14183f 100644 --- a/dubbo/src/filter/timeout.rs +++ b/dubbo/src/filter/timeout.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use logger::tracing; use std::time::{SystemTime, UNIX_EPOCH}; use crate::{ diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 546e2d0f..f91ee434 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -22,10 +22,6 @@ use std::{ sync::{Arc, Mutex}, }; -use base::Url; -use futures::{future, Future}; -use tracing::{debug, info}; - use crate::{ protocol::{BoxExporter, Protocol}, registry::{ @@ -34,7 +30,10 @@ use crate::{ BoxRegistry, Registry, }, }; +use base::Url; use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig}; +use futures::{future, Future}; +use logger::tracing; // Invoker是否可以基于hyper写一个通用的 @@ -78,10 +77,10 @@ impl Dubbo { } let root_config = self.config.as_ref().unwrap(); - debug!("global conf: {:?}", root_config); + tracing::debug!("global conf: {:?}", root_config); // env::set_var("ZOOKEEPER_SERVERS",root_config); for (_, service_config) in root_config.provider.services.iter() { - info!("init service name: {}", service_config.interface); + tracing::info!("init service name: {}", service_config.interface); let url = if root_config .protocols .contains_key(service_config.protocol.as_str()) @@ -91,12 +90,12 @@ impl Dubbo { .get_protocol_or_default(service_config.protocol.as_str()); let protocol_url = format!("{}/{}", protocol.to_url(), service_config.interface.clone(),); - info!("protocol_url: {:?}", protocol_url); + tracing::info!("protocol_url: {:?}", protocol_url); Url::from_url(&protocol_url) } else { return Err(format!("base {:?} not exists", service_config.protocol).into()); }; - info!("url: {:?}", url); + tracing::info!("url: {:?}", url); if url.is_none() { continue; } @@ -116,7 +115,7 @@ impl Dubbo { pub async fn start(&mut self) { self.init().unwrap(); - info!("starting..."); + tracing::info!("starting..."); // TODO: server registry let mem_reg = Box::new( RegistryProtocol::new() @@ -126,7 +125,7 @@ impl Dubbo { let mut async_vec: Vec + Send>>> = Vec::new(); for (name, items) in self.protocols.iter() { for url in items.iter() { - info!("base: {:?}, service url: {:?}", name, url); + tracing::info!("base: {:?}, service url: {:?}", name, url); let exporter = mem_reg.clone().export(url.to_owned()); async_vec.push(exporter); //TODO multiple registry diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs index 7f900f63..38d08e26 100644 --- a/dubbo/src/registry/memory_registry.rs +++ b/dubbo/src/registry/memory_registry.rs @@ -17,11 +17,11 @@ #![allow(unused_variables, dead_code, missing_docs)] +use logger::tracing::debug; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tracing::debug; use base::Url; diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs index d28e43de..4f86fd35 100644 --- a/dubbo/src/registry/protocol.rs +++ b/dubbo/src/registry/protocol.rs @@ -16,6 +16,7 @@ */ use base::Url; +use logger::tracing; use std::{ collections::HashMap, fmt::{Debug, Formatter}, diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs index a55e72c2..16b9063d 100644 --- a/dubbo/src/registry/types.rs +++ b/dubbo/src/registry/types.rs @@ -22,7 +22,7 @@ use std::{ use base::Url; use itertools::Itertools; -use tracing::info; +use logger::tracing::info; use crate::{ registry::{BoxRegistry, Registry}, diff --git a/dubbo/src/triple/decode.rs b/dubbo/src/triple/decode.rs index 26c4cd64..efdbee6b 100644 --- a/dubbo/src/triple/decode.rs +++ b/dubbo/src/triple/decode.rs @@ -20,6 +20,7 @@ use std::{pin::Pin, task::Poll}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures_util::{future, ready, Stream}; use http_body::Body; +use logger::tracing; use super::compression::{decompress, CompressionEncoding}; use crate::{ diff --git a/dubbo/src/triple/server/builder.rs b/dubbo/src/triple/server/builder.rs index e82ff058..1623540a 100644 --- a/dubbo/src/triple/server/builder.rs +++ b/dubbo/src/triple/server/builder.rs @@ -23,6 +23,7 @@ use std::{ use base::Url; use http::{Request, Response, Uri}; use hyper::body::Body; +use logger::tracing; use tower_service::Service; use crate::{triple::transport::DubboServer, BoxBody}; diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs index 360149bb..2188f2bd 100644 --- a/dubbo/src/triple/transport/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -18,8 +18,8 @@ use std::task::Poll; use hyper::client::{conn::Builder, service::Connect}; +use logger::tracing::debug; use tower_service::Service; -use tracing::debug; use crate::{boxed, triple::transport::connector::get_connector}; diff --git a/dubbo/src/triple/transport/connector/http_connector.rs b/dubbo/src/triple/transport/connector/http_connector.rs index f324466b..255d64e0 100644 --- a/dubbo/src/triple/transport/connector/http_connector.rs +++ b/dubbo/src/triple/transport/connector/http_connector.rs @@ -22,6 +22,7 @@ use std::{ use http::Uri; use hyper::client::connect::dns::Name; +use logger::tracing; use tokio::net::TcpStream; use tower_service::Service; diff --git a/dubbo/src/triple/transport/connector/unix_connector.rs b/dubbo/src/triple/transport/connector/unix_connector.rs index 491ba23a..70ceda99 100644 --- a/dubbo/src/triple/transport/connector/unix_connector.rs +++ b/dubbo/src/triple/transport/connector/unix_connector.rs @@ -22,6 +22,7 @@ use std::{ use http::Uri; use hyper::client::connect::dns::Name; +use logger::tracing; use tokio::net::UnixStream; use tower_service::Service; diff --git a/dubbo/src/triple/transport/listener/mod.rs b/dubbo/src/triple/transport/listener/mod.rs index 31356194..e3a70d76 100644 --- a/dubbo/src/triple/transport/listener/mod.rs +++ b/dubbo/src/triple/transport/listener/mod.rs @@ -22,6 +22,7 @@ pub mod unix_listener; use std::net::SocketAddr; use async_trait::async_trait; +use logger::tracing; use tokio::io::{AsyncRead, AsyncWrite}; use super::io::BoxIO; diff --git a/dubbo/src/triple/transport/listener/tcp_listener.rs b/dubbo/src/triple/transport/listener/tcp_listener.rs index a7c94879..2ebbf4f3 100644 --- a/dubbo/src/triple/transport/listener/tcp_listener.rs +++ b/dubbo/src/triple/transport/listener/tcp_listener.rs @@ -21,6 +21,7 @@ use super::Listener; use async_trait::async_trait; use futures_core::Stream; use hyper::server::accept::Accept; +use logger::tracing; use tokio::net::{TcpListener as tokioTcpListener, TcpStream}; pub struct TcpListener { diff --git a/dubbo/src/triple/transport/listener/unix_listener.rs b/dubbo/src/triple/transport/listener/unix_listener.rs index 5034de56..add071a8 100644 --- a/dubbo/src/triple/transport/listener/unix_listener.rs +++ b/dubbo/src/triple/transport/listener/unix_listener.rs @@ -21,6 +21,7 @@ use super::Listener; use async_trait::async_trait; use futures_core::Stream; use hyper::server::accept::Accept; +use logger::tracing; use tokio::net::{UnixListener as tokioUnixListener, UnixStream}; pub struct UnixListener { diff --git a/dubbo/src/triple/transport/service.rs b/dubbo/src/triple/transport/service.rs index 9698276d..b306085d 100644 --- a/dubbo/src/triple/transport/service.rs +++ b/dubbo/src/triple/transport/service.rs @@ -20,6 +20,7 @@ use std::net::SocketAddr; use futures_core::Future; use http::{Request, Response}; use hyper::body::Body; +use logger::tracing; use tokio::time::Duration; use tower_service::Service; diff --git a/protocol/base/Cargo.toml b/protocol/base/Cargo.toml index 379c3425..e3e8d86c 100644 --- a/protocol/base/Cargo.toml +++ b/protocol/base/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "protocol" +name = "protocol-base" version = "0.1.0" edition = "2021" @@ -7,4 +7,5 @@ edition = "2021" [dependencies] dashmap.workspace = true -base.workspace = true \ No newline at end of file +base.workspace = true +thiserror.workspace = true \ No newline at end of file diff --git a/protocol/base/src/error.rs b/protocol/base/src/error.rs index 0ad0c26c..e6c424a5 100644 --- a/protocol/base/src/error.rs +++ b/protocol/base/src/error.rs @@ -14,24 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use thiserror::Error; -use std::{ - error::Error, - fmt::{Debug, Display, Formatter}, -}; - -pub struct InvokerError(String); - -impl Debug for InvokerError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(self.0.as_str()) - } +#[derive(Error, Debug)] +pub enum InvokerError { + #[error("unknown invoker error.")] + Unknown, } - -impl Display for InvokerError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(self.0.as_str()) - } -} - -impl Error for InvokerError {} diff --git a/protocol/base/src/lib.rs b/protocol/base/src/lib.rs index 6928742a..751af40c 100644 --- a/protocol/base/src/lib.rs +++ b/protocol/base/src/lib.rs @@ -19,3 +19,5 @@ pub mod error; pub mod invocation; pub mod invoker; pub mod output; + +pub type ProtocolName = &'static str; diff --git a/protocol/dubbo2/Cargo.toml b/protocol/dubbo2/Cargo.toml index 09e950ca..1f39f12e 100644 --- a/protocol/dubbo2/Cargo.toml +++ b/protocol/dubbo2/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -remoting-net.workspace = true -protocol.workspace = true \ No newline at end of file +remoting-base.workspace = true +protocol-base.workspace = true diff --git a/protocol/triple/Cargo.toml b/protocol/triple/Cargo.toml index b342a3b1..7b4dd9f9 100644 --- a/protocol/triple/Cargo.toml +++ b/protocol/triple/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] remoting-net.workspace = true -protocol.workspace = true +protocol-base.workspace = true base.workspace = true diff --git a/protocol/triple/src/triple_invoker.rs b/protocol/triple/src/triple_invoker.rs index 4a2e74f1..6756dce4 100644 --- a/protocol/triple/src/triple_invoker.rs +++ b/protocol/triple/src/triple_invoker.rs @@ -15,7 +15,8 @@ * limitations under the License. */ use base::{Node, Url}; -use protocol::{ + +use protocol_base::{ invocation::BoxInvocation, invoker::{BaseInvoker, Invoker}, }; diff --git a/remoting/base/Cargo.toml b/remoting/base/Cargo.toml new file mode 100644 index 00000000..f5f2efcd --- /dev/null +++ b/remoting/base/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "remoting-base" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes.workspace = true +base.workspace = true +thiserror.workspace = true +dashmap.workspace = true +protocol-base.workspace = true +anyhow.workspace = true \ No newline at end of file diff --git a/remoting/exchange/LICENSE b/remoting/base/LICENSE similarity index 100% rename from remoting/exchange/LICENSE rename to remoting/base/LICENSE diff --git a/remoting/h2/src/lib.rs b/remoting/base/src/builder/client.rs similarity index 78% rename from remoting/h2/src/lib.rs rename to remoting/base/src/builder/client.rs index d64452d5..4dded803 100644 --- a/remoting/h2/src/lib.rs +++ b/remoting/base/src/builder/client.rs @@ -14,17 +14,5 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +// for tower ServiceBuilder; the input starts from Bytes +// exchange is a part of LayerStack diff --git a/remoting/exchange/src/lib.rs b/remoting/base/src/builder/mod.rs similarity index 78% rename from remoting/exchange/src/lib.rs rename to remoting/base/src/builder/mod.rs index d64452d5..069b5308 100644 --- a/remoting/exchange/src/lib.rs +++ b/remoting/base/src/builder/mod.rs @@ -14,17 +14,5 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +pub mod client; +pub mod server; diff --git a/remoting/base/src/builder/server.rs b/remoting/base/src/builder/server.rs new file mode 100644 index 00000000..4dded803 --- /dev/null +++ b/remoting/base/src/builder/server.rs @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// for tower ServiceBuilder; the input starts from Bytes +// exchange is a part of LayerStack diff --git a/remoting/base/src/codec.rs b/remoting/base/src/codec.rs new file mode 100644 index 00000000..d53383d4 --- /dev/null +++ b/remoting/base/src/codec.rs @@ -0,0 +1,124 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +use std::sync::Arc; + +use anyhow::{anyhow, Error}; +use bytes::{self, Bytes}; +use dashmap::DashMap; +use protocol_base::ProtocolName; + +use crate::{error::CodecError, Response}; + +#[derive(Clone)] +pub struct BoxedCodec(Arc); + +impl BoxedCodec { + pub fn new(codec: Arc) -> Self { + BoxedCodec(codec) + } +} + +pub trait Codec: Sync + Send { + fn encode_request(&self) -> Result; + fn encode_response(&self) -> Result; + fn decode(&self, bytes: Bytes) -> Result; +} + +pub struct CodecRegistry { + registry: DashMap, +} + +#[derive(Default)] +pub struct CodecResult { + is_request: bool, // heartbeat flag + result: Option, +} + +impl Default for CodecRegistry { + fn default() -> Self { + CodecRegistry { + registry: DashMap::new(), + } + } +} +impl CodecRegistry { + pub fn get_codec(&self, protocol: ProtocolName) -> Option { + let registry_map = &self.registry; + if let true = registry_map.contains_key(protocol) { + let option = registry_map.get(protocol); + let codec = option.as_deref().unwrap(); + Some(codec.clone()) + } else { + None + } + } + pub fn set_codec( + &mut self, + protocol: ProtocolName, + codec: BoxedCodec, + ) -> anyhow::Result<(), CodecError> { + if let true = self.registry.contains_key(protocol) { + return Err(CodecError::RegistryExistsProtocol(protocol)); + } else { + self.registry.insert(protocol, codec); + } + Ok(()) + } + + pub fn is_registered(&self, protocol: ProtocolName) -> bool { + self.registry.contains_key(protocol) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use bytes::Bytes; + + use crate::{ + codec::{BoxedCodec, CodecRegistry, CodecResult}, + error::CodecError, + Codec, + }; + + #[derive(Default)] + struct TestCodec; + impl Codec for TestCodec { + fn encode_request(&self) -> Result { + Ok(Bytes::new()) + } + + fn encode_response(&self) -> Result { + Ok(Bytes::new()) + } + + fn decode(&self, bytes: Bytes) -> Result { + Ok(CodecResult::default()) + } + } + + #[test] + fn test_registry() { + let mut codec_registry = CodecRegistry::default(); + codec_registry + .set_codec("test", BoxedCodec(Arc::new(TestCodec::default()))) + .unwrap(); + assert!(codec_registry.is_registered("test")); + } +} diff --git a/remoting/base/src/error.rs b/remoting/base/src/error.rs new file mode 100644 index 00000000..4351b755 --- /dev/null +++ b/remoting/base/src/error.rs @@ -0,0 +1,32 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +use protocol_base::ProtocolName; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum CodecError { + #[error("unknown codec error.")] + Unknown, + #[error("protocol {0} is registered.")] + RegistryExistsProtocol(ProtocolName), +} + +#[derive(Error, Debug)] +pub enum ClientError { + #[error("unknown client error")] + Unknown, +} diff --git a/remoting/base/src/exchange/client.rs b/remoting/base/src/exchange/client.rs new file mode 100644 index 00000000..4fceca7d --- /dev/null +++ b/remoting/base/src/exchange/client.rs @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use base::Url; +use std::{ + sync::{ + atomic::{AtomicBool, AtomicI32, Ordering}, + Arc, + }, + time, + time::Duration, +}; + +use crate::{ + error::ClientError, + exchange::{Request, Response}, +}; + +pub struct BoxedClient(Arc); + +pub trait Client: Sync + Send { + fn connect(&self, url: Url) -> Result<(), ClientError>; + fn request(&self, request: Request, timeout: Duration) -> Result; + fn close(&mut self) -> Result<(), ClientError>; + fn is_available(&self) -> bool; +} + +pub struct ExchangeClient { + connection_timeout: Duration, // timeout when connecting to server + address: String, // listening ip:port + client: Option, // dealing with the transports + init: AtomicBool, // whether the client is initialized + active: AtomicI32, // the number of active service bind to this client +} + +impl ExchangeClient { + pub fn new(url: Url, client: BoxedClient, connection_timeout: Duration) -> Self { + ExchangeClient { + connection_timeout, + address: url.get_ip_port(), + client: None, + init: AtomicBool::new(false), + active: AtomicI32::new(0), + } + } +} + +impl Client for ExchangeClient { + fn connect(&self, url: Url) -> Result<(), ClientError> { + if self.init.load(Ordering::SeqCst) { + return Ok(()); + } + Ok(()) + } + + fn request(&self, request: Request, timeout: Duration) -> Result { + todo!() + } + + fn close(&mut self) -> Result<(), ClientError> { + self.init.store(false, Ordering::SeqCst); + Ok(()) + } + + fn is_available(&self) -> bool { + let client = self.client.as_ref().unwrap(); + client.0.is_available() + } +} diff --git a/remoting/base/src/exchange/mod.rs b/remoting/base/src/exchange/mod.rs new file mode 100644 index 00000000..968b6cd1 --- /dev/null +++ b/remoting/base/src/exchange/mod.rs @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::CodecError; +use std::{any::Any, sync::Arc}; +pub mod client; +pub mod server; + +pub type BoxedExchangeBody = Arc; + +pub struct Request { + id: u64, + version: String, // protocol version + serial_id: u8, // serial ID (ignore) + body: Option, + two_way: bool, + event: bool, +} + +pub struct Response { + id: u64, + version: String, // protocol version + serial_id: u8, // serial ID (ignore) + status: u8, + body: Option, // mean result + event: bool, + error: Option, +} + +impl Response { + fn is_heart_beat(&self) -> bool { + self.event && self.body.is_none() + } +} diff --git a/remoting/base/src/exchange/server.rs b/remoting/base/src/exchange/server.rs new file mode 100644 index 00000000..26b1ecc5 --- /dev/null +++ b/remoting/base/src/exchange/server.rs @@ -0,0 +1,48 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +use anyhow::{Error, Result}; +use base::Url; +use std::sync::Arc; + +pub struct BoxedServer(Arc); + +pub trait Server: Sync + Send { + fn start(&self) -> Result<(), Error>; + fn stop(&self) -> Result<(), Error>; +} + +pub struct ExchangeServer { + server: BoxedServer, + url: Url, +} + +impl ExchangeServer { + pub fn new(url: Url, server: BoxedServer) -> Self { + ExchangeServer { server, url } + } +} + +impl Server for ExchangeServer { + fn start(&self) -> Result<(), Error> { + self.server.0.start() + } + + fn stop(&self) -> Result<(), Error> { + self.server.0.stop() + } +} diff --git a/remoting/base/src/lib.rs b/remoting/base/src/lib.rs new file mode 100644 index 00000000..0417afc6 --- /dev/null +++ b/remoting/base/src/lib.rs @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#![cfg_attr( + debug_assertions, + allow(dead_code, unused_imports, unused_variables, unused_mut) +)] +pub use codec::Codec; +pub mod builder; +pub mod codec; +pub mod error; +pub mod exchange; + +pub use exchange::{BoxedExchangeBody, Request, Response}; diff --git a/remoting/exchange/Cargo.toml b/remoting/exchange/Cargo.toml deleted file mode 100644 index ec14668a..00000000 --- a/remoting/exchange/Cargo.toml +++ /dev/null @@ -1,8 +0,0 @@ -[package] -name = "remoting-exchange" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/remoting/h2/Cargo.toml b/remoting/h2/Cargo.toml deleted file mode 100644 index 12a28045..00000000 --- a/remoting/h2/Cargo.toml +++ /dev/null @@ -1,8 +0,0 @@ -[package] -name = "remoting-h2" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/remoting/h2/LICENSE b/remoting/h2/LICENSE deleted file mode 100644 index d6456956..00000000 --- a/remoting/h2/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License.