diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index ac834f69..dcf530a0 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -205,15 +205,17 @@ async fn handle_conn( info_log(&mut stream, "compacting empty versions...").await; let (tx, mut rx) = mpsc::channel(4); - let (done_tx, done_rx) = oneshot::channel::>(); + let (done_tx, done_rx) = oneshot::channel::>(); + + let started = std::time::Instant::now(); let bookie = bookie.clone(); let agent = agent.clone(); tokio::task::spawn(async move { let pool = agent.pool(); match clear_overwritten_versions(&agent, &bookie, pool, Some(tx)).await { - Some(()) => done_tx.send(Some(())), - None => done_tx.send(None), + Ok(()) => done_tx.send(None), + Err(e) => done_tx.send(Some(e)), } }); @@ -224,7 +226,26 @@ async fn handle_conn( // when this loop exists it means our writer has // gone away/ the task completed match done_rx.await { - Ok(Some(())) => send_success(&mut stream).await, + Ok(None) => { + let elapsed = started.elapsed().as_secs_f64(); + info_log( + &mut stream, + format!( + "Finished compacting empty versions! Took {} seconds ({} minutes)", + elapsed, + elapsed / 60.0 + ), + ) + .await; + send_success(&mut stream).await + } + Ok(Some(err)) => { + send_error( + &mut stream, + format!("An error occured while compacting empties: {err}"), + ) + .await + } _ => { send_error( &mut stream, diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index c1d46fdb..6282d647 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -142,7 +142,7 @@ pub async fn clear_overwritten_versions_loop(agent: Agent, bookie: Bookie, sleep if clear_overwritten_versions(&agent, &bookie, pool, None) .await - .is_none() + .is_err() { continue; } @@ -155,7 +155,7 @@ pub async fn clear_overwritten_versions( bookie: &Bookie, pool: &SplitPool, feedback: Option>, -) -> Option<()> { +) -> Result<(), String> { let start = Instant::now(); let bookie_clone = { @@ -177,10 +177,17 @@ pub async fn clear_overwritten_versions( "Compacting changes for {} actors", bookie_clone.len() )) - .await; + .await + .map_err(|e| format!("{e}"))?; } for (actor_id, booked) in bookie_clone { + if let Some(ref tx) = feedback { + tx.send(format!("Starting change compaction for {actor_id}")) + .await + .map_err(|e| format!("{e}"))?; + } + // pull the current db version -> version map at the present time // these are only updated _after_ a transaction has been committed, via a write lock // so it should be representative of the current state. @@ -197,18 +204,25 @@ pub async fn clear_overwritten_versions( Ok(booked) => booked.current_versions(), Err(_) => { info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now"); - return None; + + if let Some(ref tx) = feedback { + tx.send("timed out acquiring read lock on bookkeeping".into()) + .await + .map_err(|e| format!("{e}"))?; + } + + return Err("Timed out acquiring read lock on bookkeeping".into()); } } }; if versions.is_empty() { - return None; - } - - if let Some(ref tx) = feedback { - tx.send(format!("Starting change compaction for {actor_id}")) - .await; + if let Some(ref tx) = feedback { + tx.send("No versions to compact".into()) + .await + .map_err(|e| format!("{e}"))?; + } + continue; } // we're using a read connection here, starting a read-only transaction @@ -229,17 +243,36 @@ pub async fn clear_overwritten_versions( cleared.len(), start.elapsed() ); + + if let Some(ref tx) = feedback { + tx.send(format!("Aggregated {} DB versions to clear", cleared.len())) + .await + .map_err(|e| format!("{e}"))?; + } + cleared } Err(e) => { error!("could not get cleared versions: {e}"); - return None; + + if let Some(ref tx) = feedback { + tx.send(format!("failed to get cleared versions: {e}")) + .await + .map_err(|e| format!("{e}"))?; + } + + return Err("failed to cleared versions".into()); } } } Err(e) => { error!("could not get read connection: {e}"); - return None; + if let Some(ref tx) = feedback { + tx.send(format!("failed to get read connection: {e}")) + .await + .map_err(|e| format!("{e}"))?; + } + return Err("could not get read connection".into()); } }; @@ -274,23 +307,32 @@ pub async fn clear_overwritten_versions( .dedup() { // schedule for clearing in the background task - if let Err(e) = agent.tx_empty().try_send((actor_id, range.clone())) { + if let Err(e) = agent.tx_empty().send((actor_id, range.clone())).await { error!("could not schedule version to be cleared: {e}"); + if let Some(ref tx) = feedback { + tx.send(format!("failed to get queue compaction set: {e}")) + .await + .map_err(|e| format!("{e}"))?; + } } else { inserted += 1; } + + tokio::task::yield_now().await; + } + + if let Some(ref tx) = feedback { + tx.send(format!("Queued {inserted} empty versions to compact")) + .await + .map_err(|e| format!("{e}"))?; } } } if let Some(ref tx) = feedback { - if tx - .send(format!("Finshed compacting changes for {actor_id}")) + tx.send(format!("Finshed compacting changes for {actor_id}")) .await - .is_err() - { - error!("failed to send feedback payload to caller"); - }; + .map_err(|e| format!("{e}"))?; } tokio::time::sleep(Duration::from_secs(1)).await; @@ -302,7 +344,7 @@ pub async fn clear_overwritten_versions( start.elapsed() ); - Some(()) + Ok(()) } /// Load the existing known member state and addresses diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index c9601c0b..39ff59fe 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -624,7 +624,6 @@ enum Command { Tls(TlsCommand), /// Clear overwritten versions - #[command(subcommand)] CompactEmpties, }