Skip to content

Commit

Permalink
Enable transactions for whole endpoint execution.
Browse files Browse the repository at this point in the history
  • Loading branch information
BearLemma committed Jan 27, 2022
1 parent c36974a commit 5deb9ec
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 88 deletions.
81 changes: 81 additions & 0 deletions cli/tests/lit/transaction.lit
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# SPDX-FileCopyrightText: © 2021 ChiselStrike <info@chiselstrike.com>

# RUN: sh -e @file

cat << EOF > "$TEMPDIR/models/types.ts"
export class Person extends Chisel.ChiselEntity {
name: string = "";
}
EOF

cat << EOF > "$TEMPDIR/endpoints/store_person.js"
import { Person } from "../types.ts";

export default async function chisel(req: Request) {
const req_json = await req.json();

let p = new Person();
p.name = req_json.name;
await p.save();

if (req_json.command == "die with honor") {
throw new InternalError("Let's see if transaction gets cancelled");
}

return new Response('Mission acomplished');
}
EOF

cat << EOF > "$TEMPDIR/endpoints/retrieve_all.ts"
import { Person } from "../models/types.ts";

export default async function chisel(req: Request) {
let resp = "[";
for await (let p of Person.cursor()) {
resp += p.name + " ";
}
resp += "]";
return new Response(resp);
}
EOF

cat << EOF > "$TEMPDIR/endpoints/write_and_read.ts"
import { Person } from "../models/types.ts";

export default async function chisel(req: Request) {
let p = new Person();
p.name = "ThisIsTheBestName";
await p.save();

let resp = "[";
for await (let p of Person.cursor()) {
resp += p.name + " ";
}
resp += "]";

return new Response(resp);
}
EOF

cd "$TEMPDIR"
$CHISEL apply

$CURL -X POST --data '{ "name": "Adalbrecht" }' $CHISELD_HOST/dev/store_person
# CHECK: Mission acomplished

$CURL $CHISELD_HOST/dev/retrieve_all
# CHECK: HTTP/1.1 200 OK
# CHECK: [Adalbrecht ]

$CURL -X POST --data '{
"name": "Ruprt",
"command": "die with honor"
}' $CHISELD_HOST/dev/store_person

$CURL $CHISELD_HOST/dev/retrieve_all
# CHECK: HTTP/1.1 200 OK
# CHECK: [Adalbrecht ]

$CURL -X POST $CHISELD_HOST/dev/write_and_read
# CHECK: HTTP/1.1 200 OK
# CHECK: [Adalbrecht, ThisIsTheBestName ]
14 changes: 13 additions & 1 deletion docusaurus/docs/data-access.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ The `findMany()` method is convenient, but also problematic if you have a lot of
entities stored because loading them can take a lot of time and memory. In future
releases of ChiselStrike, the runtime will enforce a maximum number of entities
`findMany()` can return and also enforce timeouts at the data store level. The
runtime will also provide optional pagination for the `findMany()` method.
runtime will also provide optional pagination for the `findMany()` method.
:::

## Cursors
Expand Down Expand Up @@ -270,3 +270,15 @@ The methods provided by `ChiselCursor` are outlined in the following table.
The `ChiselCursor` interface is still work-in-progress. For example, methods such as `skip()`, `map()`, and `reduce()` are planned for future releases.
Also, the current implementation of `filter()` takes a _restriction object_, but future ChiselStrike runtimes will allow you to write filter functions using TypeScript, which are automatically converted to efficient database queries in many cases.
:::

## Transacitons

We currently support implicit transactional evaluation. The transaction is created before ChiselStrike
starts evaluating your endpoint and is automatically committed after your endpoint ends and we generate
the HTTP response. In case your endpoint returns a stream, any database-related operation done within
stream generation code will happen outside of the transaction and can result in a crash.

If your code crashes or explicitly throws exception that is not caught, ChiselStrike rollbacks the
transaction automatically.

Explicit user-controlled transactions are coming soon.
5 changes: 2 additions & 3 deletions docusaurus/docs/known_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ While we do plan to provide you with a better experience in the future, for now
want to use external modules, browser-style should work.

* **Transactions:** ChiselStrike aims to fully support transactions at the endpoint boundary and,
at a later date, with user-defined granularity (for advanced users). There is a known bug with
that, though: endpoints should be serialized so RMW is write, but transactions won't be rolled
back if there is an exception in the endpoint and it doesn't complete.
at a later date, with user-defined granularity (for advanced users). We currently wrap the
whole endpoint in one big transaction.

* **Nested models:** (Also known as relationships.) With the exception of the special `OAuthUser`
model, it is not possible to embed a model inside another yet. The code to persist nested models
Expand Down
2 changes: 1 addition & 1 deletion server/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn insert_user_into_db(username: &str) -> anyhow::Result<String> {
}
user.insert("username".into(), json!(username));
query_engine
.add_row(&oauth_user_type, &user)
.add_row(&oauth_user_type, &user, None)
.await?
.get("id")
.ok_or_else(|| anyhow!("Didn't get user ID from storing a user."))?
Expand Down
38 changes: 19 additions & 19 deletions server/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::query::engine::new_query_results;
use crate::query::engine::JsonObject;
use crate::query::engine::RawSqlStream;
use crate::query::engine::SqlStream;
use crate::query::engine::TransactionStatic;
use crate::runtime;
use crate::types::{Field, ObjectType, Type, TypeSystem, TypeSystemError, OAUTHUSER_TYPE_NAME};
use anyhow::{anyhow, Result};
Expand All @@ -20,7 +21,6 @@ use futures::StreamExt;
use futures::TryStreamExt;
use itertools::Itertools;
use serde_json::value::Value;
use sqlx::AnyPool;
use std::collections::HashMap;
use std::collections::HashSet;
use std::pin::Pin;
Expand Down Expand Up @@ -276,7 +276,7 @@ fn sql_where_for_match_login(
}

fn sql_backing_store(
pool: &AnyPool,
tr: TransactionStatic,
columns: Vec<(String, Type)>,
limit_str: &str,
ty: Arc<ObjectType>,
Expand All @@ -303,7 +303,7 @@ fn sql_backing_store(
if policies.transforms.is_empty() {
return Query::Sql(query);
}
let stream = new_query_results(query, pool);
let stream = new_query_results(query, tr);
let pstream = Box::pin(PolicyApplyingStream {
inner: stream,
policies,
Expand Down Expand Up @@ -401,14 +401,14 @@ fn join_stream(columns: &[(String, Type)], left: SqlStream, right: SqlStream) ->
}

fn sql_filter(
pool: &AnyPool,
tr: TransactionStatic,
columns: Vec<(String, Type)>,
limit_str: &str,
alias_count: &mut u32,
inner: Relation,
restrictions: Vec<Restriction>,
) -> Query {
let inner_sql = sql_impl(pool, inner, alias_count);
let inner_sql = sql_impl(tr, inner, alias_count);
let inner_sql = match inner_sql {
Query::Sql(s) => s,
Query::Stream(s) => return filter_stream(s, columns, restrictions),
Expand Down Expand Up @@ -441,8 +441,8 @@ fn sql_filter(
))
}

fn to_stream(pool: &AnyPool, s: String, columns: Vec<(String, Type)>) -> SqlStream {
let inner = new_query_results(s, pool);
fn to_stream(tr: TransactionStatic, s: String, columns: Vec<(String, Type)>) -> SqlStream {
let inner = new_query_results(s, tr);
Box::pin(PolicyApplyingStream {
inner,
policies: FieldPolicies::default(),
Expand All @@ -451,7 +451,7 @@ fn to_stream(pool: &AnyPool, s: String, columns: Vec<(String, Type)>) -> SqlStre
}

fn sql_join(
pool: &AnyPool,
tr: TransactionStatic,
columns: &[(String, Type)],
limit_str: &str,
alias_count: &mut u32,
Expand All @@ -478,15 +478,15 @@ fn sql_join(
// FIXME: Optimize the case of table.left or table.right being just
// a BackingStore with all fields. The database probably doesn't
// care, but will make the logs cleaner.
let lsql = sql_impl(pool, left, alias_count);
let rsql = sql_impl(pool, right, alias_count);
let lsql = sql_impl(tr.clone(), left, alias_count);
let rsql = sql_impl(tr.clone(), right, alias_count);
let (lsql, rsql) = match (lsql, rsql) {
(Query::Sql(l), Query::Sql(r)) => (l, r),
(Query::Stream(l), Query::Sql(r)) => {
return join_stream(columns, l, to_stream(pool, r, right_columns))
return join_stream(columns, l, to_stream(tr, r, right_columns))
}
(Query::Sql(l), Query::Stream(r)) => {
return join_stream(columns, to_stream(pool, l, left_columns), r)
return join_stream(columns, to_stream(tr, l, left_columns), r)
}
(Query::Stream(l), Query::Stream(r)) => return join_stream(columns, l, r),
};
Expand All @@ -508,34 +508,34 @@ fn sql_join(
))
}

fn sql_impl(pool: &AnyPool, rel: Relation, alias_count: &mut u32) -> Query {
fn sql_impl(tr: TransactionStatic, rel: Relation, alias_count: &mut u32) -> Query {
let limit_str = rel
.limit
.map(|x| format!("LIMIT {}", x))
.unwrap_or_else(String::new);
match rel.inner {
Inner::BackingStore(ty, policies) => {
sql_backing_store(pool, rel.columns, &limit_str, ty, policies)
sql_backing_store(tr, rel.columns, &limit_str, ty, policies)
}
Inner::Filter(inner, restrictions) => sql_filter(
pool,
tr,
rel.columns,
&limit_str,
alias_count,
*inner,
restrictions,
),
Inner::Join(left, right) => {
sql_join(pool, &rel.columns, &limit_str, alias_count, *left, *right)
sql_join(tr, &rel.columns, &limit_str, alias_count, *left, *right)
}
}
}

pub(crate) fn sql(pool: &AnyPool, rel: Relation) -> SqlStream {
pub(crate) fn sql(tr: TransactionStatic, rel: Relation) -> SqlStream {
let mut v = 0;
let columns = rel.columns.clone();
match sql_impl(pool, rel, &mut v) {
Query::Sql(s) => to_stream(pool, s, columns),
match sql_impl(tr.clone(), rel, &mut v) {
Query::Sql(s) => to_stream(tr, s, columns),
Query::Stream(s) => s,
}
}
34 changes: 24 additions & 10 deletions server/src/deno.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use std::convert::TryInto;
use std::future::Future;
use std::io::Write;
use std::net::SocketAddr;
use std::ops::DerefMut;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::rc::Rc;
Expand Down Expand Up @@ -287,7 +288,13 @@ async fn op_chisel_store(
// Await point below, RcMut can't be held.
drop(runtime);

Ok(serde_json::json!(query_engine.add_row(&ty, value).await?))
let transaction = current_transaction()?;
let mut transaction = transaction.lock().await;
Ok(serde_json::json!(
query_engine
.add_row(&ty, value, Some(transaction.deref_mut()))
.await?
))
}

type DbStream = RefCell<SqlStream>;
Expand Down Expand Up @@ -359,7 +366,9 @@ fn op_chisel_relational_query_create(
let relation = convert(&relation)?;
let mut runtime = runtime::get();
let query_engine = &mut runtime.query_engine;
let stream = Box::pin(query_engine.query_relation(relation));

let transaction = current_transaction()?;
let stream = Box::pin(query_engine.query_relation(relation, transaction));
let resource = QueryStreamResource {
stream: RefCell::new(stream),
};
Expand Down Expand Up @@ -602,6 +611,15 @@ fn current_method() -> Method {
CURRENT_CONTEXT.with(|path| path.borrow().method.clone())
}

fn current_transaction() -> Result<TransactionStatic> {
CURRENT_CONTEXT.with(|path| {
path.borrow()
.transaction
.clone()
.ok_or_else(|| anyhow!("no active transaction"))
})
}

struct RequestFuture<F> {
context: RequestContext,
inner: F,
Expand Down Expand Up @@ -727,7 +745,6 @@ fn get_result<'a>(
}

pub(crate) async fn run_js(path: String, mut req: Request<hyper::Body>) -> Result<Response<Body>> {

let qe = {
let runtime = runtime::get();
let qe = runtime.query_engine.clone();
Expand Down Expand Up @@ -776,14 +793,7 @@ pub(crate) async fn run_js(path: String, mut req: Request<hyper::Body>) -> Resul
// endpoints that don't do any data access. For now, because we always create it above,
// it should be safe to unwrap.
let transaction = clear_current_context().unwrap();

// Only do this after we cleared the current context so we don't hold the transaction.
// But as a result of the transaction being committed after we ?, we won't commit it
// if there was an error in the endpoint. Jan FIXME: maybe we should explicitly rollback.
// Or alternatively, because there are many other `?` stuff in this function, maybe commit the
// transaction should be the very last thing we do
let result = result?;
crate::query::QueryEngine::commit_transaction_static(transaction).await?;

let stream = get_read_stream(runtime, result.clone())?;
let scope = &mut runtime.handle_scope();
Expand Down Expand Up @@ -821,6 +831,10 @@ pub(crate) async fn run_js(path: String, mut req: Request<hyper::Body>) -> Resul
}

let body = builder.body(Body::Stream(Box::pin(stream)))?;
// Defer committing of the transaction to the last possible moment. It would be better
// to commit the transaction after the response stream is closed, but it would be a lot
// of work and this will do for now.
crate::query::QueryEngine::commit_transaction_static(transaction).await?;
Ok(body)
}

Expand Down

0 comments on commit 5deb9ec

Please sign in to comment.