Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updated to work with newer versions of fstream

  • Loading branch information...
commit d47cd471bcae2e530078fa1d013cfb31f4f416ea 1 parent 6826be7
@DamonOehlman authored
View
2  examples/download-attachments.js
@@ -2,7 +2,7 @@ var attachmate = require('../'),
fstream = require('fstream'),
path = require("path"),
- r = new attachmate.Reader({ path: 'http://10.211.55.4:5984/steelmesh/app::tripplanner' }),
+ r = fstream.Reader({ type: attachmate.Reader, path: 'http://localhost:5984/steelmesh/app::tripplanner' }),
w = fstream.Writer({ path: path.resolve(__dirname, 'output'), type: 'Directory'});
// pipe the attachments to the directory
View
2  examples/download-short.js
@@ -2,7 +2,7 @@ var attachmate = require('../'),
path = require('path');
attachmate.download(
- 'http://10.211.55.4:5984/steelmesh/test',
+ 'http://localhost:5984/steelmesh/app::tripplanner',
path.resolve(__dirname, 'output'),
function(err) {
console.log('done, error = ', err);
View
108 lib/reader.js
@@ -1,7 +1,9 @@
var debug = require('debug')('attachmate'),
request = require('request'),
util = require('util'),
+ url = require('url'),
HttpRequestEntry = require('./requestentry'),
+ Reader = require('fstream').Reader,
reTrailingSlash = /\/$/;
function AttachmentReader(props) {
@@ -11,11 +13,13 @@ function AttachmentReader(props) {
this._entries = null;
this._index = -1;
this._ended = false;
+ this._paused = false;
+ this._currentEntry = null;
this._read();
}
-util.inherits(AttachmentReader, require('fstream').Reader);
+util.inherits(AttachmentReader, Reader);
AttachmentReader.prototype._getEntries = function() {
var reader = this;
@@ -44,36 +48,110 @@ AttachmentReader.prototype._getEntries = function() {
};
AttachmentReader.prototype._read = function() {
- var reader = this, entry, entryPath;
-
+ var reader = this, entry, entryPath,
+ ended = false;
+
if (! this._entries) {
return this._getEntries();
}
+ function __onEnd() {
+ if (ended) return;
+
+ ended = true;
+ reader.emit('entryEnd', entry);
+ reader._currentEntry = null;
+ reader._read();
+ }
+
+ // if paused or processing a current entry, then return
+ debug('attempting read, paused: ' + this._paused);
+ if (this._paused || this._currentEntry) return;
+
+ // increment the index and process the next file
this._index++;
if (this._index >= this._entries.length) {
if (! this._ended) {
+ debug('reading ended: emitting close and end events');
+
this._ended = true;
this.emit('end');
this.emit('close');
}
- return undefined;
+ return;
}
+
+ // create the entry
+ entry = this._currentEntry = new HttpRequestEntry(this.path, this._entries[this._index])
+ .once('ready', function __emitChild() {
+ debug('entry ready: ' + entry.url);
+
+ if (reader._paused) {
+ entry.pause(me);
+
+ return reader.once('resume', __emitChild);
+ }
+
+ reader.emit('entry', entry);
+ })
+
+ .on('pause', function(who) {
+ if (! reader._paused) {
+ reader.pause(who);
+ }
+ })
+
+ .on('resume', function(who) {
+ if (reader._paused) {
+ reader.resume(who);
+ }
+ })
+
+ .on('end', __onEnd)
+ .on('error', function() {
+ debug('encountered error reading entry for url');
+ reader.emit('error', err);
+ });
+
+ return undefined;
+};
- debug('found: ' + this.path.replace(reTrailingSlash, '') + this._entries[this._index]);
- entry = new HttpRequestEntry(this.path.replace(reTrailingSlash, ''), this._entries[this._index]);
- this.emit('entry', entry);
+AttachmentReader.prototype.pause = function (who) {
+ if (this._paused) return;
- entry.on('end', function() {
- reader._read();
- });
- entry.on('error', function(err) {
- reader.emit(err);
- });
+ who = who || this;
+ if (who !== this._currentEntry) {
+ debug('paused requested of reader');
+ }
+
+ this._paused = true;
- return undefined;
-};
+ if (this._currentEntry) {
+ this._currentEntry.pause(who);
+ }
+
+ this.emit('pause', who);
+}
+
+AttachmentReader.prototype.resume = function (who) {
+ if (! this._paused) return;
+
+ who = who || this;
+ if (who !== this._currentEntry) {
+ debug('resume requested of reader');
+ }
+
+ this._paused = false;
+ this.emit('resume', who);
+
+ if (this._currentEntry) {
+ this._currentEntry.resume(who);
+ }
+ else {
+ this._read();
+ }
+}
module.exports = AttachmentReader;
View
141 lib/requestentry.js
@@ -1,79 +1,126 @@
-var Stream = require("stream").Stream,
+var debug = require('debug')('attachmate'),
+ Stream = require('stream').Stream,
+ Reader = require('fstream').Reader,
util = require('util'),
http = require('http'),
url = require('url'),
- request = require('request');
+ reTrailingSlash = /\/$/,
+ request = require('request'),
+ EOF = { EOF: true },
+ CLOSE = { CLOSE: true };
function HttpRequestEntry(targetDoc, path) {
- var entry = this;
-
this.path = path;
+ this.url = targetDoc.replace(reTrailingSlash, '') + '/' + path;
this.props = {};
- this._paused = false;
- this._pipeTarget = null;
+ this._paused = true;
this._buffer = [];
- this._request = http.get(url.parse(targetDoc + '/' + path), function(couchRes) {
- // handle data
- couchRes.on('data', function(chunk) {
- if (entry._paused) {
+ // emit the ready event on the next tick
+ process.nextTick(this.emit.bind(this, 'ready'));
+} // HttpRequestEntry
+
+util.inherits(HttpRequestEntry, Reader);
+
+HttpRequestEntry.prototype._getStream = function() {
+ var entry = this;
+
+ debug('making request for: ' + this.url);
+ this._stream = request(this.url);
+
+ this._stream
+ .on('data', function(chunk) {
+ debug('got data for: ' + entry.url);
+ if (chunk.length == 0) {
+ return;
+ }
+ if (entry._paused || entry._buffer.length) {
entry._buffer.push(chunk);
}
else {
entry.emit('data', chunk);
}
- });
+ })
+ .on('end', function() {
+ debug('request complete: ' + entry.url + ', buffer: ', entry._buffer);
+ if (entry._paused || entry._buffer.length) {
+ entry._buffer.push(EOF);
+ entry._buffer.push(CLOSE);
+ entry._read();
+ }
+ else {
+ entry.emit('end');
+ entry.emit('close');
+ }
+ })
+ .on('error', entry.emit.bind(entry));
- // handle the response end
- couchRes.on('end', function() {
- entry._flush();
- entry.emit('end');
- });
- });
-
- this._request.on('error', function(e) {
- entry.emit('close');
- });
-
- this._request.setSocketKeepAlive(false);
+ return this._stream;
+};
+
+HttpRequestEntry.prototype._read = function() {
+ var entry = this;
- /*
- this._request.setTimeout(10000, function() {
- if (! responseStarted) {
- // flag as timed out
- timedOut = true;
+ if (this._paused) {
+ debug('paused, aborting read');
+ return;
+ }
- docRequest.abort();
- mesh.log.warn('request for \'' + targetDoc + '\' timed out');
- res.send('Timed out: ' + new Date().getTime(), 500);
+ // ensure we have a stream to work with
+ this._stream = this._stream || this._getStream();
+
+ // clear out the buffer, if there is one.
+ if (this._buffer.length) {
+ var buf = this._buffer
+ for (var i = 0, l = buf.length; i < l; i ++) {
+ var c = buf[i]
+ if (c === EOF) {
+ this.emit("end")
+ } else if (c === CLOSE) {
+ this.emit("close")
+ } else {
+ this.emit("data", c)
}
- });
- */
-} // HttpRequestEntry
-util.inherits(HttpRequestEntry, Stream);
-
-HttpRequestEntry.prototype._flush = function() {
- for (var ii = 0, count = this._buffer.length; ii < count; ii++) {
- this.emit('data', this._buffer[ii]);
+ if (this._paused) {
+ this._buffer = buf.slice(i)
+ return
+ }
+ }
+
+ this._buffer.length = 0
}
-
- // reset the buffer
- this._buffer = [];
};
HttpRequestEntry.prototype.pause = function(who) {
+ if (this._paused) return;
+
+ who = who || this;
+
+ debug('pausing: ' + this.url);
+
this._paused = true;
- this.emit('pause', who || this);
+ if (this._stream) {
+ this._stream.pause();
+ }
+
+ this.emit('pause',who);
+
+ // TODO: this should not be required, but for some reason
+ // the stream is never getting resumed... must ask @isaacs about this...
+ setTimeout(this.resume.bind(this), 500);
};
HttpRequestEntry.prototype.resume = function(who) {
- if (this._paused) {
- this._flush();
- }
+ if (! this._paused) return;
+
+ debug('resume: ' + this.url);
- this._paused = false;
this.emit('resume', who || this);
+ this._paused = false;
+
+ if (this._stream) this._stream.resume()
+ this._read();
};
module.exports = HttpRequestEntry;
View
7 package.json
@@ -11,13 +11,12 @@
},
"dependencies": {
"inherits": ">= 1.0.0",
- "request": "2.2.x",
- "fstream": ">= 0.1.3",
- "mime": ">= 1.0.0",
+ "request": "2.9.x",
+ "fstream": "0.1.x",
+ "mime": "1.2.x",
"debug": "*"
},
"devDependencies": {
- "vows": "0.5.x"
},
"repository": {
"type": "git",
Please sign in to comment.
Something went wrong with that request. Please try again.