Skip to content

Commit

Permalink
feat (client): flag to disable FKs on incoming TXs in SQLite (#1281)
Browse files Browse the repository at this point in the history
This PR introduces an additional flag that can be enabled when
electrifying a SQLite database. When the flag is enabled, the Satellite
process disables FK checks on incoming transactions. This flag is useful
for performance reasons as skipping FK checks greatly speeds up initial
data sync when there is a lot of data.

Note that disabling FK checks can't be done transactionally so we have
to disable FKs, apply the TX, and then re-enable FKs. Therefore, it is
important that no transactions are ran concurrently (this is ensured by
the mutex that our drivers hold).
We assume that Electric exlusively owns the DB connection and thus that
nobody executes a concurrent TX directly on the DB connection.
  • Loading branch information
kevin-dp committed Jun 4, 2024
1 parent b61258f commit f4f020d
Show file tree
Hide file tree
Showing 16 changed files with 554 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changeset/clean-singers-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Flag to disable FKs checks on incoming TXs in SQLite.
8 changes: 8 additions & 0 deletions clients/typescript/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ export interface ElectricConfig {
* Optional backoff options for connecting with Electric
*/
connectionBackOffOptions?: ConnectionBackOffOptions
/**
* Whether to disable FK checks when applying downstream (i.e. incoming) transactions to the local SQLite database.
* When using Postgres, this is the default behavior and can't be changed.
*/
disableForeignKeysDownstream?: boolean
}

export type ElectricConfigWithDialect = ElectricConfig & {
Expand All @@ -61,6 +66,7 @@ export type HydratedConfig = {
debug: boolean
connectionBackOffOptions: ConnectionBackOffOptions
namespace: string
disableFKs: boolean | undefined
}

export type InternalElectricConfig = {
Expand All @@ -73,6 +79,7 @@ export type InternalElectricConfig = {
}
debug?: boolean
connectionBackOffOptions?: ConnectionBackOffOptions
disableFKs?: boolean
}

export const hydrateConfig = (
Expand Down Expand Up @@ -126,5 +133,6 @@ export const hydrateConfig = (
debug,
connectionBackOffOptions,
namespace: defaultNamespace,
disableFKs: config.disableForeignKeysDownstream,
}
}
67 changes: 63 additions & 4 deletions clients/typescript/src/drivers/better-sqlite3/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { Mutex } from 'async-mutex'
import {
DatabaseAdapter as DatabaseAdapterInterface,
RunResult,
TableNameImpl,
Transaction as Tx,
UncoordinatedDatabaseAdapter,
} from '../../electric/adapter'

import {
Expand All @@ -21,12 +23,21 @@ export class DatabaseAdapter
db: Database
readonly defaultNamespace = 'main'

/*
* Even though this driver is synchronous we need to coordinate the calls through a mutex
* because of the `group` method which takes a function: `f: (adapter: UncoordinatedDatabaseAdapter) => Promise<void> | void`
* that function may call `await` which would open the possibility for another query/transaction
* to be interleaved with the execution of that function
*/
protected txMutex: Mutex

constructor(db: Database) {
super()
this.db = db
this.txMutex = new Mutex()
}

async runInTransaction(...statements: DbStatement[]): Promise<RunResult> {
async _runInTransaction(...statements: DbStatement[]): Promise<RunResult> {
const txn = this.db.transaction((stmts: DbStatement[]) => {
let rowsAffected = 0
for (const stmt of stmts) {
Expand All @@ -41,7 +52,7 @@ export class DatabaseAdapter
return txn(statements)
}

async transaction<T>(
async _transaction<T>(
f: (_tx: Tx, setResult: (res: T) => void) => void
): Promise<T> {
let result: T
Expand All @@ -51,7 +62,7 @@ export class DatabaseAdapter
}

// Promise interface, but impl not actually async
async run({ sql, args }: DbStatement): Promise<RunResult> {
async _run({ sql, args }: DbStatement): Promise<RunResult> {
const prep = this.db.prepare(sql)
const res = prep.run(...wrapBindParams(args))
return {
Expand All @@ -60,10 +71,58 @@ export class DatabaseAdapter
}

// This `query` function does not enforce that the query is read-only
async query({ sql, args }: DbStatement): Promise<Row[]> {
async _query({ sql, args }: DbStatement): Promise<Row[]> {
const stmt = this.db.prepare(sql)
return stmt.all(...wrapBindParams(args))
}

async _runExclusively<T>(
f: (adapter: UncoordinatedDatabaseAdapter) => Promise<T> | T
): Promise<T> {
// We create an adapter that does not go through the mutex
// when used by the function`f`, since we already take the mutex here
const adapter = {
run: this._run.bind(this),
query: this._query.bind(this),
transaction: this._transaction.bind(this),
runInTransaction: this._runInTransaction.bind(this),
}
return f(adapter)
}

async runInTransaction(...statements: DbStatement[]): Promise<RunResult> {
return this.txMutex.runExclusive(() => {
return this._runInTransaction(...statements)
})
}

async transaction<T>(
f: (_tx: Tx, setResult: (res: T) => void) => void
): Promise<T> {
return this.txMutex.runExclusive(() => {
return this._transaction(f)
})
}

async run(stmt: Statement): Promise<RunResult> {
return this.txMutex.runExclusive(() => {
return this._run(stmt)
})
}

async query(stmt: Statement): Promise<Row[]> {
return this.txMutex.runExclusive(() => {
return this._query(stmt)
})
}

async runExclusively<T>(
f: (adapter: UncoordinatedDatabaseAdapter) => Promise<T> | T
): Promise<T> {
return this.txMutex.runExclusive(() => {
return this._runExclusively(f)
})
}
}

function wrapBindParams(x: BindParams | undefined): StatementBindParams {
Expand Down
79 changes: 49 additions & 30 deletions clients/typescript/src/drivers/generic/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
RunResult,
TableNameImpl,
Transaction as Tx,
UncoordinatedDatabaseAdapter,
} from '../../electric/adapter'
import { Row, Statement } from '../../util'
import { Mutex } from 'async-mutex'
Expand Down Expand Up @@ -42,19 +43,12 @@ abstract class DatabaseAdapter
/**
* @param statements A list of SQL statements to execute against the DB.
*/
abstract runInTransaction(...statements: Statement[]): Promise<RunResult>
abstract _runInTransaction(...statements: Statement[]): Promise<RunResult>

async transaction<T>(
async _transaction<T>(
f: (_tx: Tx, setResult: (res: T) => void) => void
): Promise<T> {
const release = await this.txMutex.acquire()

try {
await this._run({ sql: 'BEGIN' })
} catch (e) {
release()
throw e
}
await this._run({ sql: 'BEGIN' })

return new Promise<T>((resolve, reject) => {
const tx = new Transaction(this, reject)
Expand All @@ -64,22 +58,26 @@ abstract class DatabaseAdapter
// This assumes that the user does not execute any more queries after setting the result.
this._run({ sql: 'COMMIT' })
.then(() => {
// Release early if commit succeeded
release()
resolve(res)
})
// Failed to commit
.catch(reject)
})
})
.catch((err) => {
// something went wrong
// let's roll back and rethrow
return this._run({ sql: 'ROLLBACK' }).then(() => {
throw err
})
}).catch((err) => {
// something went wrong
// let's roll back and rethrow
return this._run({ sql: 'ROLLBACK' }).then(() => {
throw err
})
.finally(release)
})
}

async transaction<T>(
f: (_tx: Tx, setResult: (res: T) => void) => void
): Promise<T> {
return this.txMutex.runExclusive(() => {
return this._transaction(f)
})
}

run(stmt: Statement): Promise<RunResult> {
Expand All @@ -98,6 +96,34 @@ abstract class DatabaseAdapter
})
}

runInTransaction(...statements: Statement[]): Promise<RunResult> {
return this.txMutex.runExclusive(() => {
return this._runInTransaction(...statements)
})
}

async _runExclusively<T>(
f: (adapter: UncoordinatedDatabaseAdapter) => Promise<T> | T
): Promise<T> {
// We create an adapter that does not go through the mutex
// when used by the function`f`, since we already take the mutex here
const adapter = {
run: this._run.bind(this),
query: this._query.bind(this),
transaction: this._transaction.bind(this),
runInTransaction: this._runInTransaction.bind(this),
}
return await f(adapter)
}

runExclusively<T>(
f: (adapter: UncoordinatedDatabaseAdapter) => Promise<T> | T
): Promise<T> {
return this.txMutex.runExclusive(() => {
return this._runExclusively(f)
})
}

get isLocked(): boolean {
return this.txMutex.isLocked()
}
Expand All @@ -118,11 +144,8 @@ export abstract class BatchDatabaseAdapter
*/
abstract execBatch(statements: Statement[]): Promise<RunResult>

async runInTransaction(...statements: Statement[]): Promise<RunResult> {
// Uses a mutex to ensure that transactions are not interleaved.
return this.txMutex.runExclusive(() => {
return this.execBatch(statements)
})
async _runInTransaction(...statements: Statement[]): Promise<RunResult> {
return this.execBatch(statements)
}
}

Expand All @@ -135,9 +158,7 @@ export abstract class SerialDatabaseAdapter
implements DatabaseAdapterInterface
{
abstract readonly defaultNamespace: 'main' | 'public'
async runInTransaction(...statements: Statement[]): Promise<RunResult> {
// Uses a mutex to ensure that transactions are not interleaved.
const release = await this.txMutex.acquire()
async _runInTransaction(...statements: Statement[]): Promise<RunResult> {
let transactionBegan = false
let rowsAffected = 0
try {
Expand All @@ -156,8 +177,6 @@ export abstract class SerialDatabaseAdapter
await this._run({ sql: 'ROLLBACK' })
}
throw error // rejects the promise with the reason for the rollback
} finally {
release()
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions clients/typescript/src/electric/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { QualifiedTablename } from '../util/tablename'
import { Row, Statement } from '../util/types'
import { parseTableNames } from '../util'

export type UncoordinatedDatabaseAdapter = Pick<
DatabaseAdapter,
'run' | 'query' | 'runInTransaction' | 'transaction'
>

// A `DatabaseAdapter` adapts a database client to provide the
// normalised interface defined here.
export interface DatabaseAdapter {
Expand All @@ -13,6 +18,24 @@ export interface DatabaseAdapter {
// Runs the provided sql as a transaction
runInTransaction(...statements: Statement[]): Promise<RunResult>

/**
* This method is useful to execute several queries in isolation from any other queries/transactions executed through this adapter.
* Useful to execute queries that cannot be executed inside a transaction (e.g. SQLite does not allow the `foreign_keys` PRAGMA to be modified in a transaction).
* In that case we can use this `group` method:
* ```
* await adapter.runExclusively(async (adapter) => {
* await adapter.run({ sql: 'PRAGMA foreign_keys = OFF;' })
* ...
* await adapter.run({ sql: 'PRAGMA foreign_keys = ON;' })
* })
* ```
* This snippet above ensures that no other query/transaction will be interleaved when the foreign keys are disabled.
* @param f Function that is guaranteed to be executed in isolation from other queries/transactions executed by this adapter.
*/
runExclusively<T>(
f: (adapter: UncoordinatedDatabaseAdapter) => Promise<T> | T
): Promise<T>

// Query the database.
query(statement: Statement): Promise<Row[]>

Expand Down
15 changes: 11 additions & 4 deletions clients/typescript/src/migrators/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { QualifiedTablename, SatelliteError, SatelliteErrorCode } from '../util'
import { _electric_migrations } from '../satellite/config'
import { pgBuilder, QueryBuilder, sqliteBuilder } from './query-builder'
import { dedent } from 'ts-dedent'
import { runInTransaction } from '../util/transactions'

export const SCHEMA_VSN_ERROR_MSG = `Local schema doesn't match server's. Clear local state through developer tools and retry connection manually. If error persists, re-generate the client. Check documentation (https://electric-sql.com/docs/reference/roadmap) to learn more.`

Expand Down Expand Up @@ -130,14 +131,17 @@ export abstract class BundleMigratorBase implements Migrator {
return migrations.slice(existingPrefix.length)
}

async apply({ statements, version }: StmtMigration): Promise<void> {
async apply(
{ statements, version }: StmtMigration,
disableFKs?: boolean
): Promise<void> {
if (!VALID_VERSION_EXP.test(version)) {
throw new Error(
`Invalid migration version, must match ${VALID_VERSION_EXP}`
)
}

await this.adapter.runInTransaction(...statements, {
await runInTransaction(this.adapter, disableFKs, ...statements, {
sql: dedent`
INSERT INTO ${this.migrationsTable} (version, applied_at)
VALUES (${this.queryBuilder.makePositionalParam(
Expand All @@ -154,7 +158,10 @@ export abstract class BundleMigratorBase implements Migrator {
* @returns A promise that resolves to a boolean
* that indicates if the migration was applied.
*/
async applyIfNotAlready(migration: StmtMigration): Promise<boolean> {
async applyIfNotAlready(
migration: StmtMigration,
disableFKs: boolean | undefined
): Promise<boolean> {
const rows = await this.adapter.query({
sql: dedent`
SELECT 1 FROM ${this.migrationsTable}
Expand All @@ -168,7 +175,7 @@ export abstract class BundleMigratorBase implements Migrator {
if (shouldApply) {
// This is a new migration because its version number
// is not in our migrations table.
await this.apply(migration)
await this.apply(migration, disableFKs)
}

return shouldApply
Expand Down
7 changes: 5 additions & 2 deletions clients/typescript/src/migrators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ export function makeStmtMigration(migration: Migration): StmtMigration {

export interface Migrator {
up(): Promise<number>
apply(migration: StmtMigration): Promise<void>
applyIfNotAlready(migration: StmtMigration): Promise<boolean>
apply(migration: StmtMigration, disableFKs?: boolean): Promise<void>
applyIfNotAlready(
migration: StmtMigration,
disableFKs: boolean | undefined
): Promise<boolean>
querySchemaVersion(): Promise<string | undefined>
queryBuilder: QueryBuilder
}
Loading

0 comments on commit f4f020d

Please sign in to comment.