Skip to content

Commit

Permalink
fix: ensure session for driver
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed Apr 15, 2024
1 parent 76e0e50 commit 1965055
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 23 deletions.
12 changes: 7 additions & 5 deletions packages/core/src/database.ts
Expand Up @@ -342,18 +342,20 @@ export class Database<S = {}, N = {}, C extends Context = Context> extends Servi
let driver = drivers.get(original)
if (!driver) {
let session: any
let _resolve: (value: any) => void
const sessionTask = new Promise((resolve) => _resolve = resolve)
driver = new Proxy(original, {
get: (target, p, receiver) => {
if (p === 'session') return session
if (p === '_ensureSession') return () => sessionTask
return Reflect.get(target, p, receiver)
},
set: (target, p, value, receiver) => {
if (p === 'session') session = value
return Reflect.set(target, p, value, receiver)
},
})
drivers.set(original, driver)
finalTasks.push(driver.withTransaction(() => initialTask))
finalTasks.push(driver.withTransaction((_session) => {
_resolve(session = _session)
return initialTask
}))
}
return driver
}
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/driver.ts
Expand Up @@ -59,7 +59,7 @@ export abstract class Driver<T = any, C extends Context = Context> {
abstract remove(sel: Selection.Mutable): Promise<Driver.WriteResult>
abstract create(sel: Selection.Mutable, data: any): Promise<any>
abstract upsert(sel: Selection.Mutable, data: any[], keys: string[]): Promise<Driver.WriteResult>
abstract withTransaction(callback: () => Promise<void>): Promise<void>
abstract withTransaction(callback: (session?: any) => Promise<void>): Promise<void>

public database: Database<any, any, C>
public logger: Logger
Expand Down Expand Up @@ -136,6 +136,8 @@ export abstract class Driver<T = any, C extends Context = Context> {
define<S, T>(converter: Driver.Transformer<S, T>) {
converter.types.forEach(type => this.types[type] = converter)
}

async _ensureSession() {}
}

export interface MigrationHooks {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/selection.ts
Expand Up @@ -125,6 +125,7 @@ class Executable<S = any, T = any> {

async execute(): Promise<T> {
await this.driver.database.prepared()
await this.driver._ensureSession()
return this.driver[this.type as any](this, ...this.args)
}
}
Expand Down
7 changes: 3 additions & 4 deletions packages/mongo/src/index.ts
Expand Up @@ -461,15 +461,14 @@ export class MongoDriver extends Driver<MongoDriver.Config> {
}
}

async withTransaction(callback: () => Promise<void>) {
async withTransaction(callback: (session: any) => Promise<void>) {
await this.client.withSession(async (session) => {
this.session = session
await session.withTransaction(() => callback()).catch(async (e) => {
await session.withTransaction(() => callback(session)).catch(async (e) => {
if (e instanceof MongoError && e.code === 20 && e.message.includes('Transaction numbers')) {
this.logger.warn(`MongoDB is currently running as standalone server, transaction is disabled.
Convert to replicaSet to enable the feature.
See https://www.mongodb.com/docs/manual/tutorial/convert-standalone-to-replica-set/`)
await callback()
await callback(session)
return
}
throw e
Expand Down
5 changes: 2 additions & 3 deletions packages/mysql/src/index.ts
Expand Up @@ -488,15 +488,14 @@ INSERT INTO mtt VALUES(json_extract(j, concat('$[', i, ']'))); SET i=i+1; END WH
return { inserted: records - result.changedRows, matched: result.changedRows, modified: result.affectedRows - records }
}

async withTransaction(callback: () => Promise<void>) {
async withTransaction(callback: (session: any) => Promise<void>) {
return new Promise<void>((resolve, reject) => {
this.pool.getConnection((err, conn) => {
this.session = conn
if (err) {
this.logger.warn('getConnection failed: ', err)
return
}
conn.beginTransaction(() => callback().then(
conn.beginTransaction(() => callback(conn).then(
() => conn.commit(() => resolve()),
(e) => conn.rollback(() => reject(e)),
).finally(() => conn.release()))
Expand Down
5 changes: 2 additions & 3 deletions packages/postgres/src/index.ts
Expand Up @@ -411,10 +411,9 @@ export class PostgresDriver extends Driver<PostgresDriver.Config> {
return { inserted: result.filter(({ rtime }) => +rtime !== mtime).length, matched: result.filter(({ rtime }) => +rtime === mtime).length }
}

async withTransaction(callback: () => Promise<void>) {
async withTransaction(callback: (session: any) => Promise<void>) {
return await this.postgres.begin(async (conn) => {
this.session = conn
await callback()
await callback(conn)
await conn.unsafe(`COMMIT`)
})
}
Expand Down
14 changes: 7 additions & 7 deletions packages/sqlite/src/index.ts
Expand Up @@ -228,7 +228,7 @@ export class SQLiteDriver extends Driver<SQLiteDriver.Config> {
}
}

#exec(sql: string, params: any, callback: (stmt: init.Statement) => any) {
_exec(sql: string, params: any, callback: (stmt: init.Statement) => any) {
try {
const stmt = this.db.prepare(sql)
const result = callback(stmt)
Expand All @@ -242,7 +242,7 @@ export class SQLiteDriver extends Driver<SQLiteDriver.Config> {
}

_all(sql: string, params: any = []) {
return this.#exec(sql, params, (stmt) => {
return this._exec(sql, params, (stmt) => {
stmt.bind(params)
const result: any[] = []
while (stmt.step()) {
Expand All @@ -254,7 +254,7 @@ export class SQLiteDriver extends Driver<SQLiteDriver.Config> {
}

_get(sql: string, params: any = []) {
return this.#exec(sql, params, stmt => stmt.getAsObject(params))
return this._exec(sql, params, stmt => stmt.getAsObject(params))
}

_export() {
Expand All @@ -263,7 +263,7 @@ export class SQLiteDriver extends Driver<SQLiteDriver.Config> {
}

_run(sql: string, params: any = [], callback?: () => any) {
this.#exec(sql, params, stmt => stmt.run(params))
this._exec(sql, params, stmt => stmt.run(params))
const result = callback?.()
return result
}
Expand Down Expand Up @@ -386,11 +386,11 @@ export class SQLiteDriver extends Driver<SQLiteDriver.Config> {
return result
}

async withTransaction(callback: (session: this) => Promise<void>) {
if (this._transactionTask) await this._transactionTask
async withTransaction(callback: () => Promise<void>) {
if (this._transactionTask) await this._transactionTask.catch(() => {})
return this._transactionTask = new Promise<void>((resolve, reject) => {
this._run('BEGIN TRANSACTION')
callback(this).then(
callback().then(
() => resolve(this._run('COMMIT')),
(e) => (this._run('ROLLBACK'), reject(e)),
)
Expand Down

0 comments on commit 1965055

Please sign in to comment.