Skip to content

Commit

Permalink
Making stream writing a bit less messy.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gozala committed May 12, 2011
1 parent 3b1f7c0 commit afa8c03
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 60 deletions.
38 changes: 11 additions & 27 deletions lib/fs.js
Expand Up @@ -10,27 +10,19 @@ const { Extendable } = require("./extendables");
// `Namespace` declared by E4X so `const` fails.
let { Namespace } = require("./namespace");

// const ioService = CC("@mozilla.org/network/io-service;1", "nsIIOService")();
// const ioUtils = CC("@mozilla.org/io-util;1", "nsIIOUtil")();

const RawFile = CC("@mozilla.org/file/local;1", "nsILocalFile",
"initWithPath");
const FileOutputStream = CC("@mozilla.org/network/file-output-stream;1",
"nsIFileOutputStream", "init");
const FileInputStream = CC("@mozilla.org/network/file-input-stream;1",
"nsIFileInputStream", "init");
const StreamCopier = CC("@mozilla.org/network/async-stream-copier;1",
"nsIAsyncStreamCopier", "init");
// const StringStream = CC("@mozilla.org/io/string-input-stream;1",
// "nsIStringInputStream", "setData");

const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
"nsIBinaryInputStream", "setInputStream");
const BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1",
"nsIBinaryOutputStream", "setOutputStream");
const StreamPump = CC("@mozilla.org/network/input-stream-pump;1",
"nsIInputStreamPump", "init");
// const StreamPipe = CC("@mozilla.org/pipe;1", "nsIPipe", "init");

const { createOutputTransport, createInputTransport } =
CC("@mozilla.org/network/stream-transport-service;1",
Expand Down Expand Up @@ -88,14 +80,7 @@ function defer(wrapped) {
function getFileName(file) {
return file.QueryInterface(Ci.nsIFile).leafName;
}
/*
function getFileURI(file) {
return ioService.newFileURI(file);
}
function getFileChannel(file) {
return ioService.newChannelFromURI(getFileURI(file));
}
*/

function remove(path, recursive) {
return new RawFile(path).remove(recursive || false);
}
Expand Down Expand Up @@ -165,6 +150,9 @@ const WriteStream = OutputStream.extend({
// If pass was passed we create a file descriptor out of it. Otherwise
// we just use given file descriptor.
let { output } = isString(path) ? openSync(path, flags, mode) : _(path);
// Setting a stream position, unless it's `-1` which means current position.
if (position >= 0)
output.QueryInterface(Ci.nsISeekableStream).seek(NS_SEEK_SET, position);
// We use `nsIStreamTransportService` service to transform blocking
// file output stream into a fully asynchronous stream that can be written
// without blocking the main thread.
Expand All @@ -177,7 +165,10 @@ const WriteStream = OutputStream.extend({
// us to write buffers as byte arrays without any further transcoding.
let binaryOutputStream = BinaryOutputStream(asyncOutputStream);
// Storing output stream so that it can be accessed later.
OutputStream.call(this, { output: binaryOutputStream, transport: transport });
OutputStream.call(this, {
output: binaryOutputStream,
asyncOutputStream: asyncOutputStream
});
},
drainable: true,
flags: 'r',
Expand Down Expand Up @@ -456,15 +447,12 @@ exports.readdir = Async(exports.readdirSync);
exports.closeSync = function closeSync(fd) {
fd = _(fd);

delete fd.rawFile;
// Closing input stream and removing reference.
if (fd.input)
fd.close();
delete fd.input;
fd.input.close();
// Closing output stream and removing reference.
if (fd.output)
fd.close();
delete fd.output;
fd.output.close();
};
/**
* Asynchronous close(2). No arguments other than a possible exception are
Expand Down Expand Up @@ -533,11 +521,7 @@ exports.write = function write(fd, buffer, offset, length, position, callback) {
buffer = buffer.slice(offset, offset + length);
}

let { mode, flags } = _(fd);
let writeStream = new WriteStream(fd, {
mode: mode,
flags: flags
});
let writeStream = new WriteStream(fd, { position: position, length: length });
writeStream.on("error", callback);
writeStream.write(buffer, function onEnd() {
writeStream.destroy();
Expand Down
90 changes: 57 additions & 33 deletions lib/stream.js
Expand Up @@ -4,12 +4,60 @@

const { EventEmitter } = require("./events");
const { Buffer } = require("./buffer");
const { setTimeout } = require("timer")
// `Namespace` declared by E4X so `const` fails.
let { Namespace } = require("./namespace");

function isFunction(value) { return typeof value === "function"; }
const _ = new Namespace();

/**
* Utility function / hack that we use to figure if output stream is closed.
*/
function isClosed(stream) {
// We assume that stream is not closed.
let isClosed = false;
stream.asyncWait({
// If `onClose` callback is called before outer function returns
// (synchronously) `isClosed` will be set to `true` identifying
// that stream is closed.
onOutputStreamReady: function onClose() isClosed = true

// `WAIT_CLOSURE_ONLY` flag overrides the default behavior, causing the
// `onOutputStreamReady` notification to be suppressed until the stream
// becomes closed.
}, stream.WAIT_CLOSURE_ONLY, 0, null);
return isClosed;
}
/**
* Utility function takes output `stream`, `onDrain`, `onClose` callbacks and
* calls one of this callbacks depending on stream state. It is guaranteed
* that only one called will be called and it will be called asynchronously.
* @param {nsIAsyncOutputStream} stream
* @param {Function} onDrain
* callback that is called when stream becomes writable.
* @param {Function} onClose
* callback that is called when stream becomes closed.
*/
function onStateChange(stream, onDrain, onClose) {
let isAsync = false;
stream.asyncWait({
onOutputStreamReady: function onOutputStreamReady() {
// If `isAsync` was not yet set to `true` by the last line we know that
// `onOutputStreamReady` was called synchronously. In such case we just
// defer execution until next turn of event loop.
if (!isAsync)
return setTimeout(onOutputStreamReady, 0);

// As it's not clear what is a state of the stream (TODO: Is there really
// no better way ?) we employ hack (see details in `isClosed`) to verify
// if stream is closed.
isClosed(stream) ? onClose() : onDrain();
}
}, 0, 0, null);
isAsync = true;
}

const Stream = EventEmitter.extend({
constructor: function Stream() {
},
Expand Down Expand Up @@ -139,11 +187,15 @@ exports.InputStream = InputStream;

const OutputStream = Stream.extend({
constructor: function OutputStream(options) {
let { output } = options;
let { output, asyncOutputStream } = options;
_(this).output = output;
_(this).asyncOutputStream = asyncOutputStream;
_(this).drain = this.emit.bind(this, "drain");
_(this).close = this.emit.bind(this, "close");
},
writable: true,
write: function write(content, encoding, callback) {
let { asyncOutputStream, binaryOutputStream } = _(this);
let { asyncOutputStream, output, drain, close } = _(this);
let stream = this;

if (isFunction(encoding))
Expand All @@ -166,37 +218,9 @@ const OutputStream = Stream.extend({
output.writeByteArray(content.valueOf(), content.length);
output.flush();

// Setting an `nsIOutputStreamCallback` to be notified when stream is
// writable again. Which may be synchronously called before we return.
pump.asyncWait({
onOutputStreamReady: function onDrain() {
// If callback is called earlier then outer function returned then
// we know that stream is writable so users don't need to wait for
// "drain" events. In such cases node returns `true` so we override
// `isWritable` to let caller know they can continue writing to this
// stream.
isWritten = stream.writable = true;
stream.emit("drain");
// Calling a callback if one was passed.
if (callback)
callback();
}
}, 0, 0, null);
// Using `nsIOutputStreamCallback` with a special flag that overrides
// the default behavior causing the `OnOutputStreamReady` notification
// to be suppressed until the stream becomes closed (either as a result of
// closeWithStatus/close being called on the stream or possibly due to
// some error in the underlying stream).
pump.asyncWait({
onOutputStreamReady: function onClose() {
stream.writable = false;
stream.emit("close", null);
}
}, pump.WAIT_CLOSURE_ONLY, 0, null);
// Return `true` if the string has been flushed to the kernel buffer.
// Return false to indicate that the kernel buffer is full, and the data
// will be sent out in the future.
return isWritten;
if (callback) this.once("drain", callback);
onStateChange(asyncOutputStream, drain, close);
return true;
} catch (error) {
// If errors occur we emit appropriate event.
stream.emit("error", error);
Expand Down

0 comments on commit afa8c03

Please sign in to comment.