Skip to content

Commit

Permalink
Merge a1ae86d into fa24c8e
Browse files Browse the repository at this point in the history
  • Loading branch information
B4nan committed Jul 15, 2019
2 parents fa24c8e + a1ae86d commit d6ba9f2
Show file tree
Hide file tree
Showing 43 changed files with 1,387 additions and 1,669 deletions.
47 changes: 23 additions & 24 deletions lib/EntityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { FilterQuery, IDatabaseDriver } from './drivers';
import { EntityData, EntityMetadata, EntityName, IEntity, IEntityType, IPrimaryKey } from './decorators';
import { QueryBuilder, QueryOrderMap, SmartQueryHelper } from './query';
import { MetadataStorage } from './metadata';
import { Connection } from './connections';
import { Connection, Transaction } from './connections';

export class EntityManager {

Expand All @@ -15,6 +15,7 @@ export class EntityManager {
private readonly metadata = MetadataStorage.getMetadata();
private readonly unitOfWork = new UnitOfWork(this);
private readonly entityFactory = new EntityFactory(this.unitOfWork, this.driver, this.config);
private transactionContext: Transaction;

constructor(readonly config: Configuration,
private readonly driver: IDatabaseDriver<Connection>) { }
Expand Down Expand Up @@ -45,7 +46,7 @@ export class EntityManager {

createQueryBuilder(entityName: EntityName<IEntity>, alias?: string): QueryBuilder {
entityName = Utils.className(entityName);
return new QueryBuilder(entityName, this.metadata, this.driver, alias);
return new QueryBuilder(entityName, this.metadata, this.driver, this.transactionContext, alias);
}

async find<T extends IEntityType<T>>(entityName: EntityName<T>, where?: FilterQuery<T>, options?: FindOptions): Promise<T[]>;
Expand All @@ -55,7 +56,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where, entityName);
this.validator.validateParams(where);
const options = Utils.isObject<FindOptions>(populate) ? populate : { populate, orderBy, limit, offset };
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset);
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset, this.transactionContext);

if (results.length === 0) {
return [];
Expand Down Expand Up @@ -89,7 +90,7 @@ export class EntityManager {
}

this.validator.validateParams(where);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode, this.transactionContext);

if (!data) {
return null;
Expand All @@ -101,26 +102,15 @@ export class EntityManager {
return entity;
}

async beginTransaction(): Promise<void> {
await this.driver.beginTransaction();
}

async commit(): Promise<void> {
await this.driver.commit();
}

async rollback(): Promise<void> {
await this.driver.rollback();
}

async transactional(cb: (em: EntityManager) => Promise<any>): Promise<any> {
async transactional(cb: (em: EntityManager) => Promise<any>, ctx = this.transactionContext): Promise<any> {
const em = this.fork(false);
await em.getDriver().transactional(async () => {
await em.getConnection().transactional(async trx => {
em.transactionContext = trx;
const ret = await cb(em);
await em.flush();

return ret;
});
}, ctx);
}

async lock(entity: IEntity, lockMode: LockMode, lockVersion?: number | Date): Promise<void> {
Expand All @@ -131,7 +121,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
data = SmartQueryHelper.processParams(data);
this.validator.validateParams(data, 'insert data');
const res = await this.driver.nativeInsert(entityName, data);
const res = await this.driver.nativeInsert(entityName, data, this.transactionContext);

return res.insertId;
}
Expand All @@ -142,7 +132,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(data, 'update data');
this.validator.validateParams(where, 'update condition');
const res = await this.driver.nativeUpdate(entityName, where, data);
const res = await this.driver.nativeUpdate(entityName, where, data, this.transactionContext);

return res.affectedRows;
}
Expand All @@ -151,7 +141,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where, 'delete condition');
const res = await this.driver.nativeDelete(entityName, where);
const res = await this.driver.nativeDelete(entityName, where, this.transactionContext);

return res.affectedRows;
}
Expand Down Expand Up @@ -218,7 +208,8 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where);
return this.driver.count(entityName, where);

return this.driver.count(entityName, where, this.transactionContext);
}

async persist(entity: IEntity | IEntity[], flush = this.config.get('autoFlush')): Promise<void> {
Expand Down Expand Up @@ -326,6 +317,14 @@ export class EntityManager {
return em.entityFactory;
}

isInTransaction(): boolean {
return !!this.transactionContext;
}

getTransactionContext<T extends Transaction = Transaction>(): T {
return this.transactionContext as T;
}

private checkLockRequirements(mode: LockMode | undefined, meta: EntityMetadata): void {
if (!mode) {
return;
Expand All @@ -335,7 +334,7 @@ export class EntityManager {
throw ValidationError.notVersioned(meta);
}

if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.getDriver().isInTransaction()) {
if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.isInTransaction()) {
throw ValidationError.transactionRequired();
}
}
Expand Down
100 changes: 100 additions & 0 deletions lib/connections/AbstractSqlConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import * as Knex from 'knex';
import { Config, QueryBuilder, Raw, Transaction } from 'knex';
import { readFile } from 'fs-extra';

import { Connection, QueryResult } from './Connection';
import { Utils } from '../utils';
import { EntityData, IEntity } from '../decorators';

export abstract class AbstractSqlConnection extends Connection {

protected client: Knex;

getKnex(): Knex {
return this.client;
}

async close(force?: boolean): Promise<void> {
await this.client.destroy();
}

async isConnected(): Promise<boolean> {
try {
await this.client.raw('select 1');
return true;
} catch {
return false;
}
}

async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
await (ctx || this.client).transaction(async trx => {
try {
const ret = await cb(trx);
await trx.commit();

return ret;
} catch (e) {
await trx.rollback(e);
throw e;
}
});
}

async execute(queryOrKnex: string | QueryBuilder | Raw, params: any[] = [], method: 'all' | 'get' | 'run' = 'all'): Promise<QueryResult | any | any[]> {
if (Utils.isObject<QueryBuilder | Raw>(queryOrKnex)) {
return await this.executeKnex(queryOrKnex, method);
}

const res = await this.executeQuery<any>(queryOrKnex, params, () => this.client.raw(queryOrKnex, params));
return this.transformRawResult(res, method);
}

async loadFile(path: string): Promise<void> {
const buf = await readFile(path);
await this.client.raw(buf.toString());
}

protected createKnexClient(type: string): Knex {
return Knex(this.getKnexOptions(type))
.on('query', data => {
if (!data.__knexQueryUid) {
this.logQuery(data.sql.toLowerCase().replace(/;$/, ''));
}
});
}

protected getKnexOptions(type: string): Config {
return {
client: type,
connection: this.getConnectionOptions(),
pool: this.config.get('pool'),
};
}

protected async executeKnex(qb: QueryBuilder | Raw, method: 'all' | 'get' | 'run'): Promise<QueryResult | any | any[]> {
const q = qb.toSQL();
const query = q.toNative ? q.toNative() : q;
const res = await this.executeQuery(query.sql, query.bindings, () => qb);

return this.transformKnexResult(res, method);
}

protected transformKnexResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | any | any[] {
if (method === 'all') {
return res;
}

if (method === 'get') {
return res[0];
}

const affectedRows = typeof res === 'number' ? res : 0;
const insertId = typeof res[0] === 'number' ? res[0] : 0;

return { insertId, affectedRows, row: res[0] };
}

protected abstract transformRawResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | EntityData<IEntity> | EntityData<IEntity>[];

}
46 changes: 11 additions & 35 deletions lib/connections/Connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { URL } from 'url';
import { Configuration } from '../utils';
import { Transaction as KnexTransaction } from 'knex';
import { Configuration, Utils } from '../utils';
import { MetadataStorage } from '../metadata';

export abstract class Connection {
Expand Down Expand Up @@ -30,24 +31,7 @@ export abstract class Connection {
*/
abstract getDefaultClientUrl(): string;

/**
* Begins a transaction (if supported)
*/
async beginTransaction(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Commits statements in a transaction
*/
async commit(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Rollback changes in a transaction
*/
async rollback(savepoint?: string): Promise<void> {
async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
throw new Error(`Transactions are not supported by current driver`);
}

Expand All @@ -73,25 +57,15 @@ export abstract class Connection {
}

protected async executeQuery<T>(query: string, params: any[], cb: () => Promise<T>): Promise<T> {
try {
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);

return res;
} catch (e) {
e.message += `\n in query: ${query}`;

if (params && params.length) {
e.message += `\n with params: ${JSON.stringify(params)}`;
}

throw e;
}
return res;
}

protected logQuery(query: string, took: number): void {
this.logger.debug(`[query-logger] ${query} [took ${took} ms]`);
protected logQuery(query: string, took?: number): void {
this.logger.debug(`[query-logger] ${query}` + (Utils.isDefined(took) ? ` [took ${took} ms]` : ''));
}

}
Expand All @@ -109,3 +83,5 @@ export interface ConnectionConfig {
password?: string;
database?: string;
}

export type Transaction = KnexTransaction;

0 comments on commit d6ba9f2

Please sign in to comment.