Skip to content

Commit

Permalink
repl respect oids/oids_exclude and lazy_static removed
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Apr 29, 2024
1 parent fc7bcdf commit 21933e5
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 30 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion svc/repl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ tokio = { features = ["full"] , workspace = true }
async-trait = { workspace = true }
serde = { features = ["derive", "rc"] , workspace = true }
log = { workspace = true }
lazy_static = { workspace = true }
async-channel = { workspace = true }
jemallocator = { workspace = true }
psrpc = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions svc/repl/src/aaa.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use eva_common::acl::Acl;
use eva_sdk::prelude::*;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

lazy_static::lazy_static! {
pub static ref KEYS: Mutex<HashMap<String, String>> = <_>::default();
pub static ref ENC_OPTS: Mutex<HashMap<String, psrpc::options::Options>> = <_>::default();
pub static ref ACLS: Mutex<HashMap<String, Arc<Acl>>> = <_>::default();
}
pub static KEYS: Lazy<Mutex<HashMap<String, String>>> = Lazy::new(<_>::default);
pub static ENC_OPTS: Lazy<Mutex<HashMap<String, psrpc::options::Options>>> =
Lazy::new(<_>::default);
pub static ACLS: Lazy<Mutex<HashMap<String, Arc<Acl>>>> = Lazy::new(<_>::default);

pub async fn get_acl(rpc: &RpcClient, key_id: &str) -> EResult<Arc<Acl>> {
#[derive(Serialize)]
Expand Down
39 changes: 25 additions & 14 deletions svc/repl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ const DEFAULT_PING_INTERVAL: Duration = Duration::from_secs(10);
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

lazy_static::lazy_static! {
static ref BULK_SEND_CONFIG: OnceCell<BulkSendConfig> = <_>::default();
static ref BULK_STATE_TOPIC: OnceCell<String> = <_>::default();
static ref PUBSUB_RPC: OnceCell<Arc<psrpc::RpcClient>> = <_>::default();
static ref RPC: OnceCell<Arc<RpcClient>> = <_>::default();
static ref KEY_SVC: OnceCell<String> = <_>::default();
static ref SYSTEM_NAME: OnceCell<String> = <_>::default();
static ref TIMEOUT: OnceCell<Duration> = <_>::default();
static ref DEFAULT_KEY_ID: OnceCell<String> = <_>::default();
static ref REG: OnceCell<Registry> = <_>::default();
static ref HTTP_CLIENT: OnceCell<eva_sdk::http::Client> = <_>::default();
static ref PULL_DATA: OnceCell<nodes::PullData> = <_>::default();
static ref BULK_SECURE_TOPICS: OnceCell<HashSet<String>> = <_>::default();
static BULK_SEND_CONFIG: OnceCell<BulkSendConfig> = OnceCell::new();
static BULK_STATE_TOPIC: OnceCell<String> = OnceCell::new();
static PUBSUB_RPC: OnceCell<Arc<psrpc::RpcClient>> = OnceCell::new();
static RPC: OnceCell<Arc<RpcClient>> = OnceCell::new();
static KEY_SVC: OnceCell<String> = OnceCell::new();
static SYSTEM_NAME: OnceCell<String> = OnceCell::new();
static TIMEOUT: OnceCell<Duration> = OnceCell::new();
static DEFAULT_KEY_ID: OnceCell<String> = OnceCell::new();
static REG: OnceCell<Registry> = OnceCell::new();
static HTTP_CLIENT: OnceCell<eva_sdk::http::Client> = OnceCell::new();
static PULL_DATA: OnceCell<nodes::PullData> = OnceCell::new();
static BULK_SECURE_TOPICS: OnceCell<HashSet<String>> = OnceCell::new();

}
static OIDS: OnceCell<OIDMaskList> = OnceCell::new();
static OIDS_EXCLUDE: OnceCell<Vec<String>> = OnceCell::new();

static DISCOVERY_ENABLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
static SUBSCRIBE_EACH: atomic::AtomicBool = atomic::AtomicBool::new(false);
Expand Down Expand Up @@ -248,6 +248,17 @@ async fn main(mut initial: Initial) -> EResult<()> {
config.subscribe == SubscribeKind::Each,
atomic::Ordering::SeqCst,
);
OIDS.set(config.oids.clone())
.map_err(|_| Error::core("unable to set OIDS"))?;
let oids_exclude_s: Vec<String> = config
.oids_exclude
.oid_masks()
.iter()
.map(ToString::to_string)
.collect();
OIDS_EXCLUDE
.set(oids_exclude_s)
.map_err(|_| Error::core("unable to set OIDS_EXCLUDE"))?;
let qos = config.pubsub.qos;
let (sender_tx, sender_rx) = async_channel::bounded(config.pubsub.queue_size);
let mut info = ServiceInfo::new(AUTHOR, VERSION, DESCRIPTION);
Expand Down
9 changes: 4 additions & 5 deletions svc/repl/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use eva_common::events::{
};
use eva_common::prelude::*;
use eva_sdk::prelude::*;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic;
Expand All @@ -22,11 +23,9 @@ pub struct PullData {
pub items: Option<Vec<ReplicationInventoryItem>>,
}

lazy_static::lazy_static! {
pub static ref NODES: RwLock<HashMap<String, Node>> = <_>::default();
pub static ref RELOAD_TRIGGERS: RwLock<HashMap<String,
async_channel::Sender<bool>>> = <_>::default();
}
pub static NODES: Lazy<RwLock<HashMap<String, Node>>> = Lazy::new(<_>::default);
pub static RELOAD_TRIGGERS: Lazy<RwLock<HashMap<String, async_channel::Sender<bool>>>> =
Lazy::new(<_>::default);

#[allow(clippy::too_many_lines)]
async fn reload_node(
Expand Down
17 changes: 13 additions & 4 deletions svc/repl/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use eva_common::acl::OIDMaskList;
use eva_common::common_payloads::ParamsIdOwned;
use eva_common::events::{
FullItemStateAndInfoOwned, ReplicationStateEvent, REPLICATION_STATE_TOPIC,
Expand Down Expand Up @@ -75,25 +76,33 @@ impl psrpc::RpcHandlers for PubSubHandlers {
} else {
#[derive(Serialize)]
struct ListPayload<'a> {
i: &'a str,
i: &'a OIDMaskList,
#[serde(skip_serializing_if = "Option::is_none")]
node: Option<&'a str>,
include: Vec<String>,
exclude: Vec<String>,
exclude: Vec<&'a str>,
}
let rpc = crate::RPC.get().unwrap();
let acl = aaa::get_acl(rpc, key_id).await?;
let mut data = crate::PULL_DATA.get().unwrap().clone();
let (allow, deny) = acl.get_items_allow_deny_reading();
let mut exclude = deny.iter().map(String::as_str).collect::<Vec<_>>();
exclude.extend(
crate::OIDS_EXCLUDE
.get()
.unwrap()
.iter()
.map(String::as_str),
);
let payload = ListPayload {
i: "#",
i: crate::OIDS.get().unwrap(),
node: if self.replicate_remote {
None
} else {
Some(".local")
},
include: allow,
exclude: deny,
exclude,
};
let items: Vec<FullItemStateAndInfoOwned> = unpack(
safe_rpc_call(
Expand Down

0 comments on commit 21933e5

Please sign in to comment.