diff --git a/lib/fs.js b/lib/fs.js index 9ba02ec..d25fd0d 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -10,8 +10,6 @@ const { Extendable } = require("./extendables"); // `Namespace` declared by E4X so `const` fails. let { Namespace } = require("./namespace"); -// const ioService = CC("@mozilla.org/network/io-service;1", "nsIIOService")(); -// const ioUtils = CC("@mozilla.org/io-util;1", "nsIIOUtil")(); const RawFile = CC("@mozilla.org/file/local;1", "nsILocalFile", "initWithPath"); @@ -19,18 +17,12 @@ const FileOutputStream = CC("@mozilla.org/network/file-output-stream;1", "nsIFileOutputStream", "init"); const FileInputStream = CC("@mozilla.org/network/file-input-stream;1", "nsIFileInputStream", "init"); -const StreamCopier = CC("@mozilla.org/network/async-stream-copier;1", - "nsIAsyncStreamCopier", "init"); -// const StringStream = CC("@mozilla.org/io/string-input-stream;1", -// "nsIStringInputStream", "setData"); - const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", "nsIBinaryInputStream", "setInputStream"); const BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1", "nsIBinaryOutputStream", "setOutputStream"); const StreamPump = CC("@mozilla.org/network/input-stream-pump;1", "nsIInputStreamPump", "init"); -// const StreamPipe = CC("@mozilla.org/pipe;1", "nsIPipe", "init"); const { createOutputTransport, createInputTransport } = CC("@mozilla.org/network/stream-transport-service;1", @@ -88,14 +80,7 @@ function defer(wrapped) { function getFileName(file) { return file.QueryInterface(Ci.nsIFile).leafName; } -/* -function getFileURI(file) { - return ioService.newFileURI(file); -} -function getFileChannel(file) { - return ioService.newChannelFromURI(getFileURI(file)); -} -*/ + function remove(path, recursive) { return new RawFile(path).remove(recursive || false); } @@ -165,6 +150,9 @@ const WriteStream = OutputStream.extend({ // If pass was passed we create a file descriptor out of it. Otherwise // we just use given file descriptor. let { output } = isString(path) ? openSync(path, flags, mode) : _(path); + // Setting a stream position, unless it's `-1` which means current position. + if (position >= 0) + output.QueryInterface(Ci.nsISeekableStream).seek(NS_SEEK_SET, position); // We use `nsIStreamTransportService` service to transform blocking // file output stream into a fully asynchronous stream that can be written // without blocking the main thread. @@ -177,7 +165,10 @@ const WriteStream = OutputStream.extend({ // us to write buffers as byte arrays without any further transcoding. let binaryOutputStream = BinaryOutputStream(asyncOutputStream); // Storing output stream so that it can be accessed later. - OutputStream.call(this, { output: binaryOutputStream, transport: transport }); + OutputStream.call(this, { + output: binaryOutputStream, + asyncOutputStream: asyncOutputStream + }); }, drainable: true, flags: 'r', @@ -456,15 +447,12 @@ exports.readdir = Async(exports.readdirSync); exports.closeSync = function closeSync(fd) { fd = _(fd); - delete fd.rawFile; // Closing input stream and removing reference. if (fd.input) - fd.close(); - delete fd.input; + fd.input.close(); // Closing output stream and removing reference. if (fd.output) - fd.close(); - delete fd.output; + fd.output.close(); }; /** * Asynchronous close(2). No arguments other than a possible exception are @@ -533,11 +521,7 @@ exports.write = function write(fd, buffer, offset, length, position, callback) { buffer = buffer.slice(offset, offset + length); } - let { mode, flags } = _(fd); - let writeStream = new WriteStream(fd, { - mode: mode, - flags: flags - }); + let writeStream = new WriteStream(fd, { position: position, length: length }); writeStream.on("error", callback); writeStream.write(buffer, function onEnd() { writeStream.destroy(); diff --git a/lib/stream.js b/lib/stream.js index 12ac460..02d72c8 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -4,12 +4,60 @@ const { EventEmitter } = require("./events"); const { Buffer } = require("./buffer"); +const { setTimeout } = require("timer") // `Namespace` declared by E4X so `const` fails. let { Namespace } = require("./namespace"); function isFunction(value) { return typeof value === "function"; } const _ = new Namespace(); +/** + * Utility function / hack that we use to figure if output stream is closed. + */ +function isClosed(stream) { + // We assume that stream is not closed. + let isClosed = false; + stream.asyncWait({ + // If `onClose` callback is called before outer function returns + // (synchronously) `isClosed` will be set to `true` identifying + // that stream is closed. + onOutputStreamReady: function onClose() isClosed = true + + // `WAIT_CLOSURE_ONLY` flag overrides the default behavior, causing the + // `onOutputStreamReady` notification to be suppressed until the stream + // becomes closed. + }, stream.WAIT_CLOSURE_ONLY, 0, null); + return isClosed; +} +/** + * Utility function takes output `stream`, `onDrain`, `onClose` callbacks and + * calls one of this callbacks depending on stream state. It is guaranteed + * that only one called will be called and it will be called asynchronously. + * @param {nsIAsyncOutputStream} stream + * @param {Function} onDrain + * callback that is called when stream becomes writable. + * @param {Function} onClose + * callback that is called when stream becomes closed. + */ +function onStateChange(stream, onDrain, onClose) { + let isAsync = false; + stream.asyncWait({ + onOutputStreamReady: function onOutputStreamReady() { + // If `isAsync` was not yet set to `true` by the last line we know that + // `onOutputStreamReady` was called synchronously. In such case we just + // defer execution until next turn of event loop. + if (!isAsync) + return setTimeout(onOutputStreamReady, 0); + + // As it's not clear what is a state of the stream (TODO: Is there really + // no better way ?) we employ hack (see details in `isClosed`) to verify + // if stream is closed. + isClosed(stream) ? onClose() : onDrain(); + } + }, 0, 0, null); + isAsync = true; +} + const Stream = EventEmitter.extend({ constructor: function Stream() { }, @@ -139,11 +187,15 @@ exports.InputStream = InputStream; const OutputStream = Stream.extend({ constructor: function OutputStream(options) { - let { output } = options; + let { output, asyncOutputStream } = options; + _(this).output = output; + _(this).asyncOutputStream = asyncOutputStream; + _(this).drain = this.emit.bind(this, "drain"); + _(this).close = this.emit.bind(this, "close"); }, writable: true, write: function write(content, encoding, callback) { - let { asyncOutputStream, binaryOutputStream } = _(this); + let { asyncOutputStream, output, drain, close } = _(this); let stream = this; if (isFunction(encoding)) @@ -166,37 +218,9 @@ const OutputStream = Stream.extend({ output.writeByteArray(content.valueOf(), content.length); output.flush(); - // Setting an `nsIOutputStreamCallback` to be notified when stream is - // writable again. Which may be synchronously called before we return. - pump.asyncWait({ - onOutputStreamReady: function onDrain() { - // If callback is called earlier then outer function returned then - // we know that stream is writable so users don't need to wait for - // "drain" events. In such cases node returns `true` so we override - // `isWritable` to let caller know they can continue writing to this - // stream. - isWritten = stream.writable = true; - stream.emit("drain"); - // Calling a callback if one was passed. - if (callback) - callback(); - } - }, 0, 0, null); - // Using `nsIOutputStreamCallback` with a special flag that overrides - // the default behavior causing the `OnOutputStreamReady` notification - // to be suppressed until the stream becomes closed (either as a result of - // closeWithStatus/close being called on the stream or possibly due to - // some error in the underlying stream). - pump.asyncWait({ - onOutputStreamReady: function onClose() { - stream.writable = false; - stream.emit("close", null); - } - }, pump.WAIT_CLOSURE_ONLY, 0, null); - // Return `true` if the string has been flushed to the kernel buffer. - // Return false to indicate that the kernel buffer is full, and the data - // will be sent out in the future. - return isWritten; + if (callback) this.once("drain", callback); + onStateChange(asyncOutputStream, drain, close); + return true; } catch (error) { // If errors occur we emit appropriate event. stream.emit("error", error);