Skip to content

Commit

Permalink
pytest that reproduces near#11135
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenwang1996 committed Apr 25, 2024
1 parent 6aa1aad commit ecd1847
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 1 deletion.
6 changes: 6 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,12 @@ impl Chain {
true,
)
};
tracing::debug!(
target: "chain",
"Updating flat storage for shard {} need_flat_storage_update: {}",
shard_id,
need_flat_storage_update
);

if need_flat_storage_update {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
Expand Down
2 changes: 2 additions & 0 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ impl FlatStorageManager {
}
}
});
} else {
tracing::warn!(target: "store", ?shard_uid, block_height=?block.header().height(), "No flat storage!!!");
}
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl FlatStorageInner {
let current_flat_head_height = self.flat_head.height;

let mut new_head = block_hash;
tracing::debug!(target: "store", "get_new_flat_head, shard id {}, cur new head {}, old flat head {}, old flat height {}", self.shard_uid.shard_id(), block_hash, current_flat_head_hash, current_flat_head_height);
let mut blocks_with_changes = 0;
// Delays updating flat head, keeps this many blocks with non-empty flat
// state changes between the requested flat head and the chosen head to
Expand All @@ -203,13 +204,15 @@ impl FlatStorageInner {
new_head = match metadata.prev_block_with_changes {
None => {
// The block has flat state changes.
tracing::debug!(target: "store", "get_new_flat_head, prev_block_with_changes is None");
blocks_with_changes += 1;
if blocks_with_changes == Self::BLOCKS_WITH_CHANGES_FLAT_HEAD_GAP {
break;
}
metadata.block.prev_hash
}
Some(BlockWithChangesInfo { hash, height, .. }) => {
tracing::debug!(target: "store", "get_new_flat_head, prev_block_with_changes is Some, hash {}, height {}", hash, height);
// The block has no flat state changes.
if height <= current_flat_head_height {
return Ok(current_flat_head_hash);
Expand Down Expand Up @@ -371,12 +374,15 @@ impl FlatStorage {
strict: bool,
) -> Result<(), FlatStorageError> {
let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR);
tracing::debug!(target: "store", "update_flat_head, shard id {}, block hash {}", guard.shard_uid.shard_id(), block_hash);
if !guard.move_head_enabled {
tracing::debug!(target: "store", "update_flat_head, shard id {}, move head disabled", guard.shard_uid.shard_id());
return Ok(());
}

let new_head = guard.get_new_flat_head(*block_hash, strict)?;
if new_head == guard.flat_head.hash {
tracing::debug!(target: "store", "update_flat_head, shard id {}, flat head already at block {}", guard.shard_uid.shard_id(), guard.flat_head.height);
return Ok(());
}

Expand Down
3 changes: 2 additions & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ def apply_config_changes(node_dir, client_config_change):
'save_trie_changes', 'split_storage',
'state_sync', 'state_sync_enabled',
'store.state_snapshot_enabled',
'tracked_shard_schedule', 'cold_store')
'tracked_shard_schedule', 'cold_store',
'store.load_mem_tries_for_tracked_shards')

for k, v in client_config_change.items():
if not (k in allowed_missing_configs or k in config_json):
Expand Down
135 changes: 135 additions & 0 deletions pytest/tests/sanity/single_shard_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
# Spins up one validating node.
# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly.
# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger state sync.
#
# After the state sync the node has to do a catchup.
#
# Note that the test must generate outgoing receipts for most shards almost
# every block in order to crash if creation of partial encoded chunks becomes
# non-deterministic.

import pathlib
import random
import sys
import time

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import start_cluster, spin_up_node, load_config, apply_config_changes
import account
import state_sync_lib
import transaction
import utils

from configured_logger import logger

EPOCH_LENGTH = 20


def random_u64():
return bytes(random.randint(0, 255) for _ in range(8))


# Generates traffic for all possible shards.
# Assumes that `test0`, `test1`, `near` all belong to different shards.
def random_workload_until(target, nonce, keys, node0, node1, target_node):
last_height = -1
while True:
nonce += 1

last_block = target_node.get_latest_block()
height = last_block.height
if height > target:
break
if height != last_height:
logger.info(
f'@{height}, epoch_height: {state_sync_lib.approximate_epoch_height(height, EPOCH_LENGTH)}'
)
last_height = height

last_block_hash = node0.get_latest_block().hash_bytes
if random.random() < 0.5:
# Make a transfer between shards.
# The goal is to generate cross-shard receipts.
key_from = random.choice([node0, node1]).signer_key
account_to = random.choice([
node0.signer_key.account_id, node1.signer_key.account_id, "near"
])
payment_tx = transaction.sign_payment_tx(key_from, account_to, 1,
nonce, last_block_hash)
node0.send_tx(payment_tx).get('result')
elif (len(keys) > 100 and random.random() < 0.5) or len(keys) > 1000:
# Do some flat storage reads, but only if we have enough keys populated.
key = keys[random.randint(0, len(keys) - 1)]
for node in [node0, node1]:
call_function('read', key, nonce, node.signer_key,
last_block_hash, node0)
call_function('read', key, nonce, node.signer_key,
last_block_hash, node0)
else:
# Generate some data for flat storage reads
key = random_u64()
keys.append(key)
for node in [node0, node1]:
call_function('write', key, nonce, node.signer_key,
last_block_hash, node0)
return nonce, keys


def call_function(op, key, nonce, signer_key, last_block_hash, node):
if op == 'read':
args = key
fn = 'read_value'
else:
args = key + random_u64()
fn = 'write_key_value'

tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id,
fn, args, 300 * account.TGAS, 0,
nonce, last_block_hash)
return node.send_tx(tx).get('result')


def main():
node_config_dump, node_config_sync = state_sync_lib.get_state_sync_configs_pair()
node_config_sync["tracked_shards"] = []
node_config_sync["store.load_mem_tries_for_tracked_shards"] = True
node_config_dump["store.load_mem_tries_for_tracked_shards"] = True
configs = {x: node_config_sync for x in range(4)}
configs[4] = node_config_dump

nodes = start_cluster(4, 1, 4, None,
[["epoch_length", EPOCH_LENGTH], ["shuffle_shard_assignment_for_chunk_producers", True], ["block_producer_kickout_threshold", 20], ["chunk_producer_kickout_threshold", 20]], configs)

for node in nodes:
node.stop_checking_store()

print("nodes started")
contract = utils.load_test_contract()

latest_block_hash = nodes[0].get_latest_block().hash_bytes
deploy_contract_tx = transaction.sign_deploy_contract_tx(
nodes[0].signer_key, contract, 1, latest_block_hash)
result = nodes[0].send_tx_and_wait(deploy_contract_tx, 10)
assert 'result' in result and 'error' not in result, (
'Expected "result" and no "error" in response, got: {}'.format(result))

nonce = 2
keys = []
nonce, keys = random_workload_until(EPOCH_LENGTH * 2, nonce, keys, nodes[0],
nodes[1], nodes[4])
for i in range(2, 6):
print(f"iteration {i} starts")
stop_height = random.randint(1, EPOCH_LENGTH)
nonce, keys = random_workload_until(EPOCH_LENGTH * i + stop_height, nonce, keys, nodes[i // 5],
nodes[(i+1) // 5], nodes[4])
for i in range(4):
nodes[i].kill()
time.sleep(2)
for i in range(4):
nodes[i].start(boot_node=nodes[4])


if __name__ == "__main__":
main()

0 comments on commit ecd1847

Please sign in to comment.