Skip to content

Commit

Permalink
Allow the Event Scheduler to be changed
Browse files Browse the repository at this point in the history
  • Loading branch information
ME1312 committed Apr 27, 2019
1 parent 90eb3a9 commit 70c3d7c
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void receive(SubDataClient client, ObjectMap<Integer> data) throws Throwa
client.getSocket().getOutputStream().flush();
client.sendPacket(this);
} else {
DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + cipher + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol()));
DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + cipher + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataClient.class.getDeclaredField("log"), client));
Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.ENCRYPTION_MISMATCH);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void send(SubDataClient client, OutputStream data) throws Throwable {
if (Util.reflect(SubDataClient.class.getDeclaredField("state"), client) == ConnectionState.INITIALIZATION) {
Util.reflect(SubDataClient.class.getDeclaredField("state"), client, ConnectionState.READY);

Util.<Logger>reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol()).info("Logged into " + client.getAddress().toString());
Util.<Logger>reflect(SubDataClient.class.getDeclaredField("log"), client).info("Logged into " + client.getAddress().toString());

LinkedList<PacketOut> queue = Util.reflect(SubDataClient.class.getDeclaredField("prequeue"), client);
if (queue.size() > 0) {
Expand All @@ -38,7 +38,7 @@ public void send(SubDataClient client, OutputStream data) throws Throwable {
for (Callback<DataClient> next : events) try {
if (next != null) next.run(client);
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol()));
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataClient.class.getDeclaredField("log"), client));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public void receive(SubDataClient client, ObjectMap<Integer> data) throws Throwa
Util.reflect(DataClient.class.getDeclaredField("id"), client, clientID);
client.sendPacket(this);
} else {
DebugUtil.logException(new IllegalArgumentException("Protocol version mismatch: [" + version + "] is not one of " + versions.toString()), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol()));
DebugUtil.logException(new IllegalArgumentException("Protocol version mismatch: [" + version + "] is not one of " + versions.toString()), Util.reflect(SubDataClient.class.getDeclaredField("log"), client));
Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.PROTOCOL_MISMATCH);
}
} else {
DebugUtil.logException(new IllegalArgumentException("Protocol mismatch: [" + name + "] != [" + client.getProtocol().getName() + "]"), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getProtocol()));
DebugUtil.logException(new IllegalArgumentException("Protocol mismatch: [" + name + "] != [" + client.getProtocol().getName() + "]"), Util.reflect(SubDataClient.class.getDeclaredField("log"), client));
Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.PROTOCOL_MISMATCH);
}
}
Expand Down
107 changes: 61 additions & 46 deletions Client/src/net/ME1312/SubData/Client/SubDataClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;
import java.util.logging.Logger;

import static net.ME1312.SubData.Client.Library.ConnectionState.*;
import static net.ME1312.SubData.Client.Library.DisconnectReason.*;
Expand All @@ -45,17 +46,21 @@ public class SubDataClient extends DataClient {
private Cipher cipher = NEH.get();
private int cipherlevel = 0;
private ConnectionState state;
private Callback<Runnable> scheduler;
private Logger log;

SubDataClient(SubDataProtocol protocol, InetAddress address, int port) throws IOException {
SubDataClient(SubDataProtocol protocol, Callback<Runnable> scheduler, Logger log, InetAddress address, int port) throws IOException {
if (Util.isNull(address, port)) throw new NullPointerException();
this.protocol = protocol;
this.scheduler = scheduler;
this.log = log;
this.state = PRE_INITIALIZATION;
this.socket = new Socket(address, port);
this.out = socket.getOutputStream();
this.queue = null;
this.prequeue = new LinkedList<>();

protocol.log.info("Connected to " + socket.getRemoteSocketAddress());
log.info("Connected to " + socket.getRemoteSocketAddress());
read();
}

Expand All @@ -82,7 +87,7 @@ private void read(Container<Boolean> reset, InputStream data) {

// Step 4 // Create a detached data forwarding InputStream
if (state != CLOSED && id >= 0 && version >= 0) {
try (InputStream forward = new InputStream() {
InputStream forward = new InputStream() {
boolean open = true;

@Override
Expand All @@ -99,38 +104,39 @@ public void close() throws IOException {
open = false;
while (data.read() != -1);
}
}) {
HashMap<Integer, PacketIn> pIn = (state.asInt() >= READY.asInt())?protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null);
if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
PacketIn packet = pIn.get(id);
if (!packet.isCompatable(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");

// Step 5 // Invoke the Packet
if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) {
throw new IllegalStateException("Only InitPacketDeclaration may be received during the PRE_INITIALIZATION stage");
} else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) {
forward.close();
} else {
packet.receive(this);
if (packet instanceof PacketStreamIn) {
try {
};
HashMap<Integer, PacketIn> pIn = (state.asInt() >= READY.asInt())?protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null);
if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
PacketIn packet = pIn.get(id);
if (!packet.isCompatable(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");

// Step 5 // Invoke the Packet
if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) {
DebugUtil.logException(new IllegalStateException("Only InitPacketDeclaration may be received during the PRE_INITIALIZATION stage"), log);
close(PROTOCOL_MISMATCH);
} else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) {
forward.close();
} else {
scheduler.run(() -> {
try {
packet.receive(this);

if (packet instanceof PacketStreamIn) {
((PacketStreamIn) packet).receive(this, forward);
} catch (Throwable e) {
throw new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler");
}
} else forward.close();
}
} catch (Throwable e) {
DebugUtil.logException(e, protocol.log);
} else forward.close();
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"), log);

if (state.asInt() <= INITIALIZATION.asInt())
close(PROTOCOL_MISMATCH); // Issues during the init stages are signs of a PROTOCOL_MISMATCH
if (state.asInt() <= INITIALIZATION.asInt())
Util.isException(() -> close(PROTOCOL_MISMATCH)); // Issues during the init stages are signs of a PROTOCOL_MISMATCH
}
});
}
}
} catch (Exception e) {
if (!reset.get()) try {
if (!(e instanceof SocketException && !Boolean.getBoolean("subdata.debug"))) {
DebugUtil.logException(e, protocol.log);
DebugUtil.logException(e, log);
} if (!(e instanceof SocketException)) {
close(UNHANDLED_EXCEPTION);
} else close(CONNECTION_INTERRUPTED);
Expand Down Expand Up @@ -203,7 +209,7 @@ public void close() throws IOException {
} catch (Exception e) {
if (!reset.get()) try {
if (!(e instanceof SocketException && !Boolean.getBoolean("subdata.debug"))) {
DebugUtil.logException(e, protocol.log);
DebugUtil.logException(e, log);
} if (!(e instanceof SocketException)) {
if (e instanceof EncryptionException)
close(ENCRYPTION_MISMATCH); // Classes that extend EncryptionException being thrown signify an ENCRYPTION_MISMATCH
Expand Down Expand Up @@ -241,11 +247,17 @@ public void close() throws IOException {
data.flush();

// Step 3 // Invoke the Packet
if (next instanceof PacketStreamOut) {
((PacketStreamOut) next).send(this, forward);
} else forward.close();
scheduler.run(() -> {
try {
if (next instanceof PacketStreamOut) {
((PacketStreamOut) next).send(this, forward);
} else forward.close();
} catch (Throwable e) {
DebugUtil.logException(e, log);
}
});
} catch (Throwable e) {
DebugUtil.logException(e, protocol.log);
DebugUtil.logException(e, log);
}
Util.isException(data::close);
}
Expand Down Expand Up @@ -298,7 +310,7 @@ public void close() throws IOException {
}
} catch (Throwable e) {
Util.isException(() -> queue.remove(0));
DebugUtil.logException(e, protocol.log);
DebugUtil.logException(e, log);

if (queue.size() > 0) SubDataClient.this.write();
else queue = null;
Expand Down Expand Up @@ -419,7 +431,7 @@ public void close() throws IOException {
for (ReturnCallback<DataClient, Boolean> next : events) try {
if (next != null) result = next.run(this) != Boolean.FALSE && result;
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log);
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log);
}

if (result) {
Expand All @@ -433,7 +445,7 @@ public void run() {
if (!socket.isClosed()) try {
close(CLOSE_REQUESTED);
} catch (IOException e) {
DebugUtil.logException(e, protocol.log);
DebugUtil.logException(e, log);
}
}
}, 5000);
Expand All @@ -446,19 +458,22 @@ void close(DisconnectReason reason) throws IOException {
if (state == CLOSING && reason == CONNECTION_INTERRUPTED) reason = CLOSE_REQUESTED;
state = CLOSED;
if (reason != CLOSE_REQUESTED) {
DebugUtil.logException(new SocketException("Connection closed: " + reason), protocol.log);
DebugUtil.logException(new SocketException("Connection closed: " + reason), log);
}

socket.close();
protocol.log.info("Disconnected from " + socket.getRemoteSocketAddress());

LinkedList<Callback<NamedContainer<DisconnectReason, DataClient>>> events = on.closed;
on.closed = new LinkedList<>();
for (Callback<NamedContainer<DisconnectReason, DataClient>> next : events) try {
if (next != null) next.run(new NamedContainer<>(reason, this));
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), protocol.log);
}
log.info("Disconnected from " + socket.getRemoteSocketAddress());

final DisconnectReason freason = reason;
scheduler.run(() -> {
LinkedList<Callback<NamedContainer<DisconnectReason, DataClient>>> events = on.closed;
on.closed = new LinkedList<>();
for (Callback<NamedContainer<DisconnectReason, DataClient>> next : events) try {
if (next != null) next.run(new NamedContainer<>(freason, this));
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), log);
}
});
}
}

Expand Down
27 changes: 19 additions & 8 deletions Client/src/net/ME1312/SubData/Client/SubDataProtocol.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package net.ME1312.SubData.Client;

import net.ME1312.Galaxi.Library.Callback.Callback;
import net.ME1312.Galaxi.Library.Callback.ReturnCallback;
import net.ME1312.Galaxi.Library.Util;
import net.ME1312.Galaxi.Library.Version.Version;
import net.ME1312.SubData.Client.Encryption.NEH;
Expand All @@ -25,16 +27,11 @@ public class SubDataProtocol extends DataProtocol {
final HashMap<Integer, PacketIn> pIn = new HashMap<Integer, PacketIn>();
ArrayList<Version> version = new ArrayList<Version>();
String name;
Logger log;

/**
* Create a new Protocol
*
* @param logger SubData Log Channel
*/
public SubDataProtocol(Logger logger) {
log = logger;

public SubDataProtocol() {
ciphers.put("NULL", NEH.get());
ciphers.put("NONE", NEH.get());

Expand All @@ -54,12 +51,26 @@ public SubDataProtocol(Logger logger) {
/**
* Launch a SubData Client Instance
*
* @param scheduler Event Scheduler
* @param logger Network Logger
* @param address Bind Address (or null for all)
* @param port Port Number
* @throws IOException
*/
public SubDataClient open(Callback<Runnable> scheduler, Logger logger, InetAddress address, int port) throws IOException {
return new SubDataClient(this, scheduler, logger, address, port);
}

/**
* Launch a SubData Client Instance
*
* @param logger Network Logger
* @param address Bind Address (or null for all)
* @param port Port Number
* @throws IOException
*/
public SubDataClient open(InetAddress address, int port) throws IOException {
return new SubDataClient(this, address, port);
public SubDataClient open(Logger logger, InetAddress address, int port) throws IOException {
return open(Runnable::run, logger, address, port);
}

/**
Expand Down
5 changes: 0 additions & 5 deletions Server/api/src/net/ME1312/SubData/Server/DataClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public final void closed(Callback<NamedContainer<DisconnectReason, DataClient>>.
}
}

DataClient(UUID id) {
if (Util.isNull(id)) throw new NullPointerException();
this.id = id;
}

/**
* Send a message to the Client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ObjectMap<Integer> send(SubDataClient client) throws Throwable {
Util.reflect(SubDataClient.class.getDeclaredField("cipher"), client, next.name());
return data;
} else {
DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + ciphers[i] + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getServer().getProtocol()));
DebugUtil.logException(new EncryptionException("Unknown encryption type \"" + ciphers[i] + '\"' + ((i <= 0)?"":" in \"" + last + '\"')), Util.reflect(SubDataServer.class.getDeclaredField("log"), client.getServer()));
Util.reflect(SubDataClient.class.getDeclaredMethod("close", DisconnectReason.class), client, DisconnectReason.ENCRYPTION_MISMATCH);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import net.ME1312.SubData.Server.Protocol.PacketOut;
import net.ME1312.SubData.Server.SubDataClient;
import net.ME1312.SubData.Server.SubDataProtocol;
import net.ME1312.SubData.Server.SubDataServer;

import java.lang.reflect.InvocationTargetException;
import java.util.LinkedList;
Expand All @@ -23,7 +24,7 @@ public void receive(SubDataClient client) throws Throwable {
if (Util.reflect(SubDataClient.class.getDeclaredField("state"), client) == ConnectionState.INITIALIZATION) {
Util.reflect(SubDataClient.class.getDeclaredField("state"), client, ConnectionState.READY);

Util.<Logger>reflect(SubDataProtocol.class.getDeclaredField("log"), client.getServer().getProtocol()).info(client.getAddress().toString() + " has logged in");
Util.<Logger>reflect(SubDataServer.class.getDeclaredField("log"), client.getServer()).info(client.getAddress().toString() + " has logged in");

LinkedList<PacketOut> queue = Util.reflect(SubDataClient.class.getDeclaredField("prequeue"), client);
if (queue.size() > 0) {
Expand All @@ -36,7 +37,7 @@ public void receive(SubDataClient client) throws Throwable {
for (Callback<DataClient> next : events) try {
if (next != null) next.run(client);
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataProtocol.class.getDeclaredField("log"), client.getServer().getProtocol()));
DebugUtil.logException(new InvocationTargetException(e, "Unhandled exception while running SubData Event"), Util.reflect(SubDataServer.class.getDeclaredField("log"), client.getServer()));
}
}
}
Expand Down
Loading

0 comments on commit 70c3d7c

Please sign in to comment.