Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};

use super::protocol::ProtocolConfig;
use super::provider::ProviderConfig;
use super::service::ServiceConfig;

pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
Expand All @@ -35,10 +36,14 @@ lazy_static! {
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct RootConfig {
pub name: String,

#[serde(skip_serializing, skip_deserializing)]
pub service: HashMap<String, ServiceConfig>,
pub protocols: HashMap<String, ProtocolConfig>,
pub registries: HashMap<String, String>,

pub provider: ProviderConfig,

#[serde(skip_serializing, skip_deserializing)]
pub data: HashMap<String, String>,
}
Expand All @@ -65,6 +70,7 @@ impl RootConfig {
service: HashMap::new(),
protocols: HashMap::new(),
registries: HashMap::new(),
provider: ProviderConfig::new(),
data: HashMap::new(),
}
}
Expand Down Expand Up @@ -96,6 +102,10 @@ impl RootConfig {
}

pub fn test_config(&mut self) {
let mut provider = ProviderConfig::new();
provider.protocol_ids = vec!["triple".to_string()];
provider.registry_ids = vec![];

let service_config = ServiceConfig::default()
.group("test".to_string())
.serializer("json".to_string())
Expand Down Expand Up @@ -127,6 +137,10 @@ impl RootConfig {
.ip("0.0.0.0".to_string())
.port("8889".to_string()),
);

provider.services = self.service.clone();
self.provider = provider.clone();
println!("provider config: {:?}", provider);
// 通过环境变量读取某个文件。加在到内存中
self.data.insert(
"dubbo.provider.url".to_string(),
Expand Down Expand Up @@ -172,6 +186,12 @@ pub trait Config {
mod tests {
use super::*;

#[test]
fn test_config() {
let mut r = RootConfig::new();
r.test_config();
}

#[test]
fn test_load() {
// case 1: read config yaml from default path
Expand Down
1 change: 1 addition & 0 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

pub mod config;
pub mod protocol;
pub mod provider;
pub mod service;

pub use config::*;
Expand Down
56 changes: 56 additions & 0 deletions config/src/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use super::service::ServiceConfig;

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct ProviderConfig {
pub registry_ids: Vec<String>,

pub protocol_ids: Vec<String>,

pub services: HashMap<String, ServiceConfig>,
}

impl ProviderConfig {
pub fn new() -> Self {
ProviderConfig {
registry_ids: vec![],
protocol_ids: vec![],
services: HashMap::new(),
}
}

pub fn with_registry_ids(mut self, registry_ids: Vec<String>) -> Self {
self.registry_ids = registry_ids;
self
}

pub fn with_protocol_ids(mut self, protocol_ids: Vec<String>) -> Self {
self.protocol_ids = protocol_ids;
self
}

pub fn with_services(mut self, services: HashMap<String, ServiceConfig>) -> Self {
self.services = services;
self
}
}
15 changes: 15 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## 关于配置的一些约定(暂时)

所有的服务只能注册到一个或多个注册中心
所有的服务只能使用Triple进行通信
Triple只能对外暴露一个端口

## Config配置

每个组件的配置是独立的。

Provider、Consumer等使用独立组件的配置进行工作

Provider Config核心设计以及Url模型流转:

Provider和Consumer使用组件的配置
4 changes: 3 additions & 1 deletion dubbo/src/common/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
* limitations under the License.
*/

pub const REGISTRY_PROTOCOL: &str = "registry";
pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
pub const PROTOCOL: &str = "protocol";
pub const REGISTRY: &str = "registry";
3 changes: 2 additions & 1 deletion dubbo/src/common/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ mod tests {

#[test]
fn test_from_url() {
let u1 = Url::from_url("");
let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
println!("{:?}", u1.unwrap().get_service_name())
}

#[test]
Expand Down
38 changes: 23 additions & 15 deletions dubbo/src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use std::pin::Pin;

use futures::future;
use futures::Future;
use futures::FutureExt;

use crate::common::url::Url;
use crate::protocol::triple::triple_protocol::TripleProtocol;
use crate::protocol::{BoxExporter, Protocol};
use crate::registry::protocol::RegistryProtocol;
use dubbo_config::{get_global_config, RootConfig};

// Invoker是否可以基于hyper写一个通用的
Expand All @@ -33,6 +32,7 @@ use dubbo_config::{get_global_config, RootConfig};
pub struct Dubbo {
protocols: HashMap<String, Vec<Url>>,
registries: HashMap<String, Url>,
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
config: Option<RootConfig>,
}

Expand All @@ -42,6 +42,7 @@ impl Dubbo {
Self {
protocols: HashMap::new(),
registries: HashMap::new(),
service_registry: HashMap::new(),
config: None,
}
}
Expand All @@ -64,7 +65,7 @@ impl Dubbo {
.insert(name.to_string(), Url::from_url(url).unwrap());
}

for (_, c) in conf.service.iter() {
for (_, c) in conf.provider.services.iter() {
let u = if c.protocol_configs.is_empty() {
let protocol = match conf.protocols.get(&c.protocol) {
Some(v) => v.to_owned(),
Expand Down Expand Up @@ -92,6 +93,18 @@ impl Dubbo {
}

let u = u.unwrap();

let reg_url = self.registries.get(&c.registry).unwrap();
if self.service_registry.get(&c.name).is_some() {
self.service_registry
.get_mut(&c.name)
.unwrap()
.push(reg_url.clone());
} else {
self.service_registry
.insert(c.name.clone(), vec![reg_url.clone()]);
}

if self.protocols.get(&c.protocol).is_some() {
self.protocols.get_mut(&c.protocol).unwrap().push(u);
} else {
Expand All @@ -105,19 +118,14 @@ impl Dubbo {

// TODO: server registry

let mem_reg =
Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
for (key, c) in self.protocols.iter() {
match key.as_str() {
"triple" => {
let pro = Box::new(TripleProtocol::new());
for u in c.iter() {
let tri_fut = pro.clone().export(u.clone()).boxed();
async_vec.push(tri_fut);
}
}
_ => {
tracing::error!("protocol {:?} not implemented", key);
}
for (name, items) in self.protocols.iter() {
for url in items.iter() {
tracing::info!("protocol: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
async_vec.push(exporter)
}
}

Expand Down
7 changes: 4 additions & 3 deletions dubbo/src/registry/memory_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#![allow(unused_variables, dead_code, missing_docs)]
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use super::{NotifyListener, Registry};
Expand All @@ -27,15 +28,15 @@ use super::{NotifyListener, Registry};

pub const REGISTRY_GROUP_KEY: &str = "registry.group";

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct MemoryRegistry {
registries: RwLock<HashMap<String, String>>,
registries: Arc<RwLock<HashMap<String, String>>>,
}

impl MemoryRegistry {
pub fn new() -> MemoryRegistry {
MemoryRegistry {
registries: RwLock::new(HashMap::new()),
registries: Arc::new(RwLock::new(HashMap::new())),
}
}
}
Expand Down
67 changes: 43 additions & 24 deletions dubbo/src/registry/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,50 @@
*/

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use super::memory_registry::MemoryRegistry;
use super::BoxRegistry;
use crate::codegen::TripleInvoker;
use crate::common::consts;
use crate::common::url::Url;
use crate::protocol::triple::triple_exporter::TripleExporter;
use crate::protocol::triple::triple_protocol::TripleProtocol;
use crate::protocol::BoxExporter;
use crate::protocol::BoxInvoker;
use crate::protocol::Protocol;

#[derive(Clone, Default)]
pub struct RegistryProtocol {
// registerAddr: Registry
registries: Arc<HashMap<String, BoxRegistry>>,
registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
// providerUrl: Exporter
exporters: Arc<HashMap<String, BoxExporter>>,
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
// serviceName: registryUrls
services: HashMap<String, Vec<Url>>,
}

impl RegistryProtocol {
pub fn new() -> Self {
RegistryProtocol {
registries: Arc::new(HashMap::new()),
exporters: Arc::new(HashMap::new()),
registries: Arc::new(RwLock::new(HashMap::new())),
exporters: Arc::new(RwLock::new(HashMap::new())),
services: HashMap::new(),
}
}

pub fn get_registry(&self, url: Url) -> BoxRegistry {
// self.registries.clone().insert(url.location.clone(), Box::new(MemoryRegistry::default()));
pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) -> Self {
self.services.extend(services);
self
}

pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
let mem = MemoryRegistry::default();
self.registries
.write()
.unwrap()
.insert(url.location, Box::new(mem.clone()));

// *(self.registries.get(&url.location).unwrap())
Box::new(MemoryRegistry::default())
Box::new(mem)
}
}

Expand All @@ -60,14 +71,34 @@ impl Protocol for RegistryProtocol {
todo!()
}

async fn export(self, url: Url) -> BoxExporter {
async fn export(mut self, url: Url) -> BoxExporter {
// getProviderUrl
// getRegisterUrl
// init Exporter based on provider_url
// server registry based on register_url
// start server health check
Box::new(TripleExporter::new())
let registry_url = self.services.get(url.get_service_name().join(",").as_str());
if let Some(urls) = registry_url {
for url in urls.clone().iter() {
if !url.protocol.is_empty() {
let mut reg = self.get_registry(url.clone());
reg.register(url.clone()).unwrap();
}
}
}

match url.clone().protocol.as_str() {
"triple" => {
let pro = Box::new(TripleProtocol::new());
return pro.export(url).await;
}
_ => {
tracing::error!("protocol {:?} not implemented", url.protocol);
Box::new(TripleExporter::new())
}
}
}

async fn refer(self, url: Url) -> Self::Invoker {
// getRegisterUrl
// get Registry from registry_url
Expand All @@ -76,15 +107,3 @@ impl Protocol for RegistryProtocol {
Box::new(TripleInvoker::new(url))
}
}

fn get_registry_url(mut url: Url) -> Url {
if url.protocol == consts::REGISTRY_PROTOCOL {
url.protocol = url.get_param("registry".to_string()).unwrap();
}

url
}

fn get_provider_url(url: Url) -> Url {
url
}