Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Open streams from the fs.open / fs.close functions.

  • Loading branch information...
commit 21f4325c9d299c6e9f06d75aa8a433db9113416c 1 parent 5c64384
@Gozala authored
Showing with 120 additions and 81 deletions.
  1. +120 −81 lib/fs.js
View
201 lib/fs.js
@@ -67,6 +67,9 @@ const FLAGS = {
const _ = new Namespace();
+function isWritable(mode) { return !!(mode & PR_WRONLY || mode & PR_RDWR); }
+function isReadable(mode) { return !!(mode & PR_RDONLY || mode & PR_RDWR); }
+
function isString(value) { return typeof value === "string"; }
function isFunction(value) { return typeof value === "function"; }
@@ -76,6 +79,14 @@ function toArray(enumerator) {
value.push(enumerator.getNext())
return value
}
+
+function defer(wrapped) {
+ return function deferred() {
+ setTimeout(function(self, args) {
+ wrapped.apply(self, args);
+ }, 0, this, Array.slice(arguments));
+ }
+}
function getFileName(file) {
return file.QueryInterface(Ci.nsIFile).leafName;
}
@@ -111,17 +122,22 @@ const ReadStream = Stream.extend({
if ('position' in options && options.position)
this.position = options.position;
- try {
- let fd = isString(path) ? new FileDescriptor(path) : _(path).value;
- let source = new FileInputStream();
- source.init(fd, Flags(this.flags), Mode(this.mode), fd.DEFER_OPEN);
- let pump = new StreamPump(source, this.position, this.length, 0, 0, true);
- _(this).request = pump;
- this.read();
- } catch (error) {
- this.readable = false;
- this.emit("error", error);
- }
+ let { flags, mode, position, length } = this;
+ let { input } = _(isString(path) ? openSync(path, flags, mode) : path);
+ // We use `nsIStreamTransportService` service to transform blocking
+ // file input stream into a fully asynchronous stream that can be written
+ // without blocking the main thread.
+ let transport = createInputTransport(input, position, length, false);
+ // Open an input stream on a transport. We don't pass flags to guarantee
+ // non-blocking stream semantics. Also we use defaults for segment size &
+ // count.
+ let asyncInputStream = transport.openInputStream(null, 0, 0);
+ _(this).asyncInputStream = asyncInputStream;
+ let pump = StreamPump(asyncInputStream, position, length, 0, 0, false);
+ _(this).pump = pump;
+ let binaryInputStream = BinaryInputStream(asyncInputStream);
+ _(this).binaryInputStream = binaryInputStream;
+ this.read();
},
path: null,
encoding: null,
@@ -137,12 +153,12 @@ const ReadStream = Stream.extend({
this.encoding = String(encoding).toUpperCase();
},
read: function read() {
- let stream = this;
- _(this).request.asyncRead({
+ let [ stream, { binaryInputStream, pump } ] = [ this, _(this) ];
+ pump.asyncRead({
onStartRequest: function onStartRequest() { stream.emit("start"); },
onDataAvailable: function onDataAvailable(req, c, input, offset, count) {
try {
- let bytes = StreamReader(input).readByteArray(count);
+ let bytes = binaryInputStream.readByteArray(count);
stream.emit("data", new Buffer(bytes, stream.encoding));
} catch (error) {
stream.emit("error", error);
@@ -152,24 +168,25 @@ const ReadStream = Stream.extend({
onStopRequest: function onStopRequest() {
stream.readable = false;
stream.emit("end");
- stream.destroy();
}
}, null);
},
pause: function pause() {
this.paused = true;
- _(this).request.suspend();
+ _(this).pump.suspend();
},
resume: function resume() {
this.paused = false;
- _(this).request.resume();
+ _(this).pump.resume();
},
destroy: function destroy() {
this.readable = false;
try {
- _(this).request.cancel(null);
- delete _(this).request;
this.emit("close", null);
+ _(this).pump.cancel(null);
+ delete _(this).pump;
+ _(this).binaryInputStream.close();
+ delete _(this).binaryInputStream;
} catch (error) {
this.emit("error", error);
}
@@ -191,34 +208,33 @@ const WriteStream = Stream.extend({
if ('position' in options && options.position !== undefined)
this.position = options.position;
- // Normalizing `mode` and `flags`.
- let [ mode, flags, stream ] = [ Mode(this.mode), Flags(this.flags), this ];
- // If pass was passed we create a file descriptor out of it. Otherwise
- // we just use given file descriptor.
- let fd = isString(path) ? new FileDescriptor(path) : _(path).value;
- // We create file output stream out of the file.
- let fileStream = new FileOutputStream();
- // We initialize stream with given `flags` `mode` and a behavior flag that
- // defers opening of stream.
- fileStream.init(fd, flags, mode, fileStream.DEFER_OPEN);
- // We use `nsIStreamTransportService` service to transform blocking
- // file output stream into a fully asynchronous stream that can be written
- // without blocking the main thread.
- let transport = createOutputTransport(fileStream, this.position, -1, false);
- // Open an output stream on a transport. We don't pass flags to guarantee
- // non-blocking stream semantics. Also we use defaults for segment size &
- // count.
- let asyncOutputStream = transport.openOutputStream(null, 0, 0);
- _(this).asyncOutputStream = asyncOutputStream;
- // Finally we create a non-blocking binary output stream. This will allows
- // us to write buffers as byte arrays without any further transcoding.
- let binaryOutputStream = new StreamWriter(asyncOutputStream);
- // Storing output stream so that it can be accessed later.
- _(this).binaryOutputStream = binaryOutputStream;
+ try {
+ let { mode, flags } = this;
+ // If pass was passed we create a file descriptor out of it. Otherwise
+ // we just use given file descriptor.
+ let { output } = isString(path) ? openSync(path, flags, mode) : _(path);
+ // We use `nsIStreamTransportService` service to transform blocking
+ // file output stream into a fully asynchronous stream that can be written
+ // without blocking the main thread.
+ let transport = createOutputTransport(output, this.position, -1, false);
+ // Open an output stream on a transport. We don't pass flags to guarantee
+ // non-blocking stream semantics. Also we use defaults for segment size &
+ // count.
+ let asyncOutputStream = transport.openOutputStream(null, 0, 0);
+ _(this).asyncOutputStream = asyncOutputStream;
+ // Finally we create a non-blocking binary output stream. This will allows
+ // us to write buffers as byte arrays without any further transcoding.
+ let binaryOutputStream = BinaryOutputStream(asyncOutputStream);
+ // Storing output stream so that it can be accessed later.
+ _(this).binaryOutputStream = binaryOutputStream;
+ } catch (error) {
+ this.emit("error", error);
+ }
},
writable: true,
drainable: true,
encoding: null,
+ position: -1,
flags: 'w',
mode: FILE_PERMISSION,
write: function write(content, encoding, callback) {
@@ -316,7 +332,7 @@ const WriteStream = Stream.extend({
});
exports.WriteStream = WriteStream;
exports.createWriteStream = function createWriteStream(path, options) {
- return new Write(path, options);
+ return new WriteStream(path, options);
};
var Stats = Extendable.extend({
@@ -584,9 +600,17 @@ exports.readdir = Async(exports.readdirSync);
* Synchronous close(2).
*/
exports.closeSync = function closeSync(fd) {
- delete _(fd).value;
- delete _(fd).mode;
- delete _(fd).flags;
+ fd = _(fd);
+
+ delete fd.rawFile;
+ // Closing input stream and removing reference.
+ if (fd.input)
+ fd.close();
+ delete fd.input;
+ // Closing output stream and removing reference.
+ if (fd.output)
+ fd.close();
+ delete fd.output;
};
/**
* Asynchronous close(2). No arguments other than a possible exception are
@@ -597,13 +621,22 @@ exports.close = Async(exports.closeSync);
/**
* Synchronous open(2).
*/
-exports.openSync = function openSync(path, flags, mode) {
- let fd = { path: path, flags: flags, mode: mode };
- _(fd).value = new FileDescriptor(path);
- _(fd).flags = flags;
- _(fd).mode = mode;
+function openSync(path, flags, mode) {
+ let [ fd, flags, mode, rawFile ] =
+ [ { path: path }, Flags(flags), Mode(mode), RawFile(path) ];
+
+ _(fd).rawFile = rawFile;
+ // If we want to open file in read mode we initialize input stream.
+ _(fd).input = isReadable(flags) ?
+ FileInputStream(rawFile, flags, mode, DEFER_OPEN) : null;
+
+ // If we want to open file in write mode we initialize output stream for it.
+ _(fd).output = isWritable(flags) ?
+ FileOutputStream(rawFile, flags, mode, DEFER_OPEN) : null;
+
return fd;
}
+exports.openSync = openSync;
/**
* Asynchronous file open. See open(2). Flags can be
* `'r', 'r+', 'w', 'w+', 'a'`, or `'a+'`. mode defaults to `0666`.
@@ -680,26 +713,19 @@ exports.readSync = function readSync(fd, buffer, offset, length, position) {
* The callback is given the three arguments, `(error, bytesRead, buffer)`.
*/
exports.read = function read(fd, buffer, offset, length, position, callback) {
- if (!Buffer.isBuffer(buffer)) {
- [ offset, length, position, callback ] = Array.slice(arguments, 1);
- buffer = new Buffer(length);
+ if (!Buffer.isBuffer(buffer)) { // (fd, length, position, encoding, callback)
+ [ length, position, encoding, callback ] = Array.slice(arguments, 1);
+ buffer = new Buffer(length, encoding);
}
let bytesRead = 0;
- let [ mode, flags ] = _(fd);
- let readStream = new ReadStream(fd, {
- mode: mode,
- flags: flags,
- length: length,
- position: position
- });
+ let readStream = new ReadStream(fd, { position: position, length: length });
readStream.on("data", function onData(chunck) {
chunck.copy(buffer, offset + bytesRead);
bytesRead += buffer.length;
+ if (bytesRead >= length) {
+ callback(null, bytesRead, buffer);
+ }
});
- readStream.on("end", function onEnd() {
- callback(null, bytesRead, buffer);
- });
- readStream.on("error", callback);
};
/**
@@ -707,14 +733,23 @@ exports.read = function read(fd, buffer, offset, length, position, callback) {
* The callback is passed two arguments `(error, data)`, where data is the
* contents of the file.
*/
-exports.readFile = function readFile(path, callback) {
+exports.readFile = function readFile(path, encoding, callback) {
+ if (isFunction(encoding))
+ [ callback, encoding ] = [ encoding, null ];
+
let buffer = new Buffer();
- let stream = new ReadStream(path);
- stream.on("data", function(chunck) {
+ let readStream = new ReadStream(path);
+ readStream.on("data", function(chunck) {
chunck.copy(buffer, buffer.length);
});
- stream.on("error", callback);
- stream.on("end", callback.bind(null, null, buffer));
+ readStream.on("error", function onError(error) {
+ callback(error);
+ readStream.destroy();
+ });
+ readStream.on("end", function onEnd() {
+ callback(null, buffer);
+ readStream.destroy();
+ });
};
/**
@@ -732,16 +767,20 @@ exports.readFileSync = function readFileSync(path, encoding) {
*/
exports.writeFile = function writeFile(path, content, encoding, callback) {
try {
- if (!callback)
- [ callback, encoding ] = [ encoding, callback ];
-
- let stream = new WriteStream(path);
- if (callback)
- stream.once("drain", callback.bind(null, null))
-
- if (typeof content === "string")
- content = typeof content === "string" ? new Buffer(content) : content;
- stream.write(content);
+ if (isFunction(encoding))
+ [ callback, encoding ] = [ encoding, null ];
+ if (isString(content))
+ content = new Buffer(content, encoding);
+
+ let writeStream = new WriteStream(path);
+ writeStream.on("error", function onError(error) {
+ callback(error);
+ writeStream.destroy();
+ });
+ writeStream.write(content, function onDrain() {
+ callback(null);
+ writeStream.destroy();
+ });
} catch (error) {
callback(error);
}
Please sign in to comment.
Something went wrong with that request. Please try again.