From 30adc7a7f8337e6cf2d9093714b0819f54a032e3 Mon Sep 17 00:00:00 2001 From: Exus Altimus Date: Mon, 29 Jan 2024 23:54:07 -0500 Subject: [PATCH 1/6] Updated packets to use implementation used by gorcon (https://github.com/gorcon/rcon) (Fixes auth issues, untested on other servers besides pal world). Added strictCommandPacketIdMatching parameter to RCON constructor. PalWorld does not respect packet id's (Always returns 0) so this was added to ignore the packet ids and match to the most recently sent command instead. Default is true. --- src/CoreRCON/Constants.cs | 7 ++- src/CoreRCON/PacketFormats/RCONPacket.cs | 18 ++++---- src/CoreRCON/RCON.cs | 57 ++++++++++++++---------- src/RconShell/Program.cs | 2 +- src/RconTest/RconTest.cs | 2 +- 5 files changed, 52 insertions(+), 34 deletions(-) diff --git a/src/CoreRCON/Constants.cs b/src/CoreRCON/Constants.cs index a1fca7a..e56f3b4 100644 --- a/src/CoreRCON/Constants.cs +++ b/src/CoreRCON/Constants.cs @@ -16,7 +16,12 @@ internal class Constants /// /// The size of the header of an RCON packet. /// - internal const int PACKET_HEADER_SIZE = 12; + internal const int PACKET_HEADER_SIZE = 8; + + /// + /// The size of the header of an RCON packet. + /// + internal const int PACKET_PADDING_SIZE = 2; /// /// Special response value when you send a Response.Response to the server. diff --git a/src/CoreRCON/PacketFormats/RCONPacket.cs b/src/CoreRCON/PacketFormats/RCONPacket.cs index 346b0cf..ae9750e 100644 --- a/src/CoreRCON/PacketFormats/RCONPacket.cs +++ b/src/CoreRCON/PacketFormats/RCONPacket.cs @@ -68,14 +68,15 @@ internal static RCONPacket FromBytes(byte[] buffer) /// Byte array with each field. internal byte[] ToBytes() { - int bodyLength = Encoding.UTF8.GetByteCount(Body) + 1; - int totalLength = Constants.PACKET_HEADER_SIZE + bodyLength; - byte[] packetBytes = new byte[totalLength]; + int bodyLength = Encoding.UTF8.GetByteCount(Body); + int size = Constants.PACKET_HEADER_SIZE + Constants.PACKET_PADDING_SIZE + bodyLength; + + byte[] packetBytes = new byte[size+4]; Span packetSpan = packetBytes; // Write packet size // Packet size parameter does not include the size of the size parameter itself - BinaryPrimitives.WriteInt32LittleEndian(packetSpan, totalLength - 4); + BinaryPrimitives.WriteInt32LittleEndian(packetSpan, size); packetSpan = packetSpan.Slice(4); // Write ID @@ -84,12 +85,13 @@ internal byte[] ToBytes() // Write type BinaryPrimitives.WriteInt32LittleEndian(packetSpan, (int)Type); - packetSpan = packetSpan.Slice(4); + //packetSpan = packetSpan.Slice(4); // Write body - Encoding.UTF8.GetBytes(Body, 0, Body.Length, packetBytes, Constants.PACKET_HEADER_SIZE); - packetSpan[bodyLength - 1] = 0; // Null terminator for the body - packetBytes[totalLength - 1] = 0; // Null terminator for the package + Encoding.UTF8.GetBytes(Body, 0, Body.Length, packetBytes, 12); + + packetBytes[packetBytes.Length - 2] = 0; // Null terminator for the body + packetBytes[packetBytes.Length - 1] = 0; // Null terminator for the package return packetBytes; } diff --git a/src/CoreRCON/RCON.cs b/src/CoreRCON/RCON.cs index 5856b25..42d3f1f 100644 --- a/src/CoreRCON/RCON.cs +++ b/src/CoreRCON/RCON.cs @@ -28,6 +28,7 @@ public partial class RCON( string password, uint timeout = 10000, bool sourceMultiPacketSupport = false, + bool strictCommandPacketIdMatching = true, ILogger logger = null) : IDisposable { // Allows us to keep track of when authentication succeeds, so we can block Connect from returning until it does. @@ -71,8 +72,8 @@ public partial class RCON( /// Server address /// Server port public RCON(IPAddress host, ushort port, string password, uint timeout = 10000, - bool sourceMultiPacketSupport = false, ILogger logger = null) - : this(new IPEndPoint(host, port), password, timeout, sourceMultiPacketSupport, logger) + bool sourceMultiPacketSupport = false, bool strictCommandPacketIdMatching = true, ILogger logger = null) + : this(new IPEndPoint(host, port), password, timeout, sourceMultiPacketSupport,strictCommandPacketIdMatching, logger) { } /// @@ -316,6 +317,7 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTim throw new SocketException(); } + using var activity = Tracing.ActivitySource.StartActivity("SendCommand", ActivityKind.Client); activity?.AddTag(Tracing.Tags.Address, _endpoint.Address.ToString()); activity?.AddTag(Tracing.Tags.Port, _endpoint.Port.ToString()); @@ -367,39 +369,48 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTim private void RCONPacketReceived(RCONPacket packet) { _logger?.LogTrace("RCON packet received: {}", packet.Id); + + TaskCompletionSource taskSource; // Call pending result and remove from map - if (_pendingCommands.TryGetValue(packet.Id, out TaskCompletionSource taskSource)) + if (!_pendingCommands.TryGetValue(packet.Id, out taskSource)) { - if (_multiPacket) + _logger?.LogWarning("Received packet with no matching command id: {} body: {}", packet.Id, packet.Body); + // The server did not respect our id + if (!strictCommandPacketIdMatching && packet.Id == 0) { - //Read any previous messages - _incomingBuffer.TryGetValue(packet.Id, out string body); - - if (packet.Body == Constants.MULTI_PACKET_END_RESPONSE) - { - //Avoid yielding - taskSource.SetResult(body ?? string.Empty); - _pendingCommands.TryRemove(packet.Id, out _); - _incomingBuffer.Remove(packet.Id); - } - else - { - //Append to previous messages - _incomingBuffer[packet.Id] = body + packet.Body; - } + // Get the most recently sent command + taskSource = _pendingCommands + .OrderBy(cmd => cmd.Key) + .Select(cmd => cmd.Value) + .FirstOrDefault(); } - else + } + + if (_multiPacket) + { + //Read any previous messages + _incomingBuffer.TryGetValue(packet.Id, out string body); + + if (packet.Body == Constants.MULTI_PACKET_END_RESPONSE) { //Avoid yielding - taskSource.SetResult(packet.Body); + taskSource.SetResult(body ?? string.Empty); _pendingCommands.TryRemove(packet.Id, out _); + _incomingBuffer.Remove(packet.Id); + } + else + { + //Append to previous messages + _incomingBuffer[packet.Id] = body + packet.Body; } } else { - _logger?.LogWarning("Received packet with no matching command id: {} body: {}", packet.Id, packet.Body); + //Avoid yielding + taskSource.SetResult(packet.Body); + _pendingCommands.TryRemove(packet.Id, out _); } - + OnPacketReceived?.Invoke(packet); } diff --git a/src/RconShell/Program.cs b/src/RconShell/Program.cs index 3a16068..38d0e38 100644 --- a/src/RconShell/Program.cs +++ b/src/RconShell/Program.cs @@ -76,7 +76,7 @@ static async Task Main(string[] args) port ); - rcon = new RCON(endpoint, password, 0); + rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: false); await rcon.ConnectAsync(); bool connected = true; Console.WriteLine("Connected"); diff --git a/src/RconTest/RconTest.cs b/src/RconTest/RconTest.cs index b12b616..e93a880 100644 --- a/src/RconTest/RconTest.cs +++ b/src/RconTest/RconTest.cs @@ -54,7 +54,7 @@ public async Task testInitAsync() }); ILogger logger = loggerFactory.CreateLogger(); - rconClient = new RCON(_ip, _port, _password, 1000, false, logger); + rconClient = new RCON(_ip, _port, _password, 1000, false, true, logger); await rconClient.ConnectAsync(); } From ace6de4058eaad1f67d1317a0cfc458a95ef033b Mon Sep 17 00:00:00 2001 From: Exus Date: Wed, 31 Jan 2024 00:33:44 -0500 Subject: [PATCH 2/6] Update src/CoreRCON/PacketFormats/RCONPacket.cs Co-authored-by: Alexander WB --- src/CoreRCON/PacketFormats/RCONPacket.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/CoreRCON/PacketFormats/RCONPacket.cs b/src/CoreRCON/PacketFormats/RCONPacket.cs index ae9750e..9e3cd33 100644 --- a/src/CoreRCON/PacketFormats/RCONPacket.cs +++ b/src/CoreRCON/PacketFormats/RCONPacket.cs @@ -85,7 +85,6 @@ internal byte[] ToBytes() // Write type BinaryPrimitives.WriteInt32LittleEndian(packetSpan, (int)Type); - //packetSpan = packetSpan.Slice(4); // Write body Encoding.UTF8.GetBytes(Body, 0, Body.Length, packetBytes, 12); From 5b1ee4238afc2a288913c653cd6522e1deeb3d5d Mon Sep 17 00:00:00 2001 From: Exus Altimus Date: Wed, 31 Jan 2024 05:22:11 -0500 Subject: [PATCH 3/6] Updated constants with feedback. Added autoConnect property, fixed dead lock (TaskCompletionSource.SetResult) when server drops connection. Validated multithreaded operation. Updated shell. Added keep alive packets. --- src/CoreRCON/Constants.cs | 6 +- src/CoreRCON/Exceptions.cs | 15 ++ src/CoreRCON/Extensions.cs | 10 +- src/CoreRCON/PacketFormats/RCONPacket.cs | 10 +- src/CoreRCON/RCON.cs | 229 +++++++++++++++-------- src/RconShell/Program.cs | 119 +++++++++--- 6 files changed, 270 insertions(+), 119 deletions(-) diff --git a/src/CoreRCON/Constants.cs b/src/CoreRCON/Constants.cs index e56f3b4..2d4322b 100644 --- a/src/CoreRCON/Constants.cs +++ b/src/CoreRCON/Constants.cs @@ -12,11 +12,12 @@ internal class Constants /// Minimum size of a rcon packet /// internal const int MIN_PACKET_SIZE = 14; - + + /// /// The size of the header of an RCON packet. /// - internal const int PACKET_HEADER_SIZE = 8; + internal const int PACKET_HEADER_SIZE = 12; /// /// The size of the header of an RCON packet. @@ -28,5 +29,6 @@ internal class Constants /// Used to finde the end of a multi packet response. /// internal const string MULTI_PACKET_END_RESPONSE = "\0\u0001\0\0"; + } } diff --git a/src/CoreRCON/Exceptions.cs b/src/CoreRCON/Exceptions.cs index da84521..4bc5a1c 100644 --- a/src/CoreRCON/Exceptions.cs +++ b/src/CoreRCON/Exceptions.cs @@ -19,4 +19,19 @@ public AuthenticationException(string message, Exception innerException) : base( { } } + + public class AuthenticationFailedException : AuthenticationException + { + public AuthenticationFailedException() + { + } + + public AuthenticationFailedException(string message) : base(message) + { + } + + public AuthenticationFailedException(string message, Exception innerException) : base(message, innerException) + { + } + } } diff --git a/src/CoreRCON/Extensions.cs b/src/CoreRCON/Extensions.cs index 637d558..0944570 100644 --- a/src/CoreRCON/Extensions.cs +++ b/src/CoreRCON/Extensions.cs @@ -134,18 +134,16 @@ public static async Task TimeoutAfter(this Task task, TimeSpan? timeout { var delayTask = Task.Delay(timeout.Value, cts.Token); - var resultTask = await Task.WhenAny(task, delayTask); + var resultTask = await Task.WhenAny(task, delayTask).ConfigureAwait(false); if (resultTask == delayTask) { // Operation cancelled throw new TimeoutException(); } - else - { - cts.Cancel(); - } - return await task; + cts.Cancel(); + + return await task.ConfigureAwait(false); } } diff --git a/src/CoreRCON/PacketFormats/RCONPacket.cs b/src/CoreRCON/PacketFormats/RCONPacket.cs index 9e3cd33..ba72c52 100644 --- a/src/CoreRCON/PacketFormats/RCONPacket.cs +++ b/src/CoreRCON/PacketFormats/RCONPacket.cs @@ -69,14 +69,16 @@ internal static RCONPacket FromBytes(byte[] buffer) internal byte[] ToBytes() { int bodyLength = Encoding.UTF8.GetByteCount(Body); - int size = Constants.PACKET_HEADER_SIZE + Constants.PACKET_PADDING_SIZE + bodyLength; + // + int packetSize = Constants.PACKET_HEADER_SIZE + Constants.PACKET_PADDING_SIZE + bodyLength; - byte[] packetBytes = new byte[size+4]; + byte[] packetBytes = new byte[packetSize]; Span packetSpan = packetBytes; // Write packet size // Packet size parameter does not include the size of the size parameter itself - BinaryPrimitives.WriteInt32LittleEndian(packetSpan, size); + var normalizedPacketSize = packetSize - 4; + BinaryPrimitives.WriteInt32LittleEndian(packetSpan, normalizedPacketSize); packetSpan = packetSpan.Slice(4); // Write ID @@ -87,7 +89,7 @@ internal byte[] ToBytes() BinaryPrimitives.WriteInt32LittleEndian(packetSpan, (int)Type); // Write body - Encoding.UTF8.GetBytes(Body, 0, Body.Length, packetBytes, 12); + Encoding.UTF8.GetBytes(Body, 0, Body.Length, packetBytes, Constants.PACKET_HEADER_SIZE); packetBytes[packetBytes.Length - 2] = 0; // Null terminator for the body packetBytes[packetBytes.Length - 1] = 0; // Null terminator for the package diff --git a/src/CoreRCON/RCON.cs b/src/CoreRCON/RCON.cs index 42d3f1f..371cf65 100644 --- a/src/CoreRCON/RCON.cs +++ b/src/CoreRCON/RCON.cs @@ -22,6 +22,8 @@ namespace CoreRCON /// Rcon password /// Timeout to connect and send messages in milliseconds. A value of 0 means no timeout /// Enable source engine trick to receive multi packet responses using trick by Koraktor + /// When true, will only match response packets if a matching command is found. Concurrent commands are disabled when set to false. Disable if server does not respect packet ids + /// When true, will attempt to auto connect to the server if the connection has been dropped /// Logger to use, null means none public partial class RCON( IPEndPoint endpoint, @@ -29,22 +31,31 @@ public partial class RCON( uint timeout = 10000, bool sourceMultiPacketSupport = false, bool strictCommandPacketIdMatching = true, + bool autoConnect = true, ILogger logger = null) : IDisposable { + + public bool Authenticated => _authenticationTask is not null && _authenticationTask.Task.IsCompleted + && _authenticationTask.Task.Result; + public bool Connected => _tcp?.Connected == true && _connected; + // Allows us to keep track of when authentication succeeds, so we can block Connect from returning until it does. private TaskCompletionSource _authenticationTask; private bool _connected = false; - + private readonly IPEndPoint _endpoint = endpoint; // When generating the packet ID, use a never-been-used (for automatic packets) ID. private int _packetId = 0; - private readonly string _password = password; + private string _password = password; private readonly int _timeout = (int)timeout; + private readonly bool _autoConnect = autoConnect; + private readonly bool _strictCommandPacketIdMatching = strictCommandPacketIdMatching; private readonly bool _multiPacket = sourceMultiPacketSupport; + private CancellationTokenSource _pipeCts; // Map of pending command references. These are called when a command with the matching Id (key) is received. Commands are called only once. private ConcurrentDictionary> _pendingCommands { get; } = new ConcurrentDictionary>(); private Dictionary _incomingBuffer { get; } = []; @@ -52,9 +63,11 @@ public partial class RCON( private Socket _tcp { get; set; } private readonly ILogger _logger = logger; + readonly SemaphoreSlim _requestLimiter = new(1, 1); readonly SemaphoreSlim _semaphoreSlim = new(1, 1); private Task _socketWriter; private Task _socketReader; + private Task _requestTask; /// /// Fired if connection is lost @@ -71,9 +84,19 @@ public partial class RCON( /// /// Server address /// Server port - public RCON(IPAddress host, ushort port, string password, uint timeout = 10000, - bool sourceMultiPacketSupport = false, bool strictCommandPacketIdMatching = true, ILogger logger = null) - : this(new IPEndPoint(host, port), password, timeout, sourceMultiPacketSupport,strictCommandPacketIdMatching, logger) + /// + /// + /// + /// + public RCON(IPAddress host, + ushort port, + string password, + uint timeout = 10000, + bool sourceMultiPacketSupport = false, + bool strictCommandPacketIdMatching = true, + bool autoConnect = true, + ILogger logger = null) + : this(new IPEndPoint(host, port), password, timeout, sourceMultiPacketSupport,strictCommandPacketIdMatching,autoConnect, logger) { } /// @@ -84,9 +107,12 @@ public async Task ConnectAsync() { if (_connected) { + if (!Authenticated) + await AuthenticateAsync(); return; } - + _connected = false; + using var activity = Tracing.ActivitySource.StartActivity("Connect", ActivityKind.Client); activity?.AddTag(Tracing.Tags.Address, _endpoint.Address.ToString()); activity?.AddTag(Tracing.Tags.Port, _endpoint.Port.ToString()); @@ -95,33 +121,23 @@ public async Task ConnectAsync() { ReceiveTimeout = _timeout, SendTimeout = _timeout, - NoDelay = true + NoDelay = true, }; + _tcp.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + await _tcp.ConnectAsync(_endpoint) .ConfigureAwait(false); _connected = true; Pipe pipe = new Pipe(); - - _socketWriter = FillPipeAsync(pipe.Writer) + _pipeCts = new CancellationTokenSource(); + _socketWriter = FillPipeAsync(pipe.Writer, _pipeCts.Token) .ContinueWith(LogDisconnect); - _socketReader = ReadPipeAsync(pipe.Reader) + _socketReader = ReadPipeAsync(pipe.Reader, _pipeCts.Token) .ContinueWith(LogDisconnect); - // Wait for successful authentication - _authenticationTask = new TaskCompletionSource(); - await SendPacketAsync(new RCONPacket(0, PacketType.Auth, _password)) - .ConfigureAwait(false); - try - { - await _authenticationTask.Task - .TimeoutAfter(TimeSpan.FromMilliseconds(_timeout)) - .ConfigureAwait(false); - } - catch (TimeoutException) - { - throw new TimeoutException("Timeout while waiting for authentication response from server"); - } + // Wait for successful authentication + await AuthenticateAsync(); } /// @@ -129,7 +145,7 @@ await _authenticationTask.Task /// /// /// Producer Task - async Task FillPipeAsync(PipeWriter writer) + async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken) { const int minimumBufferSize = Constants.MIN_PACKET_SIZE; @@ -141,8 +157,9 @@ async Task FillPipeAsync(PipeWriter writer) Memory memory = writer.GetMemory(minimumBufferSize); int bytesRead = await _tcp.ReceiveAsync(memory, SocketFlags.None) .ConfigureAwait(false); - if (bytesRead == 0) + if (bytesRead == 0 || cancellationToken.IsCancellationRequested) { + // The server has closed the connection break; } // Tell the PipeWriter how much was read from the Socket @@ -166,6 +183,11 @@ await writer.FlushAsync() await writer.CompleteAsync() .ConfigureAwait(false); _connected = false; + if (_pipeCts != null && !_pipeCts.IsCancellationRequested) + { + _pipeCts.Cancel(); // Tell reader to stop waiting + _pipeCts = null; + } OnDisconnected?.Invoke(); } } @@ -175,27 +197,32 @@ await writer.CompleteAsync() /// /// /// Consumer Task - async Task ReadPipeAsync(PipeReader reader) + async Task ReadPipeAsync(PipeReader reader, CancellationToken cancellationToken) { try { while (true) { - ReadResult result = await reader.ReadAsync() + ReadResult result = await reader.ReadAsync(cancellationToken) .ConfigureAwait(false); ReadOnlySequence buffer = result.Buffer; SequencePosition packetStart = buffer.Start; + if (cancellationToken.IsCancellationRequested) + break; + if (buffer.Length < 4) { if (result.IsCompleted) { break; } + reader.AdvanceTo(packetStart, buffer.End); continue; // Complete header not yet received } + int size = BitConverter.ToInt32(buffer.Slice(packetStart, 4).ToArray(), 0); if (buffer.Length >= size + 4) { @@ -203,24 +230,9 @@ async Task ReadPipeAsync(PipeReader reader) SequencePosition packetEnd = buffer.GetPosition(size + 4, packetStart); byte[] byteArr = buffer.Slice(packetStart, packetEnd).ToArray(); RCONPacket packet = RCONPacket.FromBytes(byteArr); - - if (packet.Type == PacketType.AuthResponse) - { - // Failed auth responses return with an ID of -1 - if (packet.Id == -1) - { - _authenticationTask.SetException( - new AuthenticationException($"Authentication failed for {_tcp.RemoteEndPoint}.") - ); - } - // Tell Connect that authentication succeeded - _authenticationTask.SetResult(true); - } - else - { - // Forward rcon packet to handler - RCONPacketReceived(packet); - } + + // Forward rcon packet to handler + RCONPacketReceived(packet); reader.AdvanceTo(packetEnd); } @@ -240,16 +252,15 @@ async Task ReadPipeAsync(PipeReader reader) } finally { - // If authentication did not complete - _authenticationTask.TrySetException( - new AuthenticationException($"Server did not respond to auth {_tcp.RemoteEndPoint}.") - ); - // Mark the PipeReader as complete await reader.CompleteAsync().ConfigureAwait(false); } } + public void SetPassword(string password) + { + _password = password; + } public void Dispose() { Dispose(true); @@ -268,6 +279,12 @@ protected virtual void Dispose(bool disposing) _tcp.Shutdown(SocketShutdown.Both); _tcp.Dispose(); } + + if (_pipeCts != null) + { + _pipeCts.Cancel(); + _pipeCts = null; + } } } @@ -281,6 +298,7 @@ protected virtual void Dispose(bool disposing) public async Task SendCommandAsync(string command, TimeSpan? overrideTimeout = null) where T : class, IParseable, new() { + string response = await SendCommandAsync(command, overrideTimeout).ConfigureAwait(false); // Se comment about TaskCreationOptions.RunContinuationsAsynchronously in SendComandAsync var source = new TaskCompletionSource(); @@ -298,6 +316,43 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTimeo return (T)parsed; } + public async Task AuthenticateAsync() + { + // ensure mutual execution of SendPacketAsync and RCONPacketReceived + await _semaphoreSlim.WaitAsync(); + + Task completedTask; + try + { + _authenticationTask = new TaskCompletionSource(); + await SendPacketAsync(new RCONPacket(0, PacketType.Auth, _password)) + .ConfigureAwait(false); + + completedTask = await Task.WhenAny(_authenticationTask.Task, _socketWriter, _socketReader) + .TimeoutAfter(TimeSpan.FromMilliseconds(_timeout)) + .ConfigureAwait(false); + } + catch (TimeoutException) + { + throw new TimeoutException("Timeout while waiting for authentication response from server"); + } + finally + { + _semaphoreSlim.Release(); + } + + if (completedTask == _authenticationTask.Task) + { + var success = await _authenticationTask.Task; + if (!success) + throw new AuthenticationFailedException($"Authentication failed for {_tcp.RemoteEndPoint}."); + + } + + await completedTask; + return true; + } + /// /// Send a command to the server, and wait for the response before proceeding. /// @@ -305,19 +360,13 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTimeo /// Connection exceptions public async Task SendCommandAsync(string command, TimeSpan? overrideTimeout = null) { + if (_autoConnect) + await ConnectAsync(); + + if (string.IsNullOrEmpty(command)) + throw new ArgumentException(nameof(command), "Command must be at least one character"); - // This TaskCompletion source could be initialized with TaskCreationOptions.RunContinuationsAsynchronously - // However we this library is designed to be able to run without its own thread - // Read more about this option here: - // https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#always-create-taskcompletionsourcet-with-taskcreationoptionsruncontinuationsasynchronously - var completionSource = new TaskCompletionSource(); int packetId = Interlocked.Increment(ref _packetId); - if (!_pendingCommands.TryAdd(packetId, completionSource)) - { - throw new SocketException(); - } - - using var activity = Tracing.ActivitySource.StartActivity("SendCommand", ActivityKind.Client); activity?.AddTag(Tracing.Tags.Address, _endpoint.Address.ToString()); activity?.AddTag(Tracing.Tags.Port, _endpoint.Port.ToString()); @@ -326,10 +375,22 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTim activity?.AddTag(Tracing.Tags.CommandCount, command.Count(c => c == ';')); activity?.AddTag(Tracing.Tags.CommandFirst, new string(command.TakeWhile(c => c != ' ').ToArray())); - RCONPacket packet = new RCONPacket(packetId, PacketType.ExecCommand, command); - - // ensuer mutal execution of SendPacketAsync and RCONPacketReceived + // ensure mutual execution of SendPacketAsync and RCONPacketReceived await _semaphoreSlim.WaitAsync(); + + // This TaskCompletion source could be initialized with TaskCreationOptions.RunContinuationsAsynchronously + // However we this library is designed to be able to run without its own thread + // Read more about this option here: + // https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#always-create-taskcompletionsourcet-with-taskcreationoptionsruncontinuationsasynchronously + var completionSource = new TaskCompletionSource(); + if (!_pendingCommands.TryAdd(packetId, completionSource)) + { + _semaphoreSlim.Release(); + throw new SocketException(); + } + + RCONPacket packet = new RCONPacket(packetId, PacketType.ExecCommand, command); + Task completedTask; try { @@ -344,6 +405,9 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTim } finally { + if (!completionSource.Task.IsCompleted) + completionSource.SetCanceled(); + _semaphoreSlim.Release(); _pendingCommands.TryRemove(packet.Id, out _); _incomingBuffer.Remove(packet.Id); @@ -355,12 +419,12 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTim activity?.AddTag(Tracing.Tags.ResponseLength, response.Length); return response; } - + // Observe exception await completedTask; throw new SocketException(); } - + /// /// Merges RCON packet bodies and resolves the waiting task /// with the full body when full response has been recived. @@ -369,46 +433,55 @@ public async Task SendCommandAsync(string command, TimeSpan? overrideTim private void RCONPacketReceived(RCONPacket packet) { _logger?.LogTrace("RCON packet received: {}", packet.Id); + + if (packet.Type == PacketType.AuthResponse) + { + // Tell Connect that authentication succeeded (Failed auth responses return with an ID of -1) + _authenticationTask.SetResult(packet.Id != -1); + return; + } - TaskCompletionSource taskSource; + var packetId = packet.Id; + TaskCompletionSource taskSource = default; // Call pending result and remove from map if (!_pendingCommands.TryGetValue(packet.Id, out taskSource)) { _logger?.LogWarning("Received packet with no matching command id: {} body: {}", packet.Id, packet.Body); // The server did not respect our id - if (!strictCommandPacketIdMatching && packet.Id == 0) + if (!_strictCommandPacketIdMatching && packet.Id == 0) { - // Get the most recently sent command - taskSource = _pendingCommands + var nextCommandInQueue = _pendingCommands .OrderBy(cmd => cmd.Key) - .Select(cmd => cmd.Value) .FirstOrDefault(); + // Get the most recently sent command + taskSource = nextCommandInQueue.Value; + packetId = nextCommandInQueue.Key; } } if (_multiPacket) { //Read any previous messages - _incomingBuffer.TryGetValue(packet.Id, out string body); + _incomingBuffer.TryGetValue(packetId, out string body); if (packet.Body == Constants.MULTI_PACKET_END_RESPONSE) { //Avoid yielding taskSource.SetResult(body ?? string.Empty); - _pendingCommands.TryRemove(packet.Id, out _); - _incomingBuffer.Remove(packet.Id); + _pendingCommands.TryRemove(packetId, out _); + _incomingBuffer.Remove(packetId); } else { //Append to previous messages - _incomingBuffer[packet.Id] = body + packet.Body; + _incomingBuffer[packetId] = body + packet.Body; } } else { //Avoid yielding taskSource.SetResult(packet.Body); - _pendingCommands.TryRemove(packet.Id, out _); + _pendingCommands.TryRemove(packetId, out _); } OnPacketReceived?.Invoke(packet); @@ -441,7 +514,7 @@ private void LogDisconnect(Task task) _logger?.LogError("RCON connection closed"); if (task.IsFaulted) { - _logger?.LogError(task.Exception, "conection closed due to exception"); + _logger?.LogError(task.Exception, "connection closed due to exception"); } } } diff --git a/src/RconShell/Program.cs b/src/RconShell/Program.cs index 38d0e38..bcca955 100644 --- a/src/RconShell/Program.cs +++ b/src/RconShell/Program.cs @@ -43,14 +43,15 @@ public static async void ConcurrentTestAsync() static async Task Main(string[] args) { - + bool autoConnect = true; string host; int port = 0; string password; - Console.WriteLine("Enter Server Host/Ip"); + Console.WriteLine("Enter Server Host/Ip (Default 127.0.0.1)"); - host = Console.ReadLine(); + var input = Console.ReadLine(); + host = string.IsNullOrWhiteSpace(input) ? "127.0.0.1" : input; // Split host and port if (host.Contains(":")) @@ -65,8 +66,9 @@ static async Task Main(string[] args) if (port == 0) { - Console.WriteLine("Enter port (default 27055))"); - port = int.Parse(Console.ReadLine() ?? "27055"); + Console.WriteLine("Enter port (default 27055)"); + input = Console.ReadLine(); + port = int.Parse( string.IsNullOrWhiteSpace(input) ? "27055" : input); } Console.WriteLine("Enter password"); password = Console.ReadLine(); @@ -75,42 +77,101 @@ static async Task Main(string[] args) addresses.First(), port ); - - rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: false); - await rcon.ConnectAsync(); - bool connected = true; - Console.WriteLine("Connected"); - - Console.WriteLine("You can now enter commands to send to server:"); + bool connected = false; + + rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: false, autoConnect: autoConnect); rcon.OnDisconnected += () => { Console.WriteLine("RCON Disconnected"); connected = false; }; - - while (connected) + + var tryConnect = true; + do { - string command = Console.ReadLine(); - if (command == "conctest") + try { - completed = 0; - List threadList = new List(ThreadCount); - for (int i = 0; i < ThreadCount; i++) + await rcon.ConnectAsync(); + connected = true; + Console.WriteLine($"Connected ({endpoint})"); + Console.WriteLine("You can now enter commands to send to server:"); + while (connected || autoConnect) { - ThreadStart childref = new ThreadStart(ConcurrentTestAsync); - Thread childThread = new Thread(childref); - childThread.Start(); - threadList.Add(childThread); + string command = Console.ReadLine(); + if (!connected && !autoConnect) + break; + if (command == "conctest") + { + completed = 0; + List threadList = new List(ThreadCount); + for (int i = 0; i < ThreadCount; i++) + { + ThreadStart childref = new ThreadStart(ConcurrentTestAsync); + Thread childThread = new Thread(childref); + childThread.Start(); + threadList.Add(childThread); + } + + while (completed < ThreadCount) + { + await Task.Delay(1); + } + + continue; + } + + try + { + string response = await rcon.SendCommandAsync(command); + Console.WriteLine(response); + } + catch (ArgumentException ex) + { + var prevColor = Console.ForegroundColor; + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine(ex.Message); + Console.ForegroundColor = prevColor; + } } - while (completed < ThreadCount) + } + catch (AuthenticationFailedException ex) + { + var prevColor = Console.ForegroundColor; + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine("Authentication failed: Invalid password"); + Console.ForegroundColor = prevColor; + Console.WriteLine("Enter password"); + password = Console.ReadLine(); + rcon.SetPassword(password); + continue; + } + catch (Exception e) + { + var prevColor = Console.ForegroundColor; + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine(e); + Console.ForegroundColor = prevColor; + + } + + while (true) + { + Console.WriteLine("Attempt to reconnect? (y/n)"); + var retry = Console.ReadLine(); + if (retry.ToLower() == "y") + break; + + if (retry.ToLower() == "y") { - await Task.Delay(1); + tryConnect = false; + break; } - continue; + + Console.WriteLine("Invalid input."); } - string response = await rcon.SendCommandAsync(command); - Console.WriteLine(response); - } + + } while (tryConnect); + } } } From 0109993ae604b37831e096e42490bcfa48e79ff1 Mon Sep 17 00:00:00 2001 From: Exus Altimus Date: Wed, 31 Jan 2024 05:30:49 -0500 Subject: [PATCH 4/6] Moved warning into guard clause --- src/CoreRCON/RCON.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/CoreRCON/RCON.cs b/src/CoreRCON/RCON.cs index 371cf65..b3e079e 100644 --- a/src/CoreRCON/RCON.cs +++ b/src/CoreRCON/RCON.cs @@ -446,7 +446,6 @@ private void RCONPacketReceived(RCONPacket packet) // Call pending result and remove from map if (!_pendingCommands.TryGetValue(packet.Id, out taskSource)) { - _logger?.LogWarning("Received packet with no matching command id: {} body: {}", packet.Id, packet.Body); // The server did not respect our id if (!_strictCommandPacketIdMatching && packet.Id == 0) { @@ -457,6 +456,11 @@ private void RCONPacketReceived(RCONPacket packet) taskSource = nextCommandInQueue.Value; packetId = nextCommandInQueue.Key; } + else + { + _logger?.LogWarning("Received packet with no matching command id: {} body: {}", packet.Id, packet.Body); + return; + } } if (_multiPacket) From babe0f63bfc1ac232df2bc1b06ee32a190f2727a Mon Sep 17 00:00:00 2001 From: Exus Altimus Date: Wed, 31 Jan 2024 05:55:32 -0500 Subject: [PATCH 5/6] Corrected console retry bug --- src/RconShell/Program.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/RconShell/Program.cs b/src/RconShell/Program.cs index bcca955..3a7d374 100644 --- a/src/RconShell/Program.cs +++ b/src/RconShell/Program.cs @@ -79,7 +79,7 @@ static async Task Main(string[] args) ); bool connected = false; - rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: false, autoConnect: autoConnect); + rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: true, autoConnect: autoConnect); rcon.OnDisconnected += () => { Console.WriteLine("RCON Disconnected"); @@ -161,7 +161,7 @@ static async Task Main(string[] args) if (retry.ToLower() == "y") break; - if (retry.ToLower() == "y") + if (retry.ToLower() == "n") { tryConnect = false; break; From dabf8df908fa350d15dd92fe91beb5ada957c110 Mon Sep 17 00:00:00 2001 From: Exus Altimus Date: Thu, 8 Feb 2024 11:31:16 -0500 Subject: [PATCH 6/6] Applied suggestions --- src/CoreRCON/Extensions.cs | 8 +++----- src/CoreRCON/RCON.cs | 5 ----- src/RconShell/Program.cs | 2 +- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/CoreRCON/Extensions.cs b/src/CoreRCON/Extensions.cs index 0944570..a44cdbe 100644 --- a/src/CoreRCON/Extensions.cs +++ b/src/CoreRCON/Extensions.cs @@ -164,11 +164,9 @@ public static async Task TimeoutAfter(this Task task, TimeSpan? timeout) { throw new TimeoutException(); } - else - { - // Cancel the timer task so that it does not fire - cts.Cancel(); - } + + // Cancel the timer task so that it does not fire + cts.Cancel(); await task; } } diff --git a/src/CoreRCON/RCON.cs b/src/CoreRCON/RCON.cs index b3e079e..b5d8796 100644 --- a/src/CoreRCON/RCON.cs +++ b/src/CoreRCON/RCON.cs @@ -183,11 +183,6 @@ await writer.FlushAsync() await writer.CompleteAsync() .ConfigureAwait(false); _connected = false; - if (_pipeCts != null && !_pipeCts.IsCancellationRequested) - { - _pipeCts.Cancel(); // Tell reader to stop waiting - _pipeCts = null; - } OnDisconnected?.Invoke(); } } diff --git a/src/RconShell/Program.cs b/src/RconShell/Program.cs index 3a7d374..ac5bb7c 100644 --- a/src/RconShell/Program.cs +++ b/src/RconShell/Program.cs @@ -79,7 +79,7 @@ static async Task Main(string[] args) ); bool connected = false; - rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: true, autoConnect: autoConnect); + rcon = new RCON(endpoint, password, 0, strictCommandPacketIdMatching: false, autoConnect: autoConnect); rcon.OnDisconnected += () => { Console.WriteLine("RCON Disconnected");