Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'attachment_streams'

  • Loading branch information...
commit 2514cd614e4fb221710bf49068542da8c4f010c3 2 parents 92dd16d + 7e9f074
@baudehlo authored
Showing with 103 additions and 20 deletions.
  1. +71 −0 attachment_stream.js
  2. +9 −10 mailbody.js
  3. +23 −6 tls_socket.js
  4. +0 −4 transaction.js
View
71 attachment_stream.js
@@ -0,0 +1,71 @@
+"use strict";
+
+var Stream = require('stream');
+var util = require('util');
+
+function AttachmentStream () {
+ Stream.call(this);
+ this.encoding = null;
+ this.paused = false;
+}
+
+util.inherits(AttachmentStream, Stream);
+
+AttachmentStream.prototype.emit_data = function (data) {
+ // console.log("DATA emit");
+ if (this.encoding) {
+ this.emit('data', data.toString(this.encoding));
+ }
+ else {
+ this.emit('data', data);
+ }
+}
+
+AttachmentStream.prototype.pipe = function (dest, options) {
+ var self = this;
+ this.paused = false;
+ Stream.prototype.pipe.call(this, dest, options);
+ dest.on('drain', function () {
+ // console.log("YYY: DRAIN!!!");
+ if (self.paused) self.resume();
+ });
+ dest.on('end', function () {
+ // console.log("YYY: END!!");
+ if (self.paused) self.resume();
+ })
+ dest.on('close', function () {
+ // console.log("YYY: CLOSE!!");
+ if (self.paused) self.resume();
+ })
+}
+
+AttachmentStream.prototype.setEncoding = function (enc) {
+ if (enc !== 'binary') {
+ throw "Unable to set encoding to anything other than binary";
+ }
+ this.encoding = enc;
+}
+
+AttachmentStream.prototype.pause = function () {
+ this.paused = true;
+ if (this.connection && this.connection.client && this.connection.client.pause) {
+ // console.log("YYYY: Backpressure pause");
+ this.connection.client.pause();
+ }
+}
+
+AttachmentStream.prototype.resume = function () {
+ if (this.connection && this.connection.client && this.connection.client.resume) {
+ // console.log("YYYY: Backpressure resume");
+ this.connection.client.resume();
+ }
+ this.paused = false;
+}
+
+AttachmentStream.prototype.destroy = function () {
+ // console.log("YYYY: Stream destroyed");
+}
+
+exports.createStream = function () {
+ return new AttachmentStream ();
+}
View
19 mailbody.js
@@ -6,6 +6,7 @@ var utils = require('./utils');
var events = require('events');
var util = require('util');
var Iconv = require('./mailheader').Iconv;
+var attstr = require('./attachment_stream');
var buf_siz = 65536;
@@ -41,9 +42,9 @@ Body.prototype.parse_child = function (line) {
// see below for why we create a new buffer here.
var to_emit = new Buffer(child.buf_fill);
child.buf.copy(to_emit, 0, 0, child.buf_fill);
- this.emit('attachment_data', to_emit);
+ child.attachment_stream.emit_data(to_emit);
}
- this.emit('attachment_end');
+ child.attachment_stream.emit('end');
}
if (line.substr(this.boundary.length + 2, 2) === '--') {
@@ -114,7 +115,10 @@ Body.prototype.parse_start = function (line) {
match = ct.match(/name\s*=\s*["']?([^'";]+)["']?/i);
}
var filename = match ? match[1] : '';
- this.emit('attachment_start', ct, filename, this);
+ console.log("XXX Creating attachmentstream");
+ this.attachment_stream = attstr.createStream();
+ console.log("Attstr: ", this.attachment_stream);
+ this.emit('attachment_start', ct, filename, this, this.attachment_stream);
this.buf_fill = 0;
this.state = 'attachment';
}
@@ -279,8 +283,6 @@ Body.prototype.parse_multipart_preamble = function (line) {
// next section
var bod = new Body(new Header(), this.options);
this.listeners('attachment_start').forEach(function (cb) { bod.on('attachment_start', cb) });
- this.listeners('attachment_data' ).forEach(function (cb) { bod.on('attachment_data', cb) });
- this.listeners('attachment_end' ).forEach(function (cb) { bod.on('attachment_end', cb) });
this.children.push(bod);
bod.state = 'headers';
this.state = 'child';
@@ -307,9 +309,6 @@ Body.prototype.parse_attachment = function (line) {
}
var buf = this.decode_function(line);
- //this.emit('attachment_data', buf);
- //return;
-
if ((buf.length + this.buf_fill) > buf_siz) {
// now we have to create a new buffer, because if we write this out
// using async code, it will get overwritten under us. Creating a new
@@ -317,11 +316,11 @@ Body.prototype.parse_attachment = function (line) {
// memcpy())
var to_emit = new Buffer(this.buf_fill);
this.buf.copy(to_emit, 0, 0, this.buf_fill);
- this.emit('attachment_data', to_emit);
+ this.attachment_stream.emit_data(to_emit);
if (buf.length > buf_siz) {
// this is an unusual case - the base64/whatever data is larger
// than our buffer size, so we just emit it and reset the counter.
- this.emit('attachment_data', buf);
+ this.attachment_stream.emit_data(buf);
this.buf_fill = 0;
}
else {
View
29 tls_socket.js
@@ -26,12 +26,29 @@ function pluggableStream(socket) {
util.inherits(pluggableStream, stream.Stream);
util.inherits(pluggableStream, events.EventEmitter);
-pluggableStream.prototype.pipe = function (socket) {
- this.on('data', function (data) {
- if (socket.write)
- socket.write(data);
- });
-};
+// This should come from Stream in node core.
+// pluggableStream.prototype.pipe = function (socket) {
+// this.on('data', function (data) {
+// if (socket.write)
+// socket.write(data);
+// });
+// };
+
+pluggableStream.prototype.pause = function () {
+ if (this.targetsocket.pause) {
+ // console.log("XXXX: Got backpressure...");
+ this.targetsocket.pause();
+ this.readable = false;
+ }
+}
+
+pluggableStream.prototype.resume = function () {
+ if (this.targetsocket.resume) {
+ // console.log("XXXX: Resuming backpressure...");
+ this.targetsocket.resume();
+ this.readable = true;
+ }
+}
pluggableStream.prototype.attach = function (socket) {
var self = this;
View
4 transaction.js
@@ -94,10 +94,6 @@ Transaction.prototype.attachment_hooks = function (start, data, end) {
this.parse_body = 1;
this.body = this.body || new body.Body(this.header, {"banner": this.banner});
this.body.on('attachment_start', start);
- if (data)
- this.body.on('attachment_data', data);
- if (end)
- this.body.on('attachment_end', end);
};
Transaction.prototype.set_banner = function (text, html) {
Please sign in to comment.
Something went wrong with that request. Please try again.