/
loadFromCsvStream.js
81 lines (75 loc) · 2.64 KB
/
loadFromCsvStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
const csv = require('csv');
/**
* Function to load a CSV file into the database in a configurable manner.
* Wraps all load operations in a transaction, so if one row fails no rows will be inserted.
* @example
* loadFromCsvStream(
* 	stream,
* 	row => new Reading(...),
* 	(readings, tx) => Reading.insertAll(readings, tx)
* ).then(() => log('Inserted!'));
* @param stream the raw stream to load from
* @param {function(Array.<*>, ...*): M} mapRowToModel A function that maps a CSV row (an array) to a model object
* @param {function(Array.<M>, ITask): Promise<>} bulkInsertModels A function that bulk inserts an array of models using the supplied transaction
* @param conn the database connection to use
* @template M
*/
function loadFromCsvStream(stream, mapRowToModel, bulkInsertModels, conn) {
return conn.tx(t => new Promise(resolve => {
let rejected = false;
const error = null;
const MIN_INSERT_BUFFER_SIZE = 1000;
let modelsToInsert = [];
const pendingInserts = [];
const parser = csv.parse();
function insertQueuedModels() {
const insert = bulkInsertModels(modelsToInsert, t);
pendingInserts.push(insert);
modelsToInsert = [];
}
// Defines how the parser behaves when it has new data (models to be inserted)
parser.on('readable', () => {
let row;
// We can only get the next row once so we check that it isn't null at the same time that we assign it
while ((row = parser.read()) !== null) { // tslint:disable-line no-conditional-assignment
if (!rejected) {
modelsToInsert.push(mapRowToModel(row));
}
}
if (!rejected) {
if (modelsToInsert.length >= MIN_INSERT_BUFFER_SIZE) {
insertQueuedModels();
}
}
});
parser.on('error', err => {
if (!rejected) {
resolve(t.batch(pendingInserts).then(() => Promise.reject(err)));
}
rejected = true;
});
// Defines what happens when the parser's input stream is finished (and thus the promise needs to be resolved)
parser.on('finish', () => {
// Insert any models left in the buffer
if (modelsToInsert.length > 0) {
insertQueuedModels();
}
// Resolve the promise, telling pg-promise to run the batch query and complete (or rollback) the
// transaction.
resolve(t.batch(pendingInserts).then(arg => {
if (rejected) {
return Promise.reject(error);
} else {
return Promise.resolve(arg);
}
}));
});
stream.pipe(parser);
}));
}
module.exports = loadFromCsvStream;