Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible race in PG endpoint's commit handling #176

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
219 changes: 45 additions & 174 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@ use compact_str::ToCompactString;
use corro_types::{
agent::{Agent, ChangeError, CurrentVersion, KnownDbVersion},
api::{
row_to_change, ColumnName, ExecResponse, ExecResult, QueryEvent, Statement,
TableStatRequest, TableStatResponse,
ColumnName, ExecResponse, ExecResult, QueryEvent, Statement, TableStatRequest,
TableStatResponse,
},
base::{CrsqlDbVersion, CrsqlSeq},
broadcast::{ChangeV1, Changeset, Timestamp},
change::{ChunkedChanges, SqliteValue, MAX_CHANGES_BYTE_SIZE},
change::{insert_local_changes, InsertChangesInfo, SqliteValue},
schema::{apply_schema, parse_sql},
sqlite::SqlitePoolError,
};
use hyper::StatusCode;
use itertools::Itertools;
use metrics::counter;
use rusqlite::{named_params, params_from_iter, ToSql, Transaction};
use rusqlite::{params_from_iter, ToSql, Transaction};
use spawn::spawn_counted;
use tokio::{
sync::{
Expand All @@ -32,7 +28,7 @@ use tokio::{
};
use tracing::{debug, error, info, trace};

use corro_types::broadcast::{BroadcastInput, BroadcastV1};
use corro_types::broadcast::broadcast_changes;

pub mod pubsub;

Expand Down Expand Up @@ -68,176 +64,45 @@ where
// Execute whatever might mutate state data
let ret = f(&tx)?;

let ts = Timestamp::from(agent.clock().new_timestamp());
let insert_info = insert_local_changes(agent, &tx, &mut book_writer)?;

let db_version: CrsqlDbVersion = tx
.prepare_cached("SELECT crsql_next_db_version()")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?
.query_row((), |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?;

let has_changes: bool = tx
.prepare_cached("SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE db_version = ?);")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?
.query_row([db_version], |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?;
tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: insert_info.as_ref().map(|info| info.version),
})?;

if !has_changes {
tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?;
return Ok((ret, start.elapsed()));
}
let elapsed = start.elapsed();

let last_version = book_writer.last().unwrap_or_default();
trace!("last_version: {last_version}");
let version = last_version + 1;
trace!("version: {version}");

let last_seq: CrsqlSeq = tx
.prepare_cached("SELECT MAX(seq) FROM crsql_changes WHERE db_version = ?")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?
.query_row([db_version], |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

let elapsed = {
tx.prepare_cached(
r#"
INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts)
VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);
"#,
)
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?
.execute(named_params! {
":actor_id": actor_id,
":start_version": version,
":db_version": db_version,
":last_seq": last_seq,
":ts": ts
})
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

debug!(%actor_id, %version, %db_version, "inserted local bookkeeping row!");

tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
start.elapsed()
};

trace!("committed tx, db_version: {db_version}, last_seq: {last_seq:?}");

book_writer.insert(
version,
KnownDbVersion::Current(CurrentVersion {
match insert_info {
None => Ok((ret, elapsed)),
Some(InsertChangesInfo {
version,
db_version,
last_seq,
ts,
}),
);
drop(book_writer);

let agent = agent.clone();

spawn_counted(async move {
let conn = agent.pool().read().await?;

block_in_place(|| {
// TODO: make this more generic so both sync and local changes can use it.
let mut prepped = conn.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE db_version = ?
ORDER BY seq ASC
"#,
)?;
let rows = prepped.query_map([db_version], row_to_change)?;
let chunked =
ChunkedChanges::new(rows, CrsqlSeq(0), last_seq, MAX_CHANGES_BYTE_SIZE);
for changes_seqs in chunked {
match changes_seqs {
Ok((changes, seqs)) => {
for (table_name, count) in
changes.iter().counts_by(|change| &change.table)
{
counter!("corro.changes.committed", "table" => table_name.to_string(), "source" => "local").increment(count as u64);
}

trace!("broadcasting changes: {changes:?} for seq: {seqs:?}");

agent.subs_manager().match_changes(&changes, db_version);

let tx_bcast = agent.tx_bcast().clone();
tokio::spawn(async move {
if let Err(e) = tx_bcast
.send(BroadcastInput::AddBroadcast(BroadcastV1::Change(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version,
changes,
seqs,
last_seq,
ts,
},
},
)))
.await
{
error!("could not send change message for broadcast: {e}");
}
});
}
Err(e) => {
error!("could not process crsql change (db_version: {db_version}) for broadcast: {e}");
break;
}
}
}
Ok::<_, rusqlite::Error>(())
})?;

Ok::<_, eyre::Report>(())
});

Ok::<_, ChangeError>((ret, elapsed))
}) => {
trace!("committed tx, db_version: {db_version}, last_seq: {last_seq:?}");

book_writer.insert(
version,
KnownDbVersion::Current(CurrentVersion {
db_version,
last_seq,
ts,
}),
);
drop(book_writer);

let agent = agent.clone();

spawn_counted(async move {
broadcast_changes(agent, db_version, last_seq, version, ts).await
});

Ok::<_, ChangeError>((ret, elapsed))
}
}
})
}

Expand Down Expand Up @@ -727,7 +592,13 @@ pub async fn api_v1_table_stats(
#[cfg(test)]
mod tests {
use bytes::Bytes;
use corro_types::{api::RowId, base::Version, config::Config, schema::SqliteType};
use corro_types::{
api::RowId,
base::Version,
broadcast::{BroadcastInput, BroadcastV1, ChangeV1, Changeset},
config::Config,
schema::SqliteType,
};
use futures::Stream;
use http_body::{combinators::UnsyncBoxBody, Body};
use tokio::sync::mpsc::error::TryRecvError;
Expand Down