Skip to content

Commit

Permalink
Fix: Use custom implementation of fs.WriteStream that supports flush (f…
Browse files Browse the repository at this point in the history
…ixes #189)
  • Loading branch information
phated committed Nov 30, 2017
1 parent 0cec736 commit 483d1cc
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 42 deletions.
69 changes: 31 additions & 38 deletions lib/dest/write-contents/write-stream.js
@@ -1,65 +1,58 @@
'use strict';

var fs = require('graceful-fs');

var fo = require('../../file-operations');
var readStream = require('../../src/get-contents/read-stream');

function writeStream(file, onWritten) {
var opt = {
mode: file.stat.mode,
// TODO: need to test this (node calls this `flags` property)
flag: file.flag,
};

var outStream = fs.createWriteStream(file.path, opt);
var fd = null;

file.contents.once('error', complete);
file.contents.once('end', readStreamEnd);
outStream.once('error', complete);
outStream.once('finish', complete);
outStream.once('open', onOpen);
// TODO: is this the best API?
var outStream = fo.createWriteStream(file.path, opt, onFlush);

// Streams are piped with end disabled, this prevents the
// WriteStream from closing the file descriptor after all
// data is written.
file.contents.pipe(outStream, { end: false });
file.contents.once('error', onComplete);
outStream.once('error', onComplete);
outStream.once('finish', onComplete);

// Obtain the file descriptor from the "open" event.
function onOpen(openFd) {
fd = openFd;
}
// TODO: should this use a clone?
file.contents.pipe(outStream);

function readStreamEnd() {
readStream(file, complete);
}
function onComplete(streamErr) {
// Cleanup event handlers before closing
file.contents.removeListener('error', onComplete);
outStream.removeListener('error', onComplete);
outStream.removeListener('finish', onComplete);

function end(propagatedErr) {
outStream.end(onEnd);
// Need to guarantee the fd is closed before forwarding the error
outStream.once('close', onClose);
outStream.end();

function onEnd(endErr) {
onWritten(propagatedErr || endErr);
function onClose(closeErr) {
onWritten(streamErr || closeErr);
}
}

// Cleanup
function complete(streamErr) {
file.contents.removeListener('error', complete);
file.contents.removeListener('end', readStreamEnd);
outStream.removeListener('error', complete);
outStream.removeListener('finish', complete);
outStream.removeListener('open', onOpen);
function onFlush(fd, callback) {
// TODO: removing this before readStream because it replaces the stream
file.contents.removeListener('error', onComplete);

if (streamErr) {
return end(streamErr);
}
// TODO: this is doing sync stuff & the callback seems unnecessary
// TODO: do we really want to replace the contents stream or should we use a clone
readStream(file, complete);

if (typeof fd !== 'number') {
return end();
}
function complete() {
if (typeof fd !== 'number') {
return callback();
}

fo.updateMetadata(fd, file, end);
fo.updateMetadata(fd, file, callback);
}
}

}

module.exports = writeStream;
101 changes: 101 additions & 0 deletions lib/file-operations.js
@@ -1,10 +1,13 @@
'use strict';

var util = require('util');

var fs = require('graceful-fs');
var path = require('path');
var assign = require('object-assign');
var isEqual = require('lodash.isequal');
var isValidDate = require('vali-date');
var FlushWriteStream = require('flush-write-stream');

var constants = require('./constants');

Expand Down Expand Up @@ -324,6 +327,103 @@ function mkdirp(dirpath, customMode, callback) {
}
}

function createWriteStream(path, options, flush) {
return new WriteStream(path, options, flush);
}

// Taken from node core and altered to receive a flush function and simplified
// To be used for cleanup (like updating times/mode/etc)
function WriteStream(path, options, flush) {
// Not exposed so we can avoid the case where someone doesn't use `new`

if (typeof options === 'function') {
flush = options;
options = null;
}

options = options || {};

FlushWriteStream.call(this, options, worker, cleanup);

this.flush = flush;
this.path = path;

this.mode = options.mode || constants.DEFAULT_FILE_MODE;
this.flag = options.flag || 'w';
this.pos = APPEND_MODE_REGEXP.test(this.flag) ? null : 0;;

// Used by node's `fs.WriteStream`
this.fd = null;
this.start = null;

this.open();

// Dispose on finish.
this.once('finish', this.close);
}

util.inherits(WriteStream, FlushWriteStream);

WriteStream.prototype.open = function() {
var self = this;

fs.open(this.path, this.flag, this.mode, onOpen);

function onOpen(openErr, fd) {
if (openErr) {
self.destroy();
self.emit('error', openErr);
return;
}

self.fd = fd;
self.emit('open', fd);
}
};

// Use our `end` method since it is patched for flush
WriteStream.prototype.destroySoon = WriteStream.prototype.end;
// Use node's `fs.WriteStream` methods
WriteStream.prototype.destroy = fs.WriteStream.prototype.destroy;
WriteStream.prototype.close = fs.WriteStream.prototype.close;

function worker(data, encoding, callback) {
var self = this;

// This is from node core but I have no idea how to get code coverage on it
if (!Buffer.isBuffer(data)) {
return this.emit('error', new Error('Invalid data'));
}

if (typeof this.fd !== 'number') {
return this.once('open', onOpen);
}

fs.write(this.fd, data, 0, data.length, this.pos, onWrite);

function onOpen() {
self._write(data, encoding, callback);
}

function onWrite(writeErr) {
if (writeErr) {
self.destroy();
callback(writeErr);
return;
}

callback();
}
}

function cleanup(callback) {
if (typeof this.flush !== 'function') {
return callback();
}

this.flush(this.fd, callback);
}

module.exports = {
closeFd: closeFd,
isValidUnixId: isValidUnixId,
Expand All @@ -334,4 +434,5 @@ module.exports = {
updateMetadata: updateMetadata,
writeFile: writeFile,
mkdirp: mkdirp,
createWriteStream: createWriteStream,
};
2 changes: 2 additions & 0 deletions package.json
Expand Up @@ -12,6 +12,7 @@
],
"dependencies": {
"duplexify": "^3.2.0",
"flush-write-stream": "^1.0.0",
"glob-stream": "^5.3.2",
"graceful-fs": "^4.0.0",
"gulp-sourcemaps": "1.6.0",
Expand All @@ -36,6 +37,7 @@
"eslint": "^1.10.3",
"eslint-config-gulp": "^2.0.0",
"expect": "^1.14.0",
"from2": "^2.1.1",
"github-changes": "^1.0.1",
"istanbul": "^0.3.0",
"istanbul-coveralls": "^1.0.1",
Expand Down
8 changes: 4 additions & 4 deletions test/dest.js
Expand Up @@ -18,6 +18,7 @@ var expect = require('expect');
var bufEqual = require('buffer-equal');
var through = require('through2');
var File = require('vinyl');
var from = require('from2');

var should = require('should');
require('mocha');
Expand Down Expand Up @@ -1146,7 +1147,9 @@ describe('dest stream', function() {
var inputPath = path.join(__dirname, './fixtures/test.coffee');
var inputBase = path.join(__dirname, './fixtures/');

var contentStream = through.obj();
var contentStream = from(function(size, cb) {
cb(new Error('mocked error'));
});
var expectedFile = new File({
base: inputBase,
cwd: __dirname,
Expand All @@ -1156,9 +1159,6 @@ describe('dest stream', function() {

var stream = vfs.dest('./out-fixtures/', { cwd: __dirname });
stream.write(expectedFile);
setTimeout(function() {
contentStream.emit('error', new Error('mocked error'));
}, 100);
stream.on('error', function(err) {
expect(err).toExist();
done();
Expand Down

0 comments on commit 483d1cc

Please sign in to comment.