From 3de1e4f66ab59979bf08f0daed1a81e752d45a20 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Wed, 27 Mar 2024 14:49:17 +0100 Subject: [PATCH 1/8] Fix bad subcommand attribute --- crates/corrosion/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index c9601c0b..39ff59fe 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -624,7 +624,6 @@ enum Command { Tls(TlsCommand), /// Clear overwritten versions - #[command(subcommand)] CompactEmpties, } From 7296f881678a4bda2f51540597e78836d8d11a97 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Wed, 27 Mar 2024 15:12:34 +0100 Subject: [PATCH 2/8] Return a lot more information from the feedback channel --- crates/corro-agent/src/agent/util.rs | 75 +++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index c1d46fdb..3415d381 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -177,10 +177,21 @@ pub async fn clear_overwritten_versions( "Compacting changes for {} actors", bookie_clone.len() )) - .await; + .await + .unwrap(); } for (actor_id, booked) in bookie_clone { + if let Some(ref tx) = feedback { + if tx + .send(format!("Starting change compaction for {actor_id}")) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + }; + } + // pull the current db version -> version map at the present time // these are only updated _after_ a transaction has been committed, via a write lock // so it should be representative of the current state. @@ -197,18 +208,30 @@ pub async fn clear_overwritten_versions( Ok(booked) => booked.current_versions(), Err(_) => { info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now"); + + if let Some(ref tx) = feedback { + if tx + .send("timed out acquiring read lock on bookkeeping".into()) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + }; + } + return None; } } }; if versions.is_empty() { - return None; - } + if let Some(ref tx) = feedback { + if tx.send("No versions to compact".into()).await.is_err() { + error!("failed to send feedback payload to caller"); + }; + } - if let Some(ref tx) = feedback { - tx.send(format!("Starting change compaction for {actor_id}")) - .await; + return None; } // we're using a read connection here, starting a read-only transaction @@ -229,16 +252,47 @@ pub async fn clear_overwritten_versions( cleared.len(), start.elapsed() ); + + if let Some(ref tx) = feedback { + if tx + .send(format!("Aggregated {} DB versions to clear", cleared.len())) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + }; + } + cleared } Err(e) => { error!("could not get cleared versions: {e}"); + + if let Some(ref tx) = feedback { + if tx + .send(format!("failed to get cleared versions: {e}")) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + }; + } + return None; } } } Err(e) => { error!("could not get read connection: {e}"); + if let Some(ref tx) = feedback { + if tx + .send(format!("failed to get read connection: {e}")) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + }; + } return None; } }; @@ -276,6 +330,15 @@ pub async fn clear_overwritten_versions( // schedule for clearing in the background task if let Err(e) = agent.tx_empty().try_send((actor_id, range.clone())) { error!("could not schedule version to be cleared: {e}"); + if let Some(ref tx) = feedback { + if tx + .send(format!("failed to schedule version to be cleared: {e}")) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + }; + } } else { inserted += 1; } From d5ce4e38603c7311d21d2d3ed41547c2ad15f8f8 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 28 Mar 2024 12:14:08 +0100 Subject: [PATCH 3/8] Remove empties channel from compaction loop --- crates/corro-agent/src/agent/util.rs | 49 +++++++++++++++------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 3415d381..10db866a 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -189,7 +189,7 @@ pub async fn clear_overwritten_versions( .is_err() { error!("failed to send feedback payload to caller"); - }; + } } // pull the current db version -> version map at the present time @@ -216,7 +216,7 @@ pub async fn clear_overwritten_versions( .is_err() { error!("failed to send feedback payload to caller"); - }; + } } return None; @@ -228,7 +228,7 @@ pub async fn clear_overwritten_versions( if let Some(ref tx) = feedback { if tx.send("No versions to compact".into()).await.is_err() { error!("failed to send feedback payload to caller"); - }; + } } return None; @@ -260,7 +260,7 @@ pub async fn clear_overwritten_versions( .is_err() { error!("failed to send feedback payload to caller"); - }; + } } cleared @@ -275,7 +275,7 @@ pub async fn clear_overwritten_versions( .is_err() { error!("failed to send feedback payload to caller"); - }; + } } return None; @@ -291,7 +291,7 @@ pub async fn clear_overwritten_versions( .is_err() { error!("failed to send feedback payload to caller"); - }; + } } return None; } @@ -321,27 +321,30 @@ pub async fn clear_overwritten_versions( } } - // find any affected cleared ranges - for range in to_clear + let mut empties_map: BTreeMap> = + BTreeMap::new(); + to_clear .iter() - .filter_map(|(_, v)| bookedw.cleared.get(v)) + .filter_map(|(_, v)| bookedw.cleared.get(v).cloned()) .dedup() - { - // schedule for clearing in the background task - if let Err(e) = agent.tx_empty().try_send((actor_id, range.clone())) { - error!("could not schedule version to be cleared: {e}"); - if let Some(ref tx) = feedback { - if tx - .send(format!("failed to schedule version to be cleared: {e}")) - .await - .is_err() - { - error!("failed to send feedback payload to caller"); - }; + .for_each(|versions| { + empties_map.entry(actor_id).or_default().insert(versions); + }); + let empties_len = empties_map.len(); + + if let Err(e) = process_completed_empties(agent.clone(), empties_map).await { + error!("could not schedule version to be cleared: {e}"); + if let Some(ref tx) = feedback { + if tx + .send(format!("failed to schedule version to be cleared: {e}")) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); } - } else { - inserted += 1; } + } else { + inserted += empties_len; } } } From 1e99fb14189428f45f46504dc532c994b80e3cda Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 28 Mar 2024 12:44:23 +0100 Subject: [PATCH 4/8] Yield after inserting a set and use a deferred db transaction --- crates/corro-agent/src/agent/util.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 10db866a..abfc573f 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -346,6 +346,9 @@ pub async fn clear_overwritten_versions( } else { inserted += empties_len; } + + // take a lil nap + tokio::task::yield_now().await; } } @@ -846,7 +849,7 @@ pub async fn process_completed_empties( for ranges in v.chunks(25) { let mut conn = agent.pool().write_low().await?; block_in_place(|| { - let mut tx = conn.immediate_transaction()?; + let mut tx = conn.transaction()?; for range in ranges { let mut sp = tx.savepoint()?; @@ -870,6 +873,8 @@ pub async fn process_completed_empties( Ok::<_, eyre::Report>(()) })?; + + tokio::task::yield_now().await; } } From 2714403fb550dbaec544e478696b4793e16b9039 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 28 Mar 2024 14:59:38 +0100 Subject: [PATCH 5/8] Does this exist From 82cd0dab3007effc24b9f34d1e45bc34dec468e4 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 28 Mar 2024 15:56:27 +0100 Subject: [PATCH 6/8] Queue empties into a channel, but wait for capacity --- crates/corro-agent/src/agent/util.rs | 47 +++++++++++++++------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index abfc573f..d8b557c8 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -178,7 +178,7 @@ pub async fn clear_overwritten_versions( bookie_clone.len() )) .await - .unwrap(); + .map_err(|e| format!("{e}"))?; } for (actor_id, booked) in bookie_clone { @@ -230,8 +230,7 @@ pub async fn clear_overwritten_versions( error!("failed to send feedback payload to caller"); } } - - return None; + continue; } // we're using a read connection here, starting a read-only transaction @@ -321,34 +320,40 @@ pub async fn clear_overwritten_versions( } } - let mut empties_map: BTreeMap> = - BTreeMap::new(); - to_clear + // find any affected cleared ranges + for range in to_clear .iter() - .filter_map(|(_, v)| bookedw.cleared.get(v).cloned()) + .filter_map(|(_, v)| bookedw.cleared.get(v)) .dedup() - .for_each(|versions| { - empties_map.entry(actor_id).or_default().insert(versions); - }); - let empties_len = empties_map.len(); + { + // schedule for clearing in the background task + if let Err(e) = agent.tx_empty().send((actor_id, range.clone())).await { + error!("could not schedule version to be cleared: {e}"); + if let Some(ref tx) = feedback { + if tx + .send(format!("failed to get queue compaction set: {e}")) + .await + .is_err() + { + error!("failed to send feedback payload to caller"); + } + } + } else { + inserted += 1; + } - if let Err(e) = process_completed_empties(agent.clone(), empties_map).await { - error!("could not schedule version to be cleared: {e}"); if let Some(ref tx) = feedback { if tx - .send(format!("failed to schedule version to be cleared: {e}")) + .send(format!("Queued {inserted} empty versions to compact")) .await .is_err() { error!("failed to send feedback payload to caller"); } } - } else { - inserted += empties_len; - } - // take a lil nap - tokio::task::yield_now().await; + tokio::task::yield_now().await; + } } } @@ -849,7 +854,7 @@ pub async fn process_completed_empties( for ranges in v.chunks(25) { let mut conn = agent.pool().write_low().await?; block_in_place(|| { - let mut tx = conn.transaction()?; + let mut tx = conn.immediate_transaction()?; for range in ranges { let mut sp = tx.savepoint()?; @@ -873,8 +878,6 @@ pub async fn process_completed_empties( Ok::<_, eyre::Report>(()) })?; - - tokio::task::yield_now().await; } } From 26d1c899766319f81c0a97df194b9755a8f97a0b Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 28 Mar 2024 16:31:09 +0100 Subject: [PATCH 7/8] Make error output more useful --- crates/corro-admin/src/lib.rs | 15 +++-- crates/corro-agent/src/agent/util.rs | 90 +++++++++------------------- 2 files changed, 40 insertions(+), 65 deletions(-) diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index ac834f69..84917f69 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -205,15 +205,15 @@ async fn handle_conn( info_log(&mut stream, "compacting empty versions...").await; let (tx, mut rx) = mpsc::channel(4); - let (done_tx, done_rx) = oneshot::channel::>(); + let (done_tx, done_rx) = oneshot::channel::>(); let bookie = bookie.clone(); let agent = agent.clone(); tokio::task::spawn(async move { let pool = agent.pool(); match clear_overwritten_versions(&agent, &bookie, pool, Some(tx)).await { - Some(()) => done_tx.send(Some(())), - None => done_tx.send(None), + Ok(()) => done_tx.send(None), + Err(e) => done_tx.send(Some(e)), } }); @@ -224,7 +224,14 @@ async fn handle_conn( // when this loop exists it means our writer has // gone away/ the task completed match done_rx.await { - Ok(Some(())) => send_success(&mut stream).await, + Ok(None) => send_success(&mut stream).await, + Ok(Some(err)) => { + send_error( + &mut stream, + format!("An error occured while compacting empties: {err}"), + ) + .await + } _ => { send_error( &mut stream, diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index d8b557c8..6282d647 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -142,7 +142,7 @@ pub async fn clear_overwritten_versions_loop(agent: Agent, bookie: Bookie, sleep if clear_overwritten_versions(&agent, &bookie, pool, None) .await - .is_none() + .is_err() { continue; } @@ -155,7 +155,7 @@ pub async fn clear_overwritten_versions( bookie: &Bookie, pool: &SplitPool, feedback: Option>, -) -> Option<()> { +) -> Result<(), String> { let start = Instant::now(); let bookie_clone = { @@ -183,13 +183,9 @@ pub async fn clear_overwritten_versions( for (actor_id, booked) in bookie_clone { if let Some(ref tx) = feedback { - if tx - .send(format!("Starting change compaction for {actor_id}")) + tx.send(format!("Starting change compaction for {actor_id}")) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } + .map_err(|e| format!("{e}"))?; } // pull the current db version -> version map at the present time @@ -210,25 +206,21 @@ pub async fn clear_overwritten_versions( info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now"); if let Some(ref tx) = feedback { - if tx - .send("timed out acquiring read lock on bookkeeping".into()) + tx.send("timed out acquiring read lock on bookkeeping".into()) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } + .map_err(|e| format!("{e}"))?; } - return None; + return Err("Timed out acquiring read lock on bookkeeping".into()); } } }; if versions.is_empty() { if let Some(ref tx) = feedback { - if tx.send("No versions to compact".into()).await.is_err() { - error!("failed to send feedback payload to caller"); - } + tx.send("No versions to compact".into()) + .await + .map_err(|e| format!("{e}"))?; } continue; } @@ -253,13 +245,9 @@ pub async fn clear_overwritten_versions( ); if let Some(ref tx) = feedback { - if tx - .send(format!("Aggregated {} DB versions to clear", cleared.len())) + tx.send(format!("Aggregated {} DB versions to clear", cleared.len())) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } + .map_err(|e| format!("{e}"))?; } cleared @@ -268,31 +256,23 @@ pub async fn clear_overwritten_versions( error!("could not get cleared versions: {e}"); if let Some(ref tx) = feedback { - if tx - .send(format!("failed to get cleared versions: {e}")) + tx.send(format!("failed to get cleared versions: {e}")) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } + .map_err(|e| format!("{e}"))?; } - return None; + return Err("failed to cleared versions".into()); } } } Err(e) => { error!("could not get read connection: {e}"); if let Some(ref tx) = feedback { - if tx - .send(format!("failed to get read connection: {e}")) + tx.send(format!("failed to get read connection: {e}")) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } + .map_err(|e| format!("{e}"))?; } - return None; + return Err("could not get read connection".into()); } }; @@ -330,41 +310,29 @@ pub async fn clear_overwritten_versions( if let Err(e) = agent.tx_empty().send((actor_id, range.clone())).await { error!("could not schedule version to be cleared: {e}"); if let Some(ref tx) = feedback { - if tx - .send(format!("failed to get queue compaction set: {e}")) + tx.send(format!("failed to get queue compaction set: {e}")) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } + .map_err(|e| format!("{e}"))?; } } else { inserted += 1; } - if let Some(ref tx) = feedback { - if tx - .send(format!("Queued {inserted} empty versions to compact")) - .await - .is_err() - { - error!("failed to send feedback payload to caller"); - } - } - tokio::task::yield_now().await; } + + if let Some(ref tx) = feedback { + tx.send(format!("Queued {inserted} empty versions to compact")) + .await + .map_err(|e| format!("{e}"))?; + } } } if let Some(ref tx) = feedback { - if tx - .send(format!("Finshed compacting changes for {actor_id}")) + tx.send(format!("Finshed compacting changes for {actor_id}")) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - }; + .map_err(|e| format!("{e}"))?; } tokio::time::sleep(Duration::from_secs(1)).await; @@ -376,7 +344,7 @@ pub async fn clear_overwritten_versions( start.elapsed() ); - Some(()) + Ok(()) } /// Load the existing known member state and addresses From 04c221d51b4294b3153ce0a57fc831ecceecd379 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 28 Mar 2024 17:26:17 +0100 Subject: [PATCH 8/8] Finish process with a log about how long it took to compact db --- crates/corro-admin/src/lib.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index 84917f69..dcf530a0 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -207,6 +207,8 @@ async fn handle_conn( let (tx, mut rx) = mpsc::channel(4); let (done_tx, done_rx) = oneshot::channel::>(); + let started = std::time::Instant::now(); + let bookie = bookie.clone(); let agent = agent.clone(); tokio::task::spawn(async move { @@ -224,7 +226,19 @@ async fn handle_conn( // when this loop exists it means our writer has // gone away/ the task completed match done_rx.await { - Ok(None) => send_success(&mut stream).await, + Ok(None) => { + let elapsed = started.elapsed().as_secs_f64(); + info_log( + &mut stream, + format!( + "Finished compacting empty versions! Took {} seconds ({} minutes)", + elapsed, + elapsed / 60.0 + ), + ) + .await; + send_success(&mut stream).await + } Ok(Some(err)) => { send_error( &mut stream,