Skip to content

Commit

Permalink
refactor rserve.js out of rclient.js
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Scheidegger committed Mar 15, 2013
1 parent 479bb72 commit 4de8ffc
Show file tree
Hide file tree
Showing 8 changed files with 1,982 additions and 1,808 deletions.
5 changes: 1 addition & 4 deletions htdocs/js/Makefile
Expand Up @@ -6,12 +6,9 @@ rcloud_bundle.js: \
chart.js \
facet_chart.js \
websockets.js \
rserve.js \
rclient.js \
buffer.js \
parse.js \
peg_parser.js \
objs.js \
Rsrv.js \
rcloud.js \
notebook/_begin.js \
notebook/cell/_begin.js \
Expand Down
10 changes: 5 additions & 5 deletions htdocs/js/parse.js
Expand Up @@ -111,7 +111,7 @@ function reader(m)
l -= attr_result[1];
}
if (handlers[t] === undefined) {
throw "Unimplemented " + t;
throw ["Unimplemented " + t, -1];
} else {
var result = handlers[t].call(this, attributes, l);
return [result[0], total_read + result[1]];
Expand All @@ -134,7 +134,7 @@ function reader(m)
for (var i=0; i<lst.length; i+=2) {
var value = lst[i], tag = lst[i+1];
if (tag.type !== "symbol")
throw "Unexpected type " + tag.type + " as tag for tagged_list";
throw ["Unexpected type " + tag.type + " as tag for tagged_list", -1];
result[tag.value] = value;
}
return Robj.tagged_list(result, attributes);
Expand Down Expand Up @@ -177,12 +177,12 @@ function parse(msg)
var header = new Int32Array(msg, 0, 4);
if (header[0] !== Rsrv.RESP_OK && header[0] !== Rsrv.OOB_SEND) {
var status_code = header[0] >> 24;
throw("ERROR FROM R SERVER: " + (Rsrv.status_codes[status_code] ||
throw ["ERROR FROM R SERVER: " + (Rsrv.status_codes[status_code] ||
status_code)
+ " " + header[0] + " " + header[1] + " " + header[2] + " " + header[3]
+ " " + msg.byteLength
+ " " + msg
); // not too helpful, but better than undefined
+ " " + msg,
status_code];
}

var payload = my_ArrayBufferView(msg, 16, msg.byteLength - 16);
Expand Down
189 changes: 66 additions & 123 deletions htdocs/js/rclient.js
Expand Up @@ -14,104 +14,68 @@ function escape_r_literal_string(s) {
// return "\"" + s.replace(/"/g, "\\\"") + "\"";
}

RClient = {
create: function(opts) {
var host = opts.host;
var onconnect = opts.on_connect;
function NoCallbackError() {
this.name = "NoCallbackError";
}

var socket = new WebSocket(host);
NoCallbackError.prototype = Object.create(Error);
NoCallbackError.prototype.constructor = NoCallbackError;

var _debug = true;
var _capturing_answers = false;
var _capturing_callback = undefined;
var _received_handshake = false;
function no_callback() { throw new NoCallbackError(); }

var result;
var command_counter = 0;

socket.binaryType = 'arraybuffer';

function hand_shake(msg)
{
msg = msg.data;
if (msg.substr(0,4) !== 'Rsrv') {
result.post_error("server is not an RServe instance");
} else if (msg.substr(4, 4) !== '0103') {
result.post_error("sorry, rclient only speaks the 0103 version of the R server protocol");
} else if (msg.substr(8, 4) !== 'QAP1') {
result.post_error("sorry, rclient only speaks QAP1");
} else {
_received_handshake = true;
result.running = true;
// FIXME: there should be a better way to handle this ...
// FIXME: can we use r_funcall? r_funcall does
// not support named parameters (for now)
var cookies = $.cookies.get();
result.login(cookies.token);
result.send("rcloud.support::session.init(username=" + escape_r_literal_string(rcloud.username()) + ")");
onconnect && onconnect.call(result);
}
RClient = {
create: function(opts) {

function on_connect() {
result.running = true;
result.send("rcloud.support::session.init(username=" + escape_r_literal_string(rcloud.username()) + ")");
opts.on_connect && opts.on_connect.call(result);
}

socket.onmessage = function(msg) {
var v = null;
if (_capturing_answers) {
try {
v = parse(msg.data);
if (v !== null)
v = result.eval(v);
_capturing_callback(v);
} catch (e) {
_capturing_answers = false;
_capturing_callback = undefined;
throw e;
}
function on_error(msg, status_code) {
if (status_code === 65) {
// Authentication failed.
result.post_error("Authentication failed. Login first!");
result.post_error(msg);

} else {
if (!_received_handshake) {
hand_shake(msg);
return;
}
if (typeof msg.data === 'string')
result.post_response(msg.data);
else {
v = parse(msg.data);
if (v !== null)
result.eval(v);
}
result.post_error(msg);
}
};
}

socket.onclose = function(msg) {
function on_close(msg) {
result.post_response("Socket was closed. Goodbye!");
result.running = false;
};
var token = $.cookies.get().token;
var rserve = Rserve.create({
host: opts.host,
on_connect: on_connect,
on_error: on_error,
on_close: on_close,
login: token + "\n" + token
});

var _debug = true;
var _capturing_answers = false;
var _capturing_callback = undefined;

var result;
var command_counter = 0;

result = {
handlers: {
"eval": function(v) {
if (v.value.length === 3) {
var command_id = v.value[2].value[0];
var cb = this.result_handlers[command_id];
// if there's a callback attached, call it.
// otherwise, display it.
if (cb) {
cb(command_id, v.value[1]);
} else {
result.display_response(v.value[1]);
}
result.display_response(v.value[1]);
}
return v.value[1];
},
"markdown.eval": function(v) {
if (v.value.length === 3) {
var command_id = v.value[2].value[0];
var cb = this.result_handlers[command_id];
// if there's a callback attached, call it.
// otherwise, display it.
if (cb) {
cb(command_id, v.value[1]);
} else {
result.display_markdown_response(v.value[1]);
}
result.display_markdown_response(v.value[1]);
}
return v.value[1];
},
Expand Down Expand Up @@ -207,7 +171,7 @@ RClient = {
try {
this.display_response(parse(msg));
} catch (e) {
this.post_error("Uncaught exception: " + e);
this.post_error("Uncaught exception: " + e.message + " - " + e.status_code);
}
}
},
Expand All @@ -233,6 +197,7 @@ RClient = {
//////////////////////////////////////////////////////////////////

post_error: function (msg) {
debugger;
var d = $("<div class='error-message'></div>").html(msg);
$("#output").append(d);
window.scrollTo(0, document.body.scrollHeight);
Expand Down Expand Up @@ -290,67 +255,45 @@ RClient = {
this.send(command);
},

login: function(token) {
var command = token + '\n' + token;
var buffer = new ArrayBuffer(command.length + 21);
var view = new EndianAwareDataView(buffer);
view.setInt32(0, 1);
view.setInt32(4, 5 + command.length);
view.setInt32(8, 0);
view.setInt32(12, 0);
view.setInt32(16, 4 + ((1 + command.length) << 8));
for (var i=0; i<command.length; ++i) {
view.setUint8(20 + i, command.charCodeAt(i));
}
view.setUint8(buffer.byteLength - 1, 0);
socket.send(buffer);
},

send: function(command, wrap) {
if (!result.running) {
alert("Init failed, cannot communicate with R process");
return;
}
if (!_.isUndefined(wrap)) command = wrap(command)[0];
var buffer = new ArrayBuffer(command.length + 21);
var view = new EndianAwareDataView(buffer);
view.setInt32(0, 3);
view.setInt32(4, 5 + command.length);
view.setInt32(8, 0);
view.setInt32(12, 0);
view.setInt32(16, 4 + ((1 + command.length) << 8));
for (var i=0; i<command.length; ++i) {
view.setUint8(20 + i, command.charCodeAt(i));
}
view.setUint8(buffer.byteLength - 1, 0);

socket.send(buffer);
},

record_cell_execution: function(cell_model) {
var json_rep = JSON.stringify(cell_model.json());
var call = this.r_funcall("rcloud.record.cell.execution",
rcloud.username(), json_rep);
this.send(call);
rserve.eval(call);
},

send: function(command, wrap) {
this.send_and_callback(command, no_callback, wrap);
},

send_and_callback: function(command, callback, wrap) {
var that = this;
if (_.isUndefined(callback))
callback = _.identity;
callback = no_callback;
var t;
if (wrap) {
t = wrap(command);
} else {
t = this.wrap_command(command, true);
}
var command_id = t[1];
command = t[0];
var that = this;
this.result_handlers[command_id] = function(id, data) {
delete that.result_handlers[id];
callback(data);
};
this.send(command);
if (_debug)
console.log(command);
function unwrap(v) {
if (_debug) {
debugger;
console.log(v);
}
try {
callback(v.value.value[1]);
} catch (e) {
if (e.constructor === NoCallbackError) {
debugger;
that.handlers[v.value.value[0].value[0]](v.value);
}
}
}
rserve.eval(command, unwrap);
},

// FIXME this needs hardening
Expand Down

0 comments on commit 4de8ffc

Please sign in to comment.