Skip to content

Commit

Permalink
Unify block executor output processing and delta resolution, optimize…
Browse files Browse the repository at this point in the history
… aggregator key detection
  • Loading branch information
gelash committed Nov 26, 2022
1 parent 7a25b0d commit b423013
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 129 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 14 additions & 76 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ use aptos_aggregator::{delta_change_set::DeltaOp, transaction::TransactionOutput
use aptos_block_executor::{
errors::Error,
executor::{BlockExecutor, RAYON_EXEC_POOL},
output_delta_resolver::OutputDeltaResolver,
task::{
Transaction as BlockExecutorTransaction,
TransactionOutput as BlockExecutorTransactionOutput,
},
view::ResolvedData,
};
use aptos_logger::debug;
use aptos_state_view::StateView;
use aptos_types::{
state_store::state_key::StateKey,
Expand All @@ -29,7 +26,6 @@ use aptos_types::{
};
use move_core_types::vm_status::VMStatus;
use rayon::prelude::*;
use std::collections::HashMap;

impl BlockExecutorTransaction for PreprocessedTransaction {
type Key = StateKey;
Expand All @@ -47,10 +43,6 @@ impl AptosTransactionOutput {
pub fn into(self) -> TransactionOutputExt {
self.0
}

pub fn as_ref(&self) -> &TransactionOutputExt {
&self.0
}
}

impl BlockExecutorTransactionOutput for AptosTransactionOutput {
Expand Down Expand Up @@ -87,48 +79,6 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
pub struct BlockAptosVM();

impl BlockAptosVM {
fn process_parallel_block_output<S: StateView>(
results: Vec<AptosTransactionOutput>,
delta_resolver: OutputDeltaResolver<StateKey, WriteOp>,
state_view: &S,
) -> Vec<TransactionOutput> {
// TODO: MVHashmap, and then delta resolver should track aggregator base values.
let mut aggregator_base_values: HashMap<StateKey, anyhow::Result<ResolvedData>> =
HashMap::new();
for res in results.iter() {
for (key, _) in res.as_ref().delta_change_set().iter() {
if !aggregator_base_values.contains_key(key) {
aggregator_base_values.insert(key.clone(), state_view.get_state_value(key));
}
}
}

let materialized_deltas =
delta_resolver.resolve(aggregator_base_values.into_iter().collect(), results.len());

results
.into_iter()
.zip(materialized_deltas.into_iter())
.map(|(res, delta_writes)| {
res.into()
.output_with_delta_writes(WriteSetMut::new(delta_writes))
})
.collect()
}

fn process_sequential_block_output(
results: Vec<AptosTransactionOutput>,
) -> Vec<TransactionOutput> {
results
.into_iter()
.map(|res| {
let (deltas, output) = res.into().into();
debug_assert!(deltas.is_empty(), "[Execution] Deltas must be materialized");
output
})
.collect()
}

pub fn execute_block<S: StateView>(
transactions: Vec<Transaction>,
state_view: &S,
Expand All @@ -149,33 +99,21 @@ impl BlockAptosVM {
BlockExecutor::<PreprocessedTransaction, AptosVMWrapper<S>>::new(concurrency_level);
let data_view = AptosDataView(state_view);

let mut ret = if concurrency_level > 1 {
executor
.execute_transactions_parallel(state_view, &signature_verified_block, &data_view)
.map(|(results, delta_resolver)| {
Self::process_parallel_block_output(results, delta_resolver, state_view)
let ret = executor
.execute_block(state_view, signature_verified_block, &data_view)
.map(|results| {
// Process the outputs in parallel, combining delta writes with other writes.
RAYON_EXEC_POOL.install(|| {
results
.into_par_iter()
.map(|(output, delta_writes)| {
output // AptosTransactionOutput
.into() // TransactionOutputExt
.output_with_delta_writes(WriteSetMut::new(delta_writes))
})
.collect()
})
} else {
executor
.execute_transactions_sequential(state_view, &signature_verified_block, &data_view)
.map(Self::process_sequential_block_output)
};

if ret == Err(Error::ModulePathReadWrite) {
debug!("[Execution]: Module read & written, sequential fallback");

ret = executor
.execute_transactions_sequential(state_view, &signature_verified_block, &data_view)
.map(Self::process_sequential_block_output);
}

// Explicit async drop. Happens here because we can't currently move to
// BlockExecutor due to the Module publishing fallback. TODO: fix after
// module publishing fallback is removed.
RAYON_EXEC_POOL.spawn(move || {
// Explicit async drops.
drop(signature_verified_block);
});
});

match ret {
Ok(outputs) => Ok(outputs),
Expand Down
1 change: 1 addition & 0 deletions aptos-move/block-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-aggregator = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-state-view = { workspace = true }
aptos-types = { workspace = true }
Expand Down
66 changes: 58 additions & 8 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::{
txn_last_input_output::TxnLastInputOutput,
view::{MVHashMapView, VersionedView},
};
use aptos_logger::debug;
use aptos_types::write_set::WriteOp;
use mvhashmap::{MVHashMap, MVHashMapError, MVHashMapOutput};
use num_cpus;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -237,15 +239,15 @@ where
}
}

pub fn execute_transactions_parallel(
pub(crate) fn execute_transactions_parallel(
&self,
executor_initial_arguments: E::Argument,
signature_verified_block: &Vec<T>,
base_view: &dyn DataView<T = T>,
) -> Result<
(
Vec<E::Output>,
OutputDeltaResolver<<T as Transaction>::Key, <T as Transaction>::Value>,
Vec<Vec<(<T as Transaction>::Key, WriteOp)>>, // Resolved delta writes per txn
),
E::Error,
> {
Expand All @@ -254,7 +256,7 @@ where
let versioned_data_cache = MVHashMap::new();

if signature_verified_block.is_empty() {
return Ok((vec![], OutputDeltaResolver::new(versioned_data_cache)));
return Ok((vec![], vec![]));
}

let num_txns = signature_verified_block.len();
Expand Down Expand Up @@ -311,15 +313,13 @@ where
Some(err) => Err(err),
None => {
final_results.resize_with(num_txns, E::Output::skip_output);
Ok((
final_results,
OutputDeltaResolver::new(versioned_data_cache),
))
let delta_resolver = OutputDeltaResolver::new(versioned_data_cache);
Ok((final_results, delta_resolver.resolve(base_view, num_txns)))
}
}
}

pub fn execute_transactions_sequential(
pub(crate) fn execute_transactions_sequential(
&self,
executor_arguments: E::Argument,
signature_verified_block: &Vec<T>,
Expand Down Expand Up @@ -365,4 +365,54 @@ where
ret.resize_with(num_txns, E::Output::skip_output);
Ok(ret)
}

pub fn execute_block(
&self,
executor_arguments: E::Argument,
signature_verified_block: Vec<T>,
base_view: &dyn DataView<T = T>,
) -> Result<Vec<(E::Output, Vec<(<T as Transaction>::Key, WriteOp)>)>, E::Error> {
let num_txns = signature_verified_block.len();

let mut ret = if self.concurrency_level > 1 {
self.execute_transactions_parallel(
executor_arguments,
&signature_verified_block,
base_view,
)
} else {
self.execute_transactions_sequential(
executor_arguments,
&signature_verified_block,
base_view,
)
.map(|results| (results, vec![Vec::new(); num_txns]))
};

// TODO: change to is_err_and(|e| {}) once stable.
if ret.is_err() && *ret.as_ref().err().unwrap() == Error::ModulePathReadWrite {
debug!("[Execution]: Module read & written, sequential fallback");

ret = self
.execute_transactions_sequential(
executor_arguments,
&signature_verified_block,
base_view,
)
.map(|results| (results, vec![Vec::new(); num_txns]))
}

RAYON_EXEC_POOL.spawn(move || {
// Explicit async drops.
drop(signature_verified_block);
});

// TODO: parallelize when necessary.
ret.map(|(results, resolved_deltas)| {
results
.into_iter()
.zip(resolved_deltas.into_iter())
.collect()
})
}
}
38 changes: 23 additions & 15 deletions aptos-move/block-executor/src/output_delta_resolver.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::view::ResolvedData;
use crate::{
executor::RAYON_EXEC_POOL,
task::{DataView, Transaction},
};
use aptos_aggregator::delta_change_set::{deserialize, serialize};
use aptos_types::write_set::{TransactionWrite, WriteOp};
use mvhashmap::{EntryCell, MVHashMap};
use std::{hash::Hash, thread::spawn};

pub struct OutputDeltaResolver<K, V> {
versioned_outputs: MVHashMap<K, V>,
pub(crate) struct OutputDeltaResolver<T: Transaction> {
versioned_outputs: MVHashMap<<T as Transaction>::Key, <T as Transaction>::Value>,
}

impl<K: Hash + Clone + Eq + Send + 'static, V: TransactionWrite + Send + Sync + 'static>
OutputDeltaResolver<K, V>
{
pub fn new(versioned_outputs: MVHashMap<K, V>) -> Self {
impl<T: Transaction> OutputDeltaResolver<T> {
// When inherent associated types become available do:
// type K = <T as Transaction>::Key;
// type V = <T as Transaction>::Value;

pub(crate) fn new(
versioned_outputs: MVHashMap<<T as Transaction>::Key, <T as Transaction>::Value>,
) -> Self {
Self { versioned_outputs }
}

/// Takes Self, vector of all involved aggregator keys (each with at least one
/// delta to resolve in the output), resolved values from storage for each key,
/// and blocksize, and returns a Vec of materialized deltas per transaction index.
pub fn resolve(
pub(crate) fn resolve(
self,
aggregator_keys: Vec<(K, anyhow::Result<ResolvedData>)>,
base_view: &dyn DataView<T = T>,
block_size: usize,
) -> Vec<Vec<(K, WriteOp)>> {
let mut ret: Vec<Vec<(K, WriteOp)>> = (0..block_size).map(|_| Vec::new()).collect();
) -> Vec<Vec<(<T as Transaction>::Key, WriteOp)>> {
let mut ret: Vec<Vec<(<T as Transaction>::Key, WriteOp)>> =
(0..block_size).map(|_| Vec::new()).collect();

// TODO: with more deltas, re-use executor threads and process in parallel.
for (key, storage_val) in aggregator_keys.into_iter() {
let mut latest_value: Option<u128> = match storage_val
for key in self.versioned_outputs.aggregator_keys() {
let mut latest_value: Option<u128> = match base_view
.get_state_value(&key)
.ok() // Was anything found in storage
.map(|value| value.map(|bytes| deserialize(&bytes)))
{
Expand Down Expand Up @@ -66,7 +74,7 @@ impl<K: Hash + Clone + Eq + Send + 'static, V: TransactionWrite + Send + Sync +
}
}

spawn(move || drop(self));
RAYON_EXEC_POOL.spawn(move || drop(self));

ret
}
Expand Down
20 changes: 4 additions & 16 deletions aptos-move/block-executor/src/proptest_types/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::{
executor::BlockExecutor,
proptest_types::types::{
DeltaDataView, EmptyDataView, ExpectedOutput, KeyType, Task, Transaction, TransactionGen,
TransactionGenParams, ValueType, STORAGE_AGGREGATOR_VALUE,
TransactionGenParams, ValueType,
},
};
use aptos_aggregator::delta_change_set::serialize;
use claims::assert_ok;
use num_cpus;
use proptest::{
Expand Down Expand Up @@ -166,9 +165,10 @@ fn deltas_writes_mixed() {
.expect("creating a new value should succeed")
.current();

// Do not allow deletions as resolver can't apply delta to a deleted aggregator.
let transactions: Vec<_> = transaction_gen
.into_iter()
.map(|txn_gen| txn_gen.materialize_with_deltas(&universe, 15, true))
.map(|txn_gen| txn_gen.materialize_with_deltas(&universe, 15, false))
.collect();

let data_view = DeltaDataView::<KeyType<[u8; 32]>, ValueType<[u8; 32]>> {
Expand Down Expand Up @@ -224,19 +224,7 @@ fn deltas_resolver() {
>::new(num_cpus::get())
.execute_transactions_parallel((), &transactions, &data_view);

let (output, delta_resolver) = output.unwrap();
let resolved = delta_resolver.resolve(
(15..50)
.map(|i| {
(
KeyType(universe[i], false),
Ok(Some(serialize(&STORAGE_AGGREGATOR_VALUE))),
)
})
.collect(),
num_txns,
);

let (output, resolved) = output.unwrap();
let baseline = ExpectedOutput::generate_baseline(&transactions, Some(resolved));
baseline.assert_output(&Ok(output));
}
Expand Down
4 changes: 2 additions & 2 deletions aptos-move/block-executor/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub trait ExecutorTask: Sync {
type Output: TransactionOutput<T = Self::T> + 'static;

/// Type of error when the executor failed to process a transaction and needs to abort.
type Error: Clone + Send + Sync + 'static;
type Error: Clone + Send + Sync + Eq + 'static;

/// Type to intialize the single thread transaction executor. Copy and Sync are required because
/// we will create an instance of executor on each individual thread.
Expand All @@ -79,7 +79,7 @@ pub trait ExecutorTask: Sync {
) -> ExecutionStatus<Self::Output, Self::Error>;
}

/// Trait for execution result of a transaction.
/// Trait for execution result of a single transaction.
pub trait TransactionOutput: Send + Sync {
/// Type of transaction and its associated key and value.
type T: Transaction;
Expand Down

0 comments on commit b423013

Please sign in to comment.