Permalink
Browse files

Modified Protocol to handle large packets

  • Loading branch information...
MattTuttle committed Feb 25, 2014
1 parent 65333b9 commit 201c8bbc2d1ccf4d0221a41de6032479d7d60353
View
@@ -1,6 +1,9 @@
package hxnet.base;
import hxnet.interfaces.Connection;
import haxe.io.Bytes;
import haxe.io.BytesInput;
import haxe.io.Eof;
import haxe.io.Input;
class Protocol implements hxnet.interfaces.Protocol
@@ -10,11 +13,72 @@ class Protocol implements hxnet.interfaces.Protocol
public function isConnected():Bool { return this.cnx != null; }
public function dataReceived(input:Input) { }
public function dataReceived(input:Input):Void
{
if (_packetPos > 0)
{
readPacket(input);
}
while (initPacket(input))
{
readPacket(input);
}
}
private function fullPacketReceived(input:Input):Void
{
}
private function initPacket(input:Input):Bool
{
try
{
_packetLength = input.readInt32();
_packet = Bytes.alloc(_packetLength);
}
catch (e:Eof)
{
return false;
}
return true;
}
private inline function readPacket(input:Input):Bool
{
var finish = true, byte:Int = 0;
while (finish)
{
try
{
byte = input.readByte();
}
catch (e:Eof)
{
finish = false;
}
_packet.set(_packetPos, byte);
_packetPos += 1;
if (_packetPos >= _packetLength)
{
var input = new BytesInput(_packet);
fullPacketReceived(input);
_packetPos = 0;
break;
}
}
return finish;
}
public function makeConnection(cnx:Connection) { this.cnx = cnx; }
public function loseConnection(?reason:String) { this.cnx = null; }
private var cnx:Connection;
private var _packetLength:Int = 0;
private var _packetPos:Int = 0;
private var _packet:Bytes;
}
@@ -4,6 +4,6 @@ import haxe.io.Bytes;
interface Connection
{
public function writeBytes(bytes:Bytes):Bool;
public function writeBytes(bytes:Bytes, writeLength:Bool=false):Bool;
public function close():Void;
}
View
@@ -1,8 +1,8 @@
package hxnet.protocols;
import haxe.io.Bytes;
import haxe.io.BytesOutput;
import haxe.io.Input;
import haxe.io.Eof;
#if neko
import neko.Lib;
@@ -15,42 +15,43 @@ class RPC extends hxnet.base.Protocol
public var dispatcher:Dynamic;
public override function dataReceived(input:Input)
override private function fullPacketReceived(input:Input)
{
try
var func = readString(input);
var numArgs = input.readInt16();
var arguments = new Array<Dynamic>();
while (numArgs > 0)
{
if (dispatcher == null) dispatcher = this;
while (true)
switch(input.readInt8())
{
var func = readString(input);
var numArgs = input.readInt16();
var arguments = new Array<Dynamic>();
for (i in 0...numArgs)
{
switch(input.readInt8())
{
case TYPE_INT:
arguments.push(input.readInt32());
case TYPE_FLOAT:
arguments.push(input.readFloat());
case TYPE_BOOL:
arguments.push(input.readInt8() == 1 ? true : false);
case TYPE_STRING:
arguments.push(readString(input));
case TYPE_OBJECT:
arguments.push(haxe.Unserializer.run(readString(input)));
}
}
var rpcCall = Reflect.field(dispatcher, func);
if (rpcCall != null)
{
Reflect.callMethod(dispatcher, rpcCall, arguments);
}
case TYPE_INT:
arguments.push(input.readInt32());
case TYPE_FLOAT:
arguments.push(input.readFloat());
case TYPE_BOOL:
arguments.push(input.readInt8() == 1 ? true : false);
case TYPE_STRING:
arguments.push(readString(input));
case TYPE_OBJECT:
arguments.push(haxe.Unserializer.run(readString(input)));
}
numArgs -= 1;
}
catch (e:Eof)
dispatch(func, arguments);
}
private inline function dispatch(func:String, arguments:Array<Dynamic>)
{
if (dispatcher == null) dispatcher = this;
try
{
// not an error, just end of data
var rpcCall = Reflect.field(dispatcher, func);
if (rpcCall != null)
{
Reflect.callMethod(dispatcher, rpcCall, arguments);
}
}
catch (e:Dynamic)
{
@@ -92,13 +93,19 @@ class RPC extends hxnet.base.Protocol
o.writeInt8(TYPE_STRING);
writeString(o, arg);
}
else if (Std.is(arg, Bytes))
{
o.writeInt32(arg.length);
o.writeFullBytes(arg, 0, arg.length);
}
else
{
o.writeInt8(TYPE_OBJECT);
writeString(o, haxe.Serializer.run(arg));
}
}
cnx.writeBytes(o.getBytes());
cnx.writeBytes(o.getBytes(), true);
}
private inline function readString(i:Input):String
@@ -113,9 +120,11 @@ class RPC extends hxnet.base.Protocol
o.writeString(value);
}
private static inline var TYPE_INT:Int = 0;
private static inline var TYPE_FLOAT:Int = 1;
private static inline var TYPE_BOOL:Int = 2;
private static inline var TYPE_STRING:Int = 3;
private static inline var TYPE_OBJECT:Int = 4;
}
// data types
private static inline var TYPE_INT:Int = 1;
private static inline var TYPE_FLOAT:Int = 2;
private static inline var TYPE_BOOL:Int = 3;
private static inline var TYPE_STRING:Int = 4;
private static inline var TYPE_BYTES:Int = 5;
private static inline var TYPE_OBJECT:Int = 6;
}
@@ -6,7 +6,7 @@ import haxe.io.BytesOutput;
class Telnet extends hxnet.base.Protocol
{
public override function dataReceived(input:Input)
override public function dataReceived(input:Input)
{
var buffer = input.readLine();
// filter out IAC commands for now
View
@@ -11,10 +11,11 @@ class Connection implements hxnet.interfaces.Connection
this.socket = socket;
}
public function writeBytes(bytes:Bytes):Bool
public function writeBytes(bytes:Bytes, writeLength:Bool=false):Bool
{
try
{
if (writeLength) socket.output.writeInt32(bytes.length);
socket.output.writeBytes(bytes, 0, bytes.length);
}
catch (e:Dynamic)
View
@@ -3,6 +3,7 @@ package hxnet.udp;
import sys.net.UdpSocket;
import sys.net.Address;
import haxe.io.Bytes;
import haxe.io.BytesOutput;
class Connection implements hxnet.interfaces.Connection
{
@@ -12,10 +13,18 @@ class Connection implements hxnet.interfaces.Connection
this.address = address.clone();
}
public function writeBytes(bytes:Bytes):Bool
public function writeBytes(bytes:Bytes, writeLength:Bool=false):Bool
{
try
{
if (writeLength)
{
var out = new BytesOutput();
out.prepare(bytes.length);
out.writeInt32(bytes.length);
out.writeBytes(bytes, 0, bytes.length);
bytes = out.getBytes();
}
socket.sendTo(bytes, 0, bytes.length, address);
}
catch (e:Dynamic)
View
@@ -11,7 +11,7 @@ class TcpTest extends haxe.unit.TestCase
public function createRPCServer()
{
var port = Thread.readMessage(true);
var server = new hxnet.tcp.Server(new hxnet.base.Factory(PingPong), port);
var server = new hxnet.tcp.Server(new hxnet.base.Factory(PingPong), port, "localhost");
while (Thread.readMessage(false) == null)
{
server.update();
@@ -39,7 +39,7 @@ class TcpTest extends haxe.unit.TestCase
client.blocking = false;
var rpc = new PingPong();
client.protocol = rpc;
client.connect(serverPort);
client.connect("localhost", serverPort);
rpc.call("ping");
client.update();
@@ -53,7 +53,7 @@ class TcpTest extends haxe.unit.TestCase
client.blocking = false;
var rpc = new PingPong();
client.protocol = rpc;
client.connect(serverPort);
client.connect("localhost", serverPort);
rpc.call("pong", [1, 12.4]);
client.update();
@@ -67,7 +67,7 @@ class TcpTest extends haxe.unit.TestCase
client.blocking = false;
var rpc = new PingPong();
client.protocol = rpc;
client.connect(serverPort);
client.connect("localhost", serverPort);
rpc.call("foo", [1, 20.4, "hi"]); // this call should fail
client.update(0.1);
View
@@ -11,7 +11,7 @@ class UdpTest extends haxe.unit.TestCase
public function createRPCServer()
{
var port = Thread.readMessage(true);
var server = new hxnet.udp.Server(new hxnet.base.Factory(PingPong), port);
var server = new hxnet.udp.Server(new hxnet.base.Factory(PingPong), port, "localhost");
while (Thread.readMessage(false) != "finish")
{
server.update();
@@ -43,7 +43,7 @@ class UdpTest extends haxe.unit.TestCase
var client = new hxnet.udp.Client();
var rpc = new PingPong();
client.protocol = rpc;
client.connect(serverPort);
client.connect("localhost", serverPort);
rpc.call("ping");
updateClient(client);
@@ -56,7 +56,7 @@ class UdpTest extends haxe.unit.TestCase
var client = new hxnet.udp.Client();
var rpc = new PingPong();
client.protocol = rpc;
client.connect(serverPort);
client.connect("localhost", serverPort);
rpc.call("pong", [1, 12.4]);
updateClient(client);

0 comments on commit 201c8bb

Please sign in to comment.