Skip to content

Commit

Permalink
[test on forge] Add state sync catching up stress tests
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Sep 17, 2022
1 parent 38e4b77 commit ecc351a
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 38 deletions.
48 changes: 44 additions & 4 deletions testsuite/forge-cli/src/main.rs
Expand Up @@ -185,6 +185,14 @@ fn main() -> Result<()> {
let duration = Duration::from_secs(args.duration_secs as u64);
let suite_name: &str = args.suite.as_ref();

let suite_name = if suite_name == "compat" {
"failures_catching_up"
} else {
"slow_processing_catching_up"
};

let duration = Duration::from_secs(1800);

let runtime = Runtime::new()?;
match args.cli_cmd {
// cmd input for test
Expand Down Expand Up @@ -653,6 +661,36 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
},
false,
),
"slow_processing_catching_up" => changing_working_quorum_test(
10,
300,
3000,
2500,
&ChangingWorkingQuorumTest {
min_tps: 1500,
always_healthy_nodes: 2,
max_down_nodes: 0,
num_large_validators: 2,
add_execution_delay: true,
check_period_s: 57,
},
false,
),
"failures_catching_up" => changing_working_quorum_test(
10,
300,
3000,
2500,
&ChangingWorkingQuorumTest {
min_tps: 1500,
always_healthy_nodes: 2,
max_down_nodes: 1,
num_large_validators: 2,
add_execution_delay: false,
check_period_s: 27,
},
false,
),
_ => return Err(format_err!("Invalid --suite given: {:?}", test_name)),
};
Ok(single_test_suite)
Expand Down Expand Up @@ -809,8 +847,10 @@ fn changing_working_quorum_test(
}))
.with_node_helm_config_fn(Arc::new(move |helm_values| {
helm_values["validator"]["config"]["api"]["failpoints_enabled"] = true.into();
helm_values["validator"]["config"]["consensus"]["max_block_txns"] =
(target_tps / 4).into();
if !max_load {
helm_values["validator"]["config"]["consensus"]["max_block_txns"] =
(target_tps / 4).into();
}
helm_values["validator"]["config"]["consensus"]["round_initial_timeout_ms"] =
500.into();
helm_values["validator"]["config"]["consensus"]
Expand All @@ -821,7 +861,7 @@ fn changing_working_quorum_test(
EmitJobRequest::default()
.mode(if max_load {
EmitJobMode::MaxLoad {
mempool_backlog: 20000,
mempool_backlog: 30000,
}
} else {
EmitJobMode::ConstTps { tps: target_tps }
Expand All @@ -836,7 +876,7 @@ fn changing_working_quorum_test(
min_avg_tps,
10000,
true,
Some(Duration::from_secs(30)),
Some(Duration::from_secs(if max_load { 60 } else { 30 })),
None,
Some(StateProgressThreshold {
max_no_progress_secs: 20.0,
Expand Down
2 changes: 2 additions & 0 deletions testsuite/forge.py
Expand Up @@ -1277,6 +1277,8 @@ def test(
forge_cli_args: Optional[List[str]],
test_args: Optional[List[str]],
) -> None:
forge_enable_failpoints = "true"

"""Run a forge test"""
shell = LocalShell(verbose == "true")
git = Git(shell)
Expand Down
3 changes: 2 additions & 1 deletion testsuite/forge/src/interface/swarm.rs
Expand Up @@ -323,8 +323,9 @@ pub async fn wait_for_all_nodes_to_catchup_to_version(

if start_time.elapsed() > timeout {
return Err(anyhow!(
"Waiting for nodes to catch up to version {} timed out, current status: {:?}",
"Waiting for nodes to catch up to version {} timed out after {}s, current status: {:?}",
version,
start_time.elapsed().as_secs(),
versions.unwrap_or_default()
));
}
Expand Down
121 changes: 88 additions & 33 deletions testsuite/testcases/src/consensus_reliability_tests.rs
Expand Up @@ -89,11 +89,17 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest {
"Always healthy {} nodes, every cycle having {} nodes out of {} down, rotating {} each cycle, expecting first {} validators to have 10x larger stake",
num_always_healthy, max_fail_in_test, num_validators, cycle_offset, self.num_large_validators);

if self.add_execution_delay {
let slow_allowed_lagging = if self.add_execution_delay {
runtime.block_on(async {
let mut rng = rand::thread_rng();
for (name, validator) in &validators[num_always_healthy..num_validators] {
let mut slow_allowed_lagging = HashSet::new();
for (index, (name, validator)) in
validators.iter().enumerate().skip(num_always_healthy)
{
let sleep_time = rng.gen_range(20, 500);
if sleep_time > 100 {
slow_allowed_lagging.insert(index);
}
let name = name.clone();
validator
.set_failpoint(
Expand All @@ -103,9 +109,11 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest {
.await
.with_context(|| name)?;
}
Ok::<(), RestError>(())
})?;
}
Ok::<HashSet<usize>, RestError>(slow_allowed_lagging)
})?
} else {
HashSet::new()
};

let min_tps = self.min_tps;
let check_period_s = self.check_period_s;
Expand Down Expand Up @@ -136,56 +144,87 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest {
(vec![], false)
}
}))),
Box::new(move |cycle, _, _, _, cur, previous| {
Box::new(move |cycle, _, _, _, cycle_end, cycle_start| {
let down_indices = down_indices_f(cycle);
let prev_down_indices = if cycle > 0 { down_indices_f(cycle - 1) } else { HashSet::new() };
fn split(all: Vec<NodeState>, down_indices: &HashSet<usize>) -> (Vec<(usize, NodeState)>, Vec<NodeState>) {
let (down, active): (Vec<_>, Vec<_>) = all.into_iter().enumerate().partition(|(idx, _state)| down_indices.contains(idx));
(down, active.into_iter().map(|(_idx, state)| state).collect())
let recently_down_indices = if cycle > 0 { down_indices_f(cycle - 1) } else { HashSet::new() };
fn split(all: Vec<NodeState>, down_indices: &HashSet<usize>, allowed_lagging_indices: &HashSet<usize>) -> (Vec<(usize, NodeState)>, Vec<(usize, NodeState)>, Vec<NodeState>) {
let (down, not_down): (Vec<_>, Vec<_>) = all.into_iter().enumerate().partition(|(idx, _state)| down_indices.contains(idx));
let (allowed_lagging, active) = not_down.into_iter().partition(|(idx, _state)| allowed_lagging_indices.contains(idx));
(down, allowed_lagging, active.into_iter().map(|(_idx, state)| state).collect())
}

let (cur_down, cur_active) = split(cur, &down_indices);
let (prev_down, prev_active) = split(previous, &down_indices);
let allowed_lagging = recently_down_indices.union(&slow_allowed_lagging).cloned().collect::<HashSet<_>>();
let (cycle_end_down, cycle_end_allowed_lagging, cycle_end_active) = split(cycle_end, &down_indices, &allowed_lagging);
let (cycle_start_down, cycle_start_allowed_lagging, cycle_start_active) = split(cycle_start, &down_indices, &allowed_lagging);

// Make sure that every active node is making progress, so we compare min(cur) vs max(previous)
let (cur_min_epoch, cur_min_round) = cur_active.iter().map(|s| (s.epoch, s.round)).min().unwrap();
let (prev_max_epoch, prev_max_round) = prev_active.iter().map(|s| (s.epoch, s.round)).max().unwrap();
// Make sure that every active node is making progress, so we compare min(cycle_end) vs max(cycle_start)
let (cycle_end_min_epoch, cycle_end_min_round) = cycle_end_active.iter().map(|s| (s.epoch, s.round)).min().unwrap();
let (cycle_start_max_epoch, cycle_start_max_round) = cycle_start_active.iter().map(|s| (s.epoch, s.round)).max().unwrap();

let epochs_progress = cur_min_epoch as i64 - prev_max_epoch as i64;
let round_progress = cur_min_round as i64 - prev_max_round as i64;
let epochs_progress = cycle_end_min_epoch as i64 - cycle_start_max_epoch as i64;
let round_progress = cycle_end_min_round as i64 - cycle_start_max_round as i64;

let transaction_progress = cur_active.iter().map(|s| s.version).min().unwrap() as i64
- prev_active.iter().map(|s| s.version).max().unwrap() as i64;
let transaction_progress = cycle_end_active.iter().map(|s| s.version).min().unwrap() as i64
- cycle_start_active.iter().map(|s| s.version).max().unwrap() as i64;

if transaction_progress < (min_tps * check_period_s) as i64 {
bail!(
"no progress with active consensus, only {} transactions, expected >= {} ({} TPS). Down indices {:?}, Prev active: {:?}. Cur active: {:?}",
"not enough progress with active consensus, only {} transactions, expected >= {} ({} TPS). Down indices {:?}, cycle start active: {:?}. cycle end active: {:?}",
transaction_progress,
(min_tps * check_period_s),
min_tps * check_period_s,
min_tps,
down_indices,
prev_active,
cur_active,
cycle_start_active,
cycle_end_active,
);
}
if epochs_progress < 0 || (epochs_progress == 0 && round_progress < (check_period_s / 2) as i64) {
bail!("no progress with active consensus, only {} epochs and {} rounds, expectd >= {}",
epochs_progress,
round_progress,
(check_period_s / 2),
bail!(
"not enough progress with active consensus, only {} epochs and {} rounds, expectd >= {}",
epochs_progress,
round_progress,
check_period_s / 2,
);
}

// Make sure that allowed_lagging nodes are making progress
for ((node_idx, cycle_end_state), (node_idx_p, cycle_start_state)) in cycle_end_allowed_lagging.iter().zip(cycle_start_allowed_lagging.iter()) {
assert_eq!(node_idx, node_idx_p, "{:?} {:?}", cycle_end_allowed_lagging, cycle_start_allowed_lagging);
let transaction_progress = cycle_end_state.version as i64 - cycle_start_state.version as i64;
if transaction_progress < (min_tps * check_period_s) as i64 {
bail!(
"not enough individual progress on allowed lagging node ({}), only {} transactions, expected >= {} ({} TPS)",
node_idx,
transaction_progress,
min_tps * check_period_s,
min_tps,
);
}
let epochs_progress = cycle_end_state.epoch as i64 - cycle_start_state.epoch as i64;
let round_progress = cycle_end_state.epoch as i64 - cycle_start_state.epoch as i64;

// state sync might not update rounds
// if epochs_progress < 0 || (epochs_progress == 0 && round_progress < (check_period_s / 2) as i64) {
// bail!(
// "not enough individual progress on allowed lagging node ({}), only {} epochs and {} rounds, expectd >= {}",
// node_idx,
// epochs_progress,
// round_progress,
// check_period_s / 2,
// );
// }
}

// Make sure down nodes don't make progress:
for ((node_idx, cur_state), (node_idx_p, prev_state)) in cur_down.iter().zip(prev_down.iter()) {
assert_eq!(node_idx, node_idx_p, "{:?} {:?}", cur_down, prev_down);
if cur_state.round > prev_state.round + 3 {
for ((node_idx, cycle_end_state), (node_idx_p, cycle_start_state)) in cycle_end_down.iter().zip(cycle_start_down.iter()) {
assert_eq!(node_idx, node_idx_p, "{:?} {:?}", cycle_end_down, cycle_start_down);
if cycle_end_state.round > cycle_start_state.round + 3 {
// if we just failed the node, some progress can happen due to pipeline in consensus,
// or buffer of received messages in state sync
if prev_down_indices.contains(node_idx) {
bail!("progress on down node {} from ({}, {}) to ({}, {})", node_idx, prev_state.epoch, prev_state.round, cur_state.epoch, cur_state.round);
if recently_down_indices.contains(node_idx) {
bail!("progress on down node {} from ({}, {}) to ({}, {})", node_idx, cycle_start_state.epoch, cycle_start_state.round, cycle_end_state.epoch, cycle_end_state.round);
} else {
warn!("progress on down node {} immediatelly after turning off from ({}, {}) to ({}, {})", node_idx, prev_state.epoch, prev_state.round, cur_state.epoch, cur_state.round)
warn!("progress on down node {} immediatelly after turning off from ({}, {}) to ({}, {})", node_idx, cycle_start_state.epoch, cycle_start_state.round, cycle_end_state.epoch, cycle_end_state.round)
}
}
}
Expand All @@ -196,6 +235,22 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest {
true,
))?;

// undo slowing down.
if self.add_execution_delay {
runtime.block_on(async {
for (name, validator) in validators.iter().skip(num_always_healthy) {
let name = name.clone();
validator
.set_failpoint(
"aptos_vm::execution::block_metadata".to_string(),
"".to_string(),
)
.await
.with_context(|| name)?;
}
Ok::<(), RestError>(())
})?;
}
Ok(())
}
}
Expand Down

0 comments on commit ecc351a

Please sign in to comment.