Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use AsyncLocalStorage to refactor transaction, to make it more safe #108

Merged
merged 2 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const db = new RDSClient({
// before returning an error from getConnection.
// If set to 0, there is no limit to the number of queued connection requests. (Default: 0)
// queueLimit: 0,
// Set asyncLocalStorage manually for transaction
// connectionStorage: new AsyncLocalStorage(),
// If create multiple RDSClient instances with the same connectionStorage, use this key to distinguish between the instances
// connectionStorageKey: 'datasource',
});
```

Expand Down Expand Up @@ -309,28 +313,17 @@ const result = await db.beginTransactionScope(async conn => {
// if error throw on scope, will auto rollback
```

#### Transaction on koa

API: `async beginTransactionScope(scope, ctx)`

Use koa's context to make sure only one active transaction on one ctx.
In `Promise.all` case, Parallel beginTransactionScope will create isolated transactions.

```js
async function foo(ctx, data1) {
return await db.beginTransactionScope(async conn => {
await conn.insert(table1, data1);
return { success: true };
}, ctx);
}

async function bar(ctx, data2) {
return await db.beginTransactionScope(async conn => {
// execute foo with the same transaction scope
await foo(ctx, { foo: 'bar' });
await conn.insert(table2, data2);
return { success: true };
}, ctx);
}
const result = await Promise.all([
db.beginTransactionScope(async conn => {
// commit and success
}),
db.beginTransactionScope(async conn => {
// throw err and rollback
}),
])
```

### Raw Queries
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"eslint": "^8.29.0",
"eslint-config-egg": "^12.1.0",
"git-contributor": "^2.0.0",
"mm": "^3.3.0",
"typescript": "^4.9.5"
},
"homepage": "https://github.com/ali-sdk/ali-rds",
Expand Down
163 changes: 118 additions & 45 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { promisify } from 'node:util';
import mysql from 'mysql';
import type { PoolConfig, Pool } from 'mysql';
import type { PoolConnectionPromisify } from './types';
import type { Pool } from 'mysql';
import type { PoolConnectionPromisify, RDSClientOptions, TransactionContext, TransactionScope } from './types';
import { Operator } from './operator';
import { RDSConnection } from './connection';
import { RDSTransaction } from './transaction';
Expand All @@ -24,17 +25,26 @@ export class RDSClient extends Operator {
static get format() { return mysql.format; }
static get raw() { return mysql.raw; }

static #DEFAULT_STORAGE_KEY = Symbol('RDSClient#storage#default');
static #TRANSACTION_NEST_COUNT = Symbol('RDSClient#transaction#nestCount');

#pool: PoolPromisify;
constructor(options: PoolConfig) {
#connectionStorage: AsyncLocalStorage<TransactionContext>;
#connectionStorageKey: string | symbol;

constructor(options: RDSClientOptions) {
super();
this.#pool = mysql.createPool(options) as unknown as PoolPromisify;
const { connectionStorage, connectionStorageKey, ...mysqlOptions } = options;
this.#pool = mysql.createPool(mysqlOptions) as unknown as PoolPromisify;
[
'query',
'getConnection',
'end',
].forEach(method => {
this.#pool[method] = promisify(this.#pool[method]);
});
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
}

// impl Operator._query
Expand Down Expand Up @@ -92,6 +102,7 @@ export class RDSClient extends Operator {
throw err;
}
const tran = new RDSTransaction(conn);
tran[RDSClient.#TRANSACTION_NEST_COUNT] = 1;
if (this.beforeQueryHandlers.length > 0) {
for (const handler of this.beforeQueryHandlers) {
tran.beforeQuery(handler);
Expand All @@ -109,75 +120,137 @@ export class RDSClient extends Operator {
* Auto commit or rollback on a transaction scope
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @param {Object} [ctx] - transaction context
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: (transaction: RDSTransaction) => Promise<any>, ctx?: any): Promise<any> {
ctx = ctx || {};
if (!ctx._transactionConnection) {
// Create only one conn if concurrent call `beginTransactionScope`
ctx._transactionConnection = this.beginTransaction();
}
const tran = await ctx._transactionConnection;

if (!ctx._transactionScopeCount) {
ctx._transactionScopeCount = 1;
async #beginTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise<any> {
let tran: RDSTransaction;
let shouldRelease = false;
if (!ctx[this.#connectionStorageKey]) {
// there is no transaction in ctx, create a new one
tran = await this.beginTransaction();
ctx[this.#connectionStorageKey] = tran;
shouldRelease = true;
} else {
ctx._transactionScopeCount++;
// use transaction in ctx
tran = ctx[this.#connectionStorageKey]!;
tran[RDSClient.#TRANSACTION_NEST_COUNT]++;

Check warning

Code scanning / CodeQL

Prototype-polluting assignment

This assignment may alter Object.prototype if a malicious '__proto__' string is injected from [library input](1).
}

let result: any;
let scopeError: any;
let internalError: any;
try {
const result = await scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
ctx._transactionConnection = null;
await tran.commit();
result = await scope(tran);
} catch (err: any) {
scopeError = err;
}
tran[RDSClient.#TRANSACTION_NEST_COUNT]--;

Check warning

Code scanning / CodeQL

Prototype-polluting assignment

This assignment may alter Object.prototype if a malicious '__proto__' string is injected from [library input](1).

// null connection means the nested scope has been rollback, we can do nothing here
if (tran.conn) {
try {
// execution error, should rollback
if (scopeError) {
await tran.rollback();
} else if (tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) {
// nestedCount smaller than 1 means all the nested scopes have executed successfully
await tran.commit();
}
} catch (err) {
internalError = err;
}
return result;
} catch (err) {
if (ctx._transactionConnection) {
ctx._transactionConnection = null;
await tran.rollback();
}

// remove transaction in ctx
if (shouldRelease && tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) {
ctx[this.#connectionStorageKey] = null;
}

if (internalError) {
if (scopeError) {
internalError.cause = scopeError;
}
throw err;
throw internalError;
}
if (scopeError) {
throw scopeError;
}
return result;
}

/**
* Auto commit or rollback on a transaction scope
*
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: TransactionScope) {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginTransactionScope(scope, ctx);
}
ctx = {};
return await this.#connectionStorage.run(ctx, async () => {
return await this.#beginTransactionScope(scope, ctx!);
});
}

/**
* doomed to be rollbacked after transaction scope
* useful on writing tests which are related with database
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @param scope - scope with code
* @param ctx - transaction context
* @return {Object} - scope return result
*/
async beginDoomedTransactionScope(scope: (transaction: RDSTransaction) => Promise<any>, ctx?: any): Promise<any> {
ctx = ctx || {};
if (!ctx._transactionConnection) {
ctx._transactionConnection = await this.beginTransaction();
ctx._transactionScopeCount = 1;
async #beginDoomedTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise<any> {
let tran: RDSTransaction;
if (!ctx[this.#connectionStorageKey]) {
// there is no transaction in ctx, create a new one
tran = await this.beginTransaction();
ctx[this.#connectionStorageKey] = tran;
} else {
ctx._transactionScopeCount++;
// use transaction in ctx
tran = ctx[this.#connectionStorageKey]!;
tran[RDSClient.#TRANSACTION_NEST_COUNT]++;

Check warning

Code scanning / CodeQL

Prototype-polluting assignment

This assignment may alter Object.prototype if a malicious '__proto__' string is injected from [library input](1).
}
const tran = ctx._transactionConnection;

try {
const result = await scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
ctx._transactionConnection = null;
tran[RDSClient.#TRANSACTION_NEST_COUNT]--;
Fixed Show fixed Hide fixed

Check warning

Code scanning / CodeQL

Prototype-polluting assignment

This assignment may alter Object.prototype if a malicious '__proto__' string is injected from [library input](1).
if (tran[RDSClient.#TRANSACTION_NEST_COUNT] === 0) {
ctx[this.#connectionStorageKey] = null;
await tran.rollback();
}
return result;
} catch (err) {
if (ctx._transactionConnection) {
ctx._transactionConnection = null;
if (ctx[this.#connectionStorageKey]) {
ctx[this.#connectionStorageKey] = null;
await tran.rollback();
}
throw err;
} finally {
await tran.rollback();
}
}

/**
* doomed to be rollbacked after transaction scope
* useful on writing tests which are related with database
*
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginDoomedTransactionScope(scope: TransactionScope): Promise<any> {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginDoomedTransactionScope(scope, ctx);
}
ctx = {};
return await this.#connectionStorage.run(ctx, async () => {
return await this.#beginDoomedTransactionScope(scope, ctx!);
});
}

async end() {
await this.#pool.end();
}
Expand Down
12 changes: 11 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import type { PoolConnection } from 'mysql';
import { AsyncLocalStorage } from 'async_hooks';
import type { PoolConnection, PoolConfig } from 'mysql';
import { RDSTransaction } from './transaction';

export interface RDSClientOptions extends PoolConfig {
connectionStorageKey?: string;
connectionStorage?: AsyncLocalStorage<Record<PropertyKey, RDSTransaction>>;
}

export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query'> {
query(sql: string): Promise<any>;
Expand Down Expand Up @@ -53,3 +60,6 @@ export type LockTableOption = {

export type BeforeQueryHandler = (sql: string) => string | undefined | void;
export type AfterQueryHandler = (sql: string, result: any, execDuration: number, err?: Error) => void;

export type TransactionContext = Record<PropertyKey, RDSTransaction | null>;
export type TransactionScope = (transaction: RDSTransaction) => Promise<any>;
Loading