Permalink
Browse files

Factoring out reusable functionality of Input / Output streams into s…

…tream module.
  • Loading branch information...
1 parent 0d3e8cb commit 3b1f7c085bb57ccc1216900f99387f39ae640617 @Gozala committed May 12, 2011
Showing with 282 additions and 182 deletions.
  1. +27 −177 lib/fs.js
  2. +255 −5 lib/stream.js
View
204 lib/fs.js
@@ -1,11 +1,12 @@
/* vim:set ts=2 sw=2 sts=2 expandtab */
"use strict";
+const { Cc, Ci, components: { Constructor: CC } } = require("chrome");
+
const { setTimeout } = require("timer");
-const { Stream } = require("./stream");
+const { Stream, InputStream, OutputStream } = require("./stream");
const { Buffer } = require("./buffer");
const { Extendable } = require("./extendables");
-const { Cc, Ci, components: { Constructor: CC } } = require("chrome");
// `Namespace` declared by E4X so `const` fails.
let { Namespace } = require("./namespace");
@@ -106,12 +107,10 @@ function Flags(flag) {
FLAGS[flag] || new Error('Unknown file open flag: ' + flag);
}
-const ReadStream = Stream.extend({
+const ReadStream = InputStream.extend({
constructor: function ReadStream(path, options) {
options = options || {};
- if ('mode' in options && options.mode)
- this.mode = options.mode;
if ('flags' in options && options.flags)
this.flags = options.flags;
if ('bufferSize' in options && options.bufferSize)
@@ -134,74 +133,24 @@ const ReadStream = Stream.extend({
// non-blocking stream semantics. Also we use defaults for segment size &
// count.
let asyncInputStream = transport.openInputStream(null, 0, 0);
- _(this).asyncInputStream = asyncInputStream;
let binaryInputStream = BinaryInputStream(asyncInputStream);
- _(this).binaryInputStream = binaryInputStream;
let pump = StreamPump(asyncInputStream, position, length, 0, 0, false);
- _(this).pump = pump;
+
+ InputStream.call(this, { input: binaryInputStream, pump: pump });
this.read();
},
- path: null,
- encoding: null,
position: -1,
length: -1,
- readable: true,
- paused: false,
flags: 'r',
mode: FILE_PERMISSION,
- bufferSize: 64 * 1024,
- get status() { return _(this).request.status; },
- setEncoding: function setEncoding(encoding) {
- this.encoding = String(encoding).toUpperCase();
- },
- read: function read() {
- let [ stream, { binaryInputStream, pump } ] = [ this, _(this) ];
- pump.asyncRead({
- onStartRequest: function onStartRequest() { stream.emit("start"); },
- onDataAvailable: function onDataAvailable(req, c, input, offset, count) {
- try {
- let bytes = binaryInputStream.readByteArray(count);
- stream.emit("data", new Buffer(bytes, stream.encoding));
- } catch (error) {
- stream.emit("error", error);
- stream.readable = false;
- }
- },
- onStopRequest: function onStopRequest() {
- stream.readable = false;
- stream.emit("end");
- }
- }, null);
- },
- pause: function pause() {
- this.paused = true;
- _(this).pump.suspend();
- },
- resume: function resume() {
- this.paused = false;
- _(this).pump.resume();
- },
- destroy: function destroy() {
- this.readable = false;
- try {
- this.emit("close", null);
- _(this).pump.cancel(null);
- delete _(this).pump;
- _(this).binaryInputStream.close();
- delete _(this).binaryInputStream;
- _(this).asyncInputStream.close();
- delete _(this).asyncInputStream;
- } catch (error) {
- this.emit("error", error);
- }
- }
+ bufferSize: 64 * 1024
});
exports.ReadStream = ReadStream;
exports.createReadStream = function createReadStream(path, options) {
return new ReadStream(path, options);
};
-const WriteStream = Stream.extend({
+const WriteStream = OutputStream.extend({
constructor: function WriteStream(path, options) {
options = options || {};
@@ -212,127 +161,28 @@ const WriteStream = Stream.extend({
if ('position' in options && options.position !== undefined)
this.position = options.position;
- try {
- let { mode, flags } = this;
- // If pass was passed we create a file descriptor out of it. Otherwise
- // we just use given file descriptor.
- let { output } = isString(path) ? openSync(path, flags, mode) : _(path);
- // We use `nsIStreamTransportService` service to transform blocking
- // file output stream into a fully asynchronous stream that can be written
- // without blocking the main thread.
- let transport = createOutputTransport(output, this.position, -1, false);
- // Open an output stream on a transport. We don't pass flags to guarantee
- // non-blocking stream semantics. Also we use defaults for segment size &
- // count.
- let asyncOutputStream = transport.openOutputStream(null, 0, 0);
- _(this).asyncOutputStream = asyncOutputStream;
- // Finally we create a non-blocking binary output stream. This will allows
- // us to write buffers as byte arrays without any further transcoding.
- let binaryOutputStream = BinaryOutputStream(asyncOutputStream);
- // Storing output stream so that it can be accessed later.
- _(this).binaryOutputStream = binaryOutputStream;
- } catch (error) {
- this.emit("error", error);
- }
+ let { mode, flags, position } = this;
+ // If pass was passed we create a file descriptor out of it. Otherwise
+ // we just use given file descriptor.
+ let { output } = isString(path) ? openSync(path, flags, mode) : _(path);
+ // We use `nsIStreamTransportService` service to transform blocking
+ // file output stream into a fully asynchronous stream that can be written
+ // without blocking the main thread.
+ let transport = createOutputTransport(output, position, -1, false);
+ // Open an output stream on a transport. We don't pass flags to guarantee
+ // non-blocking stream semantics. Also we use defaults for segment size &
+ // count.
+ let asyncOutputStream = transport.openOutputStream(null, 0, 0);
+ // Finally we create a non-blocking binary output stream. This will allows
+ // us to write buffers as byte arrays without any further transcoding.
+ let binaryOutputStream = BinaryOutputStream(asyncOutputStream);
+ // Storing output stream so that it can be accessed later.
+ OutputStream.call(this, { output: binaryOutputStream, transport: transport });
},
- writable: true,
drainable: true,
- encoding: null,
+ flags: 'r',
position: -1,
- flags: 'w',
- mode: FILE_PERMISSION,
- write: function write(content, encoding, callback) {
- let { asyncOutputStream, binaryOutputStream } = _(this);
- let stream = this;
-
- if (isFunction(encoding))
- [ callback, encoding ] = [ encoding, callback ];
-
- // Flag indicating whether or not content has been flushed to the kernel
- // buffer.
- let isWritten = false;
- // If stream is not writable we throw an error.
- if (!stream.writable)
- throw new Error('stream not writable');
-
- try {
- // If content is not a buffer then we create one out of it.
- if (!Buffer.isBuffer(content))
- content = new Buffer(content, encoding);
-
- // We write content as a byte array as this will avoid any transcoding
- // if content was a buffer.
- binaryOutputStream.writeByteArray(content.valueOf(), content.length);
- binaryOutputStream.flush();
-
- // Setting an `nsIOutputStreamCallback` to be notified when stream is
- // writable again. Which may be synchronously called before we return.
- asyncOutputStream.asyncWait({
- onOutputStreamReady: function onDrain() {
- // If callback is called earlier then outer function returned then
- // we know that stream is writable so users don't need to wait for
- // "drain" events. In such cases node returns `true` so we override
- // `isWritable` to let caller know they can continue writing to this
- // stream.
- isWritten = stream.writable = true;
- stream.emit("drain");
- // Calling a callback if one was passed.
- if (callback)
- callback();
- }
- }, 0, 0, null);
- // Using `nsIOutputStreamCallback` with a special flag that overrides
- // the default behavior causing the `OnOutputStreamReady` notification
- // to be suppressed until the stream becomes closed (either as a result of
- // closeWithStatus/close being called on the stream or possibly due to
- // some error in the underlying stream).
- asyncOutputStream.asyncWait({
- onOutputStreamReady: function onClose() {
- stream.writable = false;
- stream.emit("close", null);
- }
- }, asyncOutputStream.WAIT_CLOSURE_ONLY, 0, null);
- // Return `true` if the string has been flushed to the kernel buffer.
- // Return false to indicate that the kernel buffer is full, and the data
- // will be sent out in the future.
- return isWritten;
- } catch (error) {
- // If errors occur we emit appropriate event.
- stream.emit("error", error);
- }
- },
- flush: function flush() {
- _(this).binaryOutputStream.flush();
- },
- end: function end(content, encoding, callback) {
- if (isFunction(content))
- [ callback, content ] = [ content, callback ];
- if (isFunction(encoding))
- [ callback, encoding ] = [ encoding, callback ];
-
- // Setting a listener to "close" event if passed.
- if (isFunction(callback))
- this.once("close", callback);
-
- // If content is passed then we defer closing until we finish with writing.
- if (content)
- this.write(content, encoding, end.bind(this));
- // If we don't write anything, then we close an outputStream.
- else
- _(this).asyncOutputStream.close();
- },
- destroy: function destroy(callback) {
- try {
- this.end(callback);
- delete _(this).asyncOutputStream;
- delete _(this).binaryOutputStream;
- } catch (error) {
- this.emit("error", error);
- }
- },
- destroySoon: function destroySoon() {
- this.destroy();
- }
+ mode: FILE_PERMISSION
});
exports.WriteStream = WriteStream;
exports.createWriteStream = function createWriteStream(path, options) {
Oops, something went wrong.

0 comments on commit 3b1f7c0

Please sign in to comment.