Skip to content

Commit

Permalink
perf(MySQL): import performance improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabio286 committed Mar 5, 2022
1 parent b4af645 commit f444746
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 53 deletions.
49 changes: 22 additions & 27 deletions src/common/libs/sqlParser.js
@@ -1,13 +1,5 @@
import { Transform } from 'stream';

const chars = {
NEWLINE: 0x0A,
CARRIAGE_RETURN: 0x0D,
DOUBLE_QUOTE: 0x22,
QUOTE: 0x27,
BACKSLASH: 0x5C
};

export default class SqlParser extends Transform {
constructor (opts) {
opts = {
Expand All @@ -18,7 +10,9 @@ export default class SqlParser extends Transform {
...opts
};
super(opts);
this._buffer = Buffer.from([]);
this._buffer = '';
this._lastChar = '';
this._last9Chars = '';
this.encoding = opts.encoding;
this.delimiter = opts.delimiter;

Expand All @@ -28,45 +22,50 @@ export default class SqlParser extends Transform {
}

_transform (chunk, encoding, next) {
for (const char of chunk) {
for (const char of chunk.toString(this.encoding)) {
this.checkEscape();
this._buffer = Buffer.concat([this._buffer, Buffer.from([char])]);
this._buffer += char;
this._lastChar = char;
this._last9Chars += char.trim().toLocaleLowerCase();

if (this._last9Chars.length > 9)
this._last9Chars = this._last9Chars.slice(-9);

this.checkNewDelimiter(char);
this.checkQuote(char);
const query = this.getQuery();

if (query)
this.push(query);
}

next();
}

checkEscape () {
if (this._buffer.length > 0) {
this.isEscape = this._buffer[this._buffer.length - 1] === chars.BACKSLASH
this.isEscape = this._lastChar === '\\'
? !this.isEscape
: false;
}
}

checkNewDelimiter (char) {
if (this._buffer.length === 9 && this.parsedStr.toLowerCase() === 'delimiter' && this.currentQuote === null) {
if (this.currentQuote === null && this._last9Chars === 'delimiter') {
this.isDelimiter = true;
this._buffer = Buffer.from([]);
this._buffer = '';
}
else {
const isNewLine = [chars.NEWLINE, chars.CARRIAGE_RETURN].includes(char);
const isNewLine = char === '\n' || char === '\r';
if (isNewLine && this.isDelimiter) {
this.isDelimiter = false;
this.delimiter = this.parsedStr;
this._buffer = Buffer.from([]);
this.delimiter = this._buffer.trim();
this._buffer = '';
}
}
}

checkQuote (char) {
const isQuote = !this.isEscape && (chars.QUOTE === char || chars.DOUBLE_QUOTE === char);
const isQuote = !this.isEscape && (char === '\'' || char === '"');
if (isQuote && this.currentQuote === char)
this.currentQuote = null;

Expand All @@ -81,18 +80,14 @@ export default class SqlParser extends Transform {
let query = false;
let demiliterFound = false;
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
demiliterFound = this._buffer.slice(-this.delimiter.length).toString(this.encoding) === this.delimiter;
demiliterFound = this._last9Chars.slice(-this.delimiter.length) === this.delimiter;

if (demiliterFound) {
const str = this.parsedStr;
query = str.slice(0, str.length - this.delimiter.length);
this._buffer = Buffer.from([]);
const parsedStr = this._buffer.trim();
query = parsedStr.slice(0, parsedStr.length - this.delimiter.length);
this._buffer = '';
}

return query;
}

get parsedStr () {
return this._buffer.toString(this.encoding).trim();
}
}
6 changes: 4 additions & 2 deletions src/main/ipc-handlers/schema.js
Expand Up @@ -217,8 +217,10 @@ export default connections => {
event.sender.send('export-progress', payload);
break;
case 'end':
exporter.kill();
exporter = null;
setTimeout(() => { // Ensures that writing process has finished
exporter.kill();
exporter = null;
}, 2000);
resolve({ status: 'success', response: payload });
break;
case 'cancel':
Expand Down
8 changes: 7 additions & 1 deletion src/main/libs/AntaresCore.js
@@ -1,4 +1,10 @@
'use strict';
const queryLogger = sql => {
// Remove comments, newlines and multiple spaces
const escapedSql = sql.replace(/(\/\*(.|[\r\n])*?\*\/)|(--(.*|[\r\n]))/gm, '').replace(/\s\s+/g, ' ');
console.log(escapedSql);
};

/**
* As Simple As Possible Query Builder Core
*
Expand All @@ -17,7 +23,7 @@ export class AntaresCore {
this._poolSize = args.poolSize || false;
this._connection = null;
this._ssh = null;
this._logger = args.logger || console.log;
this._logger = args.logger || queryLogger;

this._queryDefaults = {
schema: '',
Expand Down
9 changes: 0 additions & 9 deletions src/main/libs/ClientsFactory.js
Expand Up @@ -2,13 +2,6 @@
import { MySQLClient } from './clients/MySQLClient';
import { PostgreSQLClient } from './clients/PostgreSQLClient';
import { SQLiteClient } from './clients/SQLiteClient';

const queryLogger = sql => {
// Remove comments, newlines and multiple spaces
const escapedSql = sql.replace(/(\/\*(.|[\r\n])*?\*\/)|(--(.*|[\r\n]))/gm, '').replace(/\s\s+/g, ' ');
console.log(escapedSql);
};

export class ClientsFactory {
/**
* Returns a database connection based on received args.
Expand All @@ -30,8 +23,6 @@ export class ClientsFactory {
* @memberof ClientsFactory
*/
static getConnection (args) {
args.logger = queryLogger;

switch (args.client) {
case 'mysql':
case 'maria':
Expand Down
1 change: 0 additions & 1 deletion src/main/libs/exporters/BaseExporter.js
Expand Up @@ -68,7 +68,6 @@ export class BaseExporter extends EventEmitter {
const fileName = path.basename(this._options.outputFile);
this.emit('error', `The file ${fileName} is not accessible`);
}

this._outputStream.write(data);
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/libs/importers/BaseImporter.js
Expand Up @@ -7,7 +7,8 @@ export class BaseImporter extends EventEmitter {
this._options = options;
this._isCancelled = false;
this._fileHandler = fs.createReadStream(this._options.file, {
flags: 'r'
flags: 'r',
highWaterMark: 4 * 1024
});
this._state = {};

Expand Down
24 changes: 13 additions & 11 deletions src/main/libs/importers/sql/MysqlImporter.js
Expand Up @@ -6,9 +6,11 @@ export default class MysqlImporter extends BaseImporter {
constructor (client, options) {
super(options);
this._client = client;
this._queries = [];
}

async import () {
console.time('import');
try {
const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new SqlParser();
Expand All @@ -31,21 +33,21 @@ export default class MysqlImporter extends BaseImporter {
return new Promise((resolve, reject) => {
this._fileHandler.pipe(parser);

parser.on('error', (err) => {
console.log('err', err);
reject(err);
});
parser.on('error', reject);

parser.on('finish', () => {
console.log('TOTAL QUERIES', queryCount);
console.log('import end');
resolve();
Promise.all(this._queries)
.then(() => {
console.timeEnd('import');
console.log('TOTAL QUERIES', queryCount);
console.log('import end');
resolve();
})
.catch(reject);
});

parser.on('data', async (query) => {
parser.pause();
await this._client.raw(query).catch(_ => false);
parser.resume();
parser.on('data', async query => {
this._queries.push(this._client.raw(query, { split: false }));
queryCount++;
});

Expand Down
3 changes: 2 additions & 1 deletion src/main/workers/importer.js
Expand Up @@ -8,7 +8,8 @@ process.on('message', async ({ type, dbConfig, options }) => {
const connection = await ClientsFactory.getConnection({
client: options.type,
params: dbConfig,
poolSize: 1
poolSize: 1,
logger: () => null
});
await connection.connect();

Expand Down

0 comments on commit f444746

Please sign in to comment.