Skip to content

Commit

Permalink
feat(core): use knex to generate sql + enable connection pooling [BC]
Browse files Browse the repository at this point in the history
QueryBuilder now internally uses knex to run all queries. As knex already supports connection pooling, thbis feature comes without any effort. New configuration for pooling is now availableAs knex already supports connection pooling, this feature comes without any effort.

BREAKING CHANGES:
Transactions now require using EM.transactional() method, previous helpers `beginTransaction/commit/rollback` are now removed. All transaction management has been removed from IDatabaseDriver interface, now EM handles this, passing the transaction context (carried by EM, and created by Connection) to all driver methods. New methods on EM exists: `isInTransaction` and `getTransactionContext`.
Because of knex being used as a query runner, there are some minor differences in results of plain sql calls (made by calling connection.execute() with string sql parameter instead of using QueryBuilder). Another difference is in postgre driver, that used to require passing parameters as indexed dollar sign ($1, $2, ...), while now knex requires the placeholder to be simple question mark (?), like in other dialects, so this is now unified with other drivers.

Closes #64
  • Loading branch information
Martin Adamek committed Jul 25, 2019
1 parent 273945a commit d8d4981
Show file tree
Hide file tree
Showing 43 changed files with 1,385 additions and 1,672 deletions.
47 changes: 23 additions & 24 deletions lib/EntityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { FilterQuery, IDatabaseDriver } from './drivers';
import { EntityData, EntityMetadata, EntityName, IEntity, IEntityType, IPrimaryKey } from './decorators';
import { QueryBuilder, QueryOrderMap, SmartQueryHelper } from './query';
import { MetadataStorage } from './metadata';
import { Connection } from './connections';
import { Connection, Transaction } from './connections';

export class EntityManager {

Expand All @@ -15,6 +15,7 @@ export class EntityManager {
private readonly metadata = MetadataStorage.getMetadata();
private readonly unitOfWork = new UnitOfWork(this);
private readonly entityFactory = new EntityFactory(this.unitOfWork, this.driver, this.config);
private transactionContext: Transaction;

constructor(readonly config: Configuration,
private readonly driver: IDatabaseDriver<Connection>) { }
Expand Down Expand Up @@ -45,7 +46,7 @@ export class EntityManager {

createQueryBuilder(entityName: EntityName<IEntity>, alias?: string): QueryBuilder {
entityName = Utils.className(entityName);
return new QueryBuilder(entityName, this.metadata, this.driver, alias);
return new QueryBuilder(entityName, this.metadata, this.driver, this.transactionContext, alias);
}

async find<T extends IEntityType<T>>(entityName: EntityName<T>, where?: FilterQuery<T>, options?: FindOptions): Promise<T[]>;
Expand All @@ -55,7 +56,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where, entityName);
this.validator.validateParams(where);
const options = Utils.isObject<FindOptions>(populate) ? populate : { populate, orderBy, limit, offset };
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset);
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset, this.transactionContext);

if (results.length === 0) {
return [];
Expand Down Expand Up @@ -89,7 +90,7 @@ export class EntityManager {
}

this.validator.validateParams(where);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode, this.transactionContext);

if (!data) {
return null;
Expand All @@ -101,26 +102,15 @@ export class EntityManager {
return entity;
}

async beginTransaction(): Promise<void> {
await this.driver.beginTransaction();
}

async commit(): Promise<void> {
await this.driver.commit();
}

async rollback(): Promise<void> {
await this.driver.rollback();
}

async transactional(cb: (em: EntityManager) => Promise<any>): Promise<any> {
async transactional(cb: (em: EntityManager) => Promise<any>, ctx = this.transactionContext): Promise<any> {
const em = this.fork(false);
await em.getDriver().transactional(async () => {
await em.getConnection().transactional(async trx => {
em.transactionContext = trx;
const ret = await cb(em);
await em.flush();

return ret;
});
}, ctx);
}

async lock(entity: IEntity, lockMode: LockMode, lockVersion?: number | Date): Promise<void> {
Expand All @@ -131,7 +121,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
data = SmartQueryHelper.processParams(data);
this.validator.validateParams(data, 'insert data');
const res = await this.driver.nativeInsert(entityName, data);
const res = await this.driver.nativeInsert(entityName, data, this.transactionContext);

return res.insertId;
}
Expand All @@ -142,7 +132,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(data, 'update data');
this.validator.validateParams(where, 'update condition');
const res = await this.driver.nativeUpdate(entityName, where, data);
const res = await this.driver.nativeUpdate(entityName, where, data, this.transactionContext);

return res.affectedRows;
}
Expand All @@ -151,7 +141,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where, 'delete condition');
const res = await this.driver.nativeDelete(entityName, where);
const res = await this.driver.nativeDelete(entityName, where, this.transactionContext);

return res.affectedRows;
}
Expand Down Expand Up @@ -218,7 +208,8 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where);
return this.driver.count(entityName, where);

return this.driver.count(entityName, where, this.transactionContext);
}

persist(entity: IEntity | IEntity[], flush = this.config.get('autoFlush')): void | Promise<void> {
Expand Down Expand Up @@ -326,6 +317,14 @@ export class EntityManager {
return em.entityFactory;
}

isInTransaction(): boolean {
return !!this.transactionContext;
}

getTransactionContext<T extends Transaction = Transaction>(): T {
return this.transactionContext as T;
}

private checkLockRequirements(mode: LockMode | undefined, meta: EntityMetadata): void {
if (!mode) {
return;
Expand All @@ -335,7 +334,7 @@ export class EntityManager {
throw ValidationError.notVersioned(meta);
}

if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.getDriver().isInTransaction()) {
if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.isInTransaction()) {
throw ValidationError.transactionRequired();
}
}
Expand Down
100 changes: 100 additions & 0 deletions lib/connections/AbstractSqlConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import * as Knex from 'knex';
import { Config, QueryBuilder, Raw, Transaction } from 'knex';
import { readFile } from 'fs-extra';

import { Connection, QueryResult } from './Connection';
import { Utils } from '../utils';
import { EntityData, IEntity } from '../decorators';

export abstract class AbstractSqlConnection extends Connection {

protected client: Knex;

getKnex(): Knex {
return this.client;
}

async close(force?: boolean): Promise<void> {
await this.client.destroy();
}

async isConnected(): Promise<boolean> {
try {
await this.client.raw('select 1');
return true;
} catch {
return false;
}
}

async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
await (ctx || this.client).transaction(async trx => {
try {
const ret = await cb(trx);
await trx.commit();

return ret;
} catch (e) {
await trx.rollback(e);
throw e;
}
});
}

async execute<T = QueryResult | EntityData<IEntity> | EntityData<IEntity>[]>(queryOrKnex: string | QueryBuilder | Raw, params: any[] = [], method: 'all' | 'get' | 'run' = 'all'): Promise<T> {
if (Utils.isObject<QueryBuilder | Raw>(queryOrKnex)) {
return await this.executeKnex(queryOrKnex, method);
}

const res = await this.executeQuery<any>(queryOrKnex, params, () => this.client.raw(queryOrKnex, params));
return this.transformRawResult<T>(res, method);
}

async loadFile(path: string): Promise<void> {
const buf = await readFile(path);
await this.client.raw(buf.toString());
}

protected createKnexClient(type: string): Knex {
return Knex(this.getKnexOptions(type))
.on('query', data => {
if (!data.__knexQueryUid) {
this.logQuery(data.sql.toLowerCase().replace(/;$/, ''));
}
});
}

protected getKnexOptions(type: string): Config {
return {
client: type,
connection: this.getConnectionOptions(),
pool: this.config.get('pool'),
};
}

protected async executeKnex(qb: QueryBuilder | Raw, method: 'all' | 'get' | 'run'): Promise<QueryResult | any | any[]> {
const q = qb.toSQL();
const query = q.toNative ? q.toNative() : q;
const res = await this.executeQuery(query.sql, query.bindings, () => qb);

return this.transformKnexResult(res, method);
}

protected transformKnexResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | any | any[] {
if (method === 'all') {
return res;
}

if (method === 'get') {
return res[0];
}

const affectedRows = typeof res === 'number' ? res : 0;
const insertId = typeof res[0] === 'number' ? res[0] : 0;

return { insertId, affectedRows, row: res[0] };
}

protected abstract transformRawResult<T>(res: any, method: 'all' | 'get' | 'run'): T;

}
46 changes: 11 additions & 35 deletions lib/connections/Connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { URL } from 'url';
import { Configuration } from '../utils';
import { Transaction as KnexTransaction } from 'knex';
import { Configuration, Utils } from '../utils';
import { MetadataStorage } from '../metadata';

export abstract class Connection {
Expand Down Expand Up @@ -30,24 +31,7 @@ export abstract class Connection {
*/
abstract getDefaultClientUrl(): string;

/**
* Begins a transaction (if supported)
*/
async beginTransaction(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Commits statements in a transaction
*/
async commit(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Rollback changes in a transaction
*/
async rollback(savepoint?: string): Promise<void> {
async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
throw new Error(`Transactions are not supported by current driver`);
}

Expand All @@ -73,25 +57,15 @@ export abstract class Connection {
}

protected async executeQuery<T>(query: string, params: any[], cb: () => Promise<T>): Promise<T> {
try {
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);

return res;
} catch (e) {
e.message += `\n in query: ${query}`;

if (params && params.length) {
e.message += `\n with params: ${JSON.stringify(params)}`;
}

throw e;
}
return res;
}

protected logQuery(query: string, took: number): void {
this.logger.debug(`[query-logger] ${query} [took ${took} ms]`);
protected logQuery(query: string, took?: number): void {
this.logger.debug(`[query-logger] ${query}` + (Utils.isDefined(took) ? ` [took ${took} ms]` : ''));
}

}
Expand All @@ -109,3 +83,5 @@ export interface ConnectionConfig {
password?: string;
database?: string;
}

export type Transaction = KnexTransaction;
69 changes: 18 additions & 51 deletions lib/connections/MySqlConnection.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,18 @@
import { Connection as MySql2Connection, ConnectionOptions, createConnection } from 'mysql2/promise';
import { readFile } from 'fs-extra';
import { Connection, QueryResult } from './Connection';
import { MySqlConnectionConfig } from 'knex';
import { AbstractSqlConnection } from './AbstractSqlConnection';

export class MySqlConnection extends Connection {

protected client: MySql2Connection;
export class MySqlConnection extends AbstractSqlConnection {

async connect(): Promise<void> {
this.client = await createConnection(this.getConnectionOptions());
}

async close(force?: boolean): Promise<void> {
await this.client.end({ force });
}

async isConnected(): Promise<boolean> {
try {
await this.client.query('SELECT 1');
return true;
} catch {
return false;
}
}

async beginTransaction(): Promise<void> {
await this.query('START TRANSACTION');
}

async commit(): Promise<void> {
await this.query('COMMIT');
}

async rollback(): Promise<void> {
await this.query('ROLLBACK');
this.client = this.createKnexClient('mysql2');
}

getDefaultClientUrl(): string {
return 'mysql://root@127.0.0.1:3306';
}

async execute(query: string, params: any[] = [], method: 'all' | 'get' | 'run' = 'all'): Promise<QueryResult | any | any[]> {
const res = await this.executeQuery(query, params, () => this.client.execute(query, params));

if (method === 'get') {
return (res as QueryResult[][])[0][0];
}

return res[0];
}

getConnectionOptions(): ConnectionOptions {
const ret: ConnectionOptions = super.getConnectionOptions();
getConnectionOptions(): MySqlConnectionConfig {
const ret: MySqlConnectionConfig = super.getConnectionOptions();

if (this.config.get('multipleStatements')) {
ret.multipleStatements = this.config.get('multipleStatements');
Expand All @@ -59,14 +21,19 @@ export class MySqlConnection extends Connection {
return ret;
}

async loadFile(path: string): Promise<void> {
await this.client.query((await readFile(path)).toString());
}
protected transformRawResult<T>(res: any, method: 'all' | 'get' | 'run'): T {
if (method === 'run' && res[0].constructor.name === 'ResultSetHeader') {
return {
insertId: res[0].insertId,
affectedRows: res[0].affectedRows,
} as unknown as T;
}

if (method === 'get') {
return res[0][0];
}

private async query(sql: string): Promise<void> {
const now = Date.now();
await this.client.query(sql);
this.logQuery(sql, Date.now() - now);
return res[0];
}

}

0 comments on commit d8d4981

Please sign in to comment.