Skip to content

Commit

Permalink
Merge d58a0b8 into 3947602
Browse files Browse the repository at this point in the history
  • Loading branch information
B4nan committed Jul 13, 2019
2 parents 3947602 + d58a0b8 commit f4c9478
Show file tree
Hide file tree
Showing 34 changed files with 1,158 additions and 1,087 deletions.
48 changes: 24 additions & 24 deletions lib/EntityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { FilterQuery, IDatabaseDriver } from './drivers';
import { EntityData, EntityMetadata, EntityName, IEntity, IEntityType, IPrimaryKey } from './decorators';
import { QueryBuilder, QueryOrderMap, SmartQueryHelper } from './query';
import { MetadataStorage } from './metadata';
import { Connection } from './connections';
import { Connection, Transaction } from './connections';

export class EntityManager {

Expand All @@ -16,6 +16,8 @@ export class EntityManager {
private readonly unitOfWork = new UnitOfWork(this);
private readonly entityFactory = new EntityFactory(this.unitOfWork, this.driver, this.config);

transactionContext: Transaction;

constructor(readonly config: Configuration,
private readonly driver: IDatabaseDriver<Connection>) { }

Expand Down Expand Up @@ -45,7 +47,7 @@ export class EntityManager {

createQueryBuilder(entityName: EntityName<IEntity>, alias?: string): QueryBuilder {
entityName = Utils.className(entityName);
return new QueryBuilder(entityName, this.metadata, this.driver, alias);
return new QueryBuilder(entityName, this.metadata, this.driver, this.transactionContext, alias);
}

async find<T extends IEntityType<T>>(entityName: EntityName<T>, where?: FilterQuery<T>, options?: FindOptions): Promise<T[]>;
Expand All @@ -55,7 +57,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where, entityName);
this.validator.validateParams(where);
const options = Utils.isObject<FindOptions>(populate) ? populate : { populate, orderBy, limit, offset };
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset);
const results = await this.driver.find(entityName, where, options.populate || [], options.orderBy || {}, options.limit, options.offset, this.transactionContext);

if (results.length === 0) {
return [];
Expand Down Expand Up @@ -89,7 +91,7 @@ export class EntityManager {
}

this.validator.validateParams(where);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode);
const data = await this.driver.findOne(entityName, where, options.populate, options.orderBy, options.fields, options.lockMode, this.transactionContext);

if (!data) {
return null;
Expand All @@ -101,26 +103,15 @@ export class EntityManager {
return entity;
}

async beginTransaction(): Promise<void> {
await this.driver.beginTransaction();
}

async commit(): Promise<void> {
await this.driver.commit();
}

async rollback(): Promise<void> {
await this.driver.rollback();
}

async transactional(cb: (em: EntityManager) => Promise<any>): Promise<any> {
async transactional(cb: (em: EntityManager) => Promise<any>, ctx = this.transactionContext): Promise<any> {
const em = this.fork(false);
await em.getDriver().transactional(async () => {
await em.getConnection().transactional(async trx => {
em.transactionContext = trx;
const ret = await cb(em);
await em.flush();

return ret;
});
}, ctx);
}

async lock(entity: IEntity, lockMode: LockMode, lockVersion?: number | Date): Promise<void> {
Expand All @@ -131,7 +122,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
data = SmartQueryHelper.processParams(data);
this.validator.validateParams(data, 'insert data');
const res = await this.driver.nativeInsert(entityName, data);
const res = await this.driver.nativeInsert(entityName, data, this.transactionContext);

return res.insertId;
}
Expand All @@ -142,7 +133,7 @@ export class EntityManager {
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(data, 'update data');
this.validator.validateParams(where, 'update condition');
const res = await this.driver.nativeUpdate(entityName, where, data);
const res = await this.driver.nativeUpdate(entityName, where, data, this.transactionContext);

return res.affectedRows;
}
Expand All @@ -151,7 +142,7 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where, 'delete condition');
const res = await this.driver.nativeDelete(entityName, where);
const res = await this.driver.nativeDelete(entityName, where, this.transactionContext);

return res.affectedRows;
}
Expand Down Expand Up @@ -218,7 +209,8 @@ export class EntityManager {
entityName = Utils.className(entityName);
where = SmartQueryHelper.processWhere(where as FilterQuery<T>, entityName);
this.validator.validateParams(where);
return this.driver.count(entityName, where);

return this.driver.count(entityName, where, this.transactionContext);
}

async persist(entity: IEntity | IEntity[], flush = this.config.get('autoFlush')): Promise<void> {
Expand Down Expand Up @@ -326,6 +318,14 @@ export class EntityManager {
return em.entityFactory;
}

isInTransaction(): boolean {
return !!this.transactionContext;
}

getTransactionContext<T extends Transaction = Transaction>(): T {
return this.transactionContext as T;
}

private checkLockRequirements(mode: LockMode | undefined, meta: EntityMetadata): void {
if (!mode) {
return;
Expand All @@ -335,7 +335,7 @@ export class EntityManager {
throw ValidationError.notVersioned(meta);
}

if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.getDriver().isInTransaction()) {
if ([LockMode.PESSIMISTIC_READ, LockMode.PESSIMISTIC_WRITE].includes(mode) && !this.isInTransaction()) {
throw ValidationError.transactionRequired();
}
}
Expand Down
95 changes: 95 additions & 0 deletions lib/connections/AbstractSqlConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import * as Knex from 'knex';
import { Config, QueryBuilder, Raw, Transaction } from 'knex';
import { readFile } from 'fs-extra';

import { Connection, QueryResult } from './Connection';
import { Utils } from '../utils';
import { EntityData, IEntity } from '../decorators';

export abstract class AbstractSqlConnection extends Connection {

protected client: Knex;

getKnex(): Knex {
return this.client;
}

async close(force?: boolean): Promise<void> {
await this.client.destroy();
}

async isConnected(): Promise<boolean> {
try {
await this.client.raw('select 1');
return true;
} catch {
return false;
}
}

async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
await (ctx || this.client).transaction(async trx => {
try {
const ret = await cb(trx);
await trx.commit();

return ret;
} catch (e) {
await trx.rollback(e);
throw e;
}
});
}

async execute(queryOrKnex: string | QueryBuilder | Raw, params: any[] = [], method: 'all' | 'get' | 'run' = 'all'): Promise<QueryResult | any | any[]> {
if (Utils.isObject<QueryBuilder | Raw>(queryOrKnex)) {
return await this.executeKnex(queryOrKnex, method);
}

const res = await this.executeQuery<any>(queryOrKnex, params, () => this.client.raw(queryOrKnex, params));
return this.transformRawResult(res, method);
}

async loadFile(path: string): Promise<void> {
const buf = await readFile(path);
await this.client.raw(buf.toString());
}

protected createKnexClient(type: string): Knex {
return Knex(this.getKnexOptions(type))
.on('query', data => {
if (!data.__knexQueryUid) {
this.logQuery(data.sql.toLowerCase().replace(/;$/, ''));
}
});
}

protected getKnexOptions(type: string): Config {
return {
client: type,
connection: this.getConnectionOptions(),
pool: this.config.get('pool'),
};
}

protected async executeKnex(qb: QueryBuilder | Raw, method: 'all' | 'get' | 'run'): Promise<QueryResult | any | any[]> {
const q = qb.toSQL();
const query = q.toNative ? q.toNative() : q;
let res = await this.executeQuery(query.sql, query.bindings, () => qb);

if (method === 'run') {
const affectedRows = typeof res === 'number' ? res : 0;
const insertId = typeof res[0] === 'number' ? res[0] : 0;
res = { insertId, affectedRows, row: res[0] };
}

if (method === 'get') {
return res[0];
}

return res;
}

protected abstract transformRawResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | EntityData<IEntity> | EntityData<IEntity>[];

}
46 changes: 11 additions & 35 deletions lib/connections/Connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { URL } from 'url';
import { Configuration } from '../utils';
import { Transaction as KnexTransaction } from 'knex';
import { Configuration, Utils } from '../utils';
import { MetadataStorage } from '../metadata';

export abstract class Connection {
Expand Down Expand Up @@ -30,24 +31,7 @@ export abstract class Connection {
*/
abstract getDefaultClientUrl(): string;

/**
* Begins a transaction (if supported)
*/
async beginTransaction(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Commits statements in a transaction
*/
async commit(savepoint?: string): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}

/**
* Rollback changes in a transaction
*/
async rollback(savepoint?: string): Promise<void> {
async transactional(cb: (trx: Transaction) => Promise<any>, ctx?: Transaction): Promise<any> {
throw new Error(`Transactions are not supported by current driver`);
}

Expand All @@ -73,25 +57,15 @@ export abstract class Connection {
}

protected async executeQuery<T>(query: string, params: any[], cb: () => Promise<T>): Promise<T> {
try {
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);
const now = Date.now();
const res = await cb();
this.logQuery(query, Date.now() - now);

return res;
} catch (e) {
e.message += `\n in query: ${query}`;

if (params && params.length) {
e.message += `\n with params: ${JSON.stringify(params)}`;
}

throw e;
}
return res;
}

protected logQuery(query: string, took: number): void {
this.logger.debug(`[query-logger] ${query} [took ${took} ms]`);
protected logQuery(query: string, took?: number): void {
this.logger.debug(`[query-logger] ${query}` + (Utils.isDefined(took) ? ` [took ${took} ms]` : ''));
}

}
Expand All @@ -109,3 +83,5 @@ export interface ConnectionConfig {
password?: string;
database?: string;
}

export type Transaction = KnexTransaction;
71 changes: 20 additions & 51 deletions lib/connections/MySqlConnection.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,20 @@
import { Connection as MySql2Connection, ConnectionOptions, createConnection } from 'mysql2/promise';
import { readFile } from 'fs-extra';
import { Connection, QueryResult } from './Connection';
import { MySqlConnectionConfig } from 'knex';
import { AbstractSqlConnection } from './AbstractSqlConnection';
import { QueryResult } from './Connection';
import { EntityData, IEntity } from '../decorators';

export class MySqlConnection extends Connection {

protected client: MySql2Connection;
export class MySqlConnection extends AbstractSqlConnection {

async connect(): Promise<void> {
this.client = await createConnection(this.getConnectionOptions());
}

async close(force?: boolean): Promise<void> {
await this.client.end({ force });
}

async isConnected(): Promise<boolean> {
try {
await this.client.query('SELECT 1');
return true;
} catch {
return false;
}
}

async beginTransaction(): Promise<void> {
await this.query('START TRANSACTION');
}

async commit(): Promise<void> {
await this.query('COMMIT');
}

async rollback(): Promise<void> {
await this.query('ROLLBACK');
this.client = this.createKnexClient('mysql2');
}

getDefaultClientUrl(): string {
return 'mysql://root@127.0.0.1:3306';
}

async execute(query: string, params: any[] = [], method: 'all' | 'get' | 'run' = 'all'): Promise<QueryResult | any | any[]> {
const res = await this.executeQuery(query, params, () => this.client.execute(query, params));

if (method === 'get') {
return (res as QueryResult[][])[0][0];
}

return res[0];
}

getConnectionOptions(): ConnectionOptions {
const ret: ConnectionOptions = super.getConnectionOptions();
getConnectionOptions(): MySqlConnectionConfig {
const ret: MySqlConnectionConfig = super.getConnectionOptions();

if (this.config.get('multipleStatements')) {
ret.multipleStatements = this.config.get('multipleStatements');
Expand All @@ -59,14 +23,19 @@ export class MySqlConnection extends Connection {
return ret;
}

async loadFile(path: string): Promise<void> {
await this.client.query((await readFile(path)).toString());
}
protected transformRawResult(res: any, method: 'all' | 'get' | 'run'): QueryResult | EntityData<IEntity> | EntityData<IEntity>[] {
if (method === 'run' && res[0].constructor.name === 'ResultSetHeader') {
return {
insertId: res[0].insertId,
affectedRows: res[0].affectedRows,
};
}

if (method === 'get') {
return res[0][0];
}

private async query(sql: string): Promise<void> {
const now = Date.now();
await this.client.query(sql);
this.logQuery(sql, Date.now() - now);
return res[0];
}

}

0 comments on commit f4c9478

Please sign in to comment.