diff --git a/config/src/config.rs b/config/src/config.rs index a07cb333..b82475cb 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -21,6 +21,7 @@ use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use super::protocol::ProtocolConfig; +use super::provider::ProviderConfig; use super::service::ServiceConfig; pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml"; @@ -35,10 +36,14 @@ lazy_static! { #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct RootConfig { pub name: String, + + #[serde(skip_serializing, skip_deserializing)] pub service: HashMap, pub protocols: HashMap, pub registries: HashMap, + pub provider: ProviderConfig, + #[serde(skip_serializing, skip_deserializing)] pub data: HashMap, } @@ -65,6 +70,7 @@ impl RootConfig { service: HashMap::new(), protocols: HashMap::new(), registries: HashMap::new(), + provider: ProviderConfig::new(), data: HashMap::new(), } } @@ -96,6 +102,10 @@ impl RootConfig { } pub fn test_config(&mut self) { + let mut provider = ProviderConfig::new(); + provider.protocol_ids = vec!["triple".to_string()]; + provider.registry_ids = vec![]; + let service_config = ServiceConfig::default() .group("test".to_string()) .serializer("json".to_string()) @@ -127,6 +137,10 @@ impl RootConfig { .ip("0.0.0.0".to_string()) .port("8889".to_string()), ); + + provider.services = self.service.clone(); + self.provider = provider.clone(); + println!("provider config: {:?}", provider); // 通过环境变量读取某个文件。加在到内存中 self.data.insert( "dubbo.provider.url".to_string(), @@ -172,6 +186,12 @@ pub trait Config { mod tests { use super::*; + #[test] + fn test_config() { + let mut r = RootConfig::new(); + r.test_config(); + } + #[test] fn test_load() { // case 1: read config yaml from default path diff --git a/config/src/lib.rs b/config/src/lib.rs index 6237816a..70a33cd9 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -17,6 +17,7 @@ pub mod config; pub mod protocol; +pub mod provider; pub mod service; pub use config::*; diff --git a/config/src/provider.rs b/config/src/provider.rs new file mode 100644 index 00000000..b7bec785 --- /dev/null +++ b/config/src/provider.rs @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use super::service::ServiceConfig; + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct ProviderConfig { + pub registry_ids: Vec, + + pub protocol_ids: Vec, + + pub services: HashMap, +} + +impl ProviderConfig { + pub fn new() -> Self { + ProviderConfig { + registry_ids: vec![], + protocol_ids: vec![], + services: HashMap::new(), + } + } + + pub fn with_registry_ids(mut self, registry_ids: Vec) -> Self { + self.registry_ids = registry_ids; + self + } + + pub fn with_protocol_ids(mut self, protocol_ids: Vec) -> Self { + self.protocol_ids = protocol_ids; + self + } + + pub fn with_services(mut self, services: HashMap) -> Self { + self.services = services; + self + } +} diff --git a/docs/config.md b/docs/config.md new file mode 100644 index 00000000..342c2bf0 --- /dev/null +++ b/docs/config.md @@ -0,0 +1,15 @@ +## 关于配置的一些约定(暂时) + +所有的服务只能注册到一个或多个注册中心 +所有的服务只能使用Triple进行通信 +Triple只能对外暴露一个端口 + +## Config配置 + +每个组件的配置是独立的。 + +Provider、Consumer等使用独立组件的配置进行工作 + +Provider Config核心设计以及Url模型流转: + +Provider和Consumer使用组件的配置 \ No newline at end of file diff --git a/dubbo/src/common/consts.rs b/dubbo/src/common/consts.rs index 8d941739..c05c2c77 100644 --- a/dubbo/src/common/consts.rs +++ b/dubbo/src/common/consts.rs @@ -15,4 +15,6 @@ * limitations under the License. */ -pub const REGISTRY_PROTOCOL: &str = "registry"; +pub const REGISTRY_PROTOCOL: &str = "registry_protocol"; +pub const PROTOCOL: &str = "protocol"; +pub const REGISTRY: &str = "registry"; diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs index 1bce0afe..4f79b300 100644 --- a/dubbo/src/common/url.rs +++ b/dubbo/src/common/url.rs @@ -96,7 +96,8 @@ mod tests { #[test] fn test_from_url() { - let u1 = Url::from_url(""); + let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter"); + println!("{:?}", u1.unwrap().get_service_name()) } #[test] diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 00e46d7b..5cff9e80 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -20,11 +20,10 @@ use std::pin::Pin; use futures::future; use futures::Future; -use futures::FutureExt; use crate::common::url::Url; -use crate::protocol::triple::triple_protocol::TripleProtocol; use crate::protocol::{BoxExporter, Protocol}; +use crate::registry::protocol::RegistryProtocol; use dubbo_config::{get_global_config, RootConfig}; // Invoker是否可以基于hyper写一个通用的 @@ -33,6 +32,7 @@ use dubbo_config::{get_global_config, RootConfig}; pub struct Dubbo { protocols: HashMap>, registries: HashMap, + service_registry: HashMap>, // registry: Urls config: Option, } @@ -42,6 +42,7 @@ impl Dubbo { Self { protocols: HashMap::new(), registries: HashMap::new(), + service_registry: HashMap::new(), config: None, } } @@ -64,7 +65,7 @@ impl Dubbo { .insert(name.to_string(), Url::from_url(url).unwrap()); } - for (_, c) in conf.service.iter() { + for (_, c) in conf.provider.services.iter() { let u = if c.protocol_configs.is_empty() { let protocol = match conf.protocols.get(&c.protocol) { Some(v) => v.to_owned(), @@ -92,6 +93,18 @@ impl Dubbo { } let u = u.unwrap(); + + let reg_url = self.registries.get(&c.registry).unwrap(); + if self.service_registry.get(&c.name).is_some() { + self.service_registry + .get_mut(&c.name) + .unwrap() + .push(reg_url.clone()); + } else { + self.service_registry + .insert(c.name.clone(), vec![reg_url.clone()]); + } + if self.protocols.get(&c.protocol).is_some() { self.protocols.get_mut(&c.protocol).unwrap().push(u); } else { @@ -105,19 +118,14 @@ impl Dubbo { // TODO: server registry + let mem_reg = + Box::new(RegistryProtocol::new().with_services(self.service_registry.clone())); let mut async_vec: Vec + Send>>> = Vec::new(); - for (key, c) in self.protocols.iter() { - match key.as_str() { - "triple" => { - let pro = Box::new(TripleProtocol::new()); - for u in c.iter() { - let tri_fut = pro.clone().export(u.clone()).boxed(); - async_vec.push(tri_fut); - } - } - _ => { - tracing::error!("protocol {:?} not implemented", key); - } + for (name, items) in self.protocols.iter() { + for url in items.iter() { + tracing::info!("protocol: {:?}, service url: {:?}", name, url); + let exporter = mem_reg.clone().export(url.to_owned()); + async_vec.push(exporter) } } diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs index 7997611a..67df5c7a 100644 --- a/dubbo/src/registry/memory_registry.rs +++ b/dubbo/src/registry/memory_registry.rs @@ -17,6 +17,7 @@ #![allow(unused_variables, dead_code, missing_docs)] use std::collections::HashMap; +use std::sync::Arc; use std::sync::RwLock; use super::{NotifyListener, Registry}; @@ -27,15 +28,15 @@ use super::{NotifyListener, Registry}; pub const REGISTRY_GROUP_KEY: &str = "registry.group"; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct MemoryRegistry { - registries: RwLock>, + registries: Arc>>, } impl MemoryRegistry { pub fn new() -> MemoryRegistry { MemoryRegistry { - registries: RwLock::new(HashMap::new()), + registries: Arc::new(RwLock::new(HashMap::new())), } } } diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs index 5b0973a3..933975cb 100644 --- a/dubbo/src/registry/protocol.rs +++ b/dubbo/src/registry/protocol.rs @@ -16,14 +16,14 @@ */ use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use super::memory_registry::MemoryRegistry; use super::BoxRegistry; use crate::codegen::TripleInvoker; -use crate::common::consts; use crate::common::url::Url; use crate::protocol::triple::triple_exporter::TripleExporter; +use crate::protocol::triple::triple_protocol::TripleProtocol; use crate::protocol::BoxExporter; use crate::protocol::BoxInvoker; use crate::protocol::Protocol; @@ -31,24 +31,35 @@ use crate::protocol::Protocol; #[derive(Clone, Default)] pub struct RegistryProtocol { // registerAddr: Registry - registries: Arc>, + registries: Arc>>, // providerUrl: Exporter - exporters: Arc>, + exporters: Arc>>, + // serviceName: registryUrls + services: HashMap>, } impl RegistryProtocol { pub fn new() -> Self { RegistryProtocol { - registries: Arc::new(HashMap::new()), - exporters: Arc::new(HashMap::new()), + registries: Arc::new(RwLock::new(HashMap::new())), + exporters: Arc::new(RwLock::new(HashMap::new())), + services: HashMap::new(), } } - pub fn get_registry(&self, url: Url) -> BoxRegistry { - // self.registries.clone().insert(url.location.clone(), Box::new(MemoryRegistry::default())); + pub fn with_services(mut self, services: HashMap>) -> Self { + self.services.extend(services); + self + } + + pub fn get_registry(&mut self, url: Url) -> BoxRegistry { + let mem = MemoryRegistry::default(); + self.registries + .write() + .unwrap() + .insert(url.location, Box::new(mem.clone())); - // *(self.registries.get(&url.location).unwrap()) - Box::new(MemoryRegistry::default()) + Box::new(mem) } } @@ -60,14 +71,34 @@ impl Protocol for RegistryProtocol { todo!() } - async fn export(self, url: Url) -> BoxExporter { + async fn export(mut self, url: Url) -> BoxExporter { // getProviderUrl // getRegisterUrl // init Exporter based on provider_url // server registry based on register_url // start server health check - Box::new(TripleExporter::new()) + let registry_url = self.services.get(url.get_service_name().join(",").as_str()); + if let Some(urls) = registry_url { + for url in urls.clone().iter() { + if !url.protocol.is_empty() { + let mut reg = self.get_registry(url.clone()); + reg.register(url.clone()).unwrap(); + } + } + } + + match url.clone().protocol.as_str() { + "triple" => { + let pro = Box::new(TripleProtocol::new()); + return pro.export(url).await; + } + _ => { + tracing::error!("protocol {:?} not implemented", url.protocol); + Box::new(TripleExporter::new()) + } + } } + async fn refer(self, url: Url) -> Self::Invoker { // getRegisterUrl // get Registry from registry_url @@ -76,15 +107,3 @@ impl Protocol for RegistryProtocol { Box::new(TripleInvoker::new(url)) } } - -fn get_registry_url(mut url: Url) -> Url { - if url.protocol == consts::REGISTRY_PROTOCOL { - url.protocol = url.get_param("registry".to_string()).unwrap(); - } - - url -} - -fn get_provider_url(url: Url) -> Url { - url -}