/
prototype.parseReq.js
217 lines (172 loc) · 6.38 KB
/
prototype.parseReq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/**
* Module dependencies
*/
var _ = require('lodash');
var async = require('async');
var Form = require('multiparty').Form;
var log = require('../../standalone/logger');
var STRINGFILE = require('../../stringfile');
var debug = require('debug')('skipper');
/**
* Begin parsing an incoming HTTP request (`this.req`).
*/
module.exports = function parseReq() {
var self = this;
// Save reference to `form` instance.
var form = this.form = new Form();
/**
* Receive/handle a new `part` stream from a field in the multipart upload.
* @param {stream.Readable} part
*/
form.on('part', _.bind(function(part) {
// Take care of text parameters (i.e. non-files)
if (!part.filename) {
this.onTextParam(part);
return;
}
// Custom handler for fields w/ files
this.onFile(part);
}, this));
// Only one 'error' event can ever be emitted, and if an 'error' event
// is emitted, then 'close' will NOT be emitted.
form.on('error', function(err) {
debug('multiparty form emitted error:',err);
// Emits error on any already-live Upstreams in this request.
_(self.upstreams).each(function(up) {
up.fatalIncomingError(err);
});
// Flag Parser instance with an error
self._multipartyError = err||true;
// // Stop accepting form stuff.
// // (no more new incoming files/textparams)
// self.closed = true;
// // // Informs all Upstreams in this request
// // // that no more files will be sent.
// // _(self.upstreams).each(function(up) {
// // up.noMoreFiles();
});
// Emitted after all parts have been parsed and emitted. Not emitted if an
// `error` event is emitted.
form.on('close', function() {
log((STRINGFILE.get('parser.form.onClose')).grey);
debug('multiparty form closed.');
// Flag this request as closed
// (no more new incoming files/textparams)
self.closed = true;
// Informs all Upstreams in this request
// that no more files will be sent.
_(self.upstreams).each(function(up) {
up.noMoreFiles();
});
// Uncomment this (and comment out the `noMoreFiles` business above)
// to simulate an error for testing purposes:
// up.fatalIncomingError('whee');
});
// Set up 3 conditions under which this Parser will pass control
// to app-level code (i.e. call next())
// ONLY ONE of the following must be satisfied to continue onward.
// (careful! no error allowed in callbacks!)
var timer;
var whichGuard;
async.any([
// (1)
// As soon as request body has been completely parsed.
function requestBodyCompletelyParsed(done) {
form.once('close', function() {
if (!whichGuard){
debug('passed control to app because the request "form" closed (there probably weren\'t any file uploads on this upstream)');
whichGuard = 'requestBodyCompletelyParsed';
}
done(true);
});
},
// (2)
// As soon as at least one file is received on any Upstream.
function receivedFirstFileOfRequest(done) {
self.once('firstFile', function() {
if (!whichGuard){
debug('passed control to app because first file was received');
whichGuard = 'receivedFirstFileOfRequest';
}
done(true);
});
},
// (3)
// If no files have been received by the time
// `maxWaitTimeBeforePassingControlToApp`ms have elapsed,
// go ahead and proceed.
function impatient(done) {
// Note that this is different than "maxTimeToWaitForFirstFile" and "maxTimeToBuffer"-
// rather, this is just the max number of ms that Skipper will wait before passing control
// from the body parser (i.e. calling next()).
//
// It is deliberately not configurable for now, since it's not really
var maxWaitTimeBeforePassingControlToApp = 50;
timer = setTimeout(function() {
if (!whichGuard){
debug('passed control to app because 50ms elapsed');
whichGuard = 'impatient';
}
done(true);
}, maxWaitTimeBeforePassingControlToApp);
}
],
function iterator(guard, done) {
// `guard` is one of the three functions above
guard(done);
},
// ****************************************************************
// Important: Before moving on, there is one last consideration:
//
// We must also wait until all the chunks for every textparam
// detected before the first file has been received?
// This is mostly relevant for files which are extremely small
// compared to their preceding text parameters.
//
function finally_waitForTextParams() {
// Careful: No error argument allowed in this callback!
debug('waiting for any text params');
// Make sure the `impatient` timeout fires no more than once
clearTimeout(timer);
// Take a look at all currently known text params for this Upstream,
// then wait until all of them have been read.
var ms = 5;
var numTries = 0;
async.doUntil(
function setTimer(cb) {
// Catch-all timeout, just in case something goes awry.
// Should never happen, but a good failsafe to prevent holding on to
// control forever. It this timeout was to fire, we should error out and
// cancel things.
numTries++;
if (numTries > 10) {
return cb(new Error(
'EUNFNTEX: Timed out waiting for known text parameters to finish ' +
'streaming their bytes into the server.'
));
}
setTimeout(cb, ms);
// Exponential backoff
// (multiply ms by 2 each time, up to 500)
ms = ms < 500 ? ms * 2 : ms;
},
function checkIfAllDetectedTextParamsAreDone() {
return _(self.textParams).all({
done: true
});
},
function passControlToApp(err) {
// If an error occurs, run the app's error handler.
if (err) return self.next(err);
// At last, pass control to the app.
if (!self._hasPassedControlToApp) {
debug('Passing control to app...'.green);
self._hasPassedControlToApp = true;
self.next();
}
}
);
});
// Lastly, start parsing the incoming multipart upload request.
form.parse(this.req);
};