Skip to content

Commit

Permalink
bugfix: more stable when some people have compression/BN disabled
Browse files Browse the repository at this point in the history
bugfix: no longer spams log when unexpected messages received
  • Loading branch information
CW-Jesse committed Feb 28, 2023
1 parent 61f0c4e commit 88ab52b
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 43 deletions.
26 changes: 21 additions & 5 deletions CW_Jesse.BetterNetworking/BN_Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@

namespace CW_Jesse.BetterNetworking {
class BN_Utils {

public static ZNetPeer GetPeer(ZRpc rpc) {
foreach (ZNetPeer peer in ZNet.instance.GetPeers()) {
if (peer.m_rpc == rpc) return peer;
}
return null;
}
public static ZNetPeer GetPeer(ISocket socket) {
foreach (ZNetPeer peer in ZNet.instance.GetPeers()) {
if (peer.m_socket == socket) return peer;
}
return null;
}

public static string GetPeerName(ZNetPeer peer) {
if (peer == null) return "[null]";
if (peer.m_server) return "[server]";
return $"{peer.m_playerName}[{peer.m_socket.GetHostName()}]";
// return $"{peer.m_playerName}[{peer.m_socket.GetEndPointString()}]";
}
public static string GetPeerName(ISocket socket) {
if (socket == null) return "null peer";
if (socket.IsHost()) return "[server]";
if (!string.IsNullOrEmpty(socket.GetHostName())) return socket.GetHostName();
if (!string.IsNullOrEmpty(socket.GetEndPointString())) return socket.GetEndPointString();
return "unknown peer";
return GetPeerName(GetPeer(socket));
}

public static bool isDedicated = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum Options_NetworkCompression {
CompressionStatus.AddSocket(peer.m_socket);

RegisterRPCs(peer);
SendCompressionVersion(peer.m_rpc);
SendCompressionVersion(peer);
}

// /// <summary>
Expand All @@ -84,12 +84,17 @@ public enum Options_NetworkCompression {
// [HarmonyPostfix]
// private static void OnPeerInfoReceived(ref ZRpc rpc) {
// }


[HarmonyPatch(typeof(ZNet), nameof(ZNet.Disconnect))]
[HarmonyPrefix]
private static void OnDisconnectPrefix(ZNetPeer peer) {
BN_Logger.LogMessage($"Compression: {BN_Utils.GetPeerName(peer)} disconnected");
}

[HarmonyPatch(typeof(ZNet), nameof(ZNet.Disconnect))]
[HarmonyPostfix]
private static void OnDisconnect(ZNetPeer peer) {
private static void OnDisconnectPostfix(ZNetPeer peer) {
CompressionStatus.RemoveSocket(peer.m_socket);
BN_Logger.LogMessage($"Compression: {BN_Utils.GetPeerName(peer.m_socket)} disconnected");
}

internal static byte[] Compress(byte[] buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,20 @@ public static class BN_Patch_Compression_PlayFab {
byte[] dataToDecompress = ___m_inDecompress.Dequeue();
try {
___m_outDecompress.Enqueue(Decompress(dataToDecompress));
if (!CompressionStatus.GetReceiveCompressionStarted(socket)) {
BN_Logger.LogMessage($"PlayFab: Received unexpected compressed message from {BN_Utils.GetPeerName(socket)}");
CompressionStatus.SetReceiveCompressionStarted(socket, true);
}
} catch {
if (bnCompression) BN_Logger.LogInfo($"PlayFab: Failed BN decompress");
try {
___m_outDecompress.Enqueue((byte[])AccessTools.Method(typeof(PlayFabZLibWorkQueue), "UncompressOnThisThread").Invoke(__instance, new object[] { dataToDecompress }));
if (CompressionStatus.GetReceiveCompressionStarted(socket)) {
BN_Logger.LogMessage($"PlayFab: Received unexpected vanilla message from {BN_Utils.GetPeerName(socket)}");
CompressionStatus.SetReceiveCompressionStarted(socket, false);
}
} catch {
BN_Logger.LogInfo($"PlayFab: Failed vanilla decompress; keeping data (vanilla behaviour is to throw it away)");
BN_Logger.LogMessage($"PlayFab: Failed vanilla decompress; keeping data (this data would have been lost without Better Networking)");
___m_outDecompress.Enqueue(dataToDecompress);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,75 +12,77 @@ public partial class BN_Patch_Compression {
peer.m_rpc.Register<bool>(RPC_COMPRESSION_STARTED, RPC_CompressionStarted);
}

public static void SendCompressionVersion(ZRpc rpc) {
rpc.Invoke(RPC_COMPRESSION_VERSION, new object[] { CompressionStatus.ourStatus.version });
public static void SendCompressionVersion(ZNetPeer peer) {
peer.m_rpc.Invoke(RPC_COMPRESSION_VERSION, new object[] { CompressionStatus.ourStatus.version });
}
private static void RPC_CompressionVersion(ZRpc rpc, int version) {
ISocket socket = rpc.GetSocket();
CompressionStatus.SetVersion(socket, version);
ZNetPeer peer = BN_Utils.GetPeer(rpc);
CompressionStatus.SetVersion(peer.m_socket, version);

if (CompressionStatus.ourStatus.version == version) {
BN_Logger.LogMessage($"Compression: Compatible with {BN_Utils.GetPeerName(socket)}");
BN_Logger.LogMessage($"Compression: Compatible with {BN_Utils.GetPeerName(peer)}");
} else if (CompressionStatus.ourStatus.version > version) {
BN_Logger.LogWarning($"Compression: {BN_Utils.GetPeerName(socket)} ({version}) has an older version of Better Networking; they should update");
BN_Logger.LogWarning($"Compression: {BN_Utils.GetPeerName(peer)} ({version}) has an older version of Better Networking; they should update");
} else if (version > 0) {
BN_Logger.LogError($"Compression: {BN_Utils.GetPeerName(socket)} ({version}) has a newer version of Better Networking; you should update");
BN_Logger.LogError($"Compression: {BN_Utils.GetPeerName(peer)} ({version}) has a newer version of Better Networking; you should update");
}

if (CompressionStatus.GetIsCompatibleWith(socket)) {
SendCompressionEnabledStatus(rpc);
if (CompressionStatus.GetIsCompatibleWith(peer.m_socket)) {
SendCompressionEnabledStatus(peer);
}
}

private static void SendCompressionEnabledStatus() {
if (ZNet.instance == null) { return; }
foreach (ZNetPeer peer in ZNet.instance.GetPeers()) {
if (CompressionStatus.GetIsCompatibleWith(peer.m_socket)) {
SendCompressionEnabledStatus(peer.m_rpc);
SendCompressionEnabledStatus(peer);
}
}
}
private static void SendCompressionEnabledStatus(ZRpc rpc) {
private static void SendCompressionEnabledStatus(ZNetPeer peer) {
if (ZNet.instance == null) { return; }
rpc.Invoke(RPC_COMPRESSION_ENABLED, new object[] { CompressionStatus.ourStatus.compressionEnabled });
if (CompressionStatus.ourStatus.compressionEnabled && CompressionStatus.GetCompressionEnabled(rpc.GetSocket())) {
SendCompressionStarted(rpc, true);
peer.m_rpc.Invoke(RPC_COMPRESSION_ENABLED, new object[] { CompressionStatus.ourStatus.compressionEnabled });
if (CompressionStatus.ourStatus.compressionEnabled && CompressionStatus.GetCompressionEnabled(peer.m_socket)) {
SendCompressionStarted(peer, true);
} else {
SendCompressionStarted(rpc, false); // don't start compression if either peer has it disabled
SendCompressionStarted(peer, false); // don't start compression if either peer has it disabled
}
}

private static void RPC_CompressionEnabled(ZRpc rpc, bool peerCompressionEnabled) {
CompressionStatus.SetCompressionEnabled(rpc.GetSocket(), peerCompressionEnabled);
ZNetPeer peer = BN_Utils.GetPeer(rpc);
CompressionStatus.SetCompressionEnabled(peer.m_socket, peerCompressionEnabled);
if (CompressionStatus.ourStatus.compressionEnabled && peerCompressionEnabled) {
SendCompressionStarted(rpc, true);
SendCompressionStarted(peer, true);
} else {
SendCompressionStarted(rpc, false); // don't start compression if either peer has it disabled
SendCompressionStarted(peer, false); // don't start compression if either peer has it disabled
}
}

private static void SendCompressionStarted(ZRpc rpc, bool started) {
private static void SendCompressionStarted(ZNetPeer peer, bool started) {
if (ZNet.instance == null) { return; }
if (CompressionStatus.GetSendCompressionStarted(rpc.GetSocket()) == started) { return; } // don't do anything if nothing's changed
rpc.Invoke(RPC_COMPRESSION_STARTED, new object[] { started });
Flush(rpc);
CompressionStatus.SetSendCompressionStarted(rpc.GetSocket(), started);
BN_Logger.LogMessage($"Compression: Compression to {BN_Utils.GetPeerName(rpc.GetSocket())}: {started}");
if (CompressionStatus.GetSendCompressionStarted(peer.m_socket) == started) { return; } // don't do anything if nothing's changed
peer.m_rpc.Invoke(RPC_COMPRESSION_STARTED, new object[] { started });
Flush(peer);
CompressionStatus.SetSendCompressionStarted(peer.m_socket, started);
BN_Logger.LogMessage($"Compression: Compression to {BN_Utils.GetPeerName(peer)}: {started}");
}

private static void Flush(ZRpc rpc) {
private static void Flush(ZNetPeer peer) {
switch (ZNet.m_onlineBackend) {
case OnlineBackendType.Steamworks:
rpc.GetSocket().Flush(); // since we compress the entire send queue, flush existing send queue before starting/stopping compression
peer.m_socket.Flush(); // since we compress the entire send queue, flush existing send queue before starting/stopping compression
break;
case OnlineBackendType.PlayFab:
BN_Patch_Compression_PlayFab.FlushQueue(rpc.GetSocket());
BN_Patch_Compression_PlayFab.FlushQueue(peer.m_socket);
break;
}
}
private static void RPC_CompressionStarted(ZRpc rpc, bool peerCompressionStarted) {
CompressionStatus.SetReceiveCompressionStarted(rpc.GetSocket(), peerCompressionStarted);
BN_Logger.LogMessage($"Compression: Compression from {BN_Utils.GetPeerName(rpc.GetSocket())}: {peerCompressionStarted}");
ZNetPeer peer = BN_Utils.GetPeer(rpc);
CompressionStatus.SetReceiveCompressionStarted(peer.m_socket, peerCompressionStarted);
BN_Logger.LogMessage($"Compression: Compression from {BN_Utils.GetPeerName(peer)}: {peerCompressionStarted}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ public static class BN_Patch_Compression_Steamworks {
decompressedResult = Decompress(__result.GetArray());
__result = new ZPackage(decompressedResult);
if (!CompressionStatus.GetReceiveCompressionStarted(__instance)) {
BN_Logger.LogWarning($"Compression (Steamworks): Received unexpected compressed message from {BN_Utils.GetPeerName(__instance)}");
// CompressionStatus.SetReceiveCompressionStarted(__instance, true);
BN_Logger.LogMessage($"Compression (Steamworks): Received unexpected compressed message from {BN_Utils.GetPeerName(__instance)}");
CompressionStatus.SetReceiveCompressionStarted(__instance, true);
}
} catch {
if (CompressionStatus.GetReceiveCompressionStarted(__instance)) {
BN_Logger.LogWarning($"Compression (Steamworks): Could not decompress message from {BN_Utils.GetPeerName(__instance)}");
// CompressionStatus.SetReceiveCompressionStarted(__instance, false);
BN_Logger.LogMessage($"Compression (Steamworks): Received unexpected uncompressed message from {BN_Utils.GetPeerName(__instance)}");
CompressionStatus.SetReceiveCompressionStarted(__instance, false);
}
}
}
Expand Down

0 comments on commit 88ab52b

Please sign in to comment.