From 77a2e964a220459850339efe0915970cb4daf596 Mon Sep 17 00:00:00 2001 From: onewe Date: Mon, 20 Feb 2023 18:28:33 +0800 Subject: [PATCH] =?UTF-8?q?Imp:=20remove=20GAT=20in=20the=20Registry=20tra?= =?UTF-8?q?it=E2=80=98s=20definition?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close #115 --- dubbo/src/cluster/directory.rs | 4 +-- dubbo/src/registry/memory_registry.rs | 8 ++---- dubbo/src/registry/mod.rs | 22 +++++++++------ dubbo/src/registry/types.rs | 10 +++---- registry-nacos/src/nacos_registry.rs | 12 ++++---- registry-zookeeper/src/zookeeper_registry.rs | 29 ++++++++------------ 6 files changed, 42 insertions(+), 43 deletions(-) diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index 02946a93..2879de4d 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -136,9 +136,9 @@ impl Directory for RegistryDirectory { .expect("msg") .subscribe( url, - MemoryNotifyListener { + Arc::new(MemoryNotifyListener { service_instances: Arc::clone(&self.service_instances), - }, + }), ) .expect("subscribe"); diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs index 61d3e413..b41a0e0a 100644 --- a/dubbo/src/registry/memory_registry.rs +++ b/dubbo/src/registry/memory_registry.rs @@ -25,7 +25,7 @@ use tracing::debug; use crate::common::url::Url; -use super::{NotifyListener, Registry}; +use super::{NotifyListener, Registry, RegistryNotifyListener}; // 从url中获取服务注册的元数据 /// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s) @@ -47,8 +47,6 @@ impl MemoryRegistry { } impl Registry for MemoryRegistry { - type NotifyListener = MemoryNotifyListener; - fn register(&mut self, mut url: Url) -> Result<(), crate::StdError> { // define provider label: ${registry.group}/${service_name}/provider let registry_group = match url.get_param(REGISTRY_GROUP_KEY) { @@ -91,7 +89,7 @@ impl Registry for MemoryRegistry { fn subscribe( &self, url: crate::common::url::Url, - listener: Self::NotifyListener, + listener: RegistryNotifyListener, ) -> Result<(), crate::StdError> { todo!() } @@ -99,7 +97,7 @@ impl Registry for MemoryRegistry { fn unsubscribe( &self, url: crate::common::url::Url, - listener: Self::NotifyListener, + listener: RegistryNotifyListener, ) -> Result<(), crate::StdError> { todo!() } diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index 6352cf1f..44835882 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -21,18 +21,24 @@ pub mod memory_registry; pub mod protocol; pub mod types; -use std::fmt::{Debug, Formatter}; +use std::{ + fmt::{Debug, Formatter}, + sync::Arc, +}; -use crate::{common::url::Url, registry::memory_registry::MemoryNotifyListener}; +use crate::common::url::Url; +pub type RegistryNotifyListener = Arc; pub trait Registry { - type NotifyListener; - fn register(&mut self, url: Url) -> Result<(), crate::StdError>; fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>; - fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), crate::StdError>; - fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), crate::StdError>; + fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>; + fn unsubscribe( + &self, + url: Url, + listener: RegistryNotifyListener, + ) -> Result<(), crate::StdError>; } pub trait NotifyListener { @@ -47,7 +53,7 @@ pub struct ServiceEvent { pub service: Vec, } -pub type BoxRegistry = Box + Send + Sync>; +pub type BoxRegistry = Box; impl Debug for BoxRegistry { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -57,7 +63,7 @@ impl Debug for BoxRegistry { #[derive(Default)] pub struct RegistryWrapper { - pub registry: Option>>, + pub registry: Option>, } impl Clone for RegistryWrapper { diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs index 77d253ab..8b978645 100644 --- a/dubbo/src/registry/types.rs +++ b/dubbo/src/registry/types.rs @@ -25,10 +25,12 @@ use tracing::info; use crate::{ common::url::Url, - registry::{memory_registry::MemoryNotifyListener, BoxRegistry, Registry}, + registry::{BoxRegistry, Registry}, StdError, }; +use super::RegistryNotifyListener; + pub type SafeRegistry = Arc>; pub type Registries = Arc>>; @@ -66,8 +68,6 @@ impl RegistriesOperation for Registries { } impl Registry for SafeRegistry { - type NotifyListener = MemoryNotifyListener; - fn register(&mut self, url: Url) -> Result<(), StdError> { info!("register {}.", url); self.lock().unwrap().register(url).expect("registry err."); @@ -79,12 +79,12 @@ impl Registry for SafeRegistry { Ok(()) } - fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { + fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> { self.lock().unwrap().register(url).expect("registry err."); Ok(()) } - fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { + fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> { self.lock().unwrap().register(url).expect("registry err."); Ok(()) } diff --git a/registry-nacos/src/nacos_registry.rs b/registry-nacos/src/nacos_registry.rs index 042d6f2f..888382f0 100644 --- a/registry-nacos/src/nacos_registry.rs +++ b/registry-nacos/src/nacos_registry.rs @@ -23,7 +23,7 @@ use std::{ use anyhow::anyhow; use dubbo::{ common::url::Url, - registry::{NotifyListener, Registry, ServiceEvent}, + registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent}, }; use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance}; use tracing::{error, info, warn}; @@ -139,8 +139,6 @@ impl NacosRegistry { } impl Registry for NacosRegistry { - type NotifyListener = Arc; - fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> { let side = url.get_param(SIDE_KEY).unwrap_or_default(); let register_consumer = url @@ -205,7 +203,7 @@ impl Registry for NacosRegistry { Ok(()) } - fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), dubbo::StdError> { + fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), dubbo::StdError> { let service_name = NacosServiceName::new(&url); let url_str = url.to_url(); @@ -252,7 +250,11 @@ impl Registry for NacosRegistry { Ok(()) } - fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), dubbo::StdError> { + fn unsubscribe( + &self, + url: Url, + listener: RegistryNotifyListener, + ) -> Result<(), dubbo::StdError> { let service_name = NacosServiceName::new(&url); let url_str = url.to_url(); info!("unsubscribe: {}", &url_str); diff --git a/registry-zookeeper/src/zookeeper_registry.rs b/registry-zookeeper/src/zookeeper_registry.rs index a9fc9ecc..b31d25c1 100644 --- a/registry-zookeeper/src/zookeeper_registry.rs +++ b/registry-zookeeper/src/zookeeper_registry.rs @@ -37,9 +37,8 @@ use dubbo::{ url::Url, }, registry::{ - integration::ClusterRegistryIntegration, - memory_registry::{MemoryNotifyListener, MemoryRegistry}, - NotifyListener, Registry, ServiceEvent, + integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener, + Registry, RegistryNotifyListener, ServiceEvent, }, StdError, }; @@ -61,7 +60,7 @@ impl Watcher for LoggingWatcher { pub struct ZookeeperRegistry { root_path: String, zk_client: Arc, - listeners: RwLock::NotifyListener>>>, + listeners: RwLock>, memory_registry: Arc>, } @@ -103,7 +102,7 @@ impl ZookeeperRegistry { &self, path: String, service_name: String, - listener: Arc<::NotifyListener>, + listener: RegistryNotifyListener, ) -> ServiceInstancesChangedListener { let mut service_names = HashSet::new(); service_names.insert(service_name.clone()); @@ -240,8 +239,6 @@ impl Default for ZookeeperRegistry { } impl Registry for ZookeeperRegistry { - type NotifyListener = MemoryNotifyListener; - fn register(&mut self, url: Url) -> Result<(), StdError> { println!("register url: {}", url); let zk_path = format!( @@ -268,7 +265,7 @@ impl Registry for ZookeeperRegistry { } // for consumer to find the changes of providers - fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { + fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> { let service_name = url.get_service_name(); let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY); if self @@ -281,17 +278,13 @@ impl Registry for ZookeeperRegistry { return Ok(()); } - let arc_listener = Arc::new(listener); self.listeners .write() .unwrap() - .insert(service_name.to_string(), Arc::clone(&arc_listener)); + .insert(service_name.to_string(), listener.clone()); - let zk_listener = self.create_listener( - zk_path.clone(), - service_name.to_string(), - Arc::clone(&arc_listener), - ); + let zk_listener = + self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone()); let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener); let result = match zk_changed_paths { @@ -312,7 +305,7 @@ impl Registry for ZookeeperRegistry { .collect(), }; info!("notifying {}->{:?}", service_name, result); - arc_listener.notify(ServiceEvent { + listener.notify(ServiceEvent { key: service_name, action: String::from("ADD"), service: result, @@ -320,7 +313,7 @@ impl Registry for ZookeeperRegistry { Ok(()) } - fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> { + fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> { todo!() } } @@ -329,7 +322,7 @@ pub struct ServiceInstancesChangedListener { zk_client: Arc, path: String, service_name: String, - listener: Arc, + listener: RegistryNotifyListener, } impl Watcher for ServiceInstancesChangedListener {