Skip to content

Commit

Permalink
Fix: Use custom implementation of fs.WriteStream that supports flush (f…
Browse files Browse the repository at this point in the history
…ixes #189)
  • Loading branch information
phated committed Jul 8, 2016
1 parent d905bcd commit 8bc7363
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 39 deletions.
67 changes: 28 additions & 39 deletions lib/dest/write-contents/write-stream.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,54 @@
'use strict';

var fs = require('graceful-fs');

var fo = require('../../file-operations');
var readStream = require('../../src/get-contents/read-stream');

function writeStream(file, onWritten) {
var opt = {
mode: file.stat.mode,
// TODO: need to test this (node calls this `flags` property)
flag: file.flag,
};

var outStream = fs.createWriteStream(file.path, opt);
var fd = null;

file.contents.once('error', complete);
file.contents.once('end', readStreamEnd);
outStream.once('error', complete);
outStream.once('finish', complete);
outStream.once('open', onOpen);
// TODO: is this the best API?
var outStream = fo.createWriteStream(file.path, opt, onFlush);

// Streams are piped with end disabled, this prevents the
// WriteStream from closing the file descriptor after all
// data is written.
file.contents.pipe(outStream, { end: false });
file.contents.once('error', onComplete);
outStream.once('error', onComplete);
outStream.once('finish', onComplete);

// Obtain the file descriptor from the "open" event.
function onOpen(openFd) {
fd = openFd;
}
// TODO: should this use a clone?
file.contents.pipe(outStream);

function readStreamEnd() {
readStream(file, complete);
function cleanup() {
file.contents.removeListener('error', onComplete);
outStream.removeListener('error', onComplete);
outStream.removeListener('finish', onComplete);
}

function end(propagatedErr) {
outStream.end(onEnd);

function onEnd(endErr) {
onWritten(propagatedErr || endErr);
}
function onComplete(streamErr) {
cleanup();
onWritten(streamErr);
}

// Cleanup
function complete(streamErr) {
file.contents.removeListener('error', complete);
file.contents.removeListener('end', readStreamEnd);
outStream.removeListener('error', complete);
outStream.removeListener('finish', complete);
outStream.removeListener('open', onOpen);
function onFlush(fd, callback) {
// TODO: removing this before readStream because it replaces the stream
file.contents.removeListener('error', onComplete);

if (streamErr) {
return end(streamErr);
}
// TODO: this is doing sync stuff & the callback seems unnecessary
// TODO: do we really want to replace the contents stream or should we use a clone
readStream(file, complete);

if (typeof fd !== 'number') {
return end();
}
function complete() {
if (typeof fd !== 'number') {
return callback();
}

fo.updateMetadata(fd, file, end);
fo.updateMetadata(fd, file, callback);
}
}

}

module.exports = writeStream;
90 changes: 90 additions & 0 deletions lib/file-operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,95 @@ function mkdirp(dirpath, customMode, callback) {
}
}

function createWriteStream(path, options, flush) {
return new WriteStream(path, options, flush);
}

var FlushWriteStream = require('flush-write-stream');
var util = require('util');

function WriteStream(path, options, flush) {
if (!(this instanceof WriteStream)) {
return new WriteStream(path, options, flush);
}

options = options || {};

FlushWriteStream.call(this, options, worker, cleanup);

this.flush = flush;
this.path = path;

// TODO: in node, this is named flags
this.flag = options.flag || 'w';
this.mode = options.mode || constants.DEFAULT_FILE_MODE;

this.fd = null;
this.start = undefined;
this.pos = APPEND_MODE_REGEXP.test(this.flag) ? null : 0;;

this.open();

// Dispose on finish.
this.once('finish', this.close);
}

util.inherits(WriteStream, FlushWriteStream);

WriteStream.prototype.open = function() {
var self = this;

fs.open(this.path, this.flag, this.mode, onOpen);

function onOpen(openErr, fd) {
if (openErr) {
self.destroy();
self.emit('error', openErr);
return;
}

self.fd = fd;
self.emit('open', fd);
}
};

// Like `fs.WriteStream`
WriteStream.prototype.destroy = fs.WriteStream.prototype.destroy;
WriteStream.prototype.close = fs.WriteStream.prototype.close;
WriteStream.prototype.destroySoon = WriteStream.prototype.end;

function worker(data, encoding, callback) {
var self = this;

if (!Buffer.isBuffer(data)) {
return this.emit('error', new Error('Invalid data'));
}

if (typeof this.fd !== 'number') {
return this.once('open', onOpen);
}

fs.write(this.fd, data, 0, data.length, this.pos, onWrite);

function onOpen() {
self._write(data, encoding, callback);
}

function onWrite(writeErr) {
if (writeErr) {
self.destroy();
callback(writeErr);
return;
}

callback();
}
}

function cleanup(callback) {
this.flush(this.fd, callback);
}

module.exports = {
closeFd: closeFd,
isValidUnixId: isValidUnixId,
Expand All @@ -334,4 +423,5 @@ module.exports = {
updateMetadata: updateMetadata,
writeFile: writeFile,
mkdirp: mkdirp,
createWriteStream: createWriteStream,
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
],
"dependencies": {
"duplexify": "^3.2.0",
"flush-write-stream": "^1.0.0",
"glob-stream": "^5.3.2",
"graceful-fs": "^4.0.0",
"gulp-sourcemaps": "^1.5.2",
Expand Down

0 comments on commit 8bc7363

Please sign in to comment.