Skip to content

Commit ff0d063

Browse files
Respect liveliness subscriber history when processing token declarations (#2160)
* Consider liveliness sub history when processing token declarations * Add regression test * Remove unnecessary Option wrappers * Address review comments --------- Co-authored-by: OlivierHecart <olivier.hecart@adlinktech.com>
1 parent b7f90a7 commit ff0d063

File tree

4 files changed

+119
-3
lines changed

4 files changed

+119
-3
lines changed

zenoh/src/api/admin.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ impl Handler {
221221
..Put::default()
222222
}))
223223
},
224+
false,
224225
#[cfg(feature = "unstable")]
225226
Reliability::Reliable,
226227
);
@@ -280,6 +281,7 @@ impl PeerHandler {
280281
&self.expr.clone().with_suffix(suffix),
281282
Default::default(),
282283
msg,
284+
false,
283285
#[cfg(feature = "unstable")]
284286
Reliability::Reliable,
285287
);

zenoh/src/api/session.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ impl SessionState {
338338
key_expr: key_expr.clone().into_owned(),
339339
origin,
340340
callback,
341+
history: false,
341342
};
342343

343344
let declared_sub = origin != Locality::SessionLocal;
@@ -1709,6 +1710,7 @@ impl SessionInner {
17091710
key_expr: key_expr.clone().into_owned(),
17101711
origin,
17111712
callback: callback.clone(),
1713+
history,
17121714
};
17131715

17141716
let sub_state = Arc::new(sub_state);
@@ -1975,6 +1977,7 @@ impl SessionInner {
19751977
wire_expr: &WireExpr,
19761978
qos: push::ext::QoSType,
19771979
msg: impl FnOnce() -> &'a mut PushBody,
1980+
historical: bool,
19781981
#[cfg(feature = "unstable")] reliability: Reliability,
19791982
) {
19801983
let mut callbacks = SingleOrVec::default();
@@ -1985,9 +1988,10 @@ impl SessionInner {
19851988
if wire_expr.suffix.is_empty() {
19861989
match state.get_res(&wire_expr.scope, wire_expr.mapping, local) {
19871990
Some(Resource::Node(res)) => {
1988-
for sub in res.subscribers(kind) {
1989-
if sub.origin == Locality::Any
1990-
|| (local == (sub.origin == Locality::SessionLocal))
1991+
for sub in res.subscribers(kind).iter() {
1992+
if (sub.origin == Locality::Any
1993+
|| (local == (sub.origin == Locality::SessionLocal)))
1994+
&& (sub.history || (!historical))
19911995
{
19921996
callbacks.push((sub.callback.clone(), res.key_expr.clone().into()));
19931997
}
@@ -2011,6 +2015,7 @@ impl SessionInner {
20112015
for sub in state.subscribers(kind).values() {
20122016
if (sub.origin == Locality::Any
20132017
|| (local == (sub.origin == Locality::SessionLocal)))
2018+
&& (sub.history || (!historical))
20142019
&& key_expr.intersects(&sub.key_expr)
20152020
{
20162021
callbacks.push((sub.callback.clone(), key_expr.clone().into_owned()));
@@ -2115,6 +2120,7 @@ impl SessionInner {
21152120
}
21162121
&mut push.payload
21172122
},
2123+
false,
21182124
#[cfg(feature = "unstable")]
21192125
reliability,
21202126
);
@@ -2550,6 +2556,9 @@ impl Primitives for WeakSession {
25502556
&m.wire_expr,
25512557
Default::default(),
25522558
|| body.insert(Put::default().into()),
2559+
// interest_id is set if the Token is an Interest::Current.
2560+
// This is used to decide if subs with history=false should be called or not
2561+
msg.interest_id.is_some(),
25532562
#[cfg(feature = "unstable")]
25542563
Reliability::Reliable,
25552564
);
@@ -2567,6 +2576,10 @@ impl Primitives for WeakSession {
25672576
if state.primitives.is_none() {
25682577
return; // Session closing or closed
25692578
}
2579+
// interest_id is set if the Token is an Interest::Current.
2580+
// This is used to decide if liveliness subs with history=false should be called or not
2581+
// NOTE: an UndeclareToken is most likely not an Interest::Current
2582+
let interest_current = msg.interest_id.is_some();
25702583
if let Some(key_expr) = state.remote_tokens.remove(&m.id) {
25712584
drop(state);
25722585
let mut body = None;
@@ -2576,6 +2589,7 @@ impl Primitives for WeakSession {
25762589
&key_expr.to_wire(self),
25772590
Default::default(),
25782591
|| body.insert(Del::default().into()),
2592+
interest_current,
25792593
#[cfg(feature = "unstable")]
25802594
Reliability::Reliable,
25812595
);
@@ -2593,6 +2607,7 @@ impl Primitives for WeakSession {
25932607
&key_expr.to_wire(self),
25942608
Default::default(),
25952609
|| body.insert(Del::default().into()),
2610+
interest_current,
25962611
#[cfg(feature = "unstable")]
25972612
Reliability::Reliable,
25982613
);
@@ -2625,6 +2640,7 @@ impl Primitives for WeakSession {
26252640
&msg.wire_expr,
26262641
msg.ext_qos,
26272642
|| &mut msg.payload,
2643+
false,
26282644
#[cfg(feature = "unstable")]
26292645
_reliability,
26302646
);

zenoh/src/api/subscriber.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub(crate) struct SubscriberState {
3737
pub(crate) key_expr: KeyExpr<'static>,
3838
pub(crate) origin: Locality,
3939
pub(crate) callback: Callback<Sample>,
40+
pub(crate) history: bool,
4041
}
4142

4243
impl fmt::Debug for SubscriberState {

zenoh/tests/liveliness.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4746,3 +4746,100 @@ async fn test_liveliness_double_undeclare_clique() {
47464746
peer1.close().await.unwrap();
47474747
peer2.close().await.unwrap();
47484748
}
4749+
4750+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
4751+
async fn test_liveliness_sub_history_conflict() {
4752+
// https://github.com/eclipse-zenoh/zenoh/issues/2071
4753+
use std::time::Duration;
4754+
4755+
use zenoh::sample::SampleKind;
4756+
use zenoh_config::WhatAmI;
4757+
use zenoh_link::EndPoint;
4758+
4759+
const TIMEOUT: Duration = Duration::from_secs(60);
4760+
const SLEEP: Duration = Duration::from_secs(1);
4761+
const ROUTER_ENDPOINT: &str = "tcp/localhost:30516";
4762+
const LIVELINESS_KEYEXPR: &str = "test/liveliness/history/conflict";
4763+
4764+
zenoh_util::init_log_from_env_or("error");
4765+
4766+
let router = {
4767+
let mut c = zenoh::Config::default();
4768+
c.listen
4769+
.endpoints
4770+
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
4771+
.unwrap();
4772+
c.scouting.multicast.set_enabled(Some(false)).unwrap();
4773+
let _ = c.set_mode(Some(WhatAmI::Router));
4774+
let s = ztimeout!(zenoh::open(c)).unwrap();
4775+
tracing::info!("Router ZID: {}", s.zid());
4776+
s
4777+
};
4778+
4779+
let client_tok = {
4780+
let mut c = zenoh::Config::default();
4781+
c.connect
4782+
.endpoints
4783+
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
4784+
.unwrap();
4785+
c.scouting.multicast.set_enabled(Some(false)).unwrap();
4786+
let _ = c.set_mode(Some(WhatAmI::Peer));
4787+
let s = ztimeout!(zenoh::open(c)).unwrap();
4788+
tracing::info!("Client (token) ZID: {}", s.zid());
4789+
s
4790+
};
4791+
4792+
let token = ztimeout!(client_tok.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
4793+
tokio::time::sleep(SLEEP).await;
4794+
4795+
let client_subs = {
4796+
let mut c = zenoh::Config::default();
4797+
c.connect
4798+
.endpoints
4799+
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
4800+
.unwrap();
4801+
c.scouting.multicast.set_enabled(Some(false)).unwrap();
4802+
let _ = c.set_mode(Some(WhatAmI::Client));
4803+
let s = ztimeout!(zenoh::open(c)).unwrap();
4804+
tracing::info!("Client (subs) ZID: {}", s.zid());
4805+
s
4806+
};
4807+
4808+
let sub_no_history = ztimeout!(client_subs
4809+
.liveliness()
4810+
.declare_subscriber(LIVELINESS_KEYEXPR)
4811+
.history(false))
4812+
.unwrap();
4813+
tokio::time::sleep(SLEEP).await;
4814+
4815+
assert!(sub_no_history.try_recv().unwrap().is_none());
4816+
4817+
let sub_history = ztimeout!(client_subs
4818+
.liveliness()
4819+
.declare_subscriber(LIVELINESS_KEYEXPR)
4820+
.history(true))
4821+
.unwrap();
4822+
tokio::time::sleep(SLEEP).await;
4823+
4824+
let sample = ztimeout!(sub_history.recv_async()).unwrap();
4825+
assert!(sample.kind() == SampleKind::Put);
4826+
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);
4827+
assert!(sub_history.try_recv().unwrap().is_none());
4828+
4829+
assert!(sub_no_history.try_recv().unwrap().is_none());
4830+
4831+
token.undeclare().await.unwrap();
4832+
tokio::time::sleep(SLEEP).await;
4833+
4834+
let sample = ztimeout!(sub_history.recv_async()).unwrap();
4835+
assert!(sample.kind() == SampleKind::Delete);
4836+
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);
4837+
4838+
let sample = ztimeout!(sub_no_history.recv_async()).unwrap();
4839+
assert!(sample.kind() == SampleKind::Delete);
4840+
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);
4841+
4842+
client_tok.close().await.unwrap();
4843+
client_subs.close().await.unwrap();
4844+
router.close().await.unwrap();
4845+
}

0 commit comments

Comments
 (0)