Skip to content

Commit

Permalink
[Block executor] new traits for processing the output
Browse files Browse the repository at this point in the history
  • Loading branch information
gelash committed Aug 22, 2023
1 parent b7ca937 commit c6a1074
Show file tree
Hide file tree
Showing 26 changed files with 332 additions and 297 deletions.
60 changes: 30 additions & 30 deletions aptos-move/aptos-vm-types/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use aptos_types::{
use move_binary_format::errors::Location;
use move_core_types::vm_status::{err_msg, StatusCode, VMStatus};
use std::collections::{
btree_map::Entry::{Occupied, Vacant},
BTreeMap,
hash_map::Entry::{Occupied, Vacant},
HashMap,
};

/// A change set produced by the VM.
Expand All @@ -23,10 +23,10 @@ use std::collections::{
/// VM. For storage backends, use `ChangeSet`.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct VMChangeSet {
resource_write_set: BTreeMap<StateKey, WriteOp>,
module_write_set: BTreeMap<StateKey, WriteOp>,
aggregator_write_set: BTreeMap<StateKey, WriteOp>,
aggregator_delta_set: BTreeMap<StateKey, DeltaOp>,
resource_write_set: HashMap<StateKey, WriteOp>,
module_write_set: HashMap<StateKey, WriteOp>,
aggregator_write_set: HashMap<StateKey, WriteOp>,
aggregator_delta_set: HashMap<StateKey, DeltaOp>,
events: Vec<ContractEvent>,
}

Expand All @@ -49,19 +49,19 @@ macro_rules! squash_writes_pair {
impl VMChangeSet {
pub fn empty() -> Self {
Self {
resource_write_set: BTreeMap::new(),
module_write_set: BTreeMap::new(),
aggregator_write_set: BTreeMap::new(),
aggregator_delta_set: BTreeMap::new(),
resource_write_set: HashMap::new(),
module_write_set: HashMap::new(),
aggregator_write_set: HashMap::new(),
aggregator_delta_set: HashMap::new(),
events: vec![],
}
}

pub fn new(
resource_write_set: BTreeMap<StateKey, WriteOp>,
module_write_set: BTreeMap<StateKey, WriteOp>,
aggregator_write_set: BTreeMap<StateKey, WriteOp>,
aggregator_delta_set: BTreeMap<StateKey, DeltaOp>,
resource_write_set: HashMap<StateKey, WriteOp>,
module_write_set: HashMap<StateKey, WriteOp>,
aggregator_write_set: HashMap<StateKey, WriteOp>,
aggregator_delta_set: HashMap<StateKey, DeltaOp>,
events: Vec<ContractEvent>,
checker: &dyn CheckChangeSet,
) -> anyhow::Result<Self, VMStatus> {
Expand Down Expand Up @@ -92,8 +92,8 @@ impl VMChangeSet {

// There should be no aggregator writes if we have a change set from
// storage.
let mut resource_write_set = BTreeMap::new();
let mut module_write_set = BTreeMap::new();
let mut resource_write_set = HashMap::new();
let mut module_write_set = HashMap::new();

for (state_key, write_op) in write_set {
if matches!(state_key.inner(), StateKeyInner::AccessPath(ap) if ap.is_code()) {
Expand All @@ -109,8 +109,8 @@ impl VMChangeSet {
let change_set = Self {
resource_write_set,
module_write_set,
aggregator_write_set: BTreeMap::new(),
aggregator_delta_set: BTreeMap::new(),
aggregator_write_set: HashMap::new(),
aggregator_delta_set: HashMap::new(),
events,
};
checker.check_change_set(&change_set)?;
Expand Down Expand Up @@ -166,11 +166,11 @@ impl VMChangeSet {
.chain(self.aggregator_write_set.iter_mut())
}

pub fn resource_write_set(&self) -> &BTreeMap<StateKey, WriteOp> {
pub fn resource_write_set(&self) -> &HashMap<StateKey, WriteOp> {
&self.resource_write_set
}

pub fn module_write_set(&self) -> &BTreeMap<StateKey, WriteOp> {
pub fn module_write_set(&self) -> &HashMap<StateKey, WriteOp> {
&self.module_write_set
}

Expand All @@ -183,11 +183,11 @@ impl VMChangeSet {
.extend(additional_aggregator_writes)
}

pub fn aggregator_write_set(&self) -> &BTreeMap<StateKey, WriteOp> {
pub fn aggregator_v1_write_set(&self) -> &HashMap<StateKey, WriteOp> {
&self.aggregator_write_set
}

pub fn aggregator_delta_set(&self) -> &BTreeMap<StateKey, DeltaOp> {
pub fn aggregator_v1_delta_set(&self) -> &HashMap<StateKey, DeltaOp> {
&self.aggregator_delta_set
}

Expand Down Expand Up @@ -216,23 +216,23 @@ impl VMChangeSet {
aggregator_delta_set
.into_iter()
.map(into_write)
.collect::<anyhow::Result<BTreeMap<StateKey, WriteOp>, VMStatus>>()?;
.collect::<anyhow::Result<HashMap<StateKey, WriteOp>, VMStatus>>()?;
aggregator_write_set.extend(materialized_aggregator_delta_set.into_iter());

Ok(Self {
resource_write_set,
module_write_set,
aggregator_write_set,
aggregator_delta_set: BTreeMap::new(),
aggregator_delta_set: HashMap::new(),
events,
})
}

fn squash_additional_aggregator_changes(
aggregator_write_set: &mut BTreeMap<StateKey, WriteOp>,
aggregator_delta_set: &mut BTreeMap<StateKey, DeltaOp>,
additional_aggregator_write_set: BTreeMap<StateKey, WriteOp>,
additional_aggregator_delta_set: BTreeMap<StateKey, DeltaOp>,
aggregator_write_set: &mut HashMap<StateKey, WriteOp>,
aggregator_delta_set: &mut HashMap<StateKey, DeltaOp>,
additional_aggregator_write_set: HashMap<StateKey, WriteOp>,
additional_aggregator_delta_set: HashMap<StateKey, DeltaOp>,
) -> anyhow::Result<(), VMStatus> {
use WriteOp::*;

Expand Down Expand Up @@ -312,8 +312,8 @@ impl VMChangeSet {
}

fn squash_additional_writes(
write_set: &mut BTreeMap<StateKey, WriteOp>,
additional_write_set: BTreeMap<StateKey, WriteOp>,
write_set: &mut HashMap<StateKey, WriteOp>,
additional_write_set: HashMap<StateKey, WriteOp>,
) -> anyhow::Result<(), VMStatus> {
for (key, additional_write_op) in additional_write_set.into_iter() {
match write_set.entry(key) {
Expand Down
8 changes: 4 additions & 4 deletions aptos-move/aptos-vm-types/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl VMOutput {
// First, check if output of transaction should be discarded or delta
// change set is empty. In both cases, we do not need to apply any
// deltas and can return immediately.
if self.status().is_discarded() || self.change_set().aggregator_delta_set().is_empty() {
if self.status().is_discarded() || self.change_set().aggregator_v1_delta_set().is_empty() {
return Ok(self);
}

Expand All @@ -96,7 +96,7 @@ impl VMOutput {
debug_assert!(
materialized_output
.change_set()
.aggregator_delta_set()
.aggregator_v1_delta_set()
.is_empty(),
"Aggregator deltas must be empty after materialization."
);
Expand All @@ -114,12 +114,12 @@ impl VMOutput {
// We should have a materialized delta for every delta in the output.
assert_eq!(
materialized_deltas.len(),
self.change_set().aggregator_delta_set().len()
self.change_set().aggregator_v1_delta_set().len()
);
debug_assert!(
materialized_deltas
.iter()
.all(|(k, _)| self.change_set().aggregator_delta_set().contains_key(k)),
.all(|(k, _)| self.change_set().aggregator_v1_delta_set().contains_key(k)),
"Materialized aggregator writes contain a key which does not exist in delta set."
);
self.change_set
Expand Down
12 changes: 6 additions & 6 deletions aptos-move/aptos-vm-types/src/tests/test_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use move_core_types::{
language_storage::{ModuleId, StructTag},
vm_status::{StatusCode, VMStatus},
};
use std::collections::BTreeMap;
use std::collections::HashMap;

/// Testcases:
/// ```text
Expand Down Expand Up @@ -89,7 +89,7 @@ macro_rules! write_set_2 {

macro_rules! expected_write_set {
($d:ident) => {
BTreeMap::from([
HashMap::from([
mock_create(format!("0{}", $d), 0),
mock_modify(format!("1{}", $d), 1),
mock_delete(format!("2{}", $d)),
Expand Down Expand Up @@ -164,23 +164,23 @@ fn test_successful_squash() {
&expected_write_set!(descriptor)
);

let expected_aggregator_write_set = BTreeMap::from([
let expected_aggregator_write_set = HashMap::from([
mock_create("18a", 136),
mock_modify("19a", 138),
mock_modify("22a", 122),
mock_delete("23a"),
]);
let expected_aggregator_delta_set = BTreeMap::from([
let expected_aggregator_delta_set = HashMap::from([
mock_add("15a", 15),
mock_add("16a", 116),
mock_add("17a", 134),
]);
assert_eq!(
change_set.aggregator_write_set(),
change_set.aggregator_v1_write_set(),
&expected_aggregator_write_set
);
assert_eq!(
change_set.aggregator_delta_set(),
change_set.aggregator_v1_delta_set(),
&expected_aggregator_delta_set
);
}
Expand Down
11 changes: 6 additions & 5 deletions aptos-move/aptos-vm-types/src/tests/test_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use aptos_types::{
};
use claims::{assert_err, assert_matches, assert_ok};
use move_core_types::vm_status::{AbortLocation, VMStatus};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};

fn assert_eq_outputs(vm_output: &VMOutput, txn_output: TransactionOutput) {
let vm_output_writes = &vm_output
Expand Down Expand Up @@ -77,8 +77,7 @@ fn test_ok_output_equality_with_deltas() {
.clone()
.into_transaction_output_with_materialized_deltas(vec![mock_modify("3", 400)]);

let expected_aggregator_write_set =
BTreeMap::from([mock_modify("2", 2), mock_modify("3", 400)]);
let expected_aggregator_write_set = HashMap::from([mock_modify("2", 2), mock_modify("3", 400)]);
assert_eq!(
materialized_vm_output.change_set().resource_write_set(),
vm_output.change_set().resource_write_set()
Expand All @@ -88,12 +87,14 @@ fn test_ok_output_equality_with_deltas() {
vm_output.change_set().module_write_set()
);
assert_eq!(
materialized_vm_output.change_set().aggregator_write_set(),
materialized_vm_output
.change_set()
.aggregator_v1_write_set(),
&expected_aggregator_write_set
);
assert!(materialized_vm_output
.change_set()
.aggregator_delta_set()
.aggregator_v1_delta_set()
.is_empty());
assert_eq!(
vm_output.fee_statement(),
Expand Down
10 changes: 5 additions & 5 deletions aptos-move/aptos-vm-types/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_types::{
write_set::WriteOp,
};
use move_core_types::vm_status::VMStatus;
use std::collections::BTreeMap;
use std::collections::HashMap;

pub(crate) struct MockChangeSetChecker;

Expand Down Expand Up @@ -57,10 +57,10 @@ pub(crate) fn build_change_set(
aggregator_delta_set: impl IntoIterator<Item = (StateKey, DeltaOp)>,
) -> VMChangeSet {
VMChangeSet::new(
BTreeMap::from_iter(resource_write_set),
BTreeMap::from_iter(module_write_set),
BTreeMap::from_iter(aggregator_write_set),
BTreeMap::from_iter(aggregator_delta_set),
HashMap::from_iter(resource_write_set),
HashMap::from_iter(module_write_set),
HashMap::from_iter(aggregator_write_set),
HashMap::from_iter(aggregator_delta_set),
vec![],
&MockChangeSetChecker,
)
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ impl AptosVM {
change_set: &VMChangeSet,
) -> Result<(), VMStatus> {
assert!(
change_set.aggregator_write_set().is_empty(),
change_set.aggregator_v1_write_set().is_empty(),
"Waypoint change set should not have any aggregator writes."
);

Expand Down
43 changes: 33 additions & 10 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use aptos_vm_types::output::VMOutput;
use move_core_types::vm_status::VMStatus;
use once_cell::sync::OnceCell;
use rayon::{prelude::*, ThreadPool};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

impl BlockExecutorTransaction for PreprocessedTransaction {
type Event = ContractEvent;
Expand Down Expand Up @@ -86,31 +86,54 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
Self::new(VMOutput::empty_with_status(TransactionStatus::Retry))
}

// TODO: get rid of the cloning data-structures in the following APIs.

/// Should never be called after incorporate_delta_writes, as it
/// will consume vm_output to prepare an output with deltas.
fn get_writes(&self) -> Vec<(StateKey, WriteOp)> {
fn resource_write_set(&self) -> HashMap<StateKey, WriteOp> {
self.vm_output
.lock()
.as_ref()
.expect("Output to be set to get writes")
.change_set()
.write_set_iter()
.map(|(key, op)| (key.clone(), op.clone()))
.collect()
.resource_write_set()
.clone()
}

/// Should never be called after incorporate_delta_writes, as it
/// will consume vm_output to prepare an output with deltas.
fn module_write_set(&self) -> HashMap<StateKey, WriteOp> {
self.vm_output
.lock()
.as_ref()
.expect("Output to be set to get writes")
.change_set()
.module_write_set()
.clone()
}

/// Should never be called after incorporate_delta_writes, as it
/// will consume vm_output to prepare an output with deltas.
fn get_deltas(&self) -> Vec<(StateKey, DeltaOp)> {
fn aggregator_v1_write_set(&self) -> HashMap<StateKey, WriteOp> {
self.vm_output
.lock()
.as_ref()
.expect("Output to be set to get writes")
.change_set()
.aggregator_v1_write_set()
.clone()
}

/// Should never be called after incorporate_delta_writes, as it
/// will consume vm_output to prepare an output with deltas.
fn aggregator_v1_delta_set(&self) -> HashMap<StateKey, DeltaOp> {
self.vm_output
.lock()
.as_ref()
.expect("Output to be set to get deltas")
.change_set()
.aggregator_delta_set()
.iter()
.map(|(key, op)| (key.clone(), *op))
.collect()
.aggregator_v1_delta_set()
.clone()
}

/// Should never be called after incorporate_delta_writes, as it
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<'a, S: 'a + StateView + Sync> ExecutorTask for AptosExecutorTask<'a, S> {
{
Ok((vm_status, mut vm_output, sender)) => {
if materialize_deltas {
// TODO: Integrate delta application failure.
// TODO: Integrate aggregator v2.
vm_output = vm_output
.try_materialize(view)
.expect("Delta materialization failed");
Expand Down

0 comments on commit c6a1074

Please sign in to comment.