Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Troubleshooting tools and buffered changes fixes #164

Merged
merged 17 commits into from Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
162 changes: 34 additions & 128 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -48,7 +48,7 @@ quinn-proto = "0.10.5"
quinn-plaintext = { version = "0.2.0" }
quoted-string = "0.6.1"
rand = { version = "0.8.5", features = ["small_rng"] }
rangemap = { version = "1.4.0" }
rangemap = { version = "1.4.0", features = ["serde1"] }
rcgen = { version = "0.11.1", features = ["x509-parser"] }
rhai = { version = "1.15.1", features = ["sync"] }
rusqlite = { version = "0.30.0", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono"] }
Expand Down
47 changes: 45 additions & 2 deletions crates/corro-admin/src/lib.rs
Expand Up @@ -2,8 +2,9 @@ use std::{fmt::Display, time::Duration};

use camino::Utf8PathBuf;
use corro_types::{
actor::ClusterId,
agent::{Agent, Bookie, LockKind, LockMeta, LockState},
actor::{ActorId, ClusterId},
agent::{Agent, Bookie, KnownVersion, LockKind, LockMeta, LockState},
base::Version,
broadcast::{FocaCmd, FocaInput},
sqlite::SqlitePoolError,
sync::generate_sync,
Expand Down Expand Up @@ -89,6 +90,7 @@ pub enum Command {
Sync(SyncCommand),
Locks { top: usize },
Cluster(ClusterCommand),
Actor(ActorCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -102,6 +104,11 @@ pub enum ClusterCommand {
SetId(ClusterId),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ActorCommand {
Version { actor_id: ActorId, version: Version },
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LogLevel {
Expand Down Expand Up @@ -274,6 +281,42 @@ async fn handle_conn(
continue;
}

send_success(&mut stream).await;
}
Command::Actor(ActorCommand::Version { actor_id, version }) => {
let json = {
let bookie = bookie.read("admin actor version").await;
let booked = match bookie.get(&actor_id) {
Some(booked) => booked,
None => {
send_error(&mut stream, format!("unknown actor id: {actor_id}"))
.await;
continue;
}
};
let booked_read = booked.read("admin actor version booked").await;
match booked_read.get(&version) {
Some(known) => match known {
KnownVersion::Cleared => {
Ok(serde_json::Value::String("cleared".into()))
}
KnownVersion::Current(known) => serde_json::to_value(known)
.map(|v| serde_json::json!({"current": v})),
KnownVersion::Partial(known) => serde_json::to_value(known)
.map(|v| serde_json::json!({"partial": v})),
},
None => Ok(serde_json::Value::Null),
}
};

match json {
Ok(j) => _ = send(&mut stream, Response::Json(j)).await,
Err(e) => {
_ = send_error(&mut stream, e).await;
continue;
}
}

send_success(&mut stream).await;
}
},
Expand Down
81 changes: 55 additions & 26 deletions crates/corro-agent/src/agent/handlers.rs
Expand Up @@ -332,25 +332,25 @@ pub async fn handle_notifications(
/// multiple gigabytes and needs periodic truncation. We don't want
/// to schedule this task too often since it locks the whole DB.
// TODO: can we get around the lock somehow?
async fn db_cleanup(pool: &SplitPool) -> eyre::Result<()> {
fn db_cleanup(conn: &rusqlite::Connection) -> eyre::Result<()> {
debug!("handling db_cleanup (WAL truncation)");
let conn = pool.write_low().await?;
block_in_place(move || {
let start = Instant::now();

let busy: bool =
conn.query_row("PRAGMA wal_checkpoint(TRUNCATE);", [], |row| row.get(0))?;
if busy {
warn!("could not truncate sqlite WAL, database busy");
counter!("corro.db.wal.truncate.busy").increment(1);
} else {
debug!("successfully truncated sqlite WAL!");
histogram!("corro.db.wal.truncate.seconds").record(start.elapsed().as_secs_f64());
}
Ok::<_, eyre::Report>(())
})?;
debug!("done handling db_cleanup");
Ok(())
let start = Instant::now();

let orig: u64 = conn.pragma_query_value(None, "busy_timeout", |row| row.get(0))?;
conn.pragma_update(None, "busy_timeout", 60000)?;

let busy: bool = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE);", [], |row| row.get(0))?;
if busy {
warn!("could not truncate sqlite WAL, database busy");
counter!("corro.db.wal.truncate.busy").increment(1);
} else {
debug!("successfully truncated sqlite WAL!");
histogram!("corro.db.wal.truncate.seconds").record(start.elapsed().as_secs_f64());
}

_ = conn.pragma_update(None, "busy_timeout", orig);

Ok::<_, eyre::Report>(())
}

/// See `db_cleanup`
Expand All @@ -360,8 +360,15 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) {
loop {
db_cleanup_interval.tick().await;

if let Err(e) = db_cleanup(&pool).await {
error!("could not truncate db: {e}");
match pool.write_low().await {
Ok(conn) => {
if let Err(e) = block_in_place(|| db_cleanup(&conn)) {
error!("could not truncate db: {e}");
}
}
Err(e) => {
error!("could not acquire low priority conn to truncate wal: {e}")
}
}
}
});
Expand All @@ -379,12 +386,12 @@ pub async fn handle_changes(
mut rx_changes: CorroReceiver<(ChangeV1, ChangeSource)>,
mut tripwire: Tripwire,
) {
const MIN_CHANGES_CHUNK: usize = 2000;
const MAX_CHANGES_CHUNK: usize = 800;
let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new();
let mut buf = vec![];
let mut count = 0;

const MAX_CONCURRENT: usize = 3;
const MAX_CONCURRENT: usize = 5;
let mut join_set = JoinSet::new();

let mut max_wait = tokio::time::interval(Duration::from_millis(500));
Expand All @@ -396,15 +403,15 @@ pub async fn handle_changes(
// complicated loop to process changes efficiently w/ a max concurrency
// and a minimum chunk size for bigger and faster SQLite transactions
loop {
while count >= MIN_CHANGES_CHUNK && join_set.len() < MAX_CONCURRENT {
while count >= MAX_CHANGES_CHUNK && join_set.len() < MAX_CONCURRENT {
// we're already bigger than the minimum size of changes batch
// so we want to accumulate at least that much and process them
// concurrently bvased on MAX_CONCURRENCY
let mut tmp_count = 0;
while let Some((change, src, queued_at)) = queue.pop_front() {
tmp_count += change.len();
buf.push((change, src, queued_at));
if tmp_count >= MIN_CHANGES_CHUNK {
if tmp_count >= MAX_CHANGES_CHUNK {
break;
}
}
Expand Down Expand Up @@ -526,7 +533,7 @@ pub async fn handle_changes(
gauge!("corro.agent.changesets.in_queue").set(queue.len() as f64);
gauge!("corro.agent.changes.processing.jobs").set(join_set.len() as f64);

if count < MIN_CHANGES_CHUNK && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
if count < MAX_CHANGES_CHUNK && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
// we can process this right away
debug!(%count, "spawning processing multiple changes from max wait interval");
join_set.spawn(util::process_multiple_changes(
Expand Down Expand Up @@ -561,7 +568,7 @@ pub async fn handle_changes(
counter!("corro.agent.changes.recv").increment(changes_count as u64);
count += changes_count;
queue.push_back((change, src, Instant::now()));
if count >= MIN_CHANGES_CHUNK {
if count >= MAX_CHANGES_CHUNK {
// drain and process current changes!
if let Err(e) = util::process_multiple_changes(
agent.clone(),
Expand Down Expand Up @@ -673,3 +680,25 @@ pub async fn handle_sync(
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn ensure_truncate_works() -> eyre::Result<()> {
let tmpdir = tempfile::tempdir()?;

let conn = rusqlite::Connection::open(tmpdir.path().join("db.sqlite"))?;
let pragma_value = 12345u64;
conn.pragma_update(None, "busy_timeout", pragma_value)?;

db_cleanup(&conn)?;
assert_eq!(
conn.pragma_query_value(None, "busy_timeout", |row| row.get::<_, u64>(0))?,
pragma_value
);

Ok(())
}
}
8 changes: 4 additions & 4 deletions crates/corro-agent/src/agent/run_root.rs
Expand Up @@ -203,10 +203,10 @@ async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<Bookie> {
tripwire.clone(),
));

tokio::spawn(util::clear_overwritten_versions(
agent.clone(),
bookie.clone(),
));
// tokio::spawn(util::clear_overwritten_versions(
// agent.clone(),
// bookie.clone(),
// ));

Ok(bookie)
}
Expand Down
36 changes: 22 additions & 14 deletions crates/corro-agent/src/agent/util.rs
Expand Up @@ -572,10 +572,12 @@ pub async fn clear_buffered_meta_loop(

// TODO: delete buffered changes from deleted sequences only (maybe, it's kind of hard and may not be necessary)

// sub query required due to DELETE and LIMIT interaction
let seq_count = tx
.prepare_cached("DELETE FROM __corro_seq_bookkeeping WHERE (site_id, version, start_seq) IN (SELECT site_id, version, start_seq FROM __corro_seq_bookkeeping WHERE site_id = ? AND version >= ? AND version <= ? LIMIT ?)")?
.execute(params![actor_id, versions.start(), versions.end(), TO_CLEAR_COUNT])?;

// sub query required due to DELETE and LIMIT interaction
let buf_count = tx
.prepare_cached("DELETE FROM __corro_buffered_changes WHERE (site_id, db_version, version, seq) IN (SELECT site_id, db_version, version, seq FROM __corro_buffered_changes WHERE site_id = ? AND version >= ? AND version <= ? LIMIT ?)")?
.execute(params![actor_id, versions.start(), versions.end(), TO_CLEAR_COUNT])?;
Expand Down Expand Up @@ -608,6 +610,8 @@ pub async fn clear_buffered_meta_loop(
}
}

const MAX_EMPTIES_BATCH_SIZE: u64 = 40;

/// Clear empty versions from the database in chunks to avoid locking
/// the database for too long.
///
Expand All @@ -625,12 +629,17 @@ pub async fn write_empties_loop(
let next_empties_check = tokio::time::sleep(CHECK_EMPTIES_TO_INSERT_AFTER);
tokio::pin!(next_empties_check);

let mut count = 0;

loop {
tokio::select! {
maybe_empty = rx_empty.recv() => match maybe_empty {
Some((actor_id, versions)) => {
empties.entry(actor_id).or_default().insert(versions);
continue;
count += 1;
if count < MAX_EMPTIES_BATCH_SIZE {
continue;
}
},
None => {
debug!("empties queue is done");
Expand All @@ -653,6 +662,8 @@ pub async fn write_empties_loop(
process_completed_empties(agent.clone(), empties_to_process)
.inspect_err(|e| error!("could not process empties: {e}")),
);

count = 0;
}
info!("Draining empty versions to process...");
// drain empties channel
Expand Down Expand Up @@ -714,10 +725,12 @@ pub async fn process_completed_empties(
}
}

debug!(
"upserted {inserted} empty version ranges in {:?}",
start.elapsed()
);
let elapsed = start.elapsed();

debug!("upserted {inserted} empty version ranges in {elapsed:?}");

counter!("corro.agent.empties.committed").increment(inserted as u64);
histogram!("corro.agent.empties.commit.second").record(elapsed);

Ok(())
}
Expand Down Expand Up @@ -1312,22 +1325,20 @@ pub fn process_incomplete_version(
"
DELETE FROM __corro_seq_bookkeeping
WHERE site_id = :actor_id AND version = :version AND
(
-- start_seq is between start and end of range AND no end_seq
( start_seq BETWEEN :start AND :end AND end_seq IS NULL ) OR

(
-- start_seq and end_seq are within the range
( start_seq >= :start AND end_seq <= :end ) OR

-- range being inserted is partially contained within another
( start_seq <= :end AND end_seq >= :end ) OR

-- start_seq = end + 1 (to collapse ranges)
( start_seq = :end + 1 AND end_seq IS NOT NULL ) OR
( start_seq = :end + 1) OR

-- end_seq = start - 1 (to collapse ranges)
( end_seq = :start - 1 )
)
RETURNING start_seq, end_seq
",
)?
.query_map(
Expand All @@ -1337,10 +1348,7 @@ pub fn process_incomplete_version(
":start": seqs.start(),
":end": seqs.end(),
],
|row| {
let start = row.get(0)?;
Ok(start..=row.get::<_, Option<CrsqlSeq>>(1)?.unwrap_or(start))
},
|row| Ok(row.get(0)?..=row.get(1)?),
)
.and_then(|rows| rows.collect::<rusqlite::Result<Vec<_>>>())?;

Expand Down
4 changes: 2 additions & 2 deletions crates/corro-agent/src/broadcast/mod.rs
Expand Up @@ -523,13 +523,13 @@ pub fn runtime_loop(
let (member_count, max_transmissions) = {
let config = config.read();
let members = agent.members().read();
let count = members.states.len();
let ring0_count = members.ring0(agent.cluster_id()).count();
let max_transmissions = config.max_transmissions.get();
(
std::cmp::max(
config.num_indirect_probes.get(),
(cluster_size.load(Ordering::Acquire) as usize - ring0_count)
/ (max_transmissions as usize * 10),
(count - ring0_count) / (max_transmissions as usize * 10),
),
max_transmissions,
)
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-types/src/agent.rs
Expand Up @@ -961,7 +961,7 @@ impl Drop for LockTracker {
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct CurrentVersion {
// cr-sqlite db version
pub db_version: CrsqlDbVersion,
Expand All @@ -971,7 +971,7 @@ pub struct CurrentVersion {
pub ts: Timestamp,
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct PartialVersion {
// range of sequences recorded
pub seqs: RangeInclusiveSet<CrsqlSeq>,
Expand Down