Skip to content

Commit

Permalink
merge meta and query dbs
Browse files Browse the repository at this point in the history
Users that were using both are automatically migrated. And in the
process, we move the new db to a dot-file, to keep users' workspaces
clean.

We migrate the files from the locations the users specified. So users
who specified a non-default location will get them migrated - but new
files will not be created.

We'll follow a deprecation schedule and eventually remove these options.
  • Loading branch information
Glauber Costa authored and mergify[bot] committed May 18, 2022
1 parent 5182d0c commit f9bf50b
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 18 deletions.
7 changes: 3 additions & 4 deletions cli/tests/test-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ if [ "x$TEST_DATABASE" == "xpostgres" ]; then

psql "$DATABASE_URL_PREFIX" -c "CREATE DATABASE $DATADB"

DATADB_URL="$DATABASE_URL_PREFIX/$DATADB"
DB_URL="$DATABASE_URL_PREFIX/$DATADB"
else
METADB_URL="sqlite://$TEMPDIR/chiseld.db?mode=rwc"
DATADB_URL="sqlite://$TEMPDIR/chiseld-data.db?mode=rwc"
DB_URL="sqlite://$TEMPDIR/chiseld.db?mode=rwc"
fi

$CHISELD --webui -m "$DATADB_URL" -d "$DATADB_URL" --api-listen-addr "$CHISELD_HOST" --internal-routes-listen-addr "$CHISELD_INTERNAL" --rpc-listen-addr $CHISELD_RPC_HOST &
$CHISELD --webui --db-uri "$DB_URL" --api-listen-addr "$CHISELD_HOST" --internal-routes-listen-addr "$CHISELD_INTERNAL" --rpc-listen-addr $CHISELD_RPC_HOST &
PID=$!

function cleanup() {
Expand Down
241 changes: 241 additions & 0 deletions server/src/datastore/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use crate::types::{
use anyhow::Context;
use sqlx::any::{Any, AnyPool};
use sqlx::{Execute, Executor, Row, Transaction};
use std::path::Path;
use std::sync::Arc;
use tokio::fs;

/// Meta service.
///
Expand Down Expand Up @@ -61,6 +63,18 @@ where
.with_context(|| format!("Executing query {}", qstr))
}

async fn file_exists(file: &Path) -> anyhow::Result<bool> {
match fs::metadata(file).await {
Ok(_) => Ok(true),
Err(x) => match x.kind() {
std::io::ErrorKind::NotFound => Ok(false),
_ => {
anyhow::bail!("Can't read {}", file.display());
}
},
}
}

async fn update_field_query(
transaction: &mut Transaction<'_, Any>,
delta: &FieldDelta,
Expand Down Expand Up @@ -232,6 +246,113 @@ impl MetaService {
}
}

/// Try to migrate an old-style (split meta + data) sqlite layout to
/// a single file.
///
/// For Postgres this is a lot more complex because it is not possible to
/// do cross-database transactions. But this handles the local dev case,
/// which is what is most relevant at the moment.
pub(crate) async fn maybe_migrate_sqlite_database<P: AsRef<Path>, T: AsRef<Path>>(
&self,
sources: &[P],
to: T,
) -> anyhow::Result<()> {
let to = to.as_ref();
match self.kind {
Kind::Sqlite => {}
_ => anyhow::bail!("Can't migrate postgres tables yet"),
}

// For the old files, either none of them exists (in which case
// we're ok), or all of them exists. The case in which some of them
// exists we're better off not touching, and erroring out.
let mut not_found = vec![];
for src in sources {
let src = src.as_ref();
if !file_exists(src).await? {
not_found.push(src);
}
}

if not_found.len() == sources.len() {
return Ok(());
}

if !not_found.is_empty() {
anyhow::bail!(
"Some of the old sqlite files were not found: {:?}",
not_found
);
}

let mut transaction = self.start_transaction().await?;
match self
.maybe_migrate_sqlite_database_inner(&mut transaction, sources)
.await
{
Err(x) => Err(x),
Ok(true) => {
let mut full_str = format!(
"Migrated your data to {}. You can now delete the following files:",
to.display()
);
for src in sources {
let s = src.as_ref().display();
full_str += &format!("\n\t{}\n\t{}-wal\n\t{}-shm", s, s, s);
}

info!("{}", &full_str);
Ok(())
}
Ok(false) => Ok(()),
}?;
Self::commit_transaction(transaction).await?;
Ok(())
}

async fn maybe_migrate_sqlite_database_inner<P: AsRef<Path>>(
&self,
transaction: &mut Transaction<'_, Any>,
sources: &[P],
) -> anyhow::Result<bool> {
// this sqlite instance already has data, nothing to migrate.
if self.count_tables(transaction).await? != 0 {
return Ok(false);
}

for (idx, src) in sources.iter().enumerate() {
let src = src.as_ref().to_str().unwrap();
let db = format!("db{}", idx);

let attach = format!("attach database '{}' as '{}'", src, db);
execute(transaction, sqlx::query::<sqlx::Any>(&attach)).await?;

let query = format!(
r#"
SELECT sql,name
FROM '{}'.sqlite_schema
WHERE type ='table' AND name NOT LIKE 'sqlite_%'"#,
db
);
// function takes an Executor, which is causing the compiler to move this.
// so &mut * it
let rows = fetch_all(&mut *transaction, sqlx::query::<sqlx::Any>(&query)).await?;
for row in rows {
let sql: &str = row.get("sql");
let sql = sql.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS");
execute(transaction, sqlx::query::<sqlx::Any>(&sql)).await?;
let name: &str = row.get("name");
let copy = format!(
r#"
INSERT INTO '{}' SELECT * from '{}'.{}"#,
name, db, name
);
execute(transaction, sqlx::query::<sqlx::Any>(&copy)).await?;
}
}
Ok(true)
}

async fn count_tables(&self, transaction: &mut Transaction<'_, Any>) -> anyhow::Result<usize> {
let query = match self.kind {
Kind::Sqlite => sqlx::query(
Expand Down Expand Up @@ -586,6 +707,7 @@ impl MetaService {
#[cfg(test)]
mod tests {
use super::*;
use crate::datastore::{query::tests::*, QueryEngine};
use anyhow::Result;
use tempdir::TempDir;

Expand Down Expand Up @@ -623,4 +745,123 @@ mod tests {

Ok(())
}

#[tokio::test]
// test that we can join a split meta and data db into one
async fn migrate_split_db() -> Result<()> {
let tmp_dir = TempDir::new("migrate_split_db")?;

let meta_path = tmp_dir.path().join("chisel-meta.db");
fs::copy("./test_files/split_db/chiseld-old-meta.db", &meta_path)
.await
.unwrap();

let data_path = tmp_dir.path().join("chisel-data.db");
fs::copy("./test_files/split_db/chiseld-old-data.db", &data_path)
.await
.unwrap();

let new_path = tmp_dir.path().join("chisel.db");
let conn_str = format!("sqlite://{}?mode=rwc", new_path.display());

let conn = DbConnection::connect(&conn_str, 1).await?;
let meta = MetaService::local_connection(&conn, 1).await.unwrap();
meta.maybe_migrate_sqlite_database(&[&meta_path, &data_path], &new_path)
.await
.unwrap();

let query = QueryEngine::local_connection(&conn, 1).await.unwrap();

let ts = meta.load_type_system().await.unwrap();
let ty = ts.lookup_custom_type("BlogComment", "dev").unwrap();
let rows = fetch_rows(&query, &ty).await;
assert_eq!(rows.len(), 10);
Ok(())
}

#[tokio::test]
async fn migrate_split_db_missing_file() -> Result<()> {
let tmp_dir = TempDir::new("migrate_split_db_missing_files")?;

let data_path = tmp_dir.path().join("chisel-data.db");
let meta_path = tmp_dir.path().join("chisel-meta.db");
fs::copy("./test_files/split_db/chiseld-old-meta.db", &meta_path)
.await
.unwrap();

let new_path = tmp_dir.path().join("chisel.db");
let conn_str = format!("sqlite://{}?mode=rwc", new_path.display());

let conn = DbConnection::connect(&conn_str, 1).await?;
let meta = MetaService::local_connection(&conn, 1).await.unwrap();
meta.maybe_migrate_sqlite_database(&[&meta_path, &data_path], &new_path)
.await
.unwrap_err();

// still exists, wasn't deleted
fs::metadata(meta_path).await.unwrap();
Ok(())
}

#[tokio::test]
async fn migrate_split_db_bad_file() -> Result<()> {
let tmp_dir = TempDir::new("migrate_split_db_bad_file")?;

// duplicated entries should cause the migration to fail (because we're forcing those files
// to be the same)
let data_path = tmp_dir.path().join("chisel-meta.db");
let meta_path = tmp_dir.path().join("chisel-meta.db");
fs::copy("./test_files/split_db/chiseld-old-meta.db", &meta_path)
.await
.unwrap();

let new_path = tmp_dir.path().join("chisel.db");
let conn_str = format!("sqlite://{}?mode=rwc", new_path.display());

let conn = DbConnection::connect(&conn_str, 1).await?;
let meta = MetaService::local_connection(&conn, 1).await.unwrap();
meta.maybe_migrate_sqlite_database(&[&meta_path, &data_path], &new_path)
.await
.unwrap_err();

// original still exists, werent't deleted
fs::metadata(data_path).await.unwrap();
fs::metadata(meta_path).await.unwrap();
Ok(())
}

#[tokio::test]
async fn migrate_split_db_untouched() -> Result<()> {
let tmp_dir = TempDir::new("migrate_split_db_untouched")?;

let data_path = tmp_dir.path().join("chisel-data.db");
fs::copy("./test_files/split_db/chiseld-old-data.db", &data_path)
.await
.unwrap();

let meta_path = tmp_dir.path().join("chisel-meta.db");
fs::copy("./test_files/split_db/chiseld-old-meta.db", &meta_path)
.await
.unwrap();

let new_path = tmp_dir.path().join("chisel-new.db");
fs::copy("./test_files/split_db/chiseld-old-meta.db", &new_path)
.await
.unwrap();

// meta db has data, won't migrate. This shouldn't trigger an error because this is
// the path we take on most boots after migration
let conn_str = format!("sqlite://{}?mode=rwc", meta_path.display());

let conn = DbConnection::connect(&conn_str, 1).await?;
let meta = MetaService::local_connection(&conn, 1).await.unwrap();
meta.maybe_migrate_sqlite_database(&[&meta_path, &data_path], &new_path)
.await
.unwrap();

// original still exists, werent't deleted
fs::metadata(data_path).await.unwrap();
fs::metadata(meta_path).await.unwrap();
Ok(())
}
}
53 changes: 39 additions & 14 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::FutureExt;
use futures::StreamExt;
use std::net::SocketAddr;
use std::panic;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -41,12 +42,15 @@ pub struct Opt {
/// Internal routes (for k8s) listen address
#[structopt(short, long, default_value = "127.0.0.1:9090")]
internal_routes_listen_addr: SocketAddr,
/// Metadata database URI.
/// Metadata database URI. [deprecated: use --db-uri instead]
#[structopt(short, long, default_value = "sqlite://chiseld.db?mode=rwc")]
metadata_db_uri: String,
/// Data database URI.
_metadata_db_uri: String,
/// Data database URI. [deprecated: use --db-uri instead]
#[structopt(short, long, default_value = "sqlite://chiseld-data.db?mode=rwc")]
data_db_uri: String,
_data_db_uri: String,
/// Database URI.
#[structopt(long, default_value = "sqlite://.chiseld.db?mode=rwc")]
db_uri: String,
/// Should we wait for a debugger before executing any JS?
#[structopt(long)]
inspect_brk: bool,
Expand Down Expand Up @@ -85,8 +89,7 @@ pub struct SharedState {
api_listen_addr: String,
inspect_brk: bool,
executor_threads: usize,
data_db: DbConnection,
metadata_db: DbConnection,
db: DbConnection,
nr_connections: usize,
}

Expand Down Expand Up @@ -137,7 +140,7 @@ async fn run(state: SharedState, mut cmd: ExecutorChannel) -> Result<()> {
}
}

let meta = MetaService::local_connection(&state.metadata_db, state.nr_connections).await?;
let meta = MetaService::local_connection(&state.db, state.nr_connections).await?;
let ts = meta.load_type_system().await?;

let routes = meta.load_endpoints().await?;
Expand All @@ -149,7 +152,7 @@ async fn run(state: SharedState, mut cmd: ExecutorChannel) -> Result<()> {
crate::introspect::init(&api_service);

let query_engine =
Arc::new(QueryEngine::local_connection(&state.data_db, state.nr_connections).await?);
Arc::new(QueryEngine::local_connection(&state.db, state.nr_connections).await?);
ts.create_builtin_backing_tables(query_engine.as_ref())
.await?;
let api_service = Rc::new(api_service);
Expand Down Expand Up @@ -218,14 +221,37 @@ impl CoordinatorChannel {
}
}

fn extract(s: &str) -> Option<String> {
let sqlite = regex::Regex::new("sqlite://(?P<fname>[^?]+)").unwrap();
sqlite
.captures(s)
.map(|caps| caps.name("fname").unwrap().as_str().to_string())
}

fn find_legacy_sqlite_dbs(opt: &Opt) -> Vec<PathBuf> {
let mut sources = vec![];
if let Some(x) = extract(&opt._metadata_db_uri) {
sources.push(PathBuf::from(x));
}
if let Some(x) = extract(&opt._data_db_uri) {
sources.push(PathBuf::from(x));
}
sources
}

pub async fn run_shared_state(
opt: Opt,
) -> Result<(SharedTasks, SharedState, Vec<ExecutorChannel>)> {
let meta_conn = DbConnection::connect(&opt.metadata_db_uri, opt.nr_connections).await?;
let data_db = DbConnection::connect(&opt.data_db_uri, opt.nr_connections).await?;
let db_conn = DbConnection::connect(&opt.db_uri, opt.nr_connections).await?;
let meta = MetaService::local_connection(&db_conn, opt.nr_connections).await?;

let legacy_dbs = find_legacy_sqlite_dbs(&opt);
if extract(&opt.db_uri).is_some() && legacy_dbs.len() == 2 {
meta.maybe_migrate_sqlite_database(&legacy_dbs, &opt.db_uri)
.await?;
}

let meta = MetaService::local_connection(&meta_conn, opt.nr_connections).await?;
let query_engine = QueryEngine::local_connection(&data_db, opt.nr_connections).await?;
let query_engine = QueryEngine::local_connection(&db_conn, opt.nr_connections).await?;

meta.create_schema().await?;

Expand Down Expand Up @@ -359,8 +385,7 @@ pub async fn run_shared_state(
api_listen_addr: opt.api_listen_addr,
inspect_brk: opt.inspect_brk,
executor_threads: opt.executor_threads,
data_db,
metadata_db: meta_conn,
db: db_conn,
nr_connections: opt.nr_connections,
};

Expand Down
Binary file added server/test_files/split_db/chiseld-old-data.db
Binary file not shown.
Binary file added server/test_files/split_db/chiseld-old-meta.db
Binary file not shown.

0 comments on commit f9bf50b

Please sign in to comment.