Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use AsyncLocalStorage to refactor transaction, to make it more safe #108

Merged
merged 2 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const db = new RDSClient({
// before returning an error from getConnection.
// If set to 0, there is no limit to the number of queued connection requests. (Default: 0)
// queueLimit: 0,
// Set asyncLocalStorage manually for transaction
// connectionStorage: new AsyncLocalStorage(),
// If create multiple RDSClient instances with the same connectionStorage, use this key to distinguish between the instances
// connectionStorageKey: 'datasource',
});
```

Expand Down Expand Up @@ -309,28 +313,17 @@ const result = await db.beginTransactionScope(async conn => {
// if error throw on scope, will auto rollback
```

#### Transaction on koa

API: `async beginTransactionScope(scope, ctx)`

Use koa's context to make sure only one active transaction on one ctx.
In `Promise.all` case, Parallel beginTransactionScope will create isolated transactions.

```js
async function foo(ctx, data1) {
return await db.beginTransactionScope(async conn => {
await conn.insert(table1, data1);
return { success: true };
}, ctx);
}

async function bar(ctx, data2) {
return await db.beginTransactionScope(async conn => {
// execute foo with the same transaction scope
await foo(ctx, { foo: 'bar' });
await conn.insert(table2, data2);
return { success: true };
}, ctx);
}
const result = await Promise.all([
db.beginTransactionScope(async conn => {
// commit and success
}),
db.beginTransactionScope(async conn => {
// throw err and rollback
}),
])
```

### Raw Queries
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"eslint": "^8.29.0",
"eslint-config-egg": "^12.1.0",
"git-contributor": "^2.0.0",
"mm": "^3.3.0",
"typescript": "^4.9.5"
},
"homepage": "https://github.com/ali-sdk/ali-rds",
Expand Down
153 changes: 113 additions & 40 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { promisify } from 'node:util';
import mysql from 'mysql';
import type { PoolConfig, Pool } from 'mysql';
import type { PoolConnectionPromisify } from './types';
import type { Pool } from 'mysql';
import type { PoolConnectionPromisify, RDSClientOptions, TransactionContext, TransactionScope } from './types';
import { Operator } from './operator';
import { RDSConnection } from './connection';
import { RDSTransaction } from './transaction';
Expand All @@ -24,17 +25,26 @@
static get format() { return mysql.format; }
static get raw() { return mysql.raw; }

static #DEFAULT_STORAGE_KEY = Symbol.for('RDSClient#storage#default');
static #TRANSACTION_NEST_COUNT = Symbol.for('RDSClient#transaction#nestCount');
gxkl marked this conversation as resolved.
Show resolved Hide resolved

#pool: PoolPromisify;
constructor(options: PoolConfig) {
#connectionStorage: AsyncLocalStorage<TransactionContext>;
#connectionStorageKey: string | symbol;

constructor(options: RDSClientOptions) {
super();
this.#pool = mysql.createPool(options) as unknown as PoolPromisify;
const { connectionStorage, connectionStorageKey, ...mysqlOptions } = options;
this.#pool = mysql.createPool(mysqlOptions) as unknown as PoolPromisify;
[
'query',
'getConnection',
'end',
].forEach(method => {
this.#pool[method] = promisify(this.#pool[method]);
});
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
}

// impl Operator._query
Expand Down Expand Up @@ -92,6 +102,7 @@
throw err;
}
const tran = new RDSTransaction(conn);
tran[RDSClient.#TRANSACTION_NEST_COUNT] = 1;
if (this.beforeQueryHandlers.length > 0) {
for (const handler of this.beforeQueryHandlers) {
tran.beforeQuery(handler);
Expand All @@ -109,62 +120,106 @@
* Auto commit or rollback on a transaction scope
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @param {Object} [ctx] - transaction context
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: (transaction: RDSTransaction) => Promise<any>, ctx?: any): Promise<any> {
ctx = ctx || {};
if (!ctx._transactionConnection) {
// Create only one conn if concurrent call `beginTransactionScope`
ctx._transactionConnection = this.beginTransaction();
}
const tran = await ctx._transactionConnection;

if (!ctx._transactionScopeCount) {
ctx._transactionScopeCount = 1;
async #beginTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise<any> {
let tran: RDSTransaction;
let shouldRelease = false;
if (!ctx[this.#connectionStorageKey]) {
// there is no transaction in ctx, create a new one
tran = await this.beginTransaction();
ctx[this.#connectionStorageKey] = tran;
shouldRelease = true;
} else {
ctx._transactionScopeCount++;
// use transaction in ctx
tran = ctx[this.#connectionStorageKey]!;
tran[RDSClient.#TRANSACTION_NEST_COUNT]++;
Dismissed Show dismissed Hide dismissed
}

let result: any;
let scopeError: any;
let internalError: any;
try {
const result = await scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
ctx._transactionConnection = null;
await tran.commit();
result = await scope(tran);
} catch (err: any) {
scopeError = err;
}
tran[RDSClient.#TRANSACTION_NEST_COUNT]--;
Dismissed Show dismissed Hide dismissed

// null connection means the nested scope has been rollback, we can do nothing here
if (tran.conn) {
try {
// execution error, should rollback
if (scopeError) {
await tran.rollback();
} else if (tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) {
// nestedCount smaller than 1 means all the nested scopes have executed successfully
await tran.commit();
}
} catch (err) {
internalError = err;
}
return result;
} catch (err) {
if (ctx._transactionConnection) {
ctx._transactionConnection = null;
await tran.rollback();
}

// remove transaction in ctx
if (shouldRelease && tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) {
ctx[this.#connectionStorageKey] = null;
}

if (internalError) {
if (scopeError) {
internalError.cause = scopeError;
}
throw err;
throw internalError;
}
if (scopeError) {
throw scopeError;
}
return result;
}

/**
* Auto commit or rollback on a transaction scope
*
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: TransactionScope) {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginTransactionScope(scope, ctx);
}
ctx = {};
return await this.#connectionStorage.run(ctx, async () => {
return await this.#beginTransactionScope(scope, ctx!);
});
}

/**
* doomed to be rollbacked after transaction scope
* useful on writing tests which are related with database
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @param scope - scope with code
* @param ctx - transaction context
* @return {Object} - scope return result
*/
async beginDoomedTransactionScope(scope: (transaction: RDSTransaction) => Promise<any>, ctx?: any): Promise<any> {
ctx = ctx || {};
if (!ctx._transactionConnection) {
ctx._transactionConnection = await this.beginTransaction();
ctx._transactionScopeCount = 1;
async #beginDoomedTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise<any> {
let tran: RDSTransaction;
if (!ctx[this.#connectionStorageKey]) {
// there is no transaction in ctx, create a new one
tran = await this.beginTransaction();
ctx[this.#connectionStorageKey] = tran;
} else {
ctx._transactionScopeCount++;
// use transaction in ctx
tran = ctx[this.#connectionStorageKey]!;
tran[RDSClient.#TRANSACTION_NEST_COUNT]++;

Check warning on line 216 in src/client.ts

View check run for this annotation

Codecov / codecov/patch

src/client.ts#L214-L216

Added lines #L214 - L216 were not covered by tests
Dismissed Show dismissed Hide dismissed
}
const tran = ctx._transactionConnection;

try {
const result = await scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
tran[RDSClient.#TRANSACTION_NEST_COUNT]--;
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed
if (tran[RDSClient.#TRANSACTION_NEST_COUNT] === 0) {
ctx._transactionConnection = null;
gxkl marked this conversation as resolved.
Show resolved Hide resolved
}
return result;
Expand All @@ -178,6 +233,24 @@
}
}

/**
* doomed to be rollbacked after transaction scope
* useful on writing tests which are related with database
*
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginDoomedTransactionScope(scope: TransactionScope): Promise<any> {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginDoomedTransactionScope(scope, ctx);
}

Check warning on line 247 in src/client.ts

View check run for this annotation

Codecov / codecov/patch

src/client.ts#L246-L247

Added lines #L246 - L247 were not covered by tests
ctx = {};
return await this.#connectionStorage.run(ctx, async () => {
return await this.#beginDoomedTransactionScope(scope, ctx!);
});
}

async end() {
await this.#pool.end();
}
Expand Down
12 changes: 11 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import type { PoolConnection } from 'mysql';
import { AsyncLocalStorage } from 'async_hooks';
import type { PoolConnection, PoolConfig } from 'mysql';
import { RDSTransaction } from './transaction';

export interface RDSClientOptions extends PoolConfig {
connectionStorageKey?: string;
connectionStorage?: AsyncLocalStorage<Record<PropertyKey, RDSTransaction>>;
}

export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query'> {
query(sql: string): Promise<any>;
Expand Down Expand Up @@ -53,3 +60,6 @@ export type LockTableOption = {

export type BeforeQueryHandler = (sql: string) => string | undefined | void;
export type AfterQueryHandler = (sql: string, result: any, execDuration: number, err?: Error) => void;

export type TransactionContext = Record<PropertyKey, RDSTransaction | null>;
export type TransactionScope = (transaction: RDSTransaction) => Promise<any>;
Loading