Skip to content

Commit babc2b1

Browse files
[ZEN-547] Add support for link weights (#1914)
* add support for link weights on router * clippy fixes * add tests * change zid type to ZenohId in config * clippy fixes * increase delay in tests * fix test * clippy fix * add method to update link weights * add tests for link weights update * clippy fix * clippy fix * clippy fix * take the average weight when have conflicting weights on different nodes * clippy fixes * support linkstate weights in peers network * fix interceptor cache, network weights test interaction * add test for linkstate peers weights * expose information about link weights to adminspace * clippy fixes * default config update * use greater of 2 weights if they are set by both source and destination * rename link_weights to transport weights in the config; update documentation; * move transport_weights under linkstate field in the config * Update conf doc * Rename field destination_zid to dst_zid * Code style * Update trace * Rename fields in LinkInfo * fix docs --------- Co-authored-by: OlivierHecart <olivier.hecart@adlinktech.com>
1 parent 042e0ad commit babc2b1

File tree

24 files changed

+1736
-1440
lines changed

24 files changed

+1736
-1440
lines changed

DEFAULT_CONFIG.json5

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,34 @@
217217
/// The failover brokering only works if gossip discovery is enabled
218218
/// and peers are configured with gossip target "router".
219219
peers_failover_brokering: true,
220+
/// Linkstate mode configuration.
221+
linkstate: {
222+
/// Weights of the outgoing transports in linkstate mode.
223+
/// If none of the two endpoint nodes of a transport specifies its weight, a weight of 100 is applied.
224+
/// If only one of the two endpoint nodes of a transport specifies its weight, the specified weight is applied.
225+
/// If both endpoint nodes of a transport specify its weight, the greater weight is applied.
226+
// transport_weights: [
227+
// { dst_zid: "1", weight: "10" },
228+
// { dst_zid: "2", weight: "200" },
229+
// ]
230+
}
220231
},
221232
/// The routing strategy to use in peers and it's configuration.
222233
peer: {
223234
/// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
235+
/// This option needs to be set to the same value in all peers and routers of the subsystem.
224236
mode: "peer_to_peer",
237+
/// Linkstate mode configuration (only taken into account if mode == "linkstate").
238+
linkstate: {
239+
/// Weights of the outgoing transports in linkstate mode.
240+
/// If none of the two endpoint nodes of a transport specifies its weight, a weight of 100 is applied.
241+
/// If only one of the two endpoint nodes of a transport specifies its weight, the specified weight is applied.
242+
/// If both endpoint nodes of a transport specify its weight, the greater weight is applied.
243+
// transport_weights: [
244+
// { dst_zid: "1", weight: "10" },
245+
// { dst_zid: "2", weight: "200" },
246+
// ]
247+
}
225248
},
226249
/// The interests-based routing configuration.
227250
/// This configuration applies regardless of the mode (router, peer or client).

commons/zenoh-config/src/lib.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ pub mod wrappers;
2828
use std::convert::TryFrom;
2929
// This is a false positive from the rust analyser
3030
use std::{
31-
any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak,
31+
any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, num::NonZeroU16, ops,
32+
path::Path, sync::Weak,
3233
};
3334

3435
use include::recursive_include;
@@ -82,6 +83,14 @@ impl Zeroize for SecretString {
8283

8384
pub type SecretValue = Secret<SecretString>;
8485

86+
#[derive(Debug, Deserialize, Serialize, Clone)]
87+
pub struct TransportWeight {
88+
/// A zid of destination node.
89+
pub dst_zid: ZenohId,
90+
/// A weight of link from this node to the destination.
91+
pub weight: NonZeroU16,
92+
}
93+
8594
#[derive(Debug, Deserialize, Serialize, Clone, Copy, Eq, PartialEq)]
8695
#[serde(rename_all = "lowercase")]
8796
pub enum InterceptorFlow {
@@ -458,12 +467,24 @@ validated_struct::validator! {
458467
/// connected to each other.
459468
/// The failover brokering only works if gossip discovery is enabled.
460469
peers_failover_brokering: Option<bool>,
470+
/// Linkstate mode configuration.
471+
pub linkstate: #[derive(Default)]
472+
LinkstateConf {
473+
/// Weights of the outgoing links in linkstate mode.
474+
/// If none of the two endpoint nodes of a transport specifies its weight, a weight of 100 is applied.
475+
/// If only one of the two endpoint nodes of a transport specifies its weight, the specified weight is applied.
476+
/// If both endpoint nodes of a transport specify its weight, the greater weight is applied.
477+
pub transport_weights: Vec<TransportWeight>,
478+
},
461479
},
462480
/// The routing strategy to use in peers and it's configuration.
463481
pub peer: #[derive(Default)]
464482
PeerRoutingConf {
465483
/// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
484+
/// This option needs to be set to the same value in all peers and routers of the subsystem.
466485
mode: Option<String>,
486+
/// Linkstate mode configuration (only taken into account if mode == "linkstate").
487+
pub linkstate: LinkstateConf,
467488
},
468489
/// The interests-based routing configuration.
469490
/// This configuration applies regardless of the mode (router, peer or client).
@@ -735,7 +756,7 @@ validated_struct::validator! {
735756
/// Configuration of the downsampling.
736757
downsampling: Vec<DownsamplingItemConf>,
737758

738-
///Configuration of the access control (ACL)
759+
/// Configuration of the access control (ACL)
739760
pub access_control: AclConfig {
740761
pub enabled: bool,
741762
pub default_permission: Permission,

zenoh/src/net/codec/linkstate.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ where
4949
if x.locators.is_some() {
5050
options |= linkstate::LOC;
5151
}
52+
if x.link_weights.is_some() {
53+
options |= linkstate::WGT;
54+
}
5255
codec.write(&mut *writer, options)?;
5356

5457
// Body
@@ -68,6 +71,12 @@ where
6871
for l in x.links.iter() {
6972
codec.write(&mut *writer, *l)?;
7073
}
74+
if let Some(link_weights) = x.link_weights.as_ref() {
75+
// do not write len since it is the same as that of links
76+
for w in link_weights.iter() {
77+
codec.write(&mut *writer, w)?;
78+
}
79+
}
7180

7281
Ok(())
7382
}
@@ -102,20 +111,33 @@ where
102111
} else {
103112
None
104113
};
105-
let len: usize = codec.read(&mut *reader)?;
106-
let mut links: Vec<u64> = Vec::with_capacity(len);
107-
for _ in 0..len {
114+
let links_len: usize = codec.read(&mut *reader)?;
115+
let mut links: Vec<u64> = Vec::with_capacity(links_len);
116+
for _ in 0..links_len {
108117
let l: u64 = codec.read(&mut *reader)?;
109118
links.push(l);
110119
}
111120

121+
let link_weights = if imsg::has_option(options, linkstate::WGT) {
122+
// number of weights is the same as number of links
123+
let mut weights: Vec<u16> = Vec::with_capacity(links_len);
124+
for _ in 0..links_len {
125+
let w: u16 = codec.read(&mut *reader)?;
126+
weights.push(w);
127+
}
128+
Some(weights)
129+
} else {
130+
None
131+
};
132+
112133
Ok(LinkState {
113134
psid,
114135
sn,
115136
zid,
116137
whatami,
117138
locators,
118139
links,
140+
link_weights,
119141
})
120142
}
121143
}

zenoh/src/net/protocol/linkstate.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,20 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
1313
//
14+
use std::{collections::HashMap, num::NonZeroU16};
15+
16+
use zenoh_config::TransportWeight;
1417
use zenoh_protocol::core::{Locator, WhatAmI, ZenohIdProto};
18+
use zenoh_result::ZResult;
1519

1620
pub const PID: u64 = 1; // 0x01
1721
pub const WAI: u64 = 1 << 1; // 0x02
1822
pub const LOC: u64 = 1 << 2; // 0x04
23+
pub const WGT: u64 = 1 << 3; // 0x08
1924

2025
// 7 6 5 4 3 2 1 0
2126
// +-+-+-+-+-+-+-+-+
22-
// ~X|X|X|X|X|L|W|P~
27+
// ~X|X|X|X|H|L|W|P~
2328
// +-+-+-+-+-+-+-+-+
2429
// ~ psid ~
2530
// +---------------+
@@ -33,6 +38,8 @@ pub const LOC: u64 = 1 << 2; // 0x04
3338
// +---------------+
3439
// ~ [links] ~
3540
// +---------------+
41+
// ~ [weights] ~ if H = 1
42+
// +---------------+
3643
#[derive(Debug, Clone, PartialEq, Eq)]
3744
pub(crate) struct LinkState {
3845
pub(crate) psid: u64,
@@ -41,6 +48,49 @@ pub(crate) struct LinkState {
4148
pub(crate) whatami: Option<WhatAmI>,
4249
pub(crate) locators: Option<Vec<Locator>>,
4350
pub(crate) links: Vec<u64>,
51+
pub(crate) link_weights: Option<Vec<u16>>,
52+
}
53+
54+
#[derive(Default, Copy, Debug, Clone, PartialEq, Eq)]
55+
pub(crate) struct LinkEdgeWeight(pub(crate) Option<NonZeroU16>);
56+
57+
impl LinkEdgeWeight {
58+
const DEFAULT_LINK_WEIGHT: u16 = 100;
59+
60+
pub(crate) fn new(val: NonZeroU16) -> Self {
61+
LinkEdgeWeight(Some(val))
62+
}
63+
64+
pub(crate) fn from_raw(val: u16) -> Self {
65+
LinkEdgeWeight(NonZeroU16::new(val))
66+
}
67+
68+
pub(crate) fn value(&self) -> u16 {
69+
match self.0 {
70+
Some(v) => v.get(),
71+
None => Self::DEFAULT_LINK_WEIGHT,
72+
}
73+
}
74+
75+
pub(crate) fn as_raw(&self) -> u16 {
76+
match self.0 {
77+
Some(v) => v.get(),
78+
None => 0,
79+
}
80+
}
81+
82+
pub(crate) fn is_set(&self) -> bool {
83+
self.0.is_some()
84+
}
85+
}
86+
87+
#[derive(Debug, Clone, PartialEq, Eq)]
88+
pub(crate) struct LocalLinkState {
89+
pub(crate) sn: u64,
90+
pub(crate) zid: ZenohIdProto,
91+
pub(crate) whatami: WhatAmI,
92+
pub(crate) locators: Option<Vec<Locator>>,
93+
pub(crate) links: HashMap<ZenohIdProto, LinkEdgeWeight>,
4494
}
4595

4696
impl LinkState {
@@ -115,3 +165,36 @@ impl LinkStateList {
115165
Self { link_states }
116166
}
117167
}
168+
169+
pub(crate) fn link_weights_from_config(
170+
link_weights: Vec<TransportWeight>,
171+
network_name: &str,
172+
) -> ZResult<HashMap<ZenohIdProto, LinkEdgeWeight>> {
173+
let mut link_weights_by_zid = HashMap::new();
174+
for lw in link_weights {
175+
if link_weights_by_zid
176+
.insert(lw.dst_zid.into(), LinkEdgeWeight::new(lw.weight))
177+
.is_some()
178+
{
179+
bail!(
180+
"{} config contains a duplicate zid value for transport weight: {}",
181+
network_name,
182+
lw.dst_zid
183+
);
184+
}
185+
}
186+
Ok(link_weights_by_zid)
187+
}
188+
189+
impl From<LinkEdgeWeight> for Option<u16> {
190+
fn from(value: LinkEdgeWeight) -> Self {
191+
value.is_set().then_some(value.value())
192+
}
193+
}
194+
195+
#[derive(PartialEq, Debug, serde::Serialize)]
196+
pub(crate) struct LinkInfo {
197+
pub(crate) src_weight: Option<u16>,
198+
pub(crate) dst_weight: Option<u16>,
199+
pub(crate) actual_weight: u16,
200+
}

zenoh/src/net/protocol/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@
1212
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
1313
//
1414
pub(crate) mod linkstate;
15+
pub(crate) mod network;
16+
17+
pub(crate) const ROUTERS_NET_NAME: &str = "[Routers Network]";
18+
pub(crate) const PEERS_NET_NAME: &str = "[Peers Network]";

0 commit comments

Comments
 (0)