Skip to content

Commit

Permalink
Merge pull request #1145 from holochain/fix-async-db
Browse files Browse the repository at this point in the history
Make all db transactions async
  • Loading branch information
freesig committed Dec 6, 2021
2 parents b25e566 + c349484 commit 3b5acf5
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 109 deletions.
3 changes: 3 additions & 0 deletions crates/holochain/src/conductor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub enum ConductorError {
#[error(transparent)]
StateMutationError(#[from] holochain_state::mutations::StateMutationError),

#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),

#[error(transparent)]
RusqliteError(#[from] rusqlite::Error),

Expand Down
33 changes: 23 additions & 10 deletions crates/holochain/src/conductor/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ impl<DS: DnaStore + 'static> ConductorHandleT for ConductorHandleImpl<DS> {
..
} => {
let env = { self.p2p_env(space) };
let res = get_agent_info_signed(env, kitsune_space, kitsune_agent)
let res = get_agent_info_signed(env.into(), kitsune_space, kitsune_agent)
.await
.map_err(holochain_p2p::HolochainP2pError::other);
respond.respond(Ok(async move { res }.boxed().into()));
}
Expand All @@ -613,7 +614,8 @@ impl<DS: DnaStore + 'static> ConductorHandleT for ConductorHandleImpl<DS> {
..
} => {
let env = { self.p2p_env(space) };
let res = list_all_agent_info(env, kitsune_space)
let res = list_all_agent_info(env.into(), kitsune_space)
.await
.map(|infos| match agents {
Some(agents) => infos
.into_iter()
Expand All @@ -633,10 +635,15 @@ impl<DS: DnaStore + 'static> ConductorHandleT for ConductorHandleImpl<DS> {
} => {
use holochain_sqlite::db::AsP2pAgentStoreConExt;
let env = { self.p2p_env(space) };
let res = env
.conn()?
.p2p_gossip_query_agents(since_ms, until_ms, (*arc_set).clone())
.map_err(holochain_p2p::HolochainP2pError::other);
let permit = env.conn_permit().await;
let res = tokio::task::spawn_blocking(move || {
let mut conn = env.from_permit(permit)?;
conn.p2p_gossip_query_agents(since_ms, until_ms, (*arc_set).clone())
})
.await;
let res = res
.map_err(holochain_p2p::HolochainP2pError::other)
.and_then(|r| r.map_err(holochain_p2p::HolochainP2pError::other));
respond.respond(Ok(async move { res }.boxed().into()));
}
QueryAgentInfoSignedNearBasis {
Expand All @@ -647,9 +654,14 @@ impl<DS: DnaStore + 'static> ConductorHandleT for ConductorHandleImpl<DS> {
..
} => {
let env = { self.p2p_env(space) };
let res =
list_all_agent_info_signed_near_basis(env, kitsune_space, basis_loc, limit)
.map_err(holochain_p2p::HolochainP2pError::other);
let res = list_all_agent_info_signed_near_basis(
env.into(),
kitsune_space,
basis_loc,
limit,
)
.await
.map_err(holochain_p2p::HolochainP2pError::other);
respond.respond(Ok(async move { res }.boxed().into()));
}
QueryPeerDensity {
Expand All @@ -659,7 +671,8 @@ impl<DS: DnaStore + 'static> ConductorHandleT for ConductorHandleImpl<DS> {
..
} => {
let env = { self.p2p_env(space) };
let res = query_peer_density(env, kitsune_space, dht_arc)
let res = query_peer_density(env.into(), kitsune_space, dht_arc)
.await
.map_err(holochain_p2p::HolochainP2pError::other);
respond.respond(Ok(async move { res }.boxed().into()));
}
Expand Down
4 changes: 3 additions & 1 deletion crates/holochain/src/conductor/interface/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,9 @@ pub mod test {
{
let mut count = 0;
for env in p2p.lock().values() {
count += env.conn().unwrap().p2p_list_agents().unwrap().len();
let mut conn = env.conn().unwrap();
let txn = conn.transaction().unwrap();
count += txn.p2p_list_agents().unwrap().len();
}
count
},
Expand Down
53 changes: 39 additions & 14 deletions crates/holochain/src/conductor/p2p_agent_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,42 +138,62 @@ pub async fn exchange_peer_info(envs: Vec<DbWrite<DbKindP2pAgentStore>>) {
}
}

async fn run_query<F, R>(db: DbRead<DbKindP2pAgentStore>, f: F) -> ConductorResult<R>
where
R: Send + 'static,
F: FnOnce(PConnGuard) -> ConductorResult<R> + Send + 'static,
{
let permit = db.conn_permit().await;
let r = tokio::task::spawn_blocking(move || {
let conn = db.from_permit(permit)?;
f(conn)
})
.await??;
Ok(r)
}

/// Get agent info for a single agent
pub fn get_agent_info_signed(
environ: DbWrite<DbKindP2pAgentStore>,
pub async fn get_agent_info_signed(
environ: DbRead<DbKindP2pAgentStore>,
_kitsune_space: Arc<kitsune_p2p::KitsuneSpace>,
kitsune_agent: Arc<kitsune_p2p::KitsuneAgent>,
) -> ConductorResult<Option<AgentInfoSigned>> {
Ok(environ.conn()?.p2p_get_agent(&kitsune_agent)?)
run_query(environ, move |mut conn| {
Ok(conn.p2p_get_agent(&kitsune_agent)?)
})
.await
}

/// Get all agent info for a single space
pub fn list_all_agent_info(
environ: DbWrite<DbKindP2pAgentStore>,
pub async fn list_all_agent_info(
environ: DbRead<DbKindP2pAgentStore>,
_kitsune_space: Arc<kitsune_p2p::KitsuneSpace>,
) -> ConductorResult<Vec<AgentInfoSigned>> {
Ok(environ.conn()?.p2p_list_agents()?)
run_query(environ, move |mut conn| Ok(conn.p2p_list_agents()?)).await
}

/// Get all agent info for a single space near a basis loc
pub fn list_all_agent_info_signed_near_basis(
environ: DbWrite<DbKindP2pAgentStore>,
pub async fn list_all_agent_info_signed_near_basis(
environ: DbRead<DbKindP2pAgentStore>,
_kitsune_space: Arc<kitsune_p2p::KitsuneSpace>,
basis_loc: u32,
limit: u32,
) -> ConductorResult<Vec<AgentInfoSigned>> {
Ok(environ.conn()?.p2p_query_near_basis(basis_loc, limit)?)
run_query(environ, move |mut conn| {
Ok(conn.p2p_query_near_basis(basis_loc, limit)?)
})
.await
}

/// Get the peer density an agent is currently seeing within
/// a given [`DhtArc`]
pub fn query_peer_density(
env: DbWrite<DbKindP2pAgentStore>,
pub async fn query_peer_density(
env: DbRead<DbKindP2pAgentStore>,
kitsune_space: Arc<kitsune_p2p::KitsuneSpace>,
dht_arc: DhtArc,
) -> ConductorResult<PeerDensity> {
let now = now();
let arcs = env.conn()?.p2p_list_agents()?;
let arcs = run_query(env, move |mut conn| Ok(conn.p2p_list_agents()?)).await?;
let arcs = arcs
.into_iter()
.filter_map(|v| {
Expand Down Expand Up @@ -296,7 +316,7 @@ mod tests {
p2p_put(&env, &agent_info_signed).await.unwrap();

let ret = env
.conn()
.from_permit(env.conn_permit().await)
.unwrap()
.p2p_get_agent(&agent_info_signed.agent)
.unwrap();
Expand All @@ -311,7 +331,12 @@ mod tests {
let env = t_env.env();

// - Check no data in the store to start
let count = env.conn().unwrap().p2p_list_agents().unwrap().len();
let count = env
.from_permit(env.conn_permit().await)
.unwrap()
.p2p_list_agents()
.unwrap()
.len();

assert_eq!(count, 0);

Expand Down
10 changes: 8 additions & 2 deletions crates/holochain/src/core/workflow/app_validation_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async fn app_validation_workflow_inner(
) -> WorkflowResult<WorkComplete> {
let env = workspace.dht_env.clone().into();
let sorted_ops = validation_query::get_ops_to_app_validate(&env).await?;
tracing::debug!("app validating {} ops", sorted_ops.len());

// Validate all the ops
let iter = sorted_ops.into_iter().map(|so| {
Expand All @@ -103,10 +104,12 @@ async fn app_validation_workflow_inner(
.buffer_unordered(NUM_CONCURRENT_OPS)
.ready_chunks(NUM_CONCURRENT_OPS);

let mut total = 0;
while let Some(chunk) = iter.next().await {
workspace
let t = workspace
.dht_env
.async_commit(move |mut txn| {
let mut total = 0;
for outcome in chunk {
let (op_hash, _, op_light, outcome) = outcome;
// Get the outcome or return the error
Expand All @@ -120,6 +123,7 @@ async fn app_validation_workflow_inner(
}
match outcome {
Outcome::Accepted => {
total += 1;
put_integration_limbo(&mut txn, op_hash, ValidationStatus::Valid)?;
}
Outcome::AwaitingDeps(deps) => {
Expand All @@ -132,10 +136,12 @@ async fn app_validation_workflow_inner(
}
}
}
WorkflowResult::Ok(())
WorkflowResult::Ok(total)
})
.await?;
total += t;
}
tracing::debug!("accepted {} ops", total);
Ok(WorkComplete::Complete)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ async fn run_test(
.get_dht_env(&alice_cell_id.dna_hash())
.unwrap();
wait_for_integration(&alice_env, expected_count, num_attempts, delay_per_attempt).await;
holochain_state::prelude::dump_tmp(&alice_env);

let alice_env = conductors[0]
.get_dht_env(&alice_cell_id.dna_hash())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ fn batch_check_end(batch: &IncomingOpsBatch) -> Option<Vec<InOpBatchEntry>> {
})
}

#[instrument(skip(txn, ops))]
fn batch_process_entry(
txn: &mut rusqlite::Transaction<'_>,
request_validation_receipt: bool,
Expand All @@ -115,6 +116,7 @@ fn batch_process_entry(
}
}

tracing::debug!("Inserting {} ops", to_pending.len());
add_to_pending(txn, to_pending, request_validation_receipt)?;

Ok(())
Expand Down
10 changes: 8 additions & 2 deletions crates/holochain/src/core/workflow/sys_validation_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn sys_validation_workflow_inner(
) -> WorkflowResult<WorkComplete> {
let env = workspace.dht_env.clone();
let sorted_ops = validation_query::get_ops_to_sys_validate(&env).await?;
tracing::debug!("sys validating {} ops", sorted_ops.len());

// Process each op
let iter = sorted_ops.into_iter().map(|so| {
Expand Down Expand Up @@ -112,14 +113,17 @@ async fn sys_validation_workflow_inner(
.buffer_unordered(NUM_CONCURRENT_OPS)
.ready_chunks(NUM_CONCURRENT_OPS);

let mut total = 0;
while let Some(chunk) = iter.next().await {
space
let t = space
.dht_env
.async_commit(move |mut txn| {
let mut total = 0;
for outcome in chunk {
let (op_hash, outcome) = outcome?;
match outcome {
Outcome::Accepted => {
total += 1;
put_validation_limbo(
&mut txn,
op_hash,
Expand Down Expand Up @@ -155,10 +159,12 @@ async fn sys_validation_workflow_inner(
}
}
}
WorkflowResult::Ok(())
WorkflowResult::Ok(total)
})
.await?;
total += t;
}
tracing::debug!("accepted {} ops", total);
Ok(WorkComplete::Complete)
}

Expand Down
14 changes: 7 additions & 7 deletions crates/holochain/tests/sharded_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn mock_network_sharded_gossip() {
},
};
use holochain_p2p::{dht_arc::DhtLocation, AgentPubKeyExt, DnaHashExt};
use holochain_sqlite::db::AsP2pAgentStoreConExt;
use holochain_sqlite::db::AsP2pStateTxExt;
use kitsune_p2p::TransportConfig;
use kitsune_p2p_types::tx2::tx2_adapter::AdapterFactory;

Expand Down Expand Up @@ -582,13 +582,13 @@ async fn mock_network_sharded_gossip() {
let alice_info = alice_info.clone();
async move {
loop {
let info = alice_p2p_env
.conn()
.unwrap()
.p2p_get_agent(&alice_kit)
.unwrap();
{
*alice_info.lock() = info;
let mut conn = alice_p2p_env.conn().unwrap();
let txn = conn.transaction().unwrap();
let info = txn.p2p_get_agent(&alice_kit).unwrap();
{
*alice_info.lock() = info;
}
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Expand Down
1 change: 1 addition & 0 deletions crates/holochain_cascade/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## \[Unreleased\]
- Fixes database queries that were running on the runtime thread instead of the background thread. Makes the connections wait for a permit before taking a database connection from the pool.

## 0.0.18

Expand Down

0 comments on commit 3b5acf5

Please sign in to comment.