Skip to content

Commit 409ce35

Browse files
Improve scalability (#2174)
* Avoid matching initerests when possible * Ignore aggregate flag in token interests * Avoid computing local qabl info when possible * Avoid computing local qabl info when possible (b) * Propagate all tokens to local faces (peers) * Address review comments
1 parent ff0d063 commit 409ce35

File tree

12 files changed

+381
-614
lines changed

12 files changed

+381
-614
lines changed

zenoh/src/net/routing/hat/client/interests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,13 @@ impl HatInterestTrait for HatCode {
8787
send_declare: &mut SendDeclare,
8888
) {
8989
if options.tokens() {
90+
// Note: aggregation is forbidden for tokens. The flag is ignored.
9091
declare_token_interest(
9192
tables,
9293
face,
9394
id,
9495
res.as_deref().cloned().as_mut(),
9596
mode,
96-
options.aggregate(),
9797
send_declare,
9898
)
9999
}

zenoh/src/net/routing/hat/client/queries.rs

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ fn merge_qabl_infos(mut this: QueryableInfoType, info: &QueryableInfoType) -> Qu
5050
this
5151
}
5252

53+
#[inline]
5354
fn local_qabl_info(
5455
_tables: &Tables,
5556
res: &Arc<Resource>,
@@ -74,6 +75,37 @@ fn local_qabl_info(
7475
.unwrap_or(QueryableInfoType::DEFAULT)
7576
}
7677

78+
#[inline]
79+
fn send_declare_queryable(
80+
dst_face: &mut Arc<FaceState>,
81+
res: &Arc<Resource>,
82+
id: u32,
83+
info: QueryableInfoType,
84+
send_declare: &mut SendDeclare,
85+
) {
86+
face_hat_mut!(dst_face)
87+
.local_qabls
88+
.insert(res.clone(), (id, info));
89+
let key_expr = Resource::decl_key(res, dst_face, true);
90+
send_declare(
91+
&dst_face.primitives,
92+
RoutingContext::with_expr(
93+
Declare {
94+
interest_id: None,
95+
ext_qos: ext::QoSType::DECLARE,
96+
ext_tstamp: None,
97+
ext_nodeid: ext::NodeIdType::DEFAULT,
98+
body: DeclareBody::DeclareQueryable(DeclareQueryable {
99+
id,
100+
wire_expr: key_expr,
101+
ext_info: info,
102+
}),
103+
},
104+
res.expr().to_string(),
105+
),
106+
);
107+
}
108+
77109
fn propagate_simple_queryable(
78110
tables: &mut Tables,
79111
res: &Arc<Resource>,
@@ -82,44 +114,25 @@ fn propagate_simple_queryable(
82114
) {
83115
let faces = tables.faces.values().cloned();
84116
for mut dst_face in faces {
85-
let info = local_qabl_info(tables, res, &dst_face);
86-
let current = face_hat!(dst_face).local_qabls.get(res);
87117
if src_face
88118
.as_ref()
89-
.map(|src_face| dst_face.id != src_face.id)
119+
.map(|src_face| {
120+
dst_face.id != src_face.id
121+
&& (src_face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client)
122+
})
90123
.unwrap_or(true)
91-
&& (current.is_none() || current.unwrap().1 != info)
92-
&& src_face
93-
.as_ref()
94-
.map(|src_face| {
95-
src_face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client
96-
})
97-
.unwrap_or(true)
98124
{
99-
let id = current
100-
.map(|c| c.0)
101-
.unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst));
102-
face_hat_mut!(&mut dst_face)
103-
.local_qabls
104-
.insert(res.clone(), (id, info));
105-
let key_expr = Resource::decl_key(res, &mut dst_face, true);
106-
send_declare(
107-
&dst_face.primitives,
108-
RoutingContext::with_expr(
109-
Declare {
110-
interest_id: None,
111-
ext_qos: ext::QoSType::DECLARE,
112-
ext_tstamp: None,
113-
ext_nodeid: ext::NodeIdType::DEFAULT,
114-
body: DeclareBody::DeclareQueryable(DeclareQueryable {
115-
id,
116-
wire_expr: key_expr,
117-
ext_info: info,
118-
}),
119-
},
120-
res.expr().to_string(),
121-
),
122-
);
125+
if let Some(&(current_id, current_info)) = face_hat!(dst_face).local_qabls.get(res) {
126+
let info = local_qabl_info(tables, res, &dst_face);
127+
if current_info != info {
128+
let id = current_id;
129+
send_declare_queryable(&mut dst_face, res, id, info, send_declare);
130+
}
131+
} else {
132+
let info = local_qabl_info(tables, res, &dst_face);
133+
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
134+
send_declare_queryable(&mut dst_face, res, id, info, send_declare);
135+
}
123136
}
124137
}
125138
}

zenoh/src/net/routing/hat/client/token.rs

Lines changed: 24 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -322,64 +322,35 @@ pub(crate) fn declare_token_interest(
322322
id: InterestId,
323323
res: Option<&mut Arc<Resource>>,
324324
mode: InterestMode,
325-
aggregate: bool,
326325
send_declare: &mut SendDeclare,
327326
) {
328327
if mode.current() {
329328
let interest_id = (!mode.future()).then_some(id);
330329
if let Some(res) = res.as_ref() {
331-
if aggregate {
332-
if tables.faces.values().any(|src_face| {
333-
face_hat!(src_face)
334-
.remote_tokens
335-
.values()
336-
.any(|token| token.context.is_some() && token.matches(res))
337-
}) {
338-
let id = make_token_id(res, face, mode);
339-
let wire_expr = Resource::decl_key(res, face, true);
340-
send_declare(
341-
&face.primitives,
342-
RoutingContext::with_expr(
343-
Declare {
344-
interest_id,
345-
ext_qos: ext::QoSType::DECLARE,
346-
ext_tstamp: None,
347-
ext_nodeid: ext::NodeIdType::DEFAULT,
348-
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
349-
},
350-
res.expr().to_string(),
351-
),
352-
);
353-
}
354-
} else {
355-
for src_face in tables
356-
.faces
357-
.values()
358-
.filter(|f| f.whatami == WhatAmI::Client)
359-
.cloned()
360-
.collect::<Vec<Arc<FaceState>>>()
361-
{
362-
for token in face_hat!(src_face).remote_tokens.values() {
363-
if token.context.is_some() && token.matches(res) {
364-
let id = make_token_id(token, face, mode);
365-
let wire_expr = Resource::decl_key(token, face, true);
366-
send_declare(
367-
&face.primitives,
368-
RoutingContext::with_expr(
369-
Declare {
370-
interest_id,
371-
ext_qos: ext::QoSType::default(),
372-
ext_tstamp: None,
373-
ext_nodeid: ext::NodeIdType::default(),
374-
body: DeclareBody::DeclareToken(DeclareToken {
375-
id,
376-
wire_expr,
377-
}),
378-
},
379-
res.expr().to_string(),
380-
),
381-
)
382-
}
330+
for src_face in tables
331+
.faces
332+
.values()
333+
.filter(|f| f.whatami == WhatAmI::Client)
334+
.cloned()
335+
.collect::<Vec<Arc<FaceState>>>()
336+
{
337+
for token in face_hat!(src_face).remote_tokens.values() {
338+
if token.context.is_some() && token.matches(res) {
339+
let id = make_token_id(token, face, mode);
340+
let wire_expr = Resource::decl_key(token, face, true);
341+
send_declare(
342+
&face.primitives,
343+
RoutingContext::with_expr(
344+
Declare {
345+
interest_id,
346+
ext_qos: ext::QoSType::default(),
347+
ext_tstamp: None,
348+
ext_nodeid: ext::NodeIdType::default(),
349+
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
350+
},
351+
res.expr().to_string(),
352+
),
353+
)
383354
}
384355
}
385356
}

zenoh/src/net/routing/hat/linkstate_peer/interests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ impl HatInterestTrait for HatCode {
7070
)
7171
}
7272
if options.tokens() {
73+
// Note: aggregation is forbidden for tokens. The flag is ignored.
7374
declare_token_interest(
7475
tables,
7576
face,
7677
id,
7778
res.as_ref().map(|r| (*r).clone()).as_mut(),
7879
mode,
79-
options.aggregate(),
8080
send_declare,
8181
)
8282
}

zenoh/src/net/routing/hat/linkstate_peer/queries.rs

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ fn merge_qabl_infos(mut this: QueryableInfoType, info: &QueryableInfoType) -> Qu
6161
this
6262
}
6363

64+
#[inline]
6465
fn local_peer_qabl_info(_tables: &Tables, res: &Arc<Resource>) -> QueryableInfoType {
6566
res.session_ctxs
6667
.values()
@@ -77,6 +78,7 @@ fn local_peer_qabl_info(_tables: &Tables, res: &Arc<Resource>) -> QueryableInfoT
7778
.unwrap_or(QueryableInfoType::DEFAULT)
7879
}
7980

81+
#[inline]
8082
fn local_qabl_info(
8183
tables: &Tables,
8284
res: &Arc<Resource>,
@@ -166,6 +168,37 @@ fn send_sourced_queryable_to_net_children(
166168
}
167169
}
168170

171+
#[inline]
172+
fn send_declare_queryable(
173+
dst_face: &mut Arc<FaceState>,
174+
res: &Arc<Resource>,
175+
id: u32,
176+
info: QueryableInfoType,
177+
send_declare: &mut SendDeclare,
178+
) {
179+
face_hat_mut!(dst_face)
180+
.local_qabls
181+
.insert(res.clone(), (id, info));
182+
let key_expr = Resource::decl_key(res, dst_face, super::push_declaration_profile(dst_face));
183+
send_declare(
184+
&dst_face.primitives,
185+
RoutingContext::with_expr(
186+
Declare {
187+
interest_id: None,
188+
ext_qos: ext::QoSType::DECLARE,
189+
ext_tstamp: None,
190+
ext_nodeid: ext::NodeIdType::DEFAULT,
191+
body: DeclareBody::DeclareQueryable(DeclareQueryable {
192+
id,
193+
wire_expr: key_expr,
194+
ext_info: info,
195+
}),
196+
},
197+
res.expr().to_string(),
198+
),
199+
);
200+
}
201+
169202
fn propagate_simple_queryable(
170203
tables: &mut Tables,
171204
res: &Arc<Resource>,
@@ -174,44 +207,32 @@ fn propagate_simple_queryable(
174207
) {
175208
let faces = tables.faces.values().cloned();
176209
for mut dst_face in faces {
177-
let info = local_qabl_info(tables, res, &dst_face);
178-
let current = face_hat!(dst_face).local_qabls.get(res);
179210
if src_face
180211
.as_ref()
181212
.map(|src_face| dst_face.id != src_face.id)
182213
.unwrap_or(true)
183-
&& (current.is_none() || current.unwrap().1 != info)
184214
&& dst_face.whatami == WhatAmI::Client
185-
&& face_hat!(dst_face)
215+
{
216+
if let Some(&(current_id, current_info)) = face_hat!(dst_face).local_qabls.get(res) {
217+
let info = local_qabl_info(tables, res, &dst_face);
218+
if current_info != info
219+
&& face_hat!(dst_face)
220+
.remote_interests
221+
.values()
222+
.any(|i| i.options.queryables() && i.matches(res))
223+
{
224+
let id = current_id;
225+
send_declare_queryable(&mut dst_face, res, id, info, send_declare);
226+
}
227+
} else if face_hat!(dst_face)
186228
.remote_interests
187229
.values()
188230
.any(|i| i.options.queryables() && i.matches(res))
189-
{
190-
let id = current
191-
.map(|c| c.0)
192-
.unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst));
193-
face_hat_mut!(&mut dst_face)
194-
.local_qabls
195-
.insert(res.clone(), (id, info));
196-
let push_declaration = push_declaration_profile(&dst_face);
197-
let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration);
198-
send_declare(
199-
&dst_face.primitives,
200-
RoutingContext::with_expr(
201-
Declare {
202-
interest_id: None,
203-
ext_qos: ext::QoSType::DECLARE,
204-
ext_tstamp: None,
205-
ext_nodeid: ext::NodeIdType::DEFAULT,
206-
body: DeclareBody::DeclareQueryable(DeclareQueryable {
207-
id,
208-
wire_expr: key_expr,
209-
ext_info: info,
210-
}),
211-
},
212-
res.expr().to_string(),
213-
),
214-
);
231+
{
232+
let info = local_qabl_info(tables, res, &dst_face);
233+
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
234+
send_declare_queryable(&mut dst_face, res, id, info, send_declare);
235+
}
215236
}
216237
}
217238
}

0 commit comments

Comments
 (0)