Skip to content

Commit

Permalink
Specify columns order for pg copy (#123)
Browse files Browse the repository at this point in the history
* specified columns order for pg copy

* updated version to 6.1.0
  • Loading branch information
AnatolyUss committed Feb 18, 2024
1 parent 3271897 commit eef826c
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -109,7 +109,7 @@ Or, if you have moved <code>config</code> folder out from Nmig's directory:<br /
<br /><b>Note:</b> "logs_directory" will be created during script execution.</p>

<h3>VERSION</h3>
<p>Current version is 6.0.0</p>
<p>Current version is 6.1.0</p>

<h3>LICENSE</h3>
<p>NMIG is available under "GNU GENERAL PUBLIC LICENSE" (v. 3) <br />
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "nmig",
"version": "6.0.0",
"version": "6.1.0",
"description": "The database migration app",
"author": "Anatoly Khaytovich<anatolyuss@gmail.com>",
"license": "GPL-3.0",
Expand Down
8 changes: 6 additions & 2 deletions src/DataChunksProcessor.ts
Expand Up @@ -44,8 +44,9 @@ export default async (
true,
);
const logTitle = 'DataChunksProcessor::default';
const arrTableColumns = (conversion._dicTables.get(tableName) as Table).arrTableColumns;
const selectFieldList: string = arrangeColumnsData(
(conversion._dicTables.get(tableName) as Table).arrTableColumns,
arrTableColumns,
+conversion._mysqlVersion.split('.').slice(0, 2).join('.'),
conversion._encoding,
);
Expand All @@ -72,8 +73,11 @@ export default async (

const metadata: string = JSON.stringify({
_tableName: tableName,
_selectFieldList: selectFieldList,
_rowsCnt: rowsCnt,
_selectFieldList: selectFieldList,
_copyColumnNamesList: arrTableColumns
.map((column: any): string => `"${column.Field}"`)
.join(','),
});

params.sql = `INSERT INTO ${getDataPoolTableName(conversion)}("metadata") VALUES ($1);`;
Expand Down
2 changes: 1 addition & 1 deletion src/DataPipeManager.ts
Expand Up @@ -187,7 +187,7 @@ export default class DataPipeManager {
DataPipeManager.getNumberOfReaderProcesses(conversion);

// !!!Note, invoke the "DataPipeManager.runDataReaderProcess" method sequentially.
// DO NOT use ".map(async _ => await DataPipeManager.runDataReaderProcess(..." to avoid race condition.
// DO NOT use ".map(async _ => await DataPipeManager.runDataReaderProcess(...))" to avoid race condition.
for (let i = 0; i < numberOfReaderProcesses; ++i) {
await DataPipeManager.runDataReaderProcess(conversion);
}
Expand Down
6 changes: 3 additions & 3 deletions src/DataReader.ts
Expand Up @@ -74,14 +74,14 @@ process.on('message', async (signal: MessageToDataReader): Promise<void> => {
const populateTable = async (conv: Conversion, chunk: any): Promise<void> => {
const tableName: string = chunk._tableName;
const strSelectFieldList: string = chunk._selectFieldList;
const copyColumnNamesList: string = chunk._copyColumnNamesList;
const rowsCnt: number = chunk._rowsCnt;
const dataPoolId: number = chunk._id;
const originalTableName: string = extraConfigProcessor.getTableName(conv, tableName, true);
const sql = `SELECT ${strSelectFieldList} FROM \`${originalTableName}\`;`;
const mysqlClient: PoolConnection = await DBAccess.getMysqlClient(conv);
const sqlCopy = `COPY "${conv._schema}"."${tableName}" FROM STDIN
WITH(FORMAT csv, DELIMITER '${conv._delimiter}',
ENCODING '${conv._targetConString.charset}');`;
const sqlCopy = `COPY "${conv._schema}"."${tableName}" (${copyColumnNamesList}) FROM STDIN
WITH (FORMAT csv, DELIMITER '${conv._delimiter}', ENCODING '${conv._targetConString.charset}');`;

const client: PoolClient = await DBAccess.getPgClient(conv);
let originalSessionReplicationRole: string | null = null;
Expand Down

0 comments on commit eef826c

Please sign in to comment.