Skip to content

Commit

Permalink
Bring Client & Server up to eachother's standards
Browse files Browse the repository at this point in the history
These slightly different classes have slight differences and some of them were unintentional :(
  • Loading branch information
ME1312 committed Jan 24, 2021
1 parent 20ad9ab commit 1127e97
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
40 changes: 21 additions & 19 deletions Client/src/net/ME1312/SubData/Client/SubDataClient.java
Expand Up @@ -38,6 +38,7 @@
*/
public class SubDataClient extends DataClient implements SubDataSender {
private Socket socket;
private InetSocketAddress address;
private InputStreamL1 in;
private OutputStreamL1 out;
private ExecutorService writer;
Expand Down Expand Up @@ -66,10 +67,11 @@ public class SubDataClient extends DataClient implements SubDataSender {
this.state = PRE_INITIALIZATION;
this.isdcr = PROTOCOL_MISMATCH;
this.socket = new Socket(address, port);
this.writer = Executors.newSingleThreadExecutor(r -> new Thread(r, "SubDataClient::Data_Writer(" + socket.getLocalSocketAddress() + ')'));
this.out = new OutputStreamL1(log, socket.getOutputStream(), bs, () -> close(CONNECTION_INTERRUPTED), "SubDataClient::Block_Writer(" + socket.getLocalSocketAddress().toString() + ')');
this.address = new InetSocketAddress(socket.getInetAddress(), socket.getPort());
this.writer = Executors.newSingleThreadExecutor(r -> new Thread(r, "SubDataClient::Data_Writer(" + this.address.toString() + ')'));
this.out = new OutputStreamL1(log, socket.getOutputStream(), bs, () -> close(CONNECTION_INTERRUPTED), "SubDataClient::Block_Writer(" + this.address.toString() + ')');
this.in = new InputStreamL1(new BufferedInputStream(socket.getInputStream(), bs), () -> close(CONNECTION_INTERRUPTED), e -> {
DebugUtil.logException(new IllegalStateException(getAddress().toString() + ": Received invalid L1 control character: " + DebugUtil.toHex(0xFF, e)), log);
DebugUtil.logException(new IllegalStateException(this.address.toString() + ": Received invalid L1 control character: " + DebugUtil.toHex(0xFF, e)), log);
close(PROTOCOL_MISMATCH);
});
this.statequeue = new HashMap<>();
Expand All @@ -80,7 +82,7 @@ public class SubDataClient extends DataClient implements SubDataSender {
port,
login
};
heartbeat = new Timer("SubDataServer::Connection_Heartbeat(" + address.toString() + ')');
heartbeat = new Timer("SubDataClient::Connection_Heartbeat(" + this.address.toString() + ')');
heartbeat.schedule(new TimerTask() {
@Override
public void run() {
Expand Down Expand Up @@ -153,21 +155,21 @@ public void close() throws IOException {
}
};
if (state == PRE_INITIALIZATION && id != 0x0000) {
DebugUtil.logException(new IllegalStateException(getAddress().toString() + ": Only InitPacketDeclaration (0x0000) may be received during the PRE_INITIALIZATION stage: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"), log);
DebugUtil.logException(new IllegalStateException(address.toString() + ": Only InitPacketDeclaration (0x0000) may be received during the PRE_INITIALIZATION stage: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"), log);
close(PROTOCOL_MISMATCH);
} else if (state == CLOSING && id != 0xFFFE) {
forward.close(); // Suppress other packets during the CLOSING stage
} else {
HashMap<Integer, PacketIn> pIn = (state.asInt() >= POST_INITIALIZATION.asInt())?protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null);
if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (!pIn.keySet().contains(id)) throw new IllegalPacketException(address.toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
PacketIn packet = pIn.get(id);
if (sender instanceof ForwardedDataSender && !(packet instanceof Forwardable)) throw new IllegalSenderException(getAddress().toString() + ": The handler does not support forwarded packets: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (sender instanceof SubDataClient && packet instanceof ForwardOnly) throw new IllegalSenderException(getAddress().toString() + ": The handler does not support non-forwarded packets: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (!packet.isCompatible(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (sender instanceof ForwardedDataSender && !(packet instanceof Forwardable)) throw new IllegalSenderException(address.toString() + ": The handler does not support forwarded packets: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (sender instanceof SubDataClient && packet instanceof ForwardOnly) throw new IllegalSenderException(address.toString() + ": The handler does not support non-forwarded packets: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (!packet.isCompatible(version)) throw new IllegalPacketException(address.toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");

// Step 5 // Invoke the Packet
if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) {
DebugUtil.logException(new IllegalStateException(getAddress().toString() + ": Only " + InitPacketDeclaration.class.getCanonicalName() + " may be received during the PRE_INITIALIZATION stage: " + packet.getClass().getCanonicalName()), log);
DebugUtil.logException(new IllegalStateException(address.toString() + ": Only " + InitPacketDeclaration.class.getCanonicalName() + " may be received during the PRE_INITIALIZATION stage: " + packet.getClass().getCanonicalName()), log);
close(PROTOCOL_MISMATCH);
} else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) {
forward.close(); // Suppress other packets during the CLOSING stage
Expand All @@ -181,7 +183,7 @@ public void close() throws IOException {
((PacketStreamIn) packet).receive(sender, forward);
} else forward.close();
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"), log);
DebugUtil.logException(new InvocationTargetException(e, address.toString() + ": Exception while running packet handler"), log);
Util.isException(forward::close);

if (state.asInt() <= INITIALIZATION.asInt())
Expand Down Expand Up @@ -218,7 +220,7 @@ void read() {

PipedInputStream data = new PipedInputStream(1024);
PipedOutputStream forward = new PipedOutputStream(data);
Thread reader = new Thread(() -> read(this, reset, data), "SubDataClient::Packet_Reader(" + socket.getLocalSocketAddress().toString() + ')');
Thread reader = new Thread(() -> read(this, reset, data), "SubDataClient::Packet_Reader(" + address.toString() + ')');
readc = reader::interrupt;
reader.start();

Expand All @@ -237,7 +239,7 @@ void read() {
} else close(CONNECTION_INTERRUPTED);
}
}
}, "SubDataClient::Data_Reader(" + socket.getLocalSocketAddress().toString() + ')').start();
}, "SubDataClient::Data_Reader(" + address.toString() + ')').start();
}

private void write(SubDataSender sender, PacketOut next, OutputStream data) {
Expand All @@ -263,8 +265,8 @@ public void close() throws IOException {
};
// Step 2 // Write the Packet Metadata
HashMap<Class<? extends PacketOut>, Integer> pOut = (state.asInt() >= POST_INITIALIZATION.asInt())?protocol.pOut:Util.reflect(InitialProtocol.class.getDeclaredField("pOut"), null);
if (!pOut.keySet().contains(next.getClass())) throw new IllegalMessageException(getAddress().toString() + ": Could not find ID for packet: " + next.getClass().getCanonicalName());
if (next.version() > 65535 || next.version() < 0) throw new IllegalMessageException(getAddress().toString() + ": Packet version is not in range (0x0000 to 0xFFFF): " + next.getClass().getCanonicalName());
if (!pOut.keySet().contains(next.getClass())) throw new IllegalMessageException(address.toString() + ": Could not find ID for packet: " + next.getClass().getCanonicalName());
if (next.version() > 65535 || next.version() < 0) throw new IllegalMessageException(address.toString() + ": Packet version is not in range (0x0000 to 0xFFFF): " + next.getClass().getCanonicalName());

data.write(UnsignedDataHandler.toUnsigned((long) pOut.get(next.getClass()), 2));
data.write(UnsignedDataHandler.toUnsigned((long) next.version(), 2));
Expand All @@ -279,7 +281,7 @@ public void close() throws IOException {

next.sending(sender);
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet writer"), log);
DebugUtil.logException(new InvocationTargetException(e, address.toString() + ": Exception while running packet writer"), log);
Util.isException(forward::close);
}
});
Expand All @@ -296,7 +298,7 @@ void write(PacketOut packet) {
try {
PipedOutputStream data = new PipedOutputStream();
PipedInputStream forward = new PipedInputStream(data, 1024);
new Thread(() -> write(this, packet, data), "SubDataClient::Packet_Writer(" + socket.getLocalSocketAddress().toString() + ')').start();
new Thread(() -> write(this, packet, data), "SubDataClient::Packet_Writer(" + address.toString() + ')').start();

// Step 4 // Encrypt the Data
cipher.encrypt(this, forward, out);
Expand Down Expand Up @@ -465,7 +467,7 @@ public SubDataProtocol getProtocol() {
}

public InetSocketAddress getAddress() {
return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
return address;
}

/**
Expand Down Expand Up @@ -526,7 +528,7 @@ public void close() {
state = CLOSING;
if (!isClosed()) sendPacket(new PacketDisconnect());

Timer timeout = new Timer("SubDataClient::Disconnect_Timeout(" + socket.getLocalSocketAddress().toString() + ')');
Timer timeout = new Timer("SubDataClient::Disconnect_Timeout(" + address.toString() + ')');
timeout.schedule(new TimerTask() {
@Override
public void run() {
Expand Down
28 changes: 14 additions & 14 deletions Server/src/net/ME1312/SubData/Server/SubDataClient.java
Expand Up @@ -61,7 +61,7 @@ public class SubDataClient extends DataClient {
writer = Executors.newSingleThreadExecutor(r -> new Thread(r, "SubDataServer::Data_Writer(" + address.toString() + ')'));
out = new OutputStreamL1(subdata.log, client.getOutputStream(), bs, () -> close(CONNECTION_INTERRUPTED), "SubDataServer::Block_Writer(" + address.toString() + ')');
in = new InputStreamL1(new BufferedInputStream(client.getInputStream(), bs), () -> close(CONNECTION_INTERRUPTED), e -> {
DebugUtil.logException(new IllegalStateException(getAddress().toString() + ": Received invalid L1 control character: " + DebugUtil.toHex(0xFF, e)), subdata.log);
DebugUtil.logException(new IllegalStateException(address.toString() + ": Received invalid L1 control character: " + DebugUtil.toHex(0xFF, e)), subdata.log);
close(PROTOCOL_MISMATCH);
});
statequeue = new HashMap<>();
Expand Down Expand Up @@ -146,19 +146,19 @@ public void close() throws IOException {
}
};
if (state == PRE_INITIALIZATION && id != 0x0000) {
DebugUtil.logException(new IllegalStateException(getAddress().toString() + ": Only InitPacketDeclaration (0x0000) may be received during the PRE_INITIALIZATION stage: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"), subdata.log);
DebugUtil.logException(new IllegalStateException(address.toString() + ": Only InitPacketDeclaration (0x0000) may be received during the PRE_INITIALIZATION stage: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]"), subdata.log);
close(PROTOCOL_MISMATCH);
} else if (state == CLOSING && id != 0xFFFE) {
forward.close(); // Suppress other packets during the CLOSING stage
} else {
HashMap<Integer, PacketIn> pIn = (state.asInt() >= POST_INITIALIZATION.asInt())?subdata.protocol.pIn:Util.reflect(InitialProtocol.class.getDeclaredField("pIn"), null);
if (!pIn.keySet().contains(id)) throw new IllegalPacketException(getAddress().toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (!pIn.keySet().contains(id)) throw new IllegalPacketException(address.toString() + ": Could not find handler for packet: [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
PacketIn packet = pIn.get(id);
if (!packet.isCompatible(version)) throw new IllegalPacketException(getAddress().toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");
if (!packet.isCompatible(version)) throw new IllegalPacketException(address.toString() + ": The handler does not support this packet version (" + DebugUtil.toHex(0xFFFF, packet.version()) + "): [" + DebugUtil.toHex(0xFFFF, id) + ", " + DebugUtil.toHex(0xFFFF, version) + "]");

// Step 5 // Invoke the Packet
if (state == PRE_INITIALIZATION && !(packet instanceof InitPacketDeclaration)) {
DebugUtil.logException(new IllegalStateException(getAddress().toString() + ": Only " + InitPacketDeclaration.class.getCanonicalName() + " may be received during the PRE_INITIALIZATION stage: " + packet.getClass().getCanonicalName()), subdata.log);
DebugUtil.logException(new IllegalStateException(address.toString() + ": Only " + InitPacketDeclaration.class.getCanonicalName() + " may be received during the PRE_INITIALIZATION stage: " + packet.getClass().getCanonicalName()), subdata.log);
close(PROTOCOL_MISMATCH);
} else if (state == CLOSING && !(packet instanceof PacketDisconnectUnderstood)) {
forward.close(); // Suppress other packets during the CLOSING stage
Expand All @@ -172,7 +172,7 @@ public void close() throws IOException {
((PacketStreamIn) packet).receive(this, forward);
} else forward.close();
} catch (Throwable e) {
DebugUtil.logException(new InvocationTargetException(e, getAddress().toString() + ": Exception while running packet handler"), subdata.log);
DebugUtil.logException(new InvocationTargetException(e, address.toString() + ": Exception while running packet handler"), subdata.log);
Util.isException(forward::close);

if (state.asInt() <= INITIALIZATION.asInt())
Expand Down Expand Up @@ -234,7 +234,7 @@ void read() {
private void write(PacketOut next, OutputStream data) {
// Step 1 // Create a detached data forwarding OutputStream
try {
Container<Boolean> open = new Container<Boolean>(true);
Container<Boolean> open = new Container<>(true);
OutputStream forward = new OutputStream() {
@Override
public void write(byte[] b, int off, int len) throws IOException {
Expand All @@ -254,8 +254,8 @@ public void close() throws IOException {
};
// Step 2 // Write the Packet Metadata
HashMap<Class<? extends PacketOut>, Integer> pOut = (state.asInt() >= POST_INITIALIZATION.asInt())?subdata.protocol.pOut:Util.reflect(InitialProtocol.class.getDeclaredField("pOut"), null);
if (!pOut.keySet().contains(next.getClass())) throw new IllegalMessageException(getAddress().toString() + ": Could not find ID for packet: " + next.getClass().getCanonicalName());
if (next.version() > 65535 || next.version() < 0) throw new IllegalMessageException(getAddress().toString() + ": Packet version is not in range (0x0000 to 0xFFFF): " + next.getClass().getCanonicalName());
if (!pOut.keySet().contains(next.getClass())) throw new IllegalMessageException(address.toString() + ": Could not find ID for packet: " + next.getClass().getCanonicalName());
if (next.version() > 65535 || next.version() < 0) throw new IllegalMessageException(address.toString() + ": Packet version is not in range (0x0000 to 0xFFFF): " + next.getClass().getCanonicalName());

data.write(UnsignedDataHandler.toUnsigned((long) pOut.get(next.getClass()), 2));
data.write(UnsignedDataHandler.toUnsigned((long) next.version(), 2));
Expand All @@ -270,7 +270,7 @@ public void close() throws IOException {

next.sending(this);
} catch (Throwable e) {
DebugUtil.logException(e, subdata.log);
DebugUtil.logException(new InvocationTargetException(e, address.toString() + ": Exception while running packet writer"), subdata.log);
Util.isException(forward::close);
}
});
Expand Down Expand Up @@ -300,7 +300,7 @@ void write(PacketOut packet) {
}
} catch (Throwable e) {
if (!(e instanceof SocketException)) {
DebugUtil.logException(e, subdata.log);
DebugUtil.logException(new InvocationTargetException(e, address.toString() + ": Exception while running packet writer"), subdata.log);
}
}
} else {
Expand All @@ -316,7 +316,7 @@ void write(PacketOut packet) {
*
* @param packets Packets to send
*/
public synchronized void sendPacket(PacketOut... packets) {
public void sendPacket(PacketOut... packets) {
for (PacketOut packet : packets) {
if (Util.isNull(packet)) throw new NullPointerException();
if (isClosed() || (state == CLOSING && !(packet instanceof PacketDisconnect || packet instanceof PacketDisconnectUnderstood))) {
Expand Down Expand Up @@ -478,8 +478,8 @@ void close(DisconnectReason reason) {
timeout.cancel();
if (readc != null) readc.run();
if (reason != CLOSE_REQUESTED) {
subdata.log.warning(getAddress().toString() + " has disconnected: " + reason);
} else subdata.log.info(getAddress().toString() + " has disconnected");
subdata.log.warning(address.toString() + " has disconnected: " + reason);
} else subdata.log.info(address.toString() + " has disconnected");

heartbeat.cancel();
heartbeat = null;
Expand Down

0 comments on commit 1127e97

Please sign in to comment.