Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: loop_select conditions (PRO-587) #4061

Merged
merged 4 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions engine/src/witness/common/chain_source/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use tokio::sync::oneshot;
use utilities::{
loop_select, spmc,
task_scope::{Scope, OR_CANCEL},
UnendingStream,
};

use crate::witness::common::ExternalChainSource;
Expand Down Expand Up @@ -54,8 +53,8 @@ where
if let Some(response_sender) = request_receiver.next() => {
let receiver = sender.receiver();
let _result = response_sender.send((receiver, inner_client.clone()));
},
let item = inner_stream.next_or_pending() => {
} else disable,
if let Some(item) = inner_stream.next() => { // This branch failing causes `sender` to be dropped, this causes the proxy/duplicate streams to also end.
let _result = sender.send(item).await;
},
let _ = sender.closed() => { break },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ use std::sync::Arc;
use cf_chains::ChainState;
use frame_support::CloneNoBound;
use futures::FutureExt;
use futures_core::FusedStream;
use futures_util::{stream, StreamExt};
use pallet_cf_ingress_egress::DepositChannelDetails;
use state_chain_runtime::PalletInstanceAlias;
use tokio::sync::watch;
use utilities::{
loop_select,
task_scope::{Scope, OR_CANCEL},
UnendingStream,
};

use crate::{
Expand Down Expand Up @@ -206,10 +204,9 @@ where
|(mut chain_stream, mut state)| async move {
loop_select!(
if !state.ready_headers.is_empty() => break Some((state.ready_headers.pop().unwrap(), (chain_stream, state))),
if chain_stream.is_terminated() && state.pending_headers.is_empty() => break None,
let header = chain_stream.next_or_pending() => {
if let Some(header) = chain_stream.next() => {
state.add_headers(std::iter::once(header));
},
} else disable then if state.pending_headers.is_empty() => break None,
let _ = state.receiver.changed().map(|result| result.expect(OR_CANCEL)) => {
let pending_headers = std::mem::take(&mut state.pending_headers);
state.add_headers(pending_headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::sync::Arc;
use crate::witness::common::chain_source::{ChainClient, ChainStream};
use cf_chains::{Chain, ChainCrypto};
use frame_support::CloneNoBound;
use futures_core::FusedStream;
use futures_util::{stream, StreamExt};
use state_chain_runtime::PalletInstanceAlias;
use utilities::{loop_select, task_scope::Scope, UnendingStream};
use utilities::{loop_select, task_scope::Scope};

use crate::{
state_chain_observer::client::{storage_api::StorageApi, StateChainStreamApi},
Expand Down Expand Up @@ -138,15 +137,14 @@ where
(chain_stream.fuse(), self.receiver.clone()),
|(mut chain_stream, receiver)| async move {
loop_select!(
if chain_stream.is_terminated() => break None,
let header = chain_stream.next_or_pending() => {
if let Some(header) = chain_stream.next() => {
// Always get the latest tx out ids.
// NB: There is a race condition here. If we're not watching for a particular egress id (because our state chain is slow for some reason) at the time
// it arrives on external chain, we won't witness it. This is pretty unlikely since the time between the egress id being set on the SC and the tx
// being confirmed on the external chain is quite large. We should fix this eventually though. PRO-689
let tx_out_ids = receiver.borrow().clone();
break Some((header.map_data(|header| (header.data, tx_out_ids)), (chain_stream, receiver)))
},
} else break None,
)
},
)
Expand Down
179 changes: 165 additions & 14 deletions utilities/src/with_std/loop_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ pub use futures::future::ready as internal_ready;
#[doc(hidden)]
pub use tokio::select as internal_tokio_select;

#[doc(hidden)]
pub fn is_bit_set(mask: u64, bit: u64) -> bool {
mask & (1u64 << bit) == (1u64 << bit)
}

#[macro_export]
macro_rules! inner_loop_select {
({ $($processed:tt)* } let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = $expression => {
Expand All @@ -17,8 +24,10 @@ macro_rules! inner_loop_select {
$($unprocessed)*
)
};
({ $($processed:tt)* } if let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = $expression => {
Expand All @@ -30,8 +39,10 @@ macro_rules! inner_loop_select {
$($unprocessed)*
)
};
({ $($processed:tt)* } if let $pattern:pat = $expression:expr => $body:block else break $extra:expr, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if let $pattern:pat = $expression:expr => $body:block else break $extra:expr, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = $expression => {
Expand All @@ -43,23 +54,49 @@ macro_rules! inner_loop_select {
$($unprocessed)*
)
};
({ $($processed:tt)* } if $enable_expression:expr => let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if let $pattern:pat = $expression:expr => $body:block else disable $(then if $disable_break_expression:expr => break $($extra:expr)?)?, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = async { $expression.await } /* async await block ensures $expression is evaluated after condition */, if !$crate::loop_select::is_bit_set($disabled_mask, $count) => {
if let $pattern = x {
$body
} else {
$disabled_mask |= 1u64 << $count;
}
},
$(
_ = $crate::loop_select::internal_ready(()), if $crate::loop_select::is_bit_set($disabled_mask, $count) && $disable_break_expression => {
break $($extra)?
},
)?
}
$($unprocessed)*
)
};
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if $enable_expression:expr => let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = async { $expression.await }, if $enable_expression => {
x = async { $expression.await } /* async await block ensures $expression is evaluated after condition */, if $enable_expression => {
let $pattern = x;
$body
},
}
$($unprocessed)*
)
};
({ $($processed:tt)* } if $enable_expression:expr => if let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if $enable_expression:expr => if let $pattern:pat = $expression:expr => $body:block, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = async { $expression.await }, if $enable_expression => {
x = async { $expression.await } /* async await block ensures $expression is evaluated after condition */, if $enable_expression => {
if let $pattern = x {
$body
} else { break }
Expand All @@ -68,11 +105,13 @@ macro_rules! inner_loop_select {
$($unprocessed)*
)
};
({ $($processed:tt)* } if $enable_expression:expr => if let $pattern:pat = $expression:expr => $body:block else break $extra:expr, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if $enable_expression:expr => if let $pattern:pat = $expression:expr => $body:block else break $extra:expr, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
x = async { $expression.await }, if $enable_expression => {
x = async { $expression.await } /* async await block ensures $expression is evaluated after condition */, if $enable_expression => {
if let $pattern = x {
$body
} else { break $extra }
Expand All @@ -81,8 +120,10 @@ macro_rules! inner_loop_select {
$($unprocessed)*
)
};
({ $($processed:tt)* } if $expression:expr => break $($extra:expr)?, $($unprocessed:tt)*) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)* } if $expression:expr => break $($extra:expr)?, $($unprocessed:tt)*) => {
$crate::inner_loop_select!(
$disabled_mask,
$count + 1u64,
{
$($processed)*
_ = $crate::loop_select::internal_ready(()), if $expression => {
Expand All @@ -92,7 +133,7 @@ macro_rules! inner_loop_select {
$($unprocessed)*
)
};
({ $($processed:tt)+ }) => {
($disabled_mask:ident, $count:expr, { $($processed:tt)+ }) => {
loop {
$crate::loop_select::internal_tokio_select!(
$($processed)+
Expand All @@ -103,9 +144,11 @@ macro_rules! inner_loop_select {

#[macro_export]
macro_rules! loop_select {
($($cases:tt)+) => {
$crate::inner_loop_select!({} $($cases)+)
}
($($cases:tt)+) => {{
#[allow(unused, unused_mut)]
let mut disabled_mask = 0u64;
$crate::inner_loop_select!(disabled_mask, 0u64, {} $($cases)+)
}}
}

#[cfg(test)]
Expand Down Expand Up @@ -438,4 +481,112 @@ mod test_loop_select {
}
);
}

#[allow(clippy::unit_cmp)]
#[tokio::test]
async fn disabled_branches() {
// Break condition works

assert_eq!(
'c',
loop_select!(
if let 'a' = futures::future::ready('b') => { panic!() } else disable then if true => break 'c',
)
);
assert_eq!(
(),
loop_select!(
if let 'a' = futures::future::ready('b') => { panic!() } else disable then if true => break,
)
);

// Disabled conditions don't run

{
let mut condition_run = false;

assert_eq!(
(),
loop_select!(
if let 2 = futures::future::ready({
if !condition_run {
condition_run = true;
1
} else {
panic!()
}
}) => {
panic!()
} else disable,
if condition_run => break,
)
);
}
{
let mut condition_run = false;

assert_eq!(
(),
loop_select!(
if let 2 = futures::future::ready({
if !condition_run {
condition_run = true;
1
} else {
panic!()
}
}) => {
panic!()
} else disable then if condition_run => break,
)
);
}

// Disabled branches don't run

{
let mut condition_run = false;

assert_eq!(
(),
loop_select!(
if let false = futures::future::ready(condition_run) => {
if condition_run {
panic!()
} else {
condition_run = true;
}
} else disable then if condition_run => break,
)
);
}

// Branches run until disabled

{
let mut i = 0;
assert_eq!(
(),
loop_select!(
if let 0..=10 = futures::future::ready(i) => {
i += 1;
} else disable then if true => break,
)
);
assert_eq!(i, 11);
}
{
let mut i = 0;
assert_eq!(
(),
loop_select!(
if let 0..=10 = futures::future::ready(i) => {
i += 1;
} else disable,
if i == 11 => break,
)
);
assert_eq!(i, 11);
}
}
}
Loading