Skip to content

Commit

Permalink
Compact empties command, but good this time (#179)
Browse files Browse the repository at this point in the history
* Fix bad subcommand attribute

* Return a lot more information from the feedback channel

* Remove empties channel from compaction loop

* Yield after inserting a set and use a deferred db transaction

* Does this exist

* Queue empties into a channel, but wait for capacity

* Make error output more useful

* Finish process with a log about how long it took to compact db
  • Loading branch information
spacekookie committed Mar 28, 2024
1 parent a5058c1 commit 238b650
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 25 deletions.
29 changes: 25 additions & 4 deletions crates/corro-admin/src/lib.rs
Expand Up @@ -205,15 +205,17 @@ async fn handle_conn(
info_log(&mut stream, "compacting empty versions...").await;

let (tx, mut rx) = mpsc::channel(4);
let (done_tx, done_rx) = oneshot::channel::<Option<()>>();
let (done_tx, done_rx) = oneshot::channel::<Option<String>>();

let started = std::time::Instant::now();

let bookie = bookie.clone();
let agent = agent.clone();
tokio::task::spawn(async move {
let pool = agent.pool();
match clear_overwritten_versions(&agent, &bookie, pool, Some(tx)).await {
Some(()) => done_tx.send(Some(())),
None => done_tx.send(None),
Ok(()) => done_tx.send(None),
Err(e) => done_tx.send(Some(e)),
}
});

Expand All @@ -224,7 +226,26 @@ async fn handle_conn(
// when this loop exists it means our writer has
// gone away/ the task completed
match done_rx.await {
Ok(Some(())) => send_success(&mut stream).await,
Ok(None) => {
let elapsed = started.elapsed().as_secs_f64();
info_log(
&mut stream,
format!(
"Finished compacting empty versions! Took {} seconds ({} minutes)",
elapsed,
elapsed / 60.0
),
)
.await;
send_success(&mut stream).await
}
Ok(Some(err)) => {
send_error(
&mut stream,
format!("An error occured while compacting empties: {err}"),
)
.await
}
_ => {
send_error(
&mut stream,
Expand Down
82 changes: 62 additions & 20 deletions crates/corro-agent/src/agent/util.rs
Expand Up @@ -142,7 +142,7 @@ pub async fn clear_overwritten_versions_loop(agent: Agent, bookie: Bookie, sleep

if clear_overwritten_versions(&agent, &bookie, pool, None)
.await
.is_none()
.is_err()
{
continue;
}
Expand All @@ -155,7 +155,7 @@ pub async fn clear_overwritten_versions(
bookie: &Bookie,
pool: &SplitPool,
feedback: Option<Sender<String>>,
) -> Option<()> {
) -> Result<(), String> {
let start = Instant::now();

let bookie_clone = {
Expand All @@ -177,10 +177,17 @@ pub async fn clear_overwritten_versions(
"Compacting changes for {} actors",
bookie_clone.len()
))
.await;
.await
.map_err(|e| format!("{e}"))?;
}

for (actor_id, booked) in bookie_clone {
if let Some(ref tx) = feedback {
tx.send(format!("Starting change compaction for {actor_id}"))
.await
.map_err(|e| format!("{e}"))?;
}

// pull the current db version -> version map at the present time
// these are only updated _after_ a transaction has been committed, via a write lock
// so it should be representative of the current state.
Expand All @@ -197,18 +204,25 @@ pub async fn clear_overwritten_versions(
Ok(booked) => booked.current_versions(),
Err(_) => {
info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now");
return None;

if let Some(ref tx) = feedback {
tx.send("timed out acquiring read lock on bookkeeping".into())
.await
.map_err(|e| format!("{e}"))?;
}

return Err("Timed out acquiring read lock on bookkeeping".into());
}
}
};

if versions.is_empty() {
return None;
}

if let Some(ref tx) = feedback {
tx.send(format!("Starting change compaction for {actor_id}"))
.await;
if let Some(ref tx) = feedback {
tx.send("No versions to compact".into())
.await
.map_err(|e| format!("{e}"))?;
}
continue;
}

// we're using a read connection here, starting a read-only transaction
Expand All @@ -229,17 +243,36 @@ pub async fn clear_overwritten_versions(
cleared.len(),
start.elapsed()
);

if let Some(ref tx) = feedback {
tx.send(format!("Aggregated {} DB versions to clear", cleared.len()))
.await
.map_err(|e| format!("{e}"))?;
}

cleared
}
Err(e) => {
error!("could not get cleared versions: {e}");
return None;

if let Some(ref tx) = feedback {
tx.send(format!("failed to get cleared versions: {e}"))
.await
.map_err(|e| format!("{e}"))?;
}

return Err("failed to cleared versions".into());
}
}
}
Err(e) => {
error!("could not get read connection: {e}");
return None;
if let Some(ref tx) = feedback {
tx.send(format!("failed to get read connection: {e}"))
.await
.map_err(|e| format!("{e}"))?;
}
return Err("could not get read connection".into());
}
};

Expand Down Expand Up @@ -274,23 +307,32 @@ pub async fn clear_overwritten_versions(
.dedup()
{
// schedule for clearing in the background task
if let Err(e) = agent.tx_empty().try_send((actor_id, range.clone())) {
if let Err(e) = agent.tx_empty().send((actor_id, range.clone())).await {
error!("could not schedule version to be cleared: {e}");
if let Some(ref tx) = feedback {
tx.send(format!("failed to get queue compaction set: {e}"))
.await
.map_err(|e| format!("{e}"))?;
}
} else {
inserted += 1;
}

tokio::task::yield_now().await;
}

if let Some(ref tx) = feedback {
tx.send(format!("Queued {inserted} empty versions to compact"))
.await
.map_err(|e| format!("{e}"))?;
}
}
}

if let Some(ref tx) = feedback {
if tx
.send(format!("Finshed compacting changes for {actor_id}"))
tx.send(format!("Finshed compacting changes for {actor_id}"))
.await
.is_err()
{
error!("failed to send feedback payload to caller");
};
.map_err(|e| format!("{e}"))?;
}

tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -302,7 +344,7 @@ pub async fn clear_overwritten_versions(
start.elapsed()
);

Some(())
Ok(())
}

/// Load the existing known member state and addresses
Expand Down
1 change: 0 additions & 1 deletion crates/corrosion/src/main.rs
Expand Up @@ -624,7 +624,6 @@ enum Command {
Tls(TlsCommand),

/// Clear overwritten versions
#[command(subcommand)]
CompactEmpties,
}

Expand Down

0 comments on commit 238b650

Please sign in to comment.