Skip to content

Commit

Permalink
followProgress, JSONstream
Browse files Browse the repository at this point in the history
  • Loading branch information
apocas committed Feb 4, 2020
1 parent 419cd89 commit 8be3b43
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 215 deletions.
123 changes: 69 additions & 54 deletions lib/modem.js
Expand Up @@ -10,10 +10,9 @@ var querystring = require('querystring'),
util = require('util'),
url = require('url'),
splitca = require('split-ca'),
JSONStream = require('JSONStream'),
isWin = require('os').type() === 'Windows_NT';

var defaultOpts = function() {
var defaultOpts = function () {
var host;
var opts = {};

Expand All @@ -31,12 +30,12 @@ var defaultOpts = function() {
opts.socketPath = process.env.DOCKER_HOST.substring(8) || '//./pipe/docker_engine';
} else {
var hostStr = process.env.DOCKER_HOST;
if(hostStr.indexOf('\/\/') < 0) {
if (hostStr.indexOf('\/\/') < 0) {
hostStr = 'tcp://' + hostStr;
}
try {
host = new url.URL(hostStr);
} catch(err) {
} catch (err) {
throw new Error('DOCKER_HOST env variable should be something like tcp://localhost:1234');
}

Expand Down Expand Up @@ -69,7 +68,7 @@ var defaultOpts = function() {
};


var Modem = function(options) {
var Modem = function (options) {
var opts = Object.assign({}, defaultOpts(), options);

this.socketPath = opts.socketPath;
Expand All @@ -93,7 +92,7 @@ var Modem = function(options) {
this.protocol = opts.protocol || this.protocol || 'http';
};

Modem.prototype.dial = function(options, callback) {
Modem.prototype.dial = function (options, callback) {
var opts, address, data;
var self = this;

Expand Down Expand Up @@ -166,7 +165,7 @@ Modem.prototype.dial = function(options, callback) {
optionsf.headers['Content-Type'] = 'application/tar';
} else if (opts && options.method === 'POST') {
data = JSON.stringify(opts._body || opts);
if(options.allowEmpty) {
if (options.allowEmpty) {
optionsf.headers['Content-Type'] = 'application/json';
} else {
if (data !== '{}' && data !== '""') {
Expand Down Expand Up @@ -202,33 +201,33 @@ Modem.prototype.dial = function(options, callback) {
this.buildRequest(optionsf, options, data, callback);
};

Modem.prototype.buildRequest = function(options, context, data, callback) {
Modem.prototype.buildRequest = function (options, context, data, callback) {
var self = this;
var connectionTimeoutTimer;

var opts = self.protocol === 'ssh' ? Object.assign(options, {
agent: ssh({'host': self.host, 'port': self.port, 'username': self.username, 'agent': self.sshAuthAgent}),
agent: ssh({ 'host': self.host, 'port': self.port, 'username': self.username, 'agent': self.sshAuthAgent }),
protocol: 'http:'
}) : options;

var req = http[self.protocol === 'ssh' ? 'http': self.protocol].request(opts, function() {});
var req = http[self.protocol === 'ssh' ? 'http' : self.protocol].request(opts, function () { });

debug('Sending: %s', util.inspect(options, {
showHidden: true,
depth: null
}));

if (self.connectionTimeout) {
connectionTimeoutTimer = setTimeout(function() {
connectionTimeoutTimer = setTimeout(function () {
debug('Connection Timeout of %s ms exceeded', self.connectionTimeout);
req.abort();
}, self.connectionTimeout);
}

if (self.timeout) {
req.on('socket', function(socket) {
req.on('socket', function (socket) {
socket.setTimeout(self.timeout);
socket.on('timeout', function() {
socket.on('timeout', function () {
debug('Timeout of %s ms exceeded', self.timeout);
req.abort();
});
Expand All @@ -237,30 +236,30 @@ Modem.prototype.buildRequest = function(options, context, data, callback) {

if (context.hijack === true) {
clearTimeout(connectionTimeoutTimer);
req.on('upgrade', function(res, sock, head) {
req.on('upgrade', function (res, sock, head) {
return callback(null, sock);
});
}

req.on('connect', function() {
req.on('connect', function () {
clearTimeout(connectionTimeoutTimer);
});

req.on('disconnect', function() {
req.on('disconnect', function () {
clearTimeout(connectionTimeoutTimer);
});

req.on('response', function(res) {
req.on('response', function (res) {
clearTimeout(connectionTimeoutTimer);
if (context.isStream === true) {
self.buildPayload(null, context.isStream, context.statusCodes, context.openStdin, req, res, null, callback);
} else {
var chunks = [];
res.on('data', function(chunk) {
res.on('data', function (chunk) {
chunks.push(chunk);
});

res.on('end', function() {
res.on('end', function () {
var buffer = Buffer.concat(chunks);
var result = buffer.toString();

Expand All @@ -272,7 +271,7 @@ Modem.prototype.buildRequest = function(options, context, data, callback) {
}
});

req.on('error', function(error) {
req.on('error', function (error) {
clearTimeout(connectionTimeoutTimer);
self.buildPayload(error, context.isStream, context.statusCodes, false, {}, {}, null, callback);
});
Expand All @@ -288,11 +287,11 @@ Modem.prototype.buildRequest = function(options, context, data, callback) {
}
};

Modem.prototype.buildPayload = function(err, isStream, statusCodes, openStdin, req, res, json, cb) {
Modem.prototype.buildPayload = function (err, isStream, statusCodes, openStdin, req, res, json, cb) {
if (err) return cb(err, null);

if (statusCodes[res.statusCode] !== true) {
getCause(isStream, res, json, function(err, cause) {
getCause(isStream, res, json, function (err, cause) {
var msg = new Error(
'(HTTP code ' + res.statusCode + ') ' +
(statusCodes[res.statusCode] || 'unexpected') + ' - ' +
Expand All @@ -316,10 +315,10 @@ Modem.prototype.buildPayload = function(err, isStream, statusCodes, openStdin, r
function getCause(isStream, res, json, callback) {
var chunks = '';
if (isStream) {
res.on('data', function(chunk) {
res.on('data', function (chunk) {
chunks += chunk;
});
res.on('end', function() {
res.on('end', function () {
callback(null, utils.parseJSON(chunks) || chunks);
});
} else {
Expand All @@ -328,7 +327,7 @@ Modem.prototype.buildPayload = function(err, isStream, statusCodes, openStdin, r
}
};

Modem.prototype.demuxStream = function(stream, stdout, stderr) {
Modem.prototype.demuxStream = function (stream, stdout, stderr) {
var nextDataType = null;
var nextDataLength = null;
var buffer = Buffer.from('');
Expand Down Expand Up @@ -361,7 +360,7 @@ Modem.prototype.demuxStream = function(stream, stdout, stderr) {
}
}

function bufferSlice (end) {
function bufferSlice(end) {
var out = buffer.slice(0, end);
buffer = Buffer.from(buffer.slice(end, buffer.length));
return out;
Expand All @@ -370,49 +369,65 @@ Modem.prototype.demuxStream = function(stream, stdout, stderr) {
stream.on('data', processData);
};

Modem.prototype.followProgress = function(stream, onFinished, onProgress) {
var parser = JSONStream.parse(),
output = [];

parser.on('data', onStreamEvent);
parser.on('error', onStreamError);
parser.on('end', onStreamEnd);

stream.pipe(parser);

function onStreamEvent(evt) {
if (!(evt instanceof Object)) {
evt = {};
}

output.push(evt);

if (evt.error) {
return onStreamError(evt.error);
Modem.prototype.followProgress = function (stream, onFinished, onProgress) {
var buf = '';
var output = [];
var finished = false;

stream.on('data', onStreamEvent);
stream.on('error', onStreamError);
stream.on('end', onStreamEnd);
stream.on('close', onStreamEnd);

function onStreamEvent(data) {
buf += data.toString();
pump();

function pump() {
var pos;
while ((pos = buf.indexOf('\n')) >= 0) {
if (pos == 0) {
buf = buf.slice(1);
continue;
}
processLine(buf.slice(0, pos));
buf = buf.slice(pos + 1);
}
}

if (onProgress) {
onProgress(evt);
function processLine(line) {
if (line[line.length - 1] == '\r') line = line.substr(0, line.length - 1);
if (line.length > 0) {
var obj = JSON.parse(line);
output.push(obj);
if (onProgress) {
console.log(obj)
onProgress(obj);
}
}
}
}
};

function onStreamError(err) {
parser.removeListener('data', onStreamEvent);
parser.removeListener('error', onStreamError);
parser.removeListener('end', onStreamEnd);
finished = true;
stream.removeListener('data', onStreamEvent);
stream.removeListener('error', onStreamError);
stream.removeListener('end', onStreamEnd);
stream.removeListener('close', onStreamEnd);
onFinished(err, output);
}

function onStreamEnd() {
onFinished(null, output);
if(!finished) onFinished(null, output);
finished = true;
}
};

Modem.prototype.buildQuerystring = function(opts) {
Modem.prototype.buildQuerystring = function (opts) {
var clone = {};

// serialize map values as JSON strings, else querystring truncates.
Object.keys(opts).map(function(key, i) {
Object.keys(opts).map(function (key, i) {
clone[key] = opts[key] && typeof opts[key] === 'object' && key !== 't' ?
JSON.stringify(opts[key]) : opts[key];
});
Expand Down

0 comments on commit 8be3b43

Please sign in to comment.