Skip to content
This repository
Browse code

Attachment streams

  • Loading branch information...
commit 7e9f074b042e79764d966820d3d233780cf84b43 1 parent 81ec865
Matt Sergeant authored
71 attachment_stream.js
... ... @@ -0,0 +1,71 @@
  1 +"use strict";
  2 +
  3 +var Stream = require('stream');
  4 +var util = require('util');
  5 +
  6 +function AttachmentStream () {
  7 + Stream.call(this);
  8 + this.encoding = null;
  9 + this.paused = false;
  10 +}
  11 +
  12 +util.inherits(AttachmentStream, Stream);
  13 +
  14 +AttachmentStream.prototype.emit_data = function (data) {
  15 + // console.log("DATA emit");
  16 + if (this.encoding) {
  17 + this.emit('data', data.toString(this.encoding));
  18 + }
  19 + else {
  20 + this.emit('data', data);
  21 + }
  22 +}
  23 +
  24 +AttachmentStream.prototype.pipe = function (dest, options) {
  25 + var self = this;
  26 + this.paused = false;
  27 + Stream.prototype.pipe.call(this, dest, options);
  28 + dest.on('drain', function () {
  29 + // console.log("YYY: DRAIN!!!");
  30 + if (self.paused) self.resume();
  31 + });
  32 + dest.on('end', function () {
  33 + // console.log("YYY: END!!");
  34 + if (self.paused) self.resume();
  35 + })
  36 + dest.on('close', function () {
  37 + // console.log("YYY: CLOSE!!");
  38 + if (self.paused) self.resume();
  39 + })
  40 +}
  41 +
  42 +AttachmentStream.prototype.setEncoding = function (enc) {
  43 + if (enc !== 'binary') {
  44 + throw "Unable to set encoding to anything other than binary";
  45 + }
  46 + this.encoding = enc;
  47 +}
  48 +
  49 +AttachmentStream.prototype.pause = function () {
  50 + this.paused = true;
  51 + if (this.connection && this.connection.client && this.connection.client.pause) {
  52 + // console.log("YYYY: Backpressure pause");
  53 + this.connection.client.pause();
  54 + }
  55 +}
  56 +
  57 +AttachmentStream.prototype.resume = function () {
  58 + if (this.connection && this.connection.client && this.connection.client.resume) {
  59 + // console.log("YYYY: Backpressure resume");
  60 + this.connection.client.resume();
  61 + }
  62 + this.paused = false;
  63 +}
  64 +
  65 +AttachmentStream.prototype.destroy = function () {
  66 + // console.log("YYYY: Stream destroyed");
  67 +}
  68 +
  69 +exports.createStream = function () {
  70 + return new AttachmentStream ();
  71 +}
19 mailbody.js
@@ -6,6 +6,7 @@ var utils = require('./utils');
6 6 var events = require('events');
7 7 var util = require('util');
8 8 var Iconv = require('./mailheader').Iconv;
  9 +var attstr = require('./attachment_stream');
9 10
10 11 var buf_siz = 65536;
11 12
@@ -41,9 +42,9 @@ Body.prototype.parse_child = function (line) {
41 42 // see below for why we create a new buffer here.
42 43 var to_emit = new Buffer(child.buf_fill);
43 44 child.buf.copy(to_emit, 0, 0, child.buf_fill);
44   - this.emit('attachment_data', to_emit);
  45 + child.attachment_stream.emit_data(to_emit);
45 46 }
46   - this.emit('attachment_end');
  47 + child.attachment_stream.emit('end');
47 48 }
48 49
49 50 if (line.substr(this.boundary.length + 2, 2) === '--') {
@@ -114,7 +115,10 @@ Body.prototype.parse_start = function (line) {
114 115 match = ct.match(/name\s*=\s*["']?([^'";]+)["']?/i);
115 116 }
116 117 var filename = match ? match[1] : '';
117   - this.emit('attachment_start', ct, filename, this);
  118 + console.log("XXX Creating attachmentstream");
  119 + this.attachment_stream = attstr.createStream();
  120 + console.log("Attstr: ", this.attachment_stream);
  121 + this.emit('attachment_start', ct, filename, this, this.attachment_stream);
118 122 this.buf_fill = 0;
119 123 this.state = 'attachment';
120 124 }
@@ -279,8 +283,6 @@ Body.prototype.parse_multipart_preamble = function (line) {
279 283 // next section
280 284 var bod = new Body(new Header(), this.options);
281 285 this.listeners('attachment_start').forEach(function (cb) { bod.on('attachment_start', cb) });
282   - this.listeners('attachment_data' ).forEach(function (cb) { bod.on('attachment_data', cb) });
283   - this.listeners('attachment_end' ).forEach(function (cb) { bod.on('attachment_end', cb) });
284 286 this.children.push(bod);
285 287 bod.state = 'headers';
286 288 this.state = 'child';
@@ -307,9 +309,6 @@ Body.prototype.parse_attachment = function (line) {
307 309 }
308 310
309 311 var buf = this.decode_function(line);
310   - //this.emit('attachment_data', buf);
311   - //return;
312   -
313 312 if ((buf.length + this.buf_fill) > buf_siz) {
314 313 // now we have to create a new buffer, because if we write this out
315 314 // using async code, it will get overwritten under us. Creating a new
@@ -317,11 +316,11 @@ Body.prototype.parse_attachment = function (line) {
317 316 // memcpy())
318 317 var to_emit = new Buffer(this.buf_fill);
319 318 this.buf.copy(to_emit, 0, 0, this.buf_fill);
320   - this.emit('attachment_data', to_emit);
  319 + this.attachment_stream.emit_data(to_emit);
321 320 if (buf.length > buf_siz) {
322 321 // this is an unusual case - the base64/whatever data is larger
323 322 // than our buffer size, so we just emit it and reset the counter.
324   - this.emit('attachment_data', buf);
  323 + this.attachment_stream.emit_data(buf);
325 324 this.buf_fill = 0;
326 325 }
327 326 else {
29 tls_socket.js
@@ -26,12 +26,29 @@ function pluggableStream(socket) {
26 26 util.inherits(pluggableStream, stream.Stream);
27 27 util.inherits(pluggableStream, events.EventEmitter);
28 28
29   -pluggableStream.prototype.pipe = function (socket) {
30   - this.on('data', function (data) {
31   - if (socket.write)
32   - socket.write(data);
33   - });
34   -};
  29 +// This should come from Stream in node core.
  30 +// pluggableStream.prototype.pipe = function (socket) {
  31 +// this.on('data', function (data) {
  32 +// if (socket.write)
  33 +// socket.write(data);
  34 +// });
  35 +// };
  36 +
  37 +pluggableStream.prototype.pause = function () {
  38 + if (this.targetsocket.pause) {
  39 + // console.log("XXXX: Got backpressure...");
  40 + this.targetsocket.pause();
  41 + this.readable = false;
  42 + }
  43 +}
  44 +
  45 +pluggableStream.prototype.resume = function () {
  46 + if (this.targetsocket.resume) {
  47 + // console.log("XXXX: Resuming backpressure...");
  48 + this.targetsocket.resume();
  49 + this.readable = true;
  50 + }
  51 +}
35 52
36 53 pluggableStream.prototype.attach = function (socket) {
37 54 var self = this;
4 transaction.js
@@ -88,10 +88,6 @@ Transaction.prototype.attachment_hooks = function (start, data, end) {
88 88 this.parse_body = 1;
89 89 this.body = this.body || new body.Body(this.header, {"banner": this.banner});
90 90 this.body.on('attachment_start', start);
91   - if (data)
92   - this.body.on('attachment_data', data);
93   - if (end)
94   - this.body.on('attachment_end', end);
95 91 };
96 92
97 93 Transaction.prototype.set_banner = function (text, html) {

0 comments on commit 7e9f074

Please sign in to comment.
Something went wrong with that request. Please try again.