Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions History.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#0.2.4

* Added more fine grained control to `.pause` and `.resume`
* You can now pause resume between chunks

# 0.2.3

* Add new `createWriteStream` for creating a streaming csv writer
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
DOC_COMMAND=coddoc -f multi-html -d ./lib --dir ./docs
MD_COMMAND=coddoc -f markdown -d ./lib > README.md

test:
export NODE_PATH=$NODE_PATH:lib && ./node_modules/it/bin/it -r dotmatrix

docs: docclean
$(DOC_COMMAND) && $(MD_COMMAND)
$(DOC_COMMAND)

docclean :
rm -rf docs/*
Expand Down
10 changes: 5 additions & 5 deletions benchmark/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function camelize(str) {

function benchmarkFastCsv(done) {
var count = 0;
fastCsv
var stream = fastCsv
.fromPath(TEST_FILE, {headers: true})
.transform(function (data) {
var ret = {};
Expand All @@ -23,12 +23,12 @@ function benchmarkFastCsv(done) {
ret.address = data.address;
return ret;
})
.on("record", function () {
count++;
.on("record", function (data, i) {
count = i + 1;
})
.on("end", function () {
if (count !== COUNT) {
done(new Error("Error expected %d got %d", COUNT, count));
done(new Error("Error expected " + COUNT + " got " + count));
} else {
done();
}
Expand All @@ -51,7 +51,7 @@ function benchmarkCsv(done) {
ret.address = data[3];
return ret;
})
.on('record', function () {
.on('record', function (data) {
count++;
})
.on('end', function () {
Expand Down
7 changes: 7 additions & 0 deletions docs/History.html
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@



<h1>0.2.4</h1>
<ul>
<li>Added more fine grained control to <code>.pause</code> and <code>.resume</code><ul>
<li>You can now pause resume between chunks</li>
</ul>
</li>
</ul>
<h1>0.2.3</h1>
<ul>
<li>Add new <code>createWriteStream</code> for creating a streaming csv writer</li>
Expand Down
43 changes: 33 additions & 10 deletions lib/parser_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function ParserStream(options) {
stream.Transform.call(this, options);
this.lines = "";
this._parsedHeaders = false;
this._rowCount = 0;
this._rowCount = -1;
this._emitData = false;
options = options || {};
var delimiter;
Expand All @@ -31,6 +31,7 @@ function ParserStream(options) {
this.parser = createParser(options);
this._headers = options.headers;
this._ignoreEmpty = options.ignoreEmpty;
this.__buffered = [];
return this;
}

Expand Down Expand Up @@ -61,7 +62,7 @@ extended(ParserStream).extend({

__handleLine: function __parseLineData(line, index, ignore) {
var ignoreEmpty = this._ignoreEmpty;
if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line || EMPTY.test(line.join("")))) {
if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line || EMPTY.test(line.join("")))) {
return null;
}
if (!ignore) {
Expand All @@ -77,9 +78,7 @@ extended(ParserStream).extend({
},

_parse: function _parseLine(data, hasMoreData) {
var row,
emitData = this._emitData,
count = 0, ret, rows, self = this;
var row, count, ret, rows, self = this;
try {
data = this.parser(data, hasMoreData);
ret = data.line;
Expand Down Expand Up @@ -109,12 +108,15 @@ extended(ParserStream).extend({
for (var i = 0, l = rows.length; i < l; i++) {
row = rows[i];
if (row) {
var dataRow = this.__handleLine(row, count);
var dataRow = this.__handleLine(row, (count = ++this._rowCount));
if (dataRow) {
this.emit("record", dataRow, (count = this._rowCount++));
if (emitData) {
this.push(JSON.stringify(dataRow));
if (!this.paused) {
this.__emitRecord(dataRow, count);
} else {
this.__buffered.push([dataRow, count]);
}
} else {
count = --this._rowCount;
}
}
}
Expand All @@ -124,6 +126,13 @@ extended(ParserStream).extend({
return ret;
},

__emitRecord: function (dataRow, count) {
this.emit("record", dataRow, count);
if (this._emitData) {
this.push(JSON.stringify(dataRow));
}
},

_transform: function (data, encoding, done) {
var lines = this.lines;
var lineData = (lines + data);
Expand All @@ -142,7 +151,8 @@ extended(ParserStream).extend({
if (this.lines) {
this._parse(this.lines, false);
}
this.emit("end", this._rowCount);
//increment row count so we aren't 0 based
this.emit("end", ++this._rowCount);
callback();
},

Expand All @@ -163,6 +173,19 @@ extended(ParserStream).extend({
resume: function () {
if (this.paused) {
this.paused = false;
var buffered = this.__buffered, l = buffered.length;
if (l) {
var i = -1, entry;
while (++i < buffered.length) {
entry = buffered.shift();
this.__emitRecord(entry[0], entry[1]);
//handle case where paused is called while emitting data
if (this.paused) {
return;
}
}
buffered.length = 0;
}
if (this.__pausedDone) {
var done = this.__pausedDone;
this.__pausedDone = null;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "fast-csv",
"version": "0.2.3",
"version": "0.2.4",
"description": "CSV parser and writer",
"main": "index.js",
"scripts": {
Expand Down
31 changes: 28 additions & 3 deletions test/fast-csv.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,33 @@ it.describe("fast-csv", function (it) {
});
});

it.describe("pause/resume", function () {

it.should("support pausing a stream", function (next) {
var actual = [], paused = false;
var stream = csv
.fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true})
.on("record", function (data, index) {
assert.isFalse(paused);
actual[index] = data;
paused = true;
stream.pause();
setTimeout(function () {
assert.isTrue(paused);
paused = false;
stream.resume();
}, 100);
})
.on("error", next)
.on("end", function (count) {
assert.deepEqual(actual, expected4);
assert.equal(count, actual.length);
next();
});
});

});


it.should("throw an error if an invalid path or stream is passed in", function () {
assert.throws(function () {
Expand Down Expand Up @@ -708,6 +735,4 @@ it.describe("fast-csv", function (it) {
stream.write(null);
});
});
});

it.run().both(process.exit);
});