From 5deb9eca671659d0d11bae94768ddd681989f891 Mon Sep 17 00:00:00 2001 From: Jan Plhak Date: Fri, 21 Jan 2022 12:35:24 +0100 Subject: [PATCH] Enable transactions for whole endpoint execution. --- cli/tests/lit/transaction.lit | 81 ++++++++++++++++++ docusaurus/docs/data-access.md | 14 +++- docusaurus/docs/known_issues.md | 5 +- server/src/auth.rs | 2 +- server/src/db.rs | 38 ++++----- server/src/deno.rs | 34 +++++--- server/src/query/engine.rs | 141 +++++++++++++++++++++----------- server/src/rpc.rs | 6 +- server/src/runtime.rs | 3 +- server/src/server.rs | 2 +- server/src/types.rs | 7 +- 11 files changed, 245 insertions(+), 88 deletions(-) create mode 100644 cli/tests/lit/transaction.lit diff --git a/cli/tests/lit/transaction.lit b/cli/tests/lit/transaction.lit new file mode 100644 index 000000000..0248d104d --- /dev/null +++ b/cli/tests/lit/transaction.lit @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: © 2021 ChiselStrike + +# RUN: sh -e @file + +cat << EOF > "$TEMPDIR/models/types.ts" +export class Person extends Chisel.ChiselEntity { + name: string = ""; +} +EOF + +cat << EOF > "$TEMPDIR/endpoints/store_person.js" +import { Person } from "../types.ts"; + +export default async function chisel(req: Request) { + const req_json = await req.json(); + + let p = new Person(); + p.name = req_json.name; + await p.save(); + + if (req_json.command == "die with honor") { + throw new InternalError("Let's see if transaction gets cancelled"); + } + + return new Response('Mission acomplished'); +} +EOF + +cat << EOF > "$TEMPDIR/endpoints/retrieve_all.ts" +import { Person } from "../models/types.ts"; + +export default async function chisel(req: Request) { + let resp = "["; + for await (let p of Person.cursor()) { + resp += p.name + " "; + } + resp += "]"; + return new Response(resp); +} +EOF + +cat << EOF > "$TEMPDIR/endpoints/write_and_read.ts" +import { Person } from "../models/types.ts"; + +export default async function chisel(req: Request) { + let p = new Person(); + p.name = "ThisIsTheBestName"; + await p.save(); + + let resp = "["; + for await (let p of Person.cursor()) { + resp += p.name + " "; + } + resp += "]"; + + return new Response(resp); +} +EOF + +cd "$TEMPDIR" +$CHISEL apply + +$CURL -X POST --data '{ "name": "Adalbrecht" }' $CHISELD_HOST/dev/store_person +# CHECK: Mission acomplished + +$CURL $CHISELD_HOST/dev/retrieve_all +# CHECK: HTTP/1.1 200 OK +# CHECK: [Adalbrecht ] + +$CURL -X POST --data '{ + "name": "Ruprt", + "command": "die with honor" +}' $CHISELD_HOST/dev/store_person + +$CURL $CHISELD_HOST/dev/retrieve_all +# CHECK: HTTP/1.1 200 OK +# CHECK: [Adalbrecht ] + +$CURL -X POST $CHISELD_HOST/dev/write_and_read +# CHECK: HTTP/1.1 200 OK +# CHECK: [Adalbrecht, ThisIsTheBestName ] diff --git a/docusaurus/docs/data-access.md b/docusaurus/docs/data-access.md index d55223be5..77ef3c4f6 100644 --- a/docusaurus/docs/data-access.md +++ b/docusaurus/docs/data-access.md @@ -212,7 +212,7 @@ The `findMany()` method is convenient, but also problematic if you have a lot of entities stored because loading them can take a lot of time and memory. In future releases of ChiselStrike, the runtime will enforce a maximum number of entities `findMany()` can return and also enforce timeouts at the data store level. The -runtime will also provide optional pagination for the `findMany()` method. +runtime will also provide optional pagination for the `findMany()` method. ::: ## Cursors @@ -270,3 +270,15 @@ The methods provided by `ChiselCursor` are outlined in the following table. The `ChiselCursor` interface is still work-in-progress. For example, methods such as `skip()`, `map()`, and `reduce()` are planned for future releases. Also, the current implementation of `filter()` takes a _restriction object_, but future ChiselStrike runtimes will allow you to write filter functions using TypeScript, which are automatically converted to efficient database queries in many cases. ::: + +## Transacitons + +We currently support implicit transactional evaluation. The transaction is created before ChiselStrike +starts evaluating your endpoint and is automatically committed after your endpoint ends and we generate +the HTTP response. In case your endpoint returns a stream, any database-related operation done within +stream generation code will happen outside of the transaction and can result in a crash. + +If your code crashes or explicitly throws exception that is not caught, ChiselStrike rollbacks the +transaction automatically. + +Explicit user-controlled transactions are coming soon. diff --git a/docusaurus/docs/known_issues.md b/docusaurus/docs/known_issues.md index 5eaddfc6d..048b3bc0e 100644 --- a/docusaurus/docs/known_issues.md +++ b/docusaurus/docs/known_issues.md @@ -20,9 +20,8 @@ While we do plan to provide you with a better experience in the future, for now want to use external modules, browser-style should work. * **Transactions:** ChiselStrike aims to fully support transactions at the endpoint boundary and, -at a later date, with user-defined granularity (for advanced users). There is a known bug with -that, though: endpoints should be serialized so RMW is write, but transactions won't be rolled -back if there is an exception in the endpoint and it doesn't complete. +at a later date, with user-defined granularity (for advanced users). We currently wrap the +whole endpoint in one big transaction. * **Nested models:** (Also known as relationships.) With the exception of the special `OAuthUser` model, it is not possible to embed a model inside another yet. The code to persist nested models diff --git a/server/src/auth.rs b/server/src/auth.rs index fc28a5792..378cf17e8 100644 --- a/server/src/auth.rs +++ b/server/src/auth.rs @@ -64,7 +64,7 @@ async fn insert_user_into_db(username: &str) -> anyhow::Result { } user.insert("username".into(), json!(username)); query_engine - .add_row(&oauth_user_type, &user) + .add_row(&oauth_user_type, &user, None) .await? .get("id") .ok_or_else(|| anyhow!("Didn't get user ID from storing a user."))? diff --git a/server/src/db.rs b/server/src/db.rs index db777eba8..c9265dba0 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -8,6 +8,7 @@ use crate::query::engine::new_query_results; use crate::query::engine::JsonObject; use crate::query::engine::RawSqlStream; use crate::query::engine::SqlStream; +use crate::query::engine::TransactionStatic; use crate::runtime; use crate::types::{Field, ObjectType, Type, TypeSystem, TypeSystemError, OAUTHUSER_TYPE_NAME}; use anyhow::{anyhow, Result}; @@ -20,7 +21,6 @@ use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; use serde_json::value::Value; -use sqlx::AnyPool; use std::collections::HashMap; use std::collections::HashSet; use std::pin::Pin; @@ -276,7 +276,7 @@ fn sql_where_for_match_login( } fn sql_backing_store( - pool: &AnyPool, + tr: TransactionStatic, columns: Vec<(String, Type)>, limit_str: &str, ty: Arc, @@ -303,7 +303,7 @@ fn sql_backing_store( if policies.transforms.is_empty() { return Query::Sql(query); } - let stream = new_query_results(query, pool); + let stream = new_query_results(query, tr); let pstream = Box::pin(PolicyApplyingStream { inner: stream, policies, @@ -401,14 +401,14 @@ fn join_stream(columns: &[(String, Type)], left: SqlStream, right: SqlStream) -> } fn sql_filter( - pool: &AnyPool, + tr: TransactionStatic, columns: Vec<(String, Type)>, limit_str: &str, alias_count: &mut u32, inner: Relation, restrictions: Vec, ) -> Query { - let inner_sql = sql_impl(pool, inner, alias_count); + let inner_sql = sql_impl(tr, inner, alias_count); let inner_sql = match inner_sql { Query::Sql(s) => s, Query::Stream(s) => return filter_stream(s, columns, restrictions), @@ -441,8 +441,8 @@ fn sql_filter( )) } -fn to_stream(pool: &AnyPool, s: String, columns: Vec<(String, Type)>) -> SqlStream { - let inner = new_query_results(s, pool); +fn to_stream(tr: TransactionStatic, s: String, columns: Vec<(String, Type)>) -> SqlStream { + let inner = new_query_results(s, tr); Box::pin(PolicyApplyingStream { inner, policies: FieldPolicies::default(), @@ -451,7 +451,7 @@ fn to_stream(pool: &AnyPool, s: String, columns: Vec<(String, Type)>) -> SqlStre } fn sql_join( - pool: &AnyPool, + tr: TransactionStatic, columns: &[(String, Type)], limit_str: &str, alias_count: &mut u32, @@ -478,15 +478,15 @@ fn sql_join( // FIXME: Optimize the case of table.left or table.right being just // a BackingStore with all fields. The database probably doesn't // care, but will make the logs cleaner. - let lsql = sql_impl(pool, left, alias_count); - let rsql = sql_impl(pool, right, alias_count); + let lsql = sql_impl(tr.clone(), left, alias_count); + let rsql = sql_impl(tr.clone(), right, alias_count); let (lsql, rsql) = match (lsql, rsql) { (Query::Sql(l), Query::Sql(r)) => (l, r), (Query::Stream(l), Query::Sql(r)) => { - return join_stream(columns, l, to_stream(pool, r, right_columns)) + return join_stream(columns, l, to_stream(tr, r, right_columns)) } (Query::Sql(l), Query::Stream(r)) => { - return join_stream(columns, to_stream(pool, l, left_columns), r) + return join_stream(columns, to_stream(tr, l, left_columns), r) } (Query::Stream(l), Query::Stream(r)) => return join_stream(columns, l, r), }; @@ -508,17 +508,17 @@ fn sql_join( )) } -fn sql_impl(pool: &AnyPool, rel: Relation, alias_count: &mut u32) -> Query { +fn sql_impl(tr: TransactionStatic, rel: Relation, alias_count: &mut u32) -> Query { let limit_str = rel .limit .map(|x| format!("LIMIT {}", x)) .unwrap_or_else(String::new); match rel.inner { Inner::BackingStore(ty, policies) => { - sql_backing_store(pool, rel.columns, &limit_str, ty, policies) + sql_backing_store(tr, rel.columns, &limit_str, ty, policies) } Inner::Filter(inner, restrictions) => sql_filter( - pool, + tr, rel.columns, &limit_str, alias_count, @@ -526,16 +526,16 @@ fn sql_impl(pool: &AnyPool, rel: Relation, alias_count: &mut u32) -> Query { restrictions, ), Inner::Join(left, right) => { - sql_join(pool, &rel.columns, &limit_str, alias_count, *left, *right) + sql_join(tr, &rel.columns, &limit_str, alias_count, *left, *right) } } } -pub(crate) fn sql(pool: &AnyPool, rel: Relation) -> SqlStream { +pub(crate) fn sql(tr: TransactionStatic, rel: Relation) -> SqlStream { let mut v = 0; let columns = rel.columns.clone(); - match sql_impl(pool, rel, &mut v) { - Query::Sql(s) => to_stream(pool, s, columns), + match sql_impl(tr.clone(), rel, &mut v) { + Query::Sql(s) => to_stream(tr, s, columns), Query::Stream(s) => s, } } diff --git a/server/src/deno.rs b/server/src/deno.rs index 783c5d63a..22d041110 100644 --- a/server/src/deno.rs +++ b/server/src/deno.rs @@ -45,6 +45,7 @@ use std::convert::TryInto; use std::future::Future; use std::io::Write; use std::net::SocketAddr; +use std::ops::DerefMut; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::rc::Rc; @@ -287,7 +288,13 @@ async fn op_chisel_store( // Await point below, RcMut can't be held. drop(runtime); - Ok(serde_json::json!(query_engine.add_row(&ty, value).await?)) + let transaction = current_transaction()?; + let mut transaction = transaction.lock().await; + Ok(serde_json::json!( + query_engine + .add_row(&ty, value, Some(transaction.deref_mut())) + .await? + )) } type DbStream = RefCell; @@ -359,7 +366,9 @@ fn op_chisel_relational_query_create( let relation = convert(&relation)?; let mut runtime = runtime::get(); let query_engine = &mut runtime.query_engine; - let stream = Box::pin(query_engine.query_relation(relation)); + + let transaction = current_transaction()?; + let stream = Box::pin(query_engine.query_relation(relation, transaction)); let resource = QueryStreamResource { stream: RefCell::new(stream), }; @@ -602,6 +611,15 @@ fn current_method() -> Method { CURRENT_CONTEXT.with(|path| path.borrow().method.clone()) } +fn current_transaction() -> Result { + CURRENT_CONTEXT.with(|path| { + path.borrow() + .transaction + .clone() + .ok_or_else(|| anyhow!("no active transaction")) + }) +} + struct RequestFuture { context: RequestContext, inner: F, @@ -727,7 +745,6 @@ fn get_result<'a>( } pub(crate) async fn run_js(path: String, mut req: Request) -> Result> { - let qe = { let runtime = runtime::get(); let qe = runtime.query_engine.clone(); @@ -776,14 +793,7 @@ pub(crate) async fn run_js(path: String, mut req: Request) -> Resul // endpoints that don't do any data access. For now, because we always create it above, // it should be safe to unwrap. let transaction = clear_current_context().unwrap(); - - // Only do this after we cleared the current context so we don't hold the transaction. - // But as a result of the transaction being committed after we ?, we won't commit it - // if there was an error in the endpoint. Jan FIXME: maybe we should explicitly rollback. - // Or alternatively, because there are many other `?` stuff in this function, maybe commit the - // transaction should be the very last thing we do let result = result?; - crate::query::QueryEngine::commit_transaction_static(transaction).await?; let stream = get_read_stream(runtime, result.clone())?; let scope = &mut runtime.handle_scope(); @@ -821,6 +831,10 @@ pub(crate) async fn run_js(path: String, mut req: Request) -> Resul } let body = builder.body(Body::Stream(Box::pin(stream)))?; + // Defer committing of the transaction to the last possible moment. It would be better + // to commit the transaction after the response stream is closed, but it would be a lot + // of work and this will do for now. + crate::query::QueryEngine::commit_transaction_static(transaction).await?; Ok(body) } diff --git a/server/src/query/engine.rs b/server/src/query/engine.rs index 344fb476f..901c7e61e 100644 --- a/server/src/query/engine.rs +++ b/server/src/query/engine.rs @@ -5,21 +5,21 @@ use crate::query::{DbConnection, Kind, QueryError}; use crate::types::{Field, ObjectDelta, ObjectType, Type, OAUTHUSER_TYPE_NAME}; use anyhow::{anyhow, Context as AnyhowContext}; use async_mutex::Mutex; +use async_mutex::MutexGuard; use futures::stream::BoxStream; use futures::stream::Stream; +use futures::Future; use futures::StreamExt; use itertools::{zip, Itertools}; use sea_query::{Alias, ColumnDef, Table}; use serde_json::json; use sqlx::any::{Any, AnyArguments, AnyPool, AnyRow}; -use sqlx::Column; -use sqlx::Transaction; -use sqlx::{Executor, Row}; +use sqlx::{Column, Executor, Row, Transaction}; use std::cell::RefCell; use std::collections::HashMap; use std::marker::PhantomPinned; use std::pin::Pin; -use std::rc::Rc; +use std::sync::Arc; use std::task::{Context, Poll}; use uuid::Uuid; @@ -30,34 +30,67 @@ pub(crate) type RawSqlStream = BoxStream<'static, anyhow::Result>; pub(crate) type JsonObject = serde_json::Map; pub(crate) type SqlStream = BoxStream<'static, anyhow::Result>; -pub(crate) type TransactionStatic = Rc>>; +pub(crate) type TransactionStatic = Arc>>; -struct QueryResults { +struct QueryResults { + fut: RefCell>, + lock: RefCell>>>, + tr: TransactionStatic, raw_query: String, // The streams we use in here only depend on the lifetime of raw_query. stream: RefCell>, _marker: PhantomPinned, // QueryStream cannot be moved } -pub(crate) fn new_query_results(raw_query: String, pool: &AnyPool) -> RawSqlStream { +pub(crate) fn new_query_results(raw_query: String, tr: TransactionStatic) -> RawSqlStream { let ret = Box::pin(QueryResults { + fut: RefCell::new(None), + lock: RefCell::new(None), + tr, raw_query, stream: RefCell::new(None), _marker: PhantomPinned, }); - let ptr: *const String = &ret.raw_query; - let query = sqlx::query::(unsafe { &*ptr }); - let stream = query.fetch(pool).map(|i| i.map_err(anyhow::Error::new)); - ret.stream.replace(Some(Box::pin(stream))); + let ptr: *const Mutex> = &*ret.tr; + // This is safe as ret.fut and ret.tr live together in a Pinned struct. + let fut = unsafe { &*ptr }.lock(); + ret.fut.replace(Some(fut)); ret } -impl Stream for QueryResults { +impl>>> Stream + for QueryResults +{ type Item = anyhow::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut borrow = self.stream.borrow_mut(); - borrow.as_mut().unwrap().as_mut().poll_next(cx) + let mut lock_borrow = self.lock.borrow_mut(); + let mut stream_borrow = self.stream.borrow_mut(); + if lock_borrow.is_none() { + // Why is the future necessary - The mutex guarding the transaction is async + // and to unlock it within the stream polling code, we need to manually + // implement asynchronous unlocking and wait for it to be ready. + let mut fut = self.fut.borrow_mut(); + let fut: &mut T = fut.as_mut().unwrap(); + // We need pinned poll to access its Pin<&mut Self> methods, creating unchecked pin + // is safe as should be safe as the future comes from self which is pinned as well. + let poll = unsafe { Pin::new_unchecked(fut) }.poll(cx); + let lock = futures::ready!(poll); + lock_borrow.replace(lock); + + let raw_query: *const String = &self.raw_query; + // Raw query lives in pinned structure which will outlive the query. + let query = sqlx::query::(unsafe { &*raw_query }); + + let tr: &mut Transaction<'static, Any> = &mut *lock_borrow.as_mut().unwrap(); + let tr: *mut Transaction<'static, Any> = tr; + // Same argumentation applies as for the raw_query case. + let tr = unsafe { &mut *tr }; + + let stream = query.fetch(tr).map(|i| i.map_err(anyhow::Error::new)); + stream_borrow.replace(Box::pin(stream)); + } + stream_borrow.as_mut().unwrap().as_mut().poll_next(cx) } } @@ -158,28 +191,10 @@ impl QueryEngine { Ok(Self::new(local.kind, local.pool)) } - pub(crate) async fn drop_table( - &self, - transaction: &mut Transaction<'_, Any>, - ty: &ObjectType, - ) -> anyhow::Result<()> { - let drop_table = Table::drop() - .table(Alias::new(ty.backing_table())) - .to_owned(); - let drop_table = drop_table.build_any(DbConnection::get_query_builder(&self.kind)); - let drop_table = sqlx::query(&drop_table); - - transaction - .execute(drop_table) - .await - .map_err(QueryError::ExecuteFailed)?; - Ok(()) - } - pub(crate) async fn start_transaction_static( - self: Rc, + self: Arc, ) -> anyhow::Result { - Ok(Rc::new(Mutex::new( + Ok(Arc::new(Mutex::new( self.pool .begin() .await @@ -208,7 +223,8 @@ impl QueryEngine { pub(crate) async fn commit_transaction_static( transaction: TransactionStatic, ) -> anyhow::Result<()> { - let transaction = Rc::try_unwrap(transaction).map_err(|_| anyhow!("Transaction still have references held!"))?; + let transaction = Arc::try_unwrap(transaction) + .map_err(|_| anyhow!("Transaction still have references held!"))?; let transaction = transaction.into_inner(); transaction @@ -218,6 +234,23 @@ impl QueryEngine { Ok(()) } + pub(crate) async fn drop_table( + &self, + transaction: &mut Transaction<'_, Any>, + ty: &ObjectType, + ) -> anyhow::Result<()> { + let drop_table = Table::drop() + .table(Alias::new(ty.backing_table())) + .to_owned(); + let drop_table = drop_table.build_any(DbConnection::get_query_builder(&self.kind)); + let drop_table = sqlx::query(&drop_table); + + transaction + .execute(drop_table) + .await + .map_err(QueryError::ExecuteFailed)?; + Ok(()) + } pub(crate) async fn create_table( &self, @@ -308,8 +341,8 @@ impl QueryEngine { Ok(()) } - pub(crate) fn query_relation(&self, rel: Relation) -> SqlStream { - sql(&self.pool, rel) + pub(crate) fn query_relation(&self, rel: Relation, tr: TransactionStatic) -> SqlStream { + sql(tr, rel) } /// Inserts object of type `ty` and value `ty_value` into the database. @@ -325,9 +358,10 @@ impl QueryEngine { &self, ty: &ObjectType, ty_value: &JsonObject, + transaction: Option<&mut Transaction<'_, Any>>, ) -> anyhow::Result { let (inserts, id_tree) = self.prepare_insertion(ty, ty_value)?; - self.run_sql_queries(&inserts).await?; + self.run_sql_queries(&inserts, transaction).await?; Ok(id_tree.to_json()) } @@ -337,7 +371,7 @@ impl QueryEngine { ty_value: &JsonObject, ) -> anyhow::Result<()> { let query = self.prepare_insertion_shallow(ty, ty_value)?; - self.run_sql_queries(&[query]).await?; + self.run_sql_queries(&[query], None).await?; Ok(()) } @@ -345,15 +379,28 @@ impl QueryEngine { Ok(q.get_sqlx().fetch_one(&self.pool).await?) } - async fn run_sql_queries(&self, queries: &[SqlWithArguments]) -> anyhow::Result<()> { - let mut transaction = self.start_transaction().await?; - for q in queries { - transaction - .fetch_one(q.get_sqlx()) - .await - .map_err(QueryError::ExecuteFailed)?; + async fn run_sql_queries( + &self, + queries: &[SqlWithArguments], + transaction: Option<&mut Transaction<'_, Any>>, + ) -> anyhow::Result<()> { + if let Some(transaction) = transaction { + for q in queries { + transaction + .fetch_one(q.get_sqlx()) + .await + .map_err(QueryError::ExecuteFailed)?; + } + } else { + let mut transaction = self.start_transaction().await?; + for q in queries { + transaction + .fetch_one(q.get_sqlx()) + .await + .map_err(QueryError::ExecuteFailed)?; + } + QueryEngine::commit_transaction(transaction).await?; } - QueryEngine::commit_transaction(transaction).await?; Ok(()) } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index d2745db76..31164b910 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -36,7 +36,7 @@ use tonic::{transport::Server, Request, Response, Status}; pub(crate) struct GlobalRpcState { type_system: TypeSystem, meta: MetaService, - query_engine: QueryEngine, + query_engine: Arc, routes: PrefixMap, // For globally keeping track of routes commands: Vec, policies: Policies, @@ -55,7 +55,7 @@ impl GlobalRpcState { Ok(Self { type_system, meta, - query_engine, + query_engine: Arc::new(query_engine), commands, routes, policies, @@ -175,7 +175,7 @@ impl RpcService { state .type_system - .populate_types(&state.query_engine, &to, &from) + .populate_types(state.query_engine.clone(), &to, &from) .await?; let response = chisel::PopulateResponse { diff --git a/server/src/runtime.rs b/server/src/runtime.rs index f09ec0bf7..30e326295 100644 --- a/server/src/runtime.rs +++ b/server/src/runtime.rs @@ -9,11 +9,12 @@ use derive_new::new; use once_cell::sync::OnceCell; use std::cell::RefCell; use std::rc::Rc; +use std::sync::Arc; #[derive(new)] pub(crate) struct Runtime { pub(crate) api: Rc, - pub(crate) query_engine: Rc, + pub(crate) query_engine: Arc, pub(crate) meta: Rc, pub(crate) type_system: TypeSystem, pub(crate) policies: Policies, diff --git a/server/src/server.rs b/server/src/server.rs index b2b9c8642..72b4fce01 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -165,7 +165,7 @@ async fn run(state: SharedState, mut cmd: ExecutorChannel) -> Result<()> { Ok(Type::Object(t)) => t, _ => anyhow::bail!("Internal error: type {} not found", OAUTHUSER_TYPE_NAME), }; - let query_engine = Rc::new(QueryEngine::local_connection(&state.data_db).await?); + let query_engine = Arc::new(QueryEngine::local_connection(&state.data_db).await?); let mut transaction = query_engine.start_transaction().await?; query_engine .create_table(&mut transaction, &oauth_user_type) diff --git a/server/src/types.rs b/server/src/types.rs index c8646d43e..32f5b43ac 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -277,7 +277,7 @@ impl TypeSystem { pub(crate) async fn populate_types, F: AsRef>( &self, - engine: &QueryEngine, + engine: Arc, api_version_to: T, api_version_from: F, ) -> anyhow::Result<()> { @@ -310,7 +310,8 @@ impl TypeSystem { })?; let ty_obj_rel = backing_store_from_type(self, ty_obj).await?; - let mut row_streams = engine.query_relation(ty_obj_rel); + let tr = engine.clone().start_transaction_static().await?; + let mut row_streams = engine.query_relation(ty_obj_rel, tr.clone()); while let Some(row) = row_streams.next().await { // FIXME: basic rate limit? @@ -318,6 +319,8 @@ impl TypeSystem { .with_context(|| format!("population can't proceed as reading from the underlying database for type {} failed", ty_obj_to.name))?; engine.add_row_shallow(ty_obj_to, &row).await?; } + drop(row_streams); + QueryEngine::commit_transaction_static(tr).await?; } } Ok(())