Skip to content

Commit

Permalink
Fix routing bugs (#1296)
Browse files Browse the repository at this point in the history
* Fix bug leading to dupplicates

* Fix bug leading to duplicate queries

* Avoid sending subscribers back to it's source for failover brokering

* Fix failover brokering bug reacting to linkstate changes for queries and tokens
  • Loading branch information
OlivierHecart authored Aug 6, 2024
1 parent b7d42ef commit 2d5ab7c
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 261 deletions.
70 changes: 36 additions & 34 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,47 +356,49 @@ impl HatPubSubTrait for HatCode {
}
};

for face in tables
.faces
.values()
.filter(|f| f.whatami != WhatAmI::Client)
{
if face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.subscribers()
&& interest
.res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.unwrap_or(true)
}) {
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
if source_type == WhatAmI::Client {
for face in tables
.faces
.values()
.filter(|f| f.whatami != WhatAmI::Client)
{
if face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.subscribers()
&& interest
.res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.unwrap_or(true)
}) {
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
}

Expand Down
16 changes: 9 additions & 7 deletions zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,15 @@ impl HatQueriesTrait for HatCode {
}
};

if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: f64::MAX,
});
if source_type == WhatAmI::Client {
if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: f64::MAX,
});
}
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
Expand Down
96 changes: 49 additions & 47 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,62 +604,64 @@ impl HatPubSubTrait for HatCode {
}
};

for face in tables
.faces
.values()
.filter(|f| f.whatami == WhatAmI::Router)
{
if face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.subscribers()
&& interest
.res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.unwrap_or(true)
}) {
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
if source_type == WhatAmI::Client {
for face in tables
.faces
.values()
.filter(|f| f.whatami == WhatAmI::Router)
{
if face.local_interests.values().any(|interest| {
interest.finalized
&& interest.options.subscribers()
&& interest
.res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.unwrap_or(true)
}) {
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
}

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
(face.clone(), key_expr.to_owned(), NodeId::default())
});
for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
(face.clone(), key_expr.to_owned(), NodeId::default())
});
}
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
Expand Down
48 changes: 25 additions & 23 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,30 +589,32 @@ impl HatQueriesTrait for HatCode {
}
};

// TODO: BNestMatching: What if there is a local compete ?
if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: f64::MAX,
});
}
if source_type == WhatAmI::Client {
// TODO: BNestMatching: What if there is a local compete ?
if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: f64::MAX,
});
}

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: 0.5,
});
for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: 0.5,
});
}
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
Expand Down
6 changes: 4 additions & 2 deletions zenoh/src/net/routing/hat/router/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ pub(super) fn pubsub_linkstate_change(
&& !client_subs
&& !res.session_ctxs.values().any(|ctx| {
ctx.face.whatami == WhatAmI::Peer
&& src_face.zid != ctx.face.zid
&& src_face.id != ctx.face.id
&& HatTables::failover_brokering_to(links, ctx.face.zid)
})
})
Expand Down Expand Up @@ -884,7 +884,9 @@ pub(super) fn pubsub_linkstate_change(
}

for dst_face in tables.faces.values_mut() {
if HatTables::failover_brokering_to(links, dst_face.zid) {
if src_face.id != dst_face.id
&& HatTables::failover_brokering_to(links, dst_face.zid)
{
for res in face_hat!(src_face).remote_subs.values() {
if !face_hat!(dst_face).local_subs.contains_key(res) {
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
Expand Down
Loading

0 comments on commit 2d5ab7c

Please sign in to comment.