Skip to content

Commit

Permalink
feat(core): use knex to generate sql + enable connection pooling [BC]
Browse files Browse the repository at this point in the history
QueryBuilder now internally uses knex to run all queries. As knex already supports connection pooling, thbis feature comes without any effort. New configuration for pooling is now availableAs knex already supports connection pooling, this feature comes without any effort.

BREAKING CHANGES:
Transactions now require using EM.transactional() method, previous helpers `beginTransaction/commit/rollback` are now removed. All transaction management has been removed from IDatabaseDriver interface, now EM handles this, passing the transaction context (carried by EM, and created by Connection) to all driver methods. New methods on EM exists: `isInTransaction` and `getTransactionContext`.
Because of knex being used as a query runner, there are some minor differences in results of plain sql calls (made by calling connection.execute() with string sql parameter instead of using QueryBuilder). Another difference is in postgre driver, that used to require passing parameters as indexed dollar sign ($1, $2, ...), while now knex requires the placeholder to be simple question mark (?), like in other dialects, so this is now unified with other drivers.

Closes #64
  • Loading branch information
Martin Adamek committed Jul 15, 2019
1 parent fa24c8e commit 32522c4
Show file tree
Hide file tree
Showing 43 changed files with 1,387 additions and 1,668 deletions.
48 changes: 24 additions & 24 deletions lib/EntityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { FilterQuery, IDatabaseDriver } from './drivers';
import { EntityData, EntityMetadata, EntityName, IEntity, IEntityType, IPrimaryKey } from './decorators';
import { QueryBuilder, QueryOrderMap, SmartQueryHelper } from './query';
import { MetadataStorage } from './metadata';
import { Connection } from './connections';
import { Connection, Transaction } from './connections';

export class EntityManager {

Expand All @@ -16,6 +16,8 @@ export class EntityManager {
private readonly unitOfWork = new UnitOfWork(this);
private readonly entityFactory = new EntityFactory(this.unitOfWork, this.driver, this.config);

transactionContext: Transaction;

constructor(readonly config: Configuration,
private readonly driver: IDatabaseDriver<Connection>) { }

Expand Down Expand Up @@ -45,7 +47,7 @@ export class EntityManager {

createQueryBuilder(entityName: EntityName<IEntity>, alias?: string): QueryBuilder {
entityName = Utils.className(entityName);
return new QueryBuilder(entityName, this.metadata, this.driver, alias);
return new QueryBuilder(entityName, this.metadata, this.driver, this.transactionContext, alias);
}

async find<T extends IEntityType<T>>(entityName: EntityName<T>, where?: FilterQuery<T>, options?: FindOptions): Promise<T[]>;
Expand All @@ -55,7 +57,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where, entityName);
this.validator.validateParams(where);
const options = Utils.isObject<FindOptions>(populate) ? populate : { populate, orderBy, limit, offset };
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset);
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset, this.transactionContext);

if (results.length === 0) {
return [];
Expand Down Expand Up @@ -89,7 +91,7 @@ export class EntityManager {
}

this.validator.validateParams(where);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode, this.transactionContext);

if (!data) {
return null;
Expand All @@ -101,26 +103,15 @@ export class EntityManager {
return entity;
}

async beginTransaction(): Promise<void> {
await this.driver.beginTransaction();
}

async commit(): Promise<void> {
await this.driver.commit();
}

async rollback(): Promise<void> {
await this.driver.rollback();
}

async transactional(cb: (em: EntityManager) => Promise<any>): Promise<any> {
async transactional(cb: (em: EntityManager) => Promise<any>, ctx = this.transactionContext): Promise<any> {
const em = this.fork(false);
await em.getDriver().transactional(async () => {
await em.getConnection().transactional(async trx => {
em.transactionContext = trx;
const ret = await cb(em);
await em.flush();

return ret;
});
}, ctx);
}

async lock(entity: IEntity, lockMode: LockMode, lockVersion?: number | Date): Promise<void> {
Expand All @@ -131,7 +122,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
data = SmartQueryHelper.processParams(data);
this.validator.validateParams(data, 'insert data');
const res = await this.driver.nativeInsert(entityName, data);
const res = await this.driver.nativeInsert(entityName, data, this.transactionContext);

return res.insertId;
}
Expand All @@ -142,7 +133,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(data, 'update data');
this.validator.validateParams(where, 'update condition');
const res = await this.driver.nativeUpdate(entityName, where, data);
const res = await this.driver.nativeUpdate(entityName, where, data, this.transactionContext);

return res.affectedRows;
}
Expand All @@ -151,7 +142,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where, 'delete condition');
const res = await this.driver.nativeDelete(entityName, where);
const res = await this.driver.nativeDelete(entityName, where, this.transactionContext);

return res.affectedRows;
}
Expand Down Expand Up @@ -218,7 +209,8 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where);
return this.driver.count(entityName, where);

return this.driver.count(entityName, where, this.transactionContext);
}

async persist(entity: IEntity | IEntity[], flush = this.config.get('autoFlush')): Promise<void> {
Expand Down Expand Up @@ -326,6 +318,14 @@ export class EntityManager {
return em.entityFactory;
}

isInTransaction(): boolean {
return !!this.transactionContext;
}

getTransactionContext<T extends Transaction = Transaction>(): T {
return this.transactionContext as T;
}

private checkLockRequirements(mode: LockMode | undefined, meta: EntityMetadata): void {
if (!mode) {
return;
Expand All @@ -335,7 +335,7 @@ export class EntityManager {
throw ValidationError.notVersioned(meta);
}

if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.getDriver().isInTransaction()) {
if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.isInTransaction()) {
throw ValidationError.transactionRequired();
}
}
Expand Down
100 changes: 100 additions & 0 deletions lib/connections/AbstractSqlConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import * as Knex from 'knex';
import { Config, QueryBuilder, Raw, Transaction } from 'knex';
import { readFile } from 'fs-extra';

import { Connection, QueryResult } from './Connection';
import { Utils } from '../utils';
import { EntityData, IEntity } from '../decorators';

export abstract class AbstractSqlConnection extends Connection {

protected client: Knex;

getKnex(): Knex {
return this.client;
}

async close(force?: boolean): Promise<void> {
await this.client.destroy();
}

async isConnected(): Promise<boolean> {
try {
await this.client.raw('select 1');
return true;
} catch {
return false;
}
}

async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
await (ctx || this.client).transaction(async trx => {
try {
const ret = await cb(trx);
await trx.commit();

return ret;
} catch (e) {
await trx.rollback(e);
throw e;
}
});
}

async execute(queryOrKnex: string | QueryBuilder | Raw, params: any[] = [], method: 'all' | 'get' | 'run' = 'all'): Promise<QueryResult | any | any[]> {
if (Utils.isObject<QueryBuilder | Raw>(queryOrKnex)) {
return await this.executeKnex(queryOrKnex, method);
}

const res = await this.executeQuery<any>(queryOrKnex, params, () => this.client.raw(queryOrKnex, params));
return this.transformRawResult(res, method);
}

async loadFile(path: string): Promise<void> {
const buf = await readFile(path);
await this.client.raw(buf.toString());
}

protected createKnexClient(type: string): Knex {
return Knex(this.getKnexOptions(type))
.on('query', data => {
if (!data.__knexQueryUid) {
this.logQuery(data.sql.toLowerCase().replace(/;$/, ''));
}
});
}

protected getKnexOptions(type: string): Config {
return {
client: type,
connection: this.getConnectionOptions(),
pool: this.config.get('pool'),
};
}

protected async executeKnex(qb: QueryBuilder | Raw, method: 'all' | 'get' | 'run'): Promise<QueryResult | any | any[]> {
const q = qb.toSQL();
const query = q.toNative ? q.toNative() : q;
const res = await this.executeQuery(query.sql, query.bindings, () => qb);

return this.transformKnexResult(res, method);
}

protected transformKnexResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | any | any[] {
if (method === 'all') {
return res;
}

if (method === 'get') {
return res[0];
}

const affectedRows = typeof res === 'number' ? res : 0;
const insertId = typeof res[0] === 'number' ? res[0] : 0;

return { insertId, affectedRows, row: res[0] };
}

protected abstract transformRawResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | EntityData<IEntity> | EntityData<IEntity>[];

}
46 changes: 11 additions & 35 deletions lib/connections/Connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { URL } from 'url';
import { Configuration } from '../utils';
import { Transaction as KnexTransaction } from 'knex';
import { Configuration, Utils } from '../utils';
import { MetadataStorage } from '../metadata';

export abstract class Connection {
Expand Down Expand Up @@ -30,24 +31,7 @@ export abstract class Connection {
*/
abstract getDefaultClientUrl(): string;

/**
* Begins a transaction (if supported)
*/
async beginTransaction(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Commits statements in a transaction
*/
async commit(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Rollback changes in a transaction
*/
async rollback(savepoint?: string): Promise<void> {
async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
throw new Error(`Transactions are not supported by current driver`);
}

Expand All @@ -73,25 +57,15 @@ export abstract class Connection {
}

protected async executeQuery<T>(query: string, params: any[], cb: () => Promise<T>): Promise<T> {
try {
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);

return res;
} catch (e) {
e.message += `\n in query: ${query}`;

if (params && params.length) {
e.message += `\n with params: ${JSON.stringify(params)}`;
}

throw e;
}
return res;
}

protected logQuery(query: string, took: number): void {
this.logger.debug(`[query-logger] ${query} [took ${took} ms]`);
protected logQuery(query: string, took?: number): void {
this.logger.debug(`[query-logger] ${query}` + (Utils.isDefined(took) ? ` [took ${took} ms]` : ''));
}

}
Expand All @@ -109,3 +83,5 @@ export interface ConnectionConfig {
password?: string;
database?: string;
}

export type Transaction = KnexTransaction;

0 comments on commit 32522c4

Please sign in to comment.