Skip to content

Commit

Permalink
migrating to typescript
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyUss committed Sep 29, 2018
1 parent 2b89ad8 commit 7650646
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 94 deletions.
49 changes: 24 additions & 25 deletions src/DBAccess.ts
Expand Up @@ -21,7 +21,7 @@
import * as mysql from 'mysql';
import { MysqlError, Pool as MySQLPool, PoolConnection } from 'mysql';
import { Pool as PgPool, PoolClient, QueryResult } from 'pg';
import generateError from './ErrorGenerator';
import { generateError } from './FsOps';
import Conversion from './Conversion';
import generateReport from './ReportGenerator';
import DBVendors from './DBVendors';
Expand All @@ -43,14 +43,14 @@ export default class DBAccess {
/**
* Ensures MySQL connection pool existence.
*/
private _getMysqlConnection(): void {
private async _getMysqlConnection(): Promise<void> {
if (!this._conversion._mysql) {
this._conversion._sourceConString.connectionLimit = this._conversion._maxDbConnectionPoolSize;
this._conversion._sourceConString.multipleStatements = true;
const pool: MySQLPool = mysql.createPool(this._conversion._sourceConString);

if (!pool) {
generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...');
await generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...');
process.exit();
}

Expand All @@ -61,21 +61,21 @@ export default class DBAccess {
/**
* Ensures PostgreSQL connection pool existence.
*/
private _getPgConnection(): void {
private async _getPgConnection(): Promise<void> {
if (!this._conversion._pg) {
this._conversion._targetConString.max = this._conversion._maxDbConnectionPoolSize;
const pool: PgPool = new PgPool(this._conversion._targetConString);

if (!pool) {
generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...');
await generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...');
process.exit();
}

this._conversion._pg = pool;

this._conversion._pg.on('error', (error: Error) => {
this._conversion._pg.on('error', async (error: Error) => {
const message: string = `Cannot connect to PostgreSQL server...\n' ${ error.message }\n${ error.stack }`;
generateError(this._conversion, message);
await generateError(this._conversion, message);
generateReport(this._conversion, message);
});
}
Expand All @@ -85,9 +85,8 @@ export default class DBAccess {
* Obtains PoolConnection instance.
*/
public getMysqlClient(): Promise<PoolConnection> {
this._getMysqlConnection();

return new Promise<PoolConnection>((resolve, reject) => {
return new Promise<PoolConnection>(async (resolve, reject) => {
await this._getMysqlConnection();
(<MySQLPool>this._conversion._mysql).getConnection((err: MysqlError | null, connection: PoolConnection) => {
return err ? reject(err) : resolve(connection);
});
Expand All @@ -97,39 +96,39 @@ export default class DBAccess {
/**
* Obtains PoolClient instance.
*/
public getPgClient(): Promise<PoolClient> {
this._getPgConnection();
public async getPgClient(): Promise<PoolClient> {
await this._getPgConnection();
return (<PgPool>this._conversion._pg).connect();
}

/**
* Runs a query on the first available idle client and returns its result.
* Note, the pool does the acquiring and releasing of the client internally.
*/
public runPgPoolQuery(sql: string): Promise<QueryResult> {
this._getPgConnection();
public async runPgPoolQuery(sql: string): Promise<QueryResult> {
await this._getPgConnection();
return (<PgPool>this._conversion._pg).query(sql);
}

/**
* Releases MySQL or PostgreSQL connection back to appropriate pool.
*/
public releaseDbClient(dbClient?: PoolConnection | PoolClient): void {
public async releaseDbClient(dbClient?: PoolConnection | PoolClient): Promise<void> {
try {
(<PoolConnection | PoolClient>dbClient).release();
dbClient = undefined;
} catch (error) {
generateError(this._conversion, `\t--[DBAccess::releaseDbClient] ${ error }`);
await generateError(this._conversion, `\t--[DBAccess::releaseDbClient] ${ error }`);
}
}

/**
* Checks if there are no more queries to be sent using current client.
* In such case the client should be released.
*/
private _releaseDbClientIfNecessary(client: PoolConnection | PoolClient, shouldHoldClient: boolean): void {
private async _releaseDbClientIfNecessary(client: PoolConnection | PoolClient, shouldHoldClient: boolean): Promise<void> {
if (!shouldHoldClient) {
this.releaseDbClient(client);
await this.releaseDbClient(client);
}
}

Expand All @@ -154,7 +153,7 @@ export default class DBAccess {
client = vendor === DBVendors.PG ? await this.getPgClient() : await this.getMysqlClient();
} catch (error) {
// An error occurred when tried to obtain a client from one of pools.
generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
await generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
return processExitOnError ? process.exit() : { client: client, data: undefined, error: error };
}
}
Expand All @@ -180,11 +179,11 @@ export default class DBAccess {
sql = (<PoolConnection>client).format(sql, bindings);
}

(<PoolConnection>client).query(sql, (error: MysqlError | null, data: any) => {
this._releaseDbClientIfNecessary((<PoolConnection>client), shouldReturnClient);
(<PoolConnection>client).query(sql, async (error: MysqlError | null, data: any) => {
await this._releaseDbClientIfNecessary((<PoolConnection>client), shouldReturnClient);

if (error) {
generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
await generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
return processExitOnError ? process.exit() : reject({ client: client, data: undefined, error: error });
}

Expand All @@ -206,11 +205,11 @@ export default class DBAccess {
): Promise<DBAccessQueryResult> {
try {
const data: any = Array.isArray(bindings) ? await (<PoolClient>client).query(sql, bindings) : await (<PoolClient>client).query(sql);
this._releaseDbClientIfNecessary((<PoolClient>client), shouldReturnClient); // Sets the client undefined.
await this._releaseDbClientIfNecessary((<PoolClient>client), shouldReturnClient); // Sets the client undefined.
return { client: client, data: data, error: undefined };
} catch (error) {
this._releaseDbClientIfNecessary((<PoolClient>client), shouldReturnClient); // Sets the client undefined.
generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
await this._releaseDbClientIfNecessary((<PoolClient>client), shouldReturnClient); // Sets the client undefined.
await generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
return processExitOnError ? process.exit() : { client: client, data: undefined, error: error };
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/DataLoader.ts
Expand Up @@ -19,8 +19,7 @@
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import * as csvStringify from './CsvStringifyModified';
import { log } from './FsOps';
import generateError from './ErrorGenerator';
import { log, generateError } from './FsOps';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
Expand Down Expand Up @@ -72,7 +71,7 @@ async function deleteChunk(conv: Conversion, dataPoolId: number, client: PoolCli
try {
await client.query(sql);
} catch (error) {
generateError(conv, `\t--[DataLoader::deleteChunk] ${ error }`, sql);
await generateError(conv, `\t--[DataLoader::deleteChunk] ${ error }`, sql);
} finally {
dbAccess.releaseDbClient(client);
}
Expand All @@ -88,7 +87,7 @@ function buildChunkQuery(tableName: string, selectFieldList: string, offset: num
/**
* Processes data-loading error.
*/
function processDataError(
async function processDataError(
conv: Conversion,
streamError: string,
sql: string,
Expand All @@ -97,7 +96,7 @@ function processDataError(
dataPoolId: number,
client: PoolClient
): Promise<void> {
generateError(conv, `\t--[populateTableWorker] ${ streamError }`, sqlCopy);
await generateError(conv, `\t--[populateTableWorker] ${ streamError }`, sqlCopy);
const rejectedData: string = `\t--[populateTableWorker] Error loading table data:\n${ sql }\n`;
log(conv, rejectedData, path.join(conv._logsDirPath, `${ tableName }.log`));
return deleteChunk(conv, dataPoolId, client);
Expand Down Expand Up @@ -131,7 +130,7 @@ async function populateTableWorker(

csvStringify(result.data, async (csvError: any, csvString: string) => {
if (csvError) {
generateError(conv, `\t--[${ logTitle }] ${ csvError }`);
await generateError(conv, `\t--[${ logTitle }] ${ csvError }`);
return resolvePopulateTableWorker();
}

Expand Down
11 changes: 5 additions & 6 deletions src/DataPipeManager.ts
Expand Up @@ -20,21 +20,20 @@
*/
import { ChildProcess, fork } from 'child_process';
import * as path from 'path';
import { log } from './FsOps';
import { log, generateError } from './FsOps';
import Conversion from './Conversion';
import generateError from './ErrorGenerator';
import MessageToDataLoader from './MessageToDataLoader';
import processConstraints from './ConstraintsProcessor';
import decodeBinaryData from './BinaryDataDecoder';

/**
* Kills a process specified by the pid.
*/
function killProcess(pid: number, conversion: Conversion): void {
async function killProcess(pid: number, conversion: Conversion): Promise<void> {
try {
process.kill(pid);
} catch (killError) {
generateError(conversion, `\t--[killProcess] ${ killError }`);
await generateError(conversion, `\t--[killProcess] ${ killError }`);
}
}

Expand Down Expand Up @@ -119,7 +118,7 @@ async function pipeData(conversion: Conversion, dataLoaderPath: string, options:
const bandwidth: number[] = fillBandwidth(conversion);
const chunksToLoad: any[] = bandwidth.map((index: number) => conversion._dataPool[index]);

loaderProcess.on('message', (signal: any) => {
loaderProcess.on('message', async (signal: any) => {
if (typeof signal === 'object') {
conversion._dicTables[signal.tableName].totalRowsInserted += signal.rowsInserted;
const msg: string = `\t--[pipeData] For now inserted: ${ conversion._dicTables[signal.tableName].totalRowsInserted } rows,
Expand All @@ -129,7 +128,7 @@ async function pipeData(conversion: Conversion, dataLoaderPath: string, options:
return;
}

killProcess(loaderProcess.pid, conversion);
await killProcess(loaderProcess.pid, conversion);
conversion._processedChunks += chunksToLoad.length;
return pipeData(conversion, dataLoaderPath, options);
});
Expand Down
42 changes: 0 additions & 42 deletions src/ErrorGenerator.ts

This file was deleted.

10 changes: 6 additions & 4 deletions src/FsOps.ts
Expand Up @@ -32,11 +32,13 @@ export function generateError(conversion: Conversion, message: string, sql: stri
log(conversion, message, undefined, true);

fs.open(conversion._errorLogsPath, 'a', conversion._0777, (error: Error, fd: number) => {
if (!error) {
fs.write(fd, buffer, 0, buffer.length, null, () => {
fs.close(fd, () => resolve());
});
if (error) {
return resolve();
}

fs.write(fd, buffer, 0, buffer.length, null, () => {
fs.close(fd, () => resolve());
});
});
});
}
Expand Down
5 changes: 2 additions & 3 deletions src/TableProcessor.ts
Expand Up @@ -18,8 +18,7 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import { log } from './FsOps';
import generateError from './ErrorGenerator';
import { log, generateError } from './FsOps';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
Expand Down Expand Up @@ -97,7 +96,7 @@ export async function createTable(conversion: Conversion, tableName: string): Pr
const result: DBAccessQueryResult = await dbAccess.query(logTitle, sqlAddDataChunkIdColumn, DBVendors.PG, false, false);

if (result.error) {
generateError(conversion, `\t--[${ logTitle }] ${ result.error }`, sqlAddDataChunkIdColumn);
await generateError(conversion, `\t--[${ logTitle }] ${ result.error }`, sqlAddDataChunkIdColumn);
}

return;
Expand Down
2 changes: 1 addition & 1 deletion test/TestModules/ColumnTypesTest.ts
Expand Up @@ -45,7 +45,7 @@ async function getColumnTypes(testSchemaProcessor: TestSchemaProcessor): Promise
);

if (result.error) {
testSchemaProcessor.processFatalError(result.error);
await testSchemaProcessor.processFatalError(result.error);
}

return result.data.rows;
Expand Down
2 changes: 1 addition & 1 deletion test/TestModules/DataContentTest.ts
Expand Up @@ -42,7 +42,7 @@ async function retrieveData(testSchemaProcessor: TestSchemaProcessor): Promise<a
);

if (result.error) {
testSchemaProcessor.processFatalError(result.error);
await testSchemaProcessor.processFatalError(result.error);
}

return result.data.rows[0];
Expand Down
2 changes: 1 addition & 1 deletion test/TestModules/SchemaProcessorTest.ts
Expand Up @@ -42,7 +42,7 @@ async function hasSchemaCreated(testSchemaProcessor: TestSchemaProcessor): Promi
);

if (result.error) {
testSchemaProcessor.processFatalError(result.error);
await testSchemaProcessor.processFatalError(result.error);
}

return !!result.data.rows[0].exists;
Expand Down
15 changes: 10 additions & 5 deletions test/TestModules/TestSchemaProcessor.ts
Expand Up @@ -30,10 +30,15 @@ import loadStructureToMigrate from '../../src/StructureLoader';
import pipeData from '../../src/DataPipeManager';
import { createStateLogsTable } from '../../src/MigrationStateManager';
import { createDataPoolTable, readDataPool } from '../../src/DataPoolManager';
import generateError from '../../src/ErrorGenerator';
import { log } from '../../src/FsOps';
import { readConfig, readExtraConfig, createLogsDirectory, readDataTypesMap } from '../../src/FsOps';
import { checkConnection, getLogo } from '../../src/BootProcessor';
import {
readConfig,
readExtraConfig,
createLogsDirectory,
readDataTypesMap,
log,
generateError
} from '../../src/FsOps';

export default class TestSchemaProcessor {
/**
Expand All @@ -57,9 +62,9 @@ export default class TestSchemaProcessor {
/**
* Stops the process in case of fatal error.
*/
public processFatalError(error: string): void {
public async processFatalError(error: string): Promise<void> {
console.log(error);
generateError(<Conversion>this.conversion, error);
await generateError(<Conversion>this.conversion, error);
process.exit();
}

Expand Down

0 comments on commit 7650646

Please sign in to comment.