objectMode
when parsing a csv which will cause data
events to have an object emitted.createWriteStream
pause
and resume
This is a library that provides CSV parsing and formatting.
NOTE As of v0.2.0 fast-csv
supports multi-line values.
All methods accept the following options
objectMode=true
: Ensure that data
events have an object emitted rather than the stringified version set to false to have a stringified buffer.headers=false
: Ste to true if you expect the first line of your CSV
to contain headers, alternatly you can specify an array of headers to use.ignoreEmpty=false
: If you wish to ignore empty rows.delimiter=','
: If your data uses an alternate delimiter such as ;
or \t
.events
-parse-error
: Emitted if there was an error parsing a row.
-record
: Emitted when a record is parsed.
+
record
: Emitted when a record is parsed.
data-invalid
: Emitted if there was invalid row encounted, only emitted if the validate
function is used.
data
: Emitted with the stringified
version of a record.
([options])
@@ -224,7 +223,7 @@`.fromPath(path[, options])
+.fromPath(path[, options])
This method parses a file from the specified path.
var csv = require("fast-csv");
@@ -236,7 +235,7 @@ Parsing
.on("end", function(){
console.log("done");
});
-`.fromString(string[, options])
+.fromString(string[, options])
This method parses a string
var csv = require("fast-csv");
@@ -252,7 +251,7 @@ Parsing
.on("end", function(){
console.log("done");
});
-`.fromStream(stream[, options])
+.fromStream(stream[, options])
This accepted a readable stream to parse data from.
var stream = fs.createReadStream("my.csv");
@@ -342,7 +341,7 @@ Formatting
createWriteStream(options)
This is the lowest level of the write methods, it creates a stream that can be used to create a csv of unknown size and pipe to an output csv.
var csvStream = csv.createWriteStream({headers: true}),
- writableStream = fs.createWritableStream("my.csv");
+ writableStream = fs.createWriteStream("my.csv");
writableStream.on("finish", function(){
console.log("DONE!");
@@ -417,6 +416,26 @@ Formatting
{a: "a1", b: "b1"},
{a: "a2", b: "b2"}
], {headers: true}); //"a,b\na1,b1\na2,b2\n"
+Piping from Parser to Writer
+You can use fast-csv
to pipe the output from a parsed CSV to a transformed CSV by setting the parser to objectMode
and using createWriteStream
.
+csv
+ .fromPath("in.csv", {headers: true})
+ .pipe(csv.createWriteStream({headers: true}))
+ .pipe(fs.createWriteStream("out.csv", {encoding: "utf8"}));
+When piping from a parser to a formatter the transforms are maintained also.
+csv
+ .fromPath("in.csv", {headers: true})
+ .transform(function(obj){
+ return {
+ name: obj.Name,
+ address: obj.Address,
+ emailAddress: obj.Email_Address,
+ verified: obj.Verified
+ };
+ })
+ .pipe(csv.createWriteStream({headers: true}))
+ .pipe(fs.createWriteStream("out.csv", {encoding: "utf8"}));
+The output will contain formatted result from the transform function.
Benchmarks
Parsing 20000 records AVG over 3 runs
fast-csv: 198.67ms
@@ -438,8 +457,6 @@ Meta
Website: http://c2fo.com
Twitter: http://twitter.com/c2fo - 877.465.4045
-Namespaces
-Classes
diff --git a/lib/formatter.js b/lib/formatter.js
index 15ed5aaa..a91662ed 100644
--- a/lib/formatter.js
+++ b/lib/formatter.js
@@ -1,8 +1,10 @@
var fs = require("fs"),
+ util = require("util"),
extended = require("./extended"),
isUndefinedOrNull = extended.isUndefinedOrNull,
hash = extended.hash,
stream = require("stream"),
+ Transform = stream.Transform,
LINE_BREAK = extended.LINE_BREAK;
function createFormatter(options) {
@@ -73,7 +75,7 @@ function wrapWriter(writer, options) {
hasHeaders = extended.has(options, "headers") ? options.headers : true,
parsedHeaders = hasHeaders ? false : true,
headersLength = 0, i = -1,
- writerWrite = writer.push, headers,
+ writerWrite = writer.write, headers,
buffer = [],
totalCount = 0,
MAX_BUFFER_SIZE = options.maxBuffer || 100000;
@@ -82,7 +84,7 @@ function wrapWriter(writer, options) {
if (item) {
var isHash = !extended.isArray(item), vals;
if (!parsedHeaders) {
- totalCount++
+ totalCount++;
parsedHeaders = true;
if (isHash) {
headers = hash.keys(item);
@@ -117,17 +119,32 @@ function wrapWriter(writer, options) {
writerWrite.call(writer, new Buffer(buffer.join("")).toString("utf8"));
buffer.length = 0;
}
- writerWrite.call(writer, null);
+ writer.end();
}
};
return writer;
}
+function CsvTransformStream(opts) {
+ Transform.call(this, opts);
+ wrapWriter(this, opts);
+}
+
+util.inherits(CsvTransformStream, Transform);
+
+extended(CsvTransformStream).extend({
+
+ _transform: function (str, encoding, cb) {
+ cb(null, str);
+ },
+ _flush: function (cb) {
+ this.write(null);
+ cb(null);
+ }
+});
+
function createWriteStream(options) {
- var writer = new stream.Readable();
- writer._read = function () {
- };
- return wrapWriter(writer, options);
+ return new CsvTransformStream(options);
}
function write(arr, options) {
diff --git a/lib/parser_stream.js b/lib/parser_stream.js
index 24592896..b589b39b 100644
--- a/lib/parser_stream.js
+++ b/lib/parser_stream.js
@@ -1,22 +1,42 @@
var extended = require("./extended"),
isUndefined = extended.isUndefined,
- EventEmitter = require("events").EventEmitter,
util = require("util"),
out = process.stdout,
stream = require("stream"),
EMPTY = /^\s*(?:''|"")?\s*(?:,\s*(?:''|"")?\s*)*$/,
- VALUE = /([^,'"\s\\]*(?:\s+[^,'"\s\\]+)*)/,
- LINE_SPLIT = /[\r\n]+/,
DEFAULT_DELIMITER = ",",
createParser = require("./parser");
+function spreadArgs(f, args, scope) {
+ var ret;
+ switch ((args || []).length) {
+ case 0:
+ ret = f.call(scope);
+ break;
+ case 1:
+ ret = f.call(scope, args[0]);
+ break;
+ case 2:
+ ret = f.call(scope, args[0], args[1]);
+ break;
+ case 3:
+ ret = f.call(scope, args[0], args[1], args[2]);
+ break;
+ default:
+ ret = f.apply(scope, args);
+ }
+ return ret;
+}
+
+
function ParserStream(options) {
+ options = options || {};
+ options.objectMode = extended.has(options, "objectMode") ? options.objectMode : true
stream.Transform.call(this, options);
this.lines = "";
this._parsedHeaders = false;
this._rowCount = -1;
this._emitData = false;
- options = options || {};
var delimiter;
if (extended.has(options, "delimiter")) {
delimiter = options.delimiter;
@@ -31,6 +51,7 @@ function ParserStream(options) {
this.parser = createParser(options);
this._headers = options.headers;
this._ignoreEmpty = options.ignoreEmpty;
+ this.__objectMode = options.objectMode;
this.__buffered = [];
return this;
}
@@ -39,16 +60,17 @@ util.inherits(ParserStream, stream.Transform);
var origOn = ParserStream.prototype.on,
origPause = ParserStream.prototype.pause,
- origResume = ParserStream.prototype.resume;
+ origResume = ParserStream.prototype.resume,
+ origEmit = ParserStream.prototype.emit;
function pause() {
- origPause.apply(this, arguments);
+ spreadArgs(origPause, arguments, this);
this.paused = true;
this.pause = pause;
}
function resume() {
- origResume.apply(this, arguments);
+ spreadArgs(origResume, arguments, this);
this.paused = false;
if (this.__pausedDone) {
this.__pausedDone();
@@ -60,6 +82,10 @@ extended(ParserStream).extend({
__pausedDone: null,
+ __endEmitted: false,
+
+ __emittedData: false,
+
__handleLine: function __parseLineData(line, index, ignore) {
var ignoreEmpty = this._ignoreEmpty;
if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line || EMPTY.test(line.join("")))) {
@@ -129,7 +155,7 @@ extended(ParserStream).extend({
__emitRecord: function (dataRow, count) {
this.emit("record", dataRow, count);
if (this._emitData) {
- this.push(JSON.stringify(dataRow));
+ this.push(this.__objectMode ? dataRow : JSON.stringify(dataRow));
}
},
@@ -152,7 +178,7 @@ extended(ParserStream).extend({
this._parse(this.lines, false);
}
//increment row count so we aren't 0 based
- this.emit("end", ++this._rowCount);
+ this.emit("end");
callback();
},
@@ -170,6 +196,17 @@ extended(ParserStream).extend({
}
},
+ emit: function (event) {
+ if (event === "end") {
+ if (!this.__endEmitted) {
+ this.__endEmitted = true;
+ spreadArgs(origEmit, ["end", ++this._rowCount], this);
+ }
+ } else {
+ spreadArgs(origEmit, arguments, this);
+ }
+ },
+
resume: function () {
if (this.paused) {
this.paused = false;
@@ -199,7 +236,7 @@ extended(ParserStream).extend({
if (evt === "data" || evt === "readable") {
this._emitData = true;
}
- origOn.apply(this, arguments);
+ spreadArgs(origOn, arguments, this);
return this;
},
diff --git a/package.json b/package.json
index 46cbc961..ba5b446b 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "fast-csv",
- "version": "0.2.5",
+ "version": "0.3.0",
"description": "CSV parser and writer",
"main": "index.js",
"scripts": {
@@ -25,15 +25,15 @@
"it": "~0.2.6",
"grunt-it": "~0.3.1",
"grunt": "~0.4.1",
- "grunt-contrib-jshint": "~0.4.3"
- },
- "dependencies": {
- "is-extended": "0.0.8",
- "object-extended": "0.0.5",
- "extended": "0.0.4",
- "string-extended": "0.0.7"
+ "grunt-contrib-jshint": "~0.10.0"
},
"engines": {
"node": ">=0.10"
+ },
+ "dependencies": {
+ "is-extended": "0.0.10",
+ "object-extended": "0.0.7",
+ "extended": "0.0.6",
+ "string-extended": "0.0.8"
}
}
diff --git a/test/assets/test22.csv b/test/assets/test22.csv
new file mode 100644
index 00000000..6ca6d1a3
--- /dev/null
+++ b/test/assets/test22.csv
@@ -0,0 +1,3 @@
+a,b
+a1,b1
+a2,b2
\ No newline at end of file
diff --git a/test/fast-csv.test.js b/test/fast-csv.test.js
index d0ac7427..7c6d7a7f 100644
--- a/test/fast-csv.test.js
+++ b/test/fast-csv.test.js
@@ -162,6 +162,50 @@ it.describe("fast-csv", function (it) {
});
});
+
+ it.should("emit data as a buffer if objectMode is false", function (next) {
+ var actual = [];
+ csv
+ .fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true, objectMode: false})
+ .on("data", function (data) {
+ actual.push(JSON.parse(data + ""));
+ }).
+ on("end", function () {
+ assert.deepEqual(actual, expected4);
+ assert.equal(9, actual.length);
+ next();
+ });
+ });
+
+ it.should("emit data as an object if objectMode is true", function (next) {
+ var actual = [];
+ csv
+ .fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true, objectMode: true})
+ .on("data", function (data) {
+ actual.push(data);
+ })
+ .on("end", function (count) {
+ assert.deepEqual(actual, expected4);
+ assert.equal(count, actual.length);
+ next();
+ });
+ });
+
+ it.should("emit data as an object if objectMode is not specified", function (next) {
+ var actual = [];
+ csv
+ .fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true, objectMode: true})
+ .on("data", function (data) {
+ actual.push(data);
+ })
+ .on("end", function (count) {
+ assert.deepEqual(actual, expected4);
+ assert.equal(count, actual.length);
+ next();
+ });
+ });
+
+
it.should("allow piping from a stream", function (next) {
var actual = [];
var stream = csv({headers: true})
@@ -735,4 +779,48 @@ it.describe("fast-csv", function (it) {
stream.write(null);
});
});
+
+ it.describe("piping from parser to formatter", function (it) {
+
+ it.should("allow piping from a parser to a formatter", function (next) {
+ var writable = fs.createWriteStream(path.resolve(__dirname, "assets/test.csv"), {encoding: "utf8"})
+ csv
+ .fromPath(path.resolve(__dirname, "./assets/test22.csv"), {headers: true, objectMode: true})
+ .on("error", next)
+ .pipe(csv.createWriteStream({headers: true}))
+ .on("error", next)
+ .pipe(writable)
+ .on("error", next);
+
+ writable
+ .on("finish", function () {
+ assert.equal(fs.readFileSync(path.resolve(__dirname, "assets/test.csv")).toString(), "a,b\na1,b1\na2,b2");
+ fs.unlinkSync(path.resolve(__dirname, "assets/test.csv"));
+ next();
+ });
+ });
+
+ it.should("preserve transforms", function (next) {
+ var writable = fs.createWriteStream(path.resolve(__dirname, "assets/test.csv"), {encoding: "utf8"})
+ csv
+ .fromPath(path.resolve(__dirname, "./assets/test22.csv"), {headers: true})
+ .transform(function (obj) {
+ obj.a = obj.a + "-parsed";
+ obj.b = obj.b + "-parsed";
+ return obj;
+ })
+ .on("error", next)
+ .pipe(csv.createWriteStream({headers: true}))
+ .on("error", next)
+ .pipe(writable)
+ .on("error", next);
+
+ writable
+ .on("finish", function () {
+ assert.equal(fs.readFileSync(path.resolve(__dirname, "assets/test.csv")).toString(), "a,b\na1-parsed,b1-parsed\na2-parsed,b2-parsed");
+ fs.unlinkSync(path.resolve(__dirname, "assets/test.csv"));
+ next();
+ });
+ });
+ });
});
\ No newline at end of file