diff --git a/src/html5/js/Protocol.js b/src/html5/js/Protocol.js index 69e32c5549..374ecd93dd 100644 --- a/src/html5/js/Protocol.js +++ b/src/html5/js/Protocol.js @@ -91,10 +91,9 @@ function XpraProtocol() { this.cipher_out = null; this.mode = 'binary'; // Current WebSocket mode: 'binary', 'base64' this.rQ = []; // Receive queue - this.rQi = 0; // Receive queue index - this.rQmax = 10000; // Max receive queue size before compacting this.sQ = []; // Send queue this.mQ = []; // Worker message queue + this.header = []; //Queue processing via intervals this.process_interval = 4; //milliseconds @@ -107,7 +106,6 @@ XpraProtocol.prototype.open = function(uri) { var me = this; // init this.rQ = []; - this.rQi = 0; this.sQ = []; this.websocket = null; // connect the socket @@ -124,14 +122,7 @@ XpraProtocol.prototype.open = function(uri) { }; this.websocket.onmessage = function (e) { // push arraybuffer values onto the end - var u8 = new Uint8Array(e.data); - for (var i = 0; i < u8.length; i++) { - me.rQ.push(u8[i]); - } - // wait for 8 bytes - //if (me.rQ.length >= 8) { - //me._process(); - //} + me.rQ.push(new Uint8Array(e.data)); }; this.start_processing(); } @@ -152,7 +143,7 @@ XpraProtocol.prototype.start_processing = function() { var me = this; if(this.rQ_interval_id === null){ this.rQ_interval_id = setInterval(function(){ - if (me.rQ.length >= 8) { + if (me.rQ.length > 0) { me.process_receive_queue(); } }, this.process_interval); @@ -185,24 +176,50 @@ XpraProtocol.prototype.stop_processing = function() { this.mQ_interval_id = null; } + XpraProtocol.prototype.process_receive_queue = function() { - // peek at first 8 bytes of buffer - var buf = this._buffer_peek(8); - - if (buf[0] !== ord("P")) { - msg = "invalid packet header format: " + buf[0]; - if (buf.length>1) { - msg += ": "; - for (c in buf) { - msg += String.fromCharCode(c); + var i = 0, j = 0; + if (this.header.length<8 && this.rQ.length>0) { + //add from receive queue data to header until we get the 8 bytes we need: + while (this.header.length<8 && this.rQ.length>0) { + var slice = this.rQ[0]; + var needed = 8-this.header.length; + var n = Math.min(needed, slice.length); + //console.log("header size", this.header.length, ", adding", n, "bytes from", slice.length); + //copy at most n characters: + for (i = 0; i < n; i++) { + this.header.push(slice[i]); + } + if (slice.length>needed) { + //replace the slice with what is left over: + this.rQ[0] = slice.subarray(n); + } + else { + //this slice has been fully consumed already: + this.rQ.shift(); + } + } + + //verify the header format: + if (this.header[0] !== ord("P")) { + msg = "invalid packet header format: " + this.header[0]; + if (this.header.length>1) { + msg += ": "; + for (c in this.header) { + msg += String.fromCharCode(c); + } } + throw msg; } - throw msg; } - var proto_flags = buf[1]; - var proto_crypto = proto_flags & 0x2; + if (this.header.length<8) { + //we need more data to continue + return; + } + var proto_flags = this.header[1]; + var proto_crypto = proto_flags & 0x2; if (proto_flags!=0) { // check for crypto protocol flag if (!(proto_crypto)) { @@ -210,41 +227,73 @@ XpraProtocol.prototype.process_receive_queue = function() { } } - var level = buf[2]; - var index = buf[3]; + var level = this.header[2]; + if (level & 0x20) { + throw "lzo compression is not supported"; + } + var index = this.header[3]; + if (index>=20) { + throw "invalid packet index: "+index; + } var packet_size = 0; - for (var i=0; i<4; i++) { - //debug("size header["+i+"]="+buf[4+i]); + for (i=0; i<4; i++) { packet_size = packet_size*0x100; - packet_size += buf[4+i]; + packet_size += this.header[4+i]; } + // work out padding if necessary var padding = 0 if (proto_crypto) { padding = (this.cipher_in_block_size - packet_size % this.cipher_in_block_size); packet_size += padding; } - //debug("packet_size="+packet_size+", level="+level+", index="+index); - // wait for packet to be complete - // the header is still on the buffer so wait for packetsize+headersize bytes! - if (this.rQ.length < packet_size+8) { - // we already shifted the header off the buffer? - //debug("packet is not complete yet"); + // verify that we have enough data for the full payload: + var rsize = 0; + for (i=0,j=this.rQ.length;ineeded) { + //add part of this slice: + packet_data.set(slice.subarray(0, needed), rsize); + rsize += needed; + this.rQ[0] = slice.subarray(needed); + } + else { + //add this slice in full: + packet_data.set(slice, rsize); + rsize += slice.length; + this.rQ.shift(); + } + } + } // decrypt if needed if (proto_crypto) { this.cipher_in.update(forge.util.createBuffer(uintToString(packet_data))); var decrypted = this.cipher_in.output.getBytes(); packet_data = []; - for (var i=0; i= 8) { + if (this.rQ.length > 0) { this.process_receive_queue(); } } @@ -405,14 +452,6 @@ XpraProtocol.prototype.set_cipher_out = function(caps, key) { this.cipher_out.start({iv: caps['cipher.iv']}); } -XpraProtocol.prototype._buffer_peek = function(bytes) { - return this.rQ.slice(0, 0+bytes); -} - -XpraProtocol.prototype._buffer_shift = function(bytes) { - return this.rQ.splice(0, 0+bytes);; -} - /* If we are in a web worker, set up an instance of the protocol