From 70c3d7cdb94b33fcaf1b6ad21f9c30951c4f7151 Mon Sep 17 00:00:00 2001 From: ME1312 Date: Sat, 27 Apr 2019 00:02:35 -0400 Subject: [PATCH] Allow the Event Scheduler to be changed --- .../Initial/InitPacketChangeEncryption.java | 2 +- .../Initial/InitPacketChangeProtocol.java | 4 +- .../Initial/InitPacketPostDeclaration.java | 4 +- .../ME1312/SubData/Client/SubDataClient.java | 107 ++++++++++-------- .../SubData/Client/SubDataProtocol.java | 27 +++-- .../net/ME1312/SubData/Server/DataClient.java | 5 - .../Initial/InitPacketChangeEncryption.java | 2 +- .../Initial/InitPacketChangeProtocol.java | 5 +- .../ME1312/SubData/Server/SubDataClient.java | 104 +++++++++-------- .../SubData/Server/SubDataProtocol.java | 27 +++-- .../ME1312/SubData/Server/SubDataServer.java | 88 ++++++++------ 11 files changed, 215 insertions(+), 160 deletions(-) diff --git a/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeEncryption.java b/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeEncryption.java index 7a14eec..64ec4a2 100644 --- a/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeEncryption.java +++ b/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeEncryption.java @@ -46,7 +46,7 @@ public void receive(SubDataClient client, ObjectMap data) throws Throwa client.getSocket().getOutputStream().flush(); client.sendPacket(this); } else { - DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + cipher + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol())); + DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + cipher + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataClient.class.getDeclaredField("log"), client)); Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.ENCRYPTION_MISMATCH); } } diff --git a/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeProtocol.java b/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeProtocol.java index db040ca..09f6e7a 100644 --- a/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeProtocol.java +++ b/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketChangeProtocol.java @@ -25,7 +25,7 @@ public void send(SubDataClient client, OutputStream data) throws Throwable { if (Util.reflect(SubDataClient.class.getDeclaredField("state"), client) == ConnectionState.INITIALIZATION) { Util.reflect(SubDataClient.class.getDeclaredField("state"), client, ConnectionState.READY); - Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol()).info("Logged into " + client.getAddress().toString()); + Util.reflect(SubDataClient.class.getDeclaredField("log"), client).info("Logged into " + client.getAddress().toString()); LinkedList queue = Util.reflect(SubDataClient.class.getDeclaredField("prequeue"), client); if (queue.size() > 0) { @@ -38,7 +38,7 @@ public void send(SubDataClient client, OutputStream data) throws Throwable { for (Callback next : events) try { if (next != null) next.run(client); } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol())); + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataClient.class.getDeclaredField("log"), client)); } } } diff --git a/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketPostDeclaration.java b/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketPostDeclaration.java index bb29094..b7c8c85 100644 --- a/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketPostDeclaration.java +++ b/Client/src/net/ME1312/SubData/Client/Protocol/Initial/InitPacketPostDeclaration.java @@ -34,11 +34,11 @@ public void receive(SubDataClient client, ObjectMap data) throws Throwa Util.reflect(DataClient.class.getDeclaredField("id"), client, clientID); client.sendPacket(this); } else { - DebugUtil.logException(new IllegalArgumentException("Protocol version mismatch: [" + version + "] is not one of " + versions.toString()), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol())); + DebugUtil.logException(new IllegalArgumentException("Protocol version mismatch: [" + version + "] is not one of " + versions.toString()), Util.reflect(SubDataClient.class.getDeclaredField("log"), client)); Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.PROTOCOL_MISMATCH); } } else { - DebugUtil.logException(new IllegalArgumentException("Protocol mismatch: [" + name + "] != [" + client.getProtocol().getName() + "]"), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol())); + DebugUtil.logException(new IllegalArgumentException("Protocol mismatch: [" + name + "] != [" + client.getProtocol().getName() + "]"), Util.reflect(SubDataClient.class.getDeclaredField("log"), client)); Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.PROTOCOL_MISMATCH); } } diff --git a/Client/src/net/ME1312/SubData/Client/SubDataClient.java b/Client/src/net/ME1312/SubData/Client/SubDataClient.java index 66b1dce..7951661 100644 --- a/Client/src/net/ME1312/SubData/Client/SubDataClient.java +++ b/Client/src/net/ME1312/SubData/Client/SubDataClient.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.*; +import java.util.logging.Logger; import static net.ME1312.SubData.Client.Library.ConnectionState.*; import static net.ME1312.SubData.Client.Library.DisconnectReason.*; @@ -45,17 +46,21 @@ public class SubDataClient extends DataClient { private Cipher cipher = NEH.get(); private int cipherlevel = 0; private ConnectionState state; + private Callback scheduler; + private Logger log; - SubDataClient(SubDataProtocol protocol, InetAddress address, int port) throws IOException { + SubDataClient(SubDataProtocol protocol, Callback scheduler, Logger log, InetAddress address, int port) throws IOException { if (Util.isNull(address, port)) throw new NullPointerException(); this.protocol = protocol; + this.scheduler = scheduler; + this.log = log; this.state = PRE_INITIALIZATION; this.socket = new Socket(address, port); this.out = socket.getOutputStream(); this.queue = null; this.prequeue = new LinkedList<>(); - protocol.log.info("Connected to " + socket.getRemoteSocketAddress()); + log.info("Connected to " + socket.getRemoteSocketAddress()); read(); } @@ -82,7 +87,7 @@ private void read(Container reset, InputStream data) { // Step 4 // Create a detached data forwarding InputStream if (state != CLOSED && id >= 0 && version >= 0) { - try (InputStream forward = new InputStream() { + InputStream forward = new InputStream() { boolean open = true; @Override @@ -99,38 +104,39 @@ public void close() throws IOException { open = false; while (data.read() != -1); } - }) { - HashMap pIn = (state.asInt() >= READY.asInt())?protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null); - if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); - PacketIn packet = pIn.get(id); - if (!packet.isCompatable(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); - - // Step 5 // Invoke the Packet - if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) { - throw new IllegalStateException("Only InitPacketDeclaration may be received during the PRE_INITIALIZATION stage"); - } else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) { - forward.close(); - } else { - packet.receive(this); - if (packet instanceof PacketStreamIn) { - try { + }; + HashMap pIn = (state.asInt() >= READY.asInt())?protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null); + if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); + PacketIn packet = pIn.get(id); + if (!packet.isCompatable(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); + + // Step 5 // Invoke the Packet + if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) { + DebugUtil.logException(new IllegalStateException("Only InitPacketDeclaration may be received during the PRE_INITIALIZATION stage"), log); + close(PROTOCOL_MISMATCH); + } else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) { + forward.close(); + } else { + scheduler.run(() -> { + try { + packet.receive(this); + + if (packet instanceof PacketStreamIn) { ((PacketStreamIn) packet).receive(this, forward); - } catch (Throwable e) { - throw new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"); - } - } else forward.close(); - } - } catch (Throwable e) { - DebugUtil.logException(e, protocol.log); + } else forward.close(); + } catch (Throwable e) { + DebugUtil.logException(new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"), log); - if (state.asInt() <= INITIALIZATION.asInt()) - close(PROTOCOL_MISMATCH); // Issues during the init stages are signs of a PROTOCOL_MISMATCH + if (state.asInt() <= INITIALIZATION.asInt()) + Util.isException(() -> close(PROTOCOL_MISMATCH)); // Issues during the init stages are signs of a PROTOCOL_MISMATCH + } + }); } } } catch (Exception e) { if (!reset.get()) try { if (!(e instanceof SocketException && !Boolean.getBoolean("subdata.debug"))) { - DebugUtil.logException(e, protocol.log); + DebugUtil.logException(e, log); } if (!(e instanceof SocketException)) { close(UNHANDLED_EXCEPTION); } else close(CONNECTION_INTERRUPTED); @@ -203,7 +209,7 @@ public void close() throws IOException { } catch (Exception e) { if (!reset.get()) try { if (!(e instanceof SocketException && !Boolean.getBoolean("subdata.debug"))) { - DebugUtil.logException(e, protocol.log); + DebugUtil.logException(e, log); } if (!(e instanceof SocketException)) { if (e instanceof EncryptionException) close(ENCRYPTION_MISMATCH); // Classes that extend EncryptionException being thrown signify an ENCRYPTION_MISMATCH @@ -241,11 +247,17 @@ public void close() throws IOException { data.flush(); // Step 3 // Invoke the Packet - if (next instanceof PacketStreamOut) { - ((PacketStreamOut) next).send(this, forward); - } else forward.close(); + scheduler.run(() -> { + try { + if (next instanceof PacketStreamOut) { + ((PacketStreamOut) next).send(this, forward); + } else forward.close(); + } catch (Throwable e) { + DebugUtil.logException(e, log); + } + }); } catch (Throwable e) { - DebugUtil.logException(e, protocol.log); + DebugUtil.logException(e, log); } Util.isException(data::close); } @@ -298,7 +310,7 @@ public void close() throws IOException { } } catch (Throwable e) { Util.isException(() -> queue.remove(0)); - DebugUtil.logException(e, protocol.log); + DebugUtil.logException(e, log); if (queue.size() > 0) SubDataClient.this.write(); else queue = null; @@ -419,7 +431,7 @@ public void close() throws IOException { for (ReturnCallback next : events) try { if (next != null) result = next.run(this) != Boolean.FALSE && result; } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log); + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log); } if (result) { @@ -433,7 +445,7 @@ public void run() { if (!socket.isClosed()) try { close(CLOSE_REQUESTED); } catch (IOException e) { - DebugUtil.logException(e, protocol.log); + DebugUtil.logException(e, log); } } }, 5000); @@ -446,19 +458,22 @@ void close(DisconnectReason reason) throws IOException { if (state == CLOSING && reason == CONNECTION_INTERRUPTED) reason = CLOSE_REQUESTED; state = CLOSED; if (reason != CLOSE_REQUESTED) { - DebugUtil.logException(new SocketException("Connection closed: " + reason), protocol.log); + DebugUtil.logException(new SocketException("Connection closed: " + reason), log); } socket.close(); - protocol.log.info("Disconnected from " + socket.getRemoteSocketAddress()); - - LinkedList>> events = on.closed; - on.closed = new LinkedList<>(); - for (Callback> next : events) try { - if (next != null) next.run(new NamedContainer<>(reason, this)); - } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log); - } + log.info("Disconnected from " + socket.getRemoteSocketAddress()); + + final DisconnectReason freason = reason; + scheduler.run(() -> { + LinkedList>> events = on.closed; + on.closed = new LinkedList<>(); + for (Callback> next : events) try { + if (next != null) next.run(new NamedContainer<>(freason, this)); + } catch (Throwable e) { + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log); + } + }); } } diff --git a/Client/src/net/ME1312/SubData/Client/SubDataProtocol.java b/Client/src/net/ME1312/SubData/Client/SubDataProtocol.java index 058b49d..ceb4522 100644 --- a/Client/src/net/ME1312/SubData/Client/SubDataProtocol.java +++ b/Client/src/net/ME1312/SubData/Client/SubDataProtocol.java @@ -1,5 +1,7 @@ package net.ME1312.SubData.Client; +import net.ME1312.Galaxi.Library.Callback.Callback; +import net.ME1312.Galaxi.Library.Callback.ReturnCallback; import net.ME1312.Galaxi.Library.Util; import net.ME1312.Galaxi.Library.Version.Version; import net.ME1312.SubData.Client.Encryption.NEH; @@ -25,16 +27,11 @@ public class SubDataProtocol extends DataProtocol { final HashMap pIn = new HashMap(); ArrayList version = new ArrayList(); String name; - Logger log; /** * Create a new Protocol - * - * @param logger SubData Log Channel */ - public SubDataProtocol(Logger logger) { - log = logger; - + public SubDataProtocol() { ciphers.put("NULL", NEH.get()); ciphers.put("NONE", NEH.get()); @@ -54,12 +51,26 @@ public SubDataProtocol(Logger logger) { /** * Launch a SubData Client Instance * + * @param scheduler Event Scheduler + * @param logger Network Logger + * @param address Bind Address (or null for all) + * @param port Port Number + * @throws IOException + */ + public SubDataClient open(Callback scheduler, Logger logger, InetAddress address, int port) throws IOException { + return new SubDataClient(this, scheduler, logger, address, port); + } + + /** + * Launch a SubData Client Instance + * + * @param logger Network Logger * @param address Bind Address (or null for all) * @param port Port Number * @throws IOException */ - public SubDataClient open(InetAddress address, int port) throws IOException { - return new SubDataClient(this, address, port); + public SubDataClient open(Logger logger, InetAddress address, int port) throws IOException { + return open(Runnable::run, logger, address, port); } /** diff --git a/Server/api/src/net/ME1312/SubData/Server/DataClient.java b/Server/api/src/net/ME1312/SubData/Server/DataClient.java index 5b113c5..eb52e51 100644 --- a/Server/api/src/net/ME1312/SubData/Server/DataClient.java +++ b/Server/api/src/net/ME1312/SubData/Server/DataClient.java @@ -60,11 +60,6 @@ public final void closed(Callback>. } } - DataClient(UUID id) { - if (Util.isNull(id)) throw new NullPointerException(); - this.id = id; - } - /** * Send a message to the Client * diff --git a/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeEncryption.java b/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeEncryption.java index f707666..0635c78 100644 --- a/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeEncryption.java +++ b/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeEncryption.java @@ -43,7 +43,7 @@ public ObjectMap send(SubDataClient client) throws Throwable { Util.reflect(SubDataClient.class.getDeclaredField("cipher"), client, next.name()); return data; } else { - DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + ciphers[i] + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getServer().getProtocol())); + DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + ciphers[i] + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataServer.class.getDeclaredField("log"), client.getServer())); Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.ENCRYPTION_MISMATCH); return null; } diff --git a/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeProtocol.java b/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeProtocol.java index 2f4a56e..d4b34ab 100644 --- a/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeProtocol.java +++ b/Server/src/net/ME1312/SubData/Server/Protocol/Initial/InitPacketChangeProtocol.java @@ -9,6 +9,7 @@ import net.ME1312.SubData.Server.Protocol.PacketOut; import net.ME1312.SubData.Server.SubDataClient; import net.ME1312.SubData.Server.SubDataProtocol; +import net.ME1312.SubData.Server.SubDataServer; import java.lang.reflect.InvocationTargetException; import java.util.LinkedList; @@ -23,7 +24,7 @@ public void receive(SubDataClient client) throws Throwable { if (Util.reflect(SubDataClient.class.getDeclaredField("state"), client) == ConnectionState.INITIALIZATION) { Util.reflect(SubDataClient.class.getDeclaredField("state"), client, ConnectionState.READY); - Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getServer().getProtocol()).info(client.getAddress().toString() + " has logged in"); + Util.reflect(SubDataServer.class.getDeclaredField("log"), client.getServer()).info(client.getAddress().toString() + " has logged in"); LinkedList queue = Util.reflect(SubDataClient.class.getDeclaredField("prequeue"), client); if (queue.size() > 0) { @@ -36,7 +37,7 @@ public void receive(SubDataClient client) throws Throwable { for (Callback next : events) try { if (next != null) next.run(client); } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getServer().getProtocol())); + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataServer.class.getDeclaredField("log"), client.getServer())); } } } diff --git a/Server/src/net/ME1312/SubData/Server/SubDataClient.java b/Server/src/net/ME1312/SubData/Server/SubDataClient.java index 6bc442f..ff851db 100644 --- a/Server/src/net/ME1312/SubData/Server/SubDataClient.java +++ b/Server/src/net/ME1312/SubData/Server/SubDataClient.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.*; +import java.util.logging.Logger; import static net.ME1312.SubData.Server.Library.ConnectionState.*; import static net.ME1312.SubData.Server.Library.DisconnectReason.*; @@ -37,7 +38,6 @@ * SubData Client Class */ public class SubDataClient extends DataClient { - private UUID id; private Socket socket; private InetSocketAddress address; private ClientHandler handler; @@ -50,11 +50,9 @@ public class SubDataClient extends DataClient { private ConnectionState state; private Timer timeout; - SubDataClient(SubDataServer subdata, UUID id, Socket client) throws IOException { - super(id); + SubDataClient(SubDataServer subdata, Socket client) throws IOException { if (Util.isNull(subdata, client)) throw new NullPointerException(); this.subdata = subdata; - this.id = id; state = PRE_INITIALIZATION; socket = client; out = client.getOutputStream(); @@ -68,7 +66,7 @@ public void run() { if (state.asInt() < READY.asInt()) try { close(INITIALIZATION_TIMEOUT); } catch (IOException e) { - DebugUtil.logException(e, subdata.protocol.log); + DebugUtil.logException(e, subdata.log); } } }, 15000); @@ -97,7 +95,7 @@ private void read(Container reset, InputStream data) { // Step 4 // Create a detached data forwarding InputStream if (state != CLOSED && id >= 0 && version >= 0) { - try (InputStream forward = new InputStream() { + InputStream forward = new InputStream() { boolean open = true; @Override @@ -114,40 +112,39 @@ public void close() throws IOException { open = false; while (data.read() != -1); } - }) { - HashMap pIn = (state.asInt() >= READY.asInt())?subdata.protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null); - if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); - PacketIn packet = pIn.get(id); - if (!packet.isCompatable(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); - - // Step 5 // Invoke the Packet - if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) { - throw new IllegalStateException("Only InitPacketDeclaration may be received during the PRE_INITIALIZATION stage"); - } else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) { - forward.close(); - } else { - packet.receive(this); - if (packet instanceof PacketStreamIn) { - try { + }; + HashMap pIn = (state.asInt() >= READY.asInt())?subdata.protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null); + if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); + PacketIn packet = pIn.get(id); + if (!packet.isCompatable(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"); + + // Step 5 // Invoke the Packet + if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) { + DebugUtil.logException(new IllegalStateException("Only InitPacketDeclaration may be received during the PRE_INITIALIZATION stage"), subdata.log); + close(PROTOCOL_MISMATCH); + } else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) { + forward.close(); + } else { + subdata.scheduler.run(() -> { + try { + packet.receive(this); + + if (packet instanceof PacketStreamIn) { ((PacketStreamIn) packet).receive(this, forward); - } catch (Throwable e) { - throw new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"); - } - } else { - forward.close(); - } - } - } catch (Throwable e) { - DebugUtil.logException(e, subdata.protocol.log); + } else forward.close(); + } catch (Throwable e) { + DebugUtil.logException(new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"), subdata.log); - if (state.asInt() <= INITIALIZATION.asInt()) - close(PROTOCOL_MISMATCH); // Issues during the init stages are signs of a PROTOCOL_MISMATCH + if (state.asInt() <= INITIALIZATION.asInt()) + Util.isException(() -> close(PROTOCOL_MISMATCH)); // Issues during the init stages are signs of a PROTOCOL_MISMATCH + } + }); } } } catch (Exception e) { if (!reset.get()) try { if (!(e instanceof SocketException && !Boolean.getBoolean("subdata.debug"))) { - DebugUtil.logException(e, subdata.protocol.log); + DebugUtil.logException(e, subdata.log); } if (!(e instanceof SocketException)) { close(UNHANDLED_EXCEPTION); } else close(CONNECTION_INTERRUPTED); @@ -220,7 +217,7 @@ public void close() throws IOException { } catch (Exception e) { if (!reset.get()) try { if (!(e instanceof SocketException && !Boolean.getBoolean("subdata.debug"))) { - DebugUtil.logException(e, subdata.protocol.log); + DebugUtil.logException(e, subdata.log); } if (!(e instanceof SocketException)) { if (e instanceof EncryptionException) close(ENCRYPTION_MISMATCH); // Classes that extend EncryptionException being thrown signify an ENCRYPTION_MISMATCH @@ -258,11 +255,17 @@ public void close() throws IOException { data.flush(); // Step 3 // Invoke the Packet - if (next instanceof PacketStreamOut) { - ((PacketStreamOut) next).send(this, forward); - } else forward.close(); + subdata.scheduler.run(() -> { + try { + if (next instanceof PacketStreamOut) { + ((PacketStreamOut) next).send(this, forward); + } else forward.close(); + } catch (Throwable e) { + DebugUtil.logException(e, subdata.log); + } + }); } catch (Throwable e) { - DebugUtil.logException(e, subdata.protocol.log); + DebugUtil.logException(e, subdata.log); } Util.isException(data::close); } @@ -315,7 +318,7 @@ public void close() throws IOException { } } catch (Throwable e) { Util.isException(() -> queue.remove(0)); - DebugUtil.logException(e, subdata.protocol.log); + DebugUtil.logException(e, subdata.log); if (queue.size() > 0) SubDataClient.this.write(); else queue = null; @@ -400,7 +403,7 @@ public void close() throws IOException { for (ReturnCallback next : events) try { if (next != null) result = next.run(this) != Boolean.FALSE && result; } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), subdata.protocol.log); + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), subdata.log); } if (result) { @@ -414,7 +417,7 @@ public void run() { if (!socket.isClosed()) try { close(CLOSE_REQUESTED); } catch (IOException e) { - DebugUtil.logException(e, subdata.protocol.log); + DebugUtil.logException(e, subdata.log); } } }, 5000); @@ -426,7 +429,7 @@ void close(DisconnectReason reason) throws IOException { if (state == CLOSING && reason == CONNECTION_INTERRUPTED) reason = CLOSE_REQUESTED; state = CLOSED; if (reason != CLOSE_REQUESTED) { - DebugUtil.logException(new SocketException("Connection closed: " + reason), subdata.protocol.log); + DebugUtil.logException(new SocketException("Connection closed: " + reason), subdata.log); } if (!socket.isClosed()) getSocket().close(); @@ -436,13 +439,16 @@ void close(DisconnectReason reason) throws IOException { } if (subdata.getClients().values().contains(this)) subdata.removeClient(this); - LinkedList>> events = on.closed; - on.closed = new LinkedList<>(); - for (Callback> next : events) try { - if (next != null) next.run(new NamedContainer<>(reason, this)); - } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), subdata.protocol.log); - } + final DisconnectReason freason = reason; + subdata.scheduler.run(() -> { + LinkedList>> events = on.closed; + on.closed = new LinkedList<>(); + for (Callback> next : events) try { + if (next != null) next.run(new NamedContainer<>(freason, this)); + } catch (Throwable e) { + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), subdata.log); + } + }); } } diff --git a/Server/src/net/ME1312/SubData/Server/SubDataProtocol.java b/Server/src/net/ME1312/SubData/Server/SubDataProtocol.java index 9b96f52..cece634 100644 --- a/Server/src/net/ME1312/SubData/Server/SubDataProtocol.java +++ b/Server/src/net/ME1312/SubData/Server/SubDataProtocol.java @@ -1,5 +1,6 @@ package net.ME1312.SubData.Server; +import net.ME1312.Galaxi.Library.Callback.Callback; import net.ME1312.Galaxi.Library.Util; import net.ME1312.Galaxi.Library.Version.Version; import net.ME1312.SubData.Server.Encryption.NEH; @@ -25,16 +26,11 @@ public class SubDataProtocol extends DataProtocol { int MAX_QUEUE = 64; Version version; String name; - Logger log; /** * Create a new Protocol - * - * @param logger SubData Log Channel */ - public SubDataProtocol(Logger logger) { - log = logger; - + public SubDataProtocol() { ciphers.put("NULL", NEH.get()); ciphers.put("NONE", NEH.get()); @@ -54,13 +50,28 @@ public SubDataProtocol(Logger logger) { /** * SubData Server Instance * + * @param scheduler Event Scheduler + * @param logger Network Logger + * @param address Bind Address (or null for all) + * @param port Port Number + * @param cipher Cipher (or null for none) + * @throws IOException + */ + public SubDataServer open(Callback scheduler, Logger logger, InetAddress address, int port, String cipher) throws IOException { + return new SubDataServer(this, scheduler, logger, address, port, cipher); + } + + /** + * SubData Server Instance + * + * @param logger Network Logger * @param address Bind Address (or null for all) * @param port Port Number * @param cipher Cipher (or null for none) * @throws IOException */ - public SubDataServer open(InetAddress address, int port, String cipher) throws IOException { - return new SubDataServer(this, address, port, cipher); + public SubDataServer open(Logger logger, InetAddress address, int port, String cipher) throws IOException { + return open(Runnable::run, logger, address, port, cipher); } /** diff --git a/Server/src/net/ME1312/SubData/Server/SubDataServer.java b/Server/src/net/ME1312/SubData/Server/SubDataServer.java index fcf036c..8f444a8 100644 --- a/Server/src/net/ME1312/SubData/Server/SubDataServer.java +++ b/Server/src/net/ME1312/SubData/Server/SubDataServer.java @@ -13,6 +13,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.*; import java.util.*; +import java.util.logging.Logger; /** * SubData Server Class @@ -22,9 +23,11 @@ public class SubDataServer extends DataServer { private ServerSocket server; private String address; SubDataProtocol protocol; + Callback scheduler; String cipher; + Logger log; - SubDataServer(SubDataProtocol protocol, InetAddress address, int port, String cipher) throws IOException { + SubDataServer(SubDataProtocol protocol, Callback scheduler, Logger log, InetAddress address, int port, String cipher) throws IOException { if (Util.isNull(protocol)) throw new NullPointerException(); if (address == null) { this.server = new ServerSocket(port, protocol.MAX_QUEUE); @@ -36,6 +39,8 @@ public class SubDataServer extends DataServer { whitelist(address.getHostAddress()); } this.protocol = protocol; + this.scheduler = scheduler; + this.log = log; this.cipher = cipher = (cipher != null)?cipher:"NULL"; // Validate Cipher String[] ciphers = (cipher.contains("/"))?cipher.split("/"):new String[]{cipher}; @@ -54,14 +59,14 @@ public class SubDataServer extends DataServer { throw new EncryptionException("Unknown encryption type \"" + next + "\" in \"" + this.cipher + '\"'); } - protocol.log.info("Listening on " + this.address); + log.info("Listening on " + this.address); new Thread(() -> { while (!server.isClosed()) { try { addClient(server.accept()); } catch (IOException e) { if (!(e instanceof SocketException)) { - DebugUtil.logException(e, protocol.log); + DebugUtil.logException(e, log); } } } @@ -90,33 +95,42 @@ public SubDataProtocol getProtocol() { private SubDataClient addClient(Socket socket) throws IOException { if (Util.isNull(socket)) throw new NullPointerException(); if (isWhitelisted(socket.getInetAddress())) { - UUID id = Util.getNew(clients.keySet(), UUID::randomUUID); - SubDataClient client = new SubDataClient(this, id, socket); - - boolean result = true; - LinkedList> events = on.connect; - on.connect = new LinkedList<>(); - for (ReturnCallback next : events) try { - if (next != null) result = next.run(client) != Boolean.FALSE && result; - } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log); - } + SubDataClient client = addClient(new SubDataClient(this, socket)); + if (client != null) client.read(); + return client; + } else { + log.info(socket.getInetAddress().toString() + " attempted to connect, but isn't white-listed"); + socket.close(); + return null; + } + } - if (result) { - clients.put(id, client); - protocol.log.info(client.getAddress().toString() + " has connected"); + /** + * Add a Client to the Network + * + * @param client Client to add + * @throws IOException + */ + private SubDataClient addClient(SubDataClient client) throws IOException { + boolean result = true; + Util.isException(() -> Util.reflect(DataClient.class.getDeclaredField("id"), client, Util.getNew(clients.keySet(), UUID::randomUUID))); + LinkedList> events = on.connect; + on.connect = new LinkedList<>(); + for (ReturnCallback next : events) try { + if (next != null) result = next.run(client) != Boolean.FALSE && result; + } catch (Throwable e) { + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log); + } - client.sendPacket(new InitPacketDeclaration()); - client.read(); - return client; - } else { - client.close(DisconnectReason.CLOSE_REQUESTED); - protocol.log.info(socket.getInetAddress().toString() + " attempted to connect, but was blocked"); - return null; - } + if (result) { + clients.put(client.getID(), client); + log.info(client.getAddress().toString() + " has connected"); + + client.sendPacket(new InitPacketDeclaration()); + return client; } else { - protocol.log.info(socket.getInetAddress().toString() + " attempted to connect, but isn't white-listed"); - socket.close(); + client.close(DisconnectReason.CLOSE_REQUESTED); + log.info(client.getAddress().toString() + " attempted to connect, but was blocked"); return null; } } @@ -141,7 +155,7 @@ public void removeClient(UUID id) throws IOException { SubDataClient client = clients.get(id); clients.remove(id); client.close(); - protocol.log.info(client.getAddress().toString() + " has disconnected"); + log.info(client.getAddress().toString() + " has disconnected"); } } @@ -152,7 +166,7 @@ public void close() throws IOException { for (ReturnCallback next : events) try { if (next != null) result = next.run(this) != Boolean.FALSE && result; } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log); + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log); } if (result) { @@ -163,14 +177,16 @@ public void close() throws IOException { } server.close(); - LinkedList> events2 = on.closed; - for (Callback next : events2) try { - if (next != null) next.run(this); - } catch (Throwable e) { - DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log); - } + scheduler.run(() -> { + LinkedList> events2 = on.closed; + for (Callback next : events2) try { + if (next != null) next.run(this); + } catch (Throwable e) { + DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log); + } + }); - protocol.log.info("Listener " + this.address + " has been closed"); + log.info("Listener " + this.address + " has been closed"); } }