Skip to content

Commit

Permalink
#1424: zero copy whenever possible in the network layer
Browse files Browse the repository at this point in the history
git-svn-id: https://xpra.org/svn/Xpra/trunk@15183 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Feb 28, 2017
1 parent f1cddc7 commit 2bc84f3
Showing 1 changed file with 102 additions and 63 deletions.
165 changes: 102 additions & 63 deletions src/html5/js/Protocol.js
Expand Up @@ -91,10 +91,9 @@ function XpraProtocol() {
this.cipher_out = null;
this.mode = 'binary'; // Current WebSocket mode: 'binary', 'base64'
this.rQ = []; // Receive queue
this.rQi = 0; // Receive queue index
this.rQmax = 10000; // Max receive queue size before compacting
this.sQ = []; // Send queue
this.mQ = []; // Worker message queue
this.header = [];

//Queue processing via intervals
this.process_interval = 4; //milliseconds
Expand All @@ -107,7 +106,6 @@ XpraProtocol.prototype.open = function(uri) {
var me = this;
// init
this.rQ = [];
this.rQi = 0;
this.sQ = [];
this.websocket = null;
// connect the socket
Expand All @@ -124,14 +122,7 @@ XpraProtocol.prototype.open = function(uri) {
};
this.websocket.onmessage = function (e) {
// push arraybuffer values onto the end
var u8 = new Uint8Array(e.data);
for (var i = 0; i < u8.length; i++) {
me.rQ.push(u8[i]);
}
// wait for 8 bytes
//if (me.rQ.length >= 8) {
//me._process();
//}
me.rQ.push(new Uint8Array(e.data));
};
this.start_processing();
}
Expand All @@ -152,7 +143,7 @@ XpraProtocol.prototype.start_processing = function() {
var me = this;
if(this.rQ_interval_id === null){
this.rQ_interval_id = setInterval(function(){
if (me.rQ.length >= 8) {
if (me.rQ.length > 0) {
me.process_receive_queue();
}
}, this.process_interval);
Expand Down Expand Up @@ -185,66 +176,124 @@ XpraProtocol.prototype.stop_processing = function() {
this.mQ_interval_id = null;
}


XpraProtocol.prototype.process_receive_queue = function() {
// peek at first 8 bytes of buffer
var buf = this._buffer_peek(8);

if (buf[0] !== ord("P")) {
msg = "invalid packet header format: " + buf[0];
if (buf.length>1) {
msg += ": ";
for (c in buf) {
msg += String.fromCharCode(c);
var i = 0, j = 0;
if (this.header.length<8 && this.rQ.length>0) {
//add from receive queue data to header until we get the 8 bytes we need:
while (this.header.length<8 && this.rQ.length>0) {
var slice = this.rQ[0];
var needed = 8-this.header.length;
var n = Math.min(needed, slice.length);
//console.log("header size", this.header.length, ", adding", n, "bytes from", slice.length);
//copy at most n characters:
for (i = 0; i < n; i++) {
this.header.push(slice[i]);
}
if (slice.length>needed) {
//replace the slice with what is left over:
this.rQ[0] = slice.subarray(n);
}
else {
//this slice has been fully consumed already:
this.rQ.shift();
}
}

//verify the header format:
if (this.header[0] !== ord("P")) {
msg = "invalid packet header format: " + this.header[0];
if (this.header.length>1) {
msg += ": ";
for (c in this.header) {
msg += String.fromCharCode(c);
}
}
throw msg;
}
throw msg;
}

var proto_flags = buf[1];
var proto_crypto = proto_flags & 0x2;
if (this.header.length<8) {
//we need more data to continue
return;
}

var proto_flags = this.header[1];
var proto_crypto = proto_flags & 0x2;
if (proto_flags!=0) {
// check for crypto protocol flag
if (!(proto_crypto)) {
throw "we can't handle this protocol flag yet, sorry";
}
}

var level = buf[2];
var index = buf[3];
var level = this.header[2];
if (level & 0x20) {
throw "lzo compression is not supported";
}
var index = this.header[3];
if (index>=20) {
throw "invalid packet index: "+index;
}
var packet_size = 0;
for (var i=0; i<4; i++) {
//debug("size header["+i+"]="+buf[4+i]);
for (i=0; i<4; i++) {
packet_size = packet_size*0x100;
packet_size += buf[4+i];
packet_size += this.header[4+i];
}

// work out padding if necessary
var padding = 0
if (proto_crypto) {
padding = (this.cipher_in_block_size - packet_size % this.cipher_in_block_size);
packet_size += padding;
}
//debug("packet_size="+packet_size+", level="+level+", index="+index);

// wait for packet to be complete
// the header is still on the buffer so wait for packetsize+headersize bytes!
if (this.rQ.length < packet_size+8) {
// we already shifted the header off the buffer?
//debug("packet is not complete yet");
// verify that we have enough data for the full payload:
var rsize = 0;
for (i=0,j=this.rQ.length;i<j;++i) {
rsize += this.rQ[i].length;
}
if (rsize<packet_size) {
return;
}

// packet is complete but header is still on buffer
this._buffer_shift(8);
//debug("got a full packet, shifting off "+packet_size);
var packet_data = this._buffer_shift(packet_size);
// done parsing the header, the next packet will need a new one:
this.header = []

var packet_data = null;
if (this.rQ[0].length==packet_size) {
//exact match: the payload is in a buffer already:
packet_data = this.rQ.shift();
}
else {
//aggregate all the buffers into "packet_data" until we get exactly "packet_size" bytes:
packet_data = new Uint8Array(packet_size);
rsize = 0;
while (rsize < packet_size) {
var slice = this.rQ[0];
var needed = packet_size - rsize;
//console.log("slice:", slice.length, "bytes, needed", needed);
if (slice.length>needed) {
//add part of this slice:
packet_data.set(slice.subarray(0, needed), rsize);
rsize += needed;
this.rQ[0] = slice.subarray(needed);
}
else {
//add this slice in full:
packet_data.set(slice, rsize);
rsize += slice.length;
this.rQ.shift();
}
}
}

// decrypt if needed
if (proto_crypto) {
this.cipher_in.update(forge.util.createBuffer(uintToString(packet_data)));
var decrypted = this.cipher_in.output.getBytes();
packet_data = [];
for (var i=0; i<decrypted.length; i++)
for (i=0; i<decrypted.length; i++)
packet_data.push(decrypted[i].charCodeAt(0));
packet_data = packet_data.slice(0, -1 * padding);
}
Expand All @@ -255,16 +304,18 @@ XpraProtocol.prototype.process_receive_queue = function() {
// lz4
// python-lz4 inserts the length of the uncompressed data as an int
// at the start of the stream
var d = packet_data.splice(0, 4);
// will always be little endian
var d = packet_data.subarray(0, 4);
// output buffer length is stored as little endian
var length = d[0] | (d[1] << 8) | (d[2] << 16) | (d[3] << 24);
// decode the LZ4 block
var inflated = new Buffer(length);
var uncompressedSize = LZ4.decodeBlock(packet_data, inflated);
if(!proto_crypto)
inflated = inflated.slice(0, uncompressedSize);
} else if (level & 0x20) {
// lzo
// console.log("lz4 decompress packet size", packet_size, ", lz4 length=", length);
var inflated = new Uint8Array(length);
var uncompressedSize = LZ4.decodeBlock(packet_data, inflated, 4);
// if lz4 errors out at the end of the buffer, ignore it:
if (uncompressedSize<=0 && packet_size+uncompressedSize!=0) {
console.error("failed to decompress lz4 data, error code:", uncompressedSize);
return;
}
} else {
// zlib
var inflated = new Zlib.Inflate(packet_data).decompress();
Expand All @@ -287,21 +338,17 @@ XpraProtocol.prototype.process_receive_queue = function() {
}
this.raw_packets = {}
// pass to our packet handler

if((packet[0] === 'draw') && (packet[6] !== 'scroll')){
var img_data = packet[7];
if (typeof img_data === 'string') {
var uint = new Uint8Array(img_data.length);
for(var i=0,j=img_data.length;i<j;++i) {
for(i=0,j=img_data.length;i<j;++i) {
uint[i] = img_data.charCodeAt(i);
}
packet[7] = uint;
}
else {
packet[7] = new Uint8Array(packet[7]);
}
}
if(this.is_worker){
if (this.is_worker){
this.mQ[this.mQ.length] = packet;
} else {
this.packet_handler(packet, this.packet_ctx);
Expand All @@ -314,7 +361,7 @@ XpraProtocol.prototype.process_receive_queue = function() {
}

// see if buffer still has unread packets
if (this.rQ.length >= 8) {
if (this.rQ.length > 0) {
this.process_receive_queue();
}
}
Expand Down Expand Up @@ -405,14 +452,6 @@ XpraProtocol.prototype.set_cipher_out = function(caps, key) {
this.cipher_out.start({iv: caps['cipher.iv']});
}

XpraProtocol.prototype._buffer_peek = function(bytes) {
return this.rQ.slice(0, 0+bytes);
}

XpraProtocol.prototype._buffer_shift = function(bytes) {
return this.rQ.splice(0, 0+bytes);;
}


/*
If we are in a web worker, set up an instance of the protocol
Expand Down

0 comments on commit 2bc84f3

Please sign in to comment.