diff --git a/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/Abstractions/IConnectionStore.cs b/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/Abstractions/IConnectionStore.cs new file mode 100644 index 0000000..3c85a17 --- /dev/null +++ b/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/Abstractions/IConnectionStore.cs @@ -0,0 +1,7 @@ +namespace LiteHttp.ConnectionManager.Abstractions; + +public interface IConnectionStore +{ + public bool TryCloseConnection(ConnectionContext connection); + public ConnectionContext InitializeConnection(SocketAsyncEventArgs saea); +} diff --git a/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionManager.cs b/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionManager.cs index 8d56043..9918521 100644 --- a/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionManager.cs +++ b/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionManager.cs @@ -5,78 +5,75 @@ // // The rest of the code is written without any inspiration, any similarities are purely coincidental. -using System.Collections.Concurrent; - -using LiteHttp.Heartbeat; -using LiteHttp.Helpers; +using LiteHttp.ConnectionManager.Abstractions; namespace LiteHttp.ConnectionManager; #nullable disable #pragma warning disable CS8632 -public sealed class ConnectionManager : IHeartbeatHandler, IDisposable +public sealed class ConnectionManager : IDisposable { - private const int MinimalReceiveSpeed = 1024; // 1 KB/s - private static readonly TimeSpan Second = TimeSpan.FromSeconds(1); - private readonly DefaultObjectPool _saeaPool = new(); - private readonly ConcurrentDictionary _connections; - private readonly ConnectionContextFactory _connectionContextFactory = new(); - + private readonly IConnectionStore _connectionStore; + public ConnectionManager() { const int initObjectsCount = 50000; - _connections = new ConcurrentDictionary(-1, initObjectsCount); - + _connectionStore = new ConnectionStore(); + ObjectPoolInitializationHelper.Initialize(initObjectsCount, _saeaPool, () => { - const int bufferSize = 4 * 1024; // 4 KB - var saea = new SocketAsyncEventArgs(); - - saea.Completed += IoCompleted; - saea.SetBuffer(new byte[bufferSize], 0, bufferSize); + var saea = InitSaea(); return saea; }); } - - public void OnHeartbeat() + + private SocketAsyncEventArgs InitSaea() { - var now = DateTime.UtcNow; - - foreach (var kvp in _connections) - { - var connection = kvp.Value; - - var lifetime = now - connection.CreatedAtUtc; - if (lifetime < Second) continue; - - var speed = connection.BytesReceived / lifetime.TotalSeconds; - if (speed < MinimalReceiveSpeed) - CloseConnection(connection.SocketEventArgs); - } + const int bufferSize = 4 * 1024; // 4 KB + + var saea = new SocketAsyncEventArgs(); + + saea.Completed += IoCompleted; + saea.SetBuffer(new byte[bufferSize], 0, bufferSize); + return saea; } - + + // TODO: implement "walker" service with similar logic. + //public void OnHeartbeat() + //{ + // var now = DateTime.UtcNow; + + // foreach (var kvp in _connections) + // { + // var connection = kvp.Value; + + // var lifetime = now - connection.CreatedAtUtc; + // if (lifetime < Second) continue; + + // var speed = connection.BytesReceived / lifetime.TotalSeconds; + // if (speed < MinimalReceiveSpeed) + // CloseConnection(connection.SocketEventArgs); + // } + //} + public void HandleAccept(SocketAsyncEventArgs acceptEventArg) { - if (!_saeaPool.TryGet(out var saea)) return; - + if (!_saeaPool.TryGet(out var saea)) saea = InitSaea(); + saea.AcceptSocket = acceptEventArg.AcceptSocket; - + ThreadPool.UnsafeQueueUserWorkItem(InitializeConnection, saea, false); } private void InitializeConnection(SocketAsyncEventArgs saea) { - var connectionContext = _connectionContextFactory.Create(saea); + var connectionContext = _connectionStore.InitializeConnection(saea); saea.UserToken = connectionContext; - - // REVIEW: not thread safe. Should be refactored to support multiple accept loops - if (!_connections.TryAdd(connectionContext.Id, connectionContext)) - throw new InvalidOperationException($"Cannot add task {connectionContext.Id}"); - + Receive(saea); } @@ -93,14 +90,14 @@ public void SendResponse(ConnectionContext connectionContext) { var socket = connectionContext.SocketEventArgs.AcceptSocket; bool willRaiseEvent = socket.SendAsync(connectionContext.SocketEventArgs); - + if (!willRaiseEvent) ProcessSend(connectionContext.SocketEventArgs); } - + private void IoCompleted(object? sender, SocketAsyncEventArgs saea) { - switch(saea.LastOperation) + switch (saea.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(saea); @@ -117,17 +114,17 @@ private void ProcessSend(SocketAsyncEventArgs saea) saea.AcceptSocket = null; saea.UserToken = null; saea.SetBuffer(0, saea.Buffer.Length); - + _saeaPool.TryReturn(saea); } private void CloseConnection(SocketAsyncEventArgs saea) { var connectionContext = (ConnectionContext)saea.UserToken; - - if (!_connections.TryRemove(connectionContext.Id, out _)) + + if (!_connectionStore.TryCloseConnection(connectionContext)) throw new InvalidOperationException($"Cannot remove connection {connectionContext.Id}"); - + saea.AcceptSocket.Shutdown(SocketShutdown.Both); saea.AcceptSocket.Close(); } @@ -135,9 +132,9 @@ private void CloseConnection(SocketAsyncEventArgs saea) private void ProcessReceive(SocketAsyncEventArgs saea) { var connectionContext = (ConnectionContext)saea.UserToken; - + connectionContext.IncrementBytesReceived(saea.BytesTransferred); - + saea.SetBuffer(saea.Offset, saea.Offset + saea.BytesTransferred); OnDataReceived(connectionContext); } diff --git a/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionStore.cs b/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionStore.cs new file mode 100644 index 0000000..400b333 --- /dev/null +++ b/src/Server/Infrastructure/ConnectionManager/src/LiteHttp.ConnectionManager/LiteHttp/ConnectionManager/ConnectionStore.cs @@ -0,0 +1,35 @@ +using System.Collections.Concurrent; + +using LiteHttp.ConnectionManager.Abstractions; +using LiteHttp.Helpers; + +namespace LiteHttp.ConnectionManager; + +// Used to store active connections and manage them (close, timeout, etc.). +// The motivation to extract Walker service to avoid iterating over connections in ConnectionManager itself, +// and instead delegate this responsibility to a separate service, which will be called in Heartbeat. +// This way we can keep ConnectionManager focused on connection management (creation, closing, etc.) and have a +// separate service that will be responsible for "walking" through connections and performing necessary checks +// (like timeouts, etc.). This separation of concerns can lead to cleaner code and better maintainability. +internal sealed class ConnectionStore : IConnectionStore +{ + private readonly ConcurrentDictionary _connections; + private readonly ConnectionContextFactory _connectionContextFactory = new(); + + public ConnectionStore() + { + const int initialCapacity = 10000; + + _connections = new(-1, initialCapacity); + } + + public bool TryCloseConnection(ConnectionContext connection) => _connections.TryRemove(connection.Id, out _); + + public ConnectionContext InitializeConnection(SocketAsyncEventArgs saea) + { + var connectionContext = _connectionContextFactory.Create(saea); + if (!_connections.TryAdd(connectionContext.Id, connectionContext)) + ; + return connectionContext; + } +} diff --git a/src/Server/Infrastructure/Heartbeat/src/Heartbeat.cs b/src/Server/Infrastructure/Heartbeat/src/Heartbeat.cs index d323c5a..fad46cc 100644 --- a/src/Server/Infrastructure/Heartbeat/src/Heartbeat.cs +++ b/src/Server/Infrastructure/Heartbeat/src/Heartbeat.cs @@ -12,18 +12,22 @@ namespace LiteHttp.Heartbeat; public sealed class Heartbeat : IDisposable { private static readonly TimeSpan Interval = TimeSpan.FromSeconds(1); - private const string NoHandlersExceptionString = "Heartbeat not needed to be initialized if here is no handlers in app"; + private const string NoHandlersString = "Heartbeat handlers were not initialized"; private readonly Action[] _callbacks; private readonly ManualResetEventSlim _timer = new ManualResetEventSlim(false, 0); private readonly Thread _heartbeatThread; private readonly ILogger _logger; - - public Heartbeat(Span heartbeatHandlers, ILogger logger) - { - Debug.Assert(heartbeatHandlers.Length > 0, NoHandlersExceptionString); - ArgumentException.ThrowIfNullOrEmpty(NoHandlersExceptionString); +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. + public Heartbeat(IHeartbeatHandler[] heartbeatHandlers, ILogger logger) + { + if (heartbeatHandlers.Length == 0) + { + logger.LogWarning($"{NoHandlersString}"); + _logger = logger; + return; // we don't have any handlers, so we won't start the heartbeat thread + } _logger = logger; @@ -40,6 +44,7 @@ public Heartbeat(Span heartbeatHandlers, ILogger l _heartbeatThread.Start(); } +#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. private void OnHeartbeat() { @@ -52,7 +57,7 @@ private void OnHeartbeat() } catch (Exception ex) { - _logger.LogTrace($"Exception thrown in heartbeat handler"); + _logger.LogError(ex, $"Exception thrown in heartbeat handler"); } } } diff --git a/src/Server/Infrastructure/Heartbeat/src/IHeartbeatHandler.cs b/src/Server/Infrastructure/Heartbeat/src/IHeartbeatHandler.cs index c64d805..5ea5a28 100644 --- a/src/Server/Infrastructure/Heartbeat/src/IHeartbeatHandler.cs +++ b/src/Server/Infrastructure/Heartbeat/src/IHeartbeatHandler.cs @@ -2,5 +2,5 @@ public interface IHeartbeatHandler { - void OnHeartbeat(); + public void OnHeartbeat(); } \ No newline at end of file diff --git a/src/Server/Infrastructure/Listener/src/LiteHttp/Listener/Listener.cs b/src/Server/Infrastructure/Listener/src/LiteHttp/Listener/Listener.cs index 3d2b71c..c30bef5 100644 --- a/src/Server/Infrastructure/Listener/src/LiteHttp/Listener/Listener.cs +++ b/src/Server/Infrastructure/Listener/src/LiteHttp/Listener/Listener.cs @@ -62,13 +62,10 @@ public void Dispose() _isListening = false; } - public bool StartListen(CancellationToken cancellationToken) + public bool StartListen(CancellationToken cancellationToken = default) { _cancellationToken = cancellationToken; - if (_endPoint is null) - throw new InvalidOperationException("Listener endpoint cannot be null"); - if (!Socket.IsBound) { Socket.Bind(_endPoint); @@ -84,7 +81,7 @@ public bool StartListen(CancellationToken cancellationToken) var acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += AcceptEventArg_Completed; - return ThreadPool.UnsafeQueueUserWorkItem(StartAccept, acceptEventArg, false); + return ThreadPool.QueueUserWorkItem(StartAccept, acceptEventArg, false); } catch (Exception ex) { @@ -93,7 +90,6 @@ public bool StartListen(CancellationToken cancellationToken) } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AcceptEventArg_Completed(object? sender, SocketAsyncEventArgs saea) { ProcessAccept(saea); diff --git a/src/Server/Infrastructure/Pipeline/src/Pipeline.cs b/src/Server/Infrastructure/Pipeline/src/Pipeline.cs index 70146f5..97a507b 100644 --- a/src/Server/Infrastructure/Pipeline/src/Pipeline.cs +++ b/src/Server/Infrastructure/Pipeline/src/Pipeline.cs @@ -1,33 +1,31 @@ namespace LiteHttp.Pipeline; -#nullable disable public sealed class Pipeline { private readonly IRouter _router; private readonly Parser _parser; private readonly ResponseBuilder _responseBuilder; - private readonly Executor _executor; - + private readonly ActionInvoker _actionInvoker; + +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. internal Pipeline(PipelineFactory factory) { _router = factory.RouterFactory(); _parser = factory.ParserFactory(); _responseBuilder = factory.ResponseBuilderFactory(); - _executor = factory.ExecutorFactory(); + _actionInvoker = factory.ActionInvokerFactory(); } - +#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. + [SkipLocalsInit] public void ProcessRequest(ConnectionContext connectionContext) { Memory buffer = connectionContext.SocketEventArgs.Buffer; var parsingResult = _parser.Parse(buffer); - int responseLength; - + if (!parsingResult.Success) { - responseLength = _responseBuilder.Build(InternalActionResults.BadRequest(), buffer); - connectionContext.SocketEventArgs.SetBuffer(0, responseLength); - ThreadPool.UnsafeQueueUserWorkItem(OnExecuted, connectionContext, false); + SendResponse(connectionContext, buffer, InternalActionResults.BadRequest()); return; } @@ -35,19 +33,23 @@ public void ProcessRequest(ConnectionContext connectionContext) if (action is null) { - responseLength = _responseBuilder.Build(InternalActionResults.NotFound(), buffer); - connectionContext.SocketEventArgs.SetBuffer(0, responseLength); - ThreadPool.UnsafeQueueUserWorkItem(OnExecuted, connectionContext, false); + SendResponse(connectionContext, buffer, InternalActionResults.NotFound()); return; } - var executionResult = _executor.Execute(action); + var executionResult = _actionInvoker.Execute(action); - responseLength = _responseBuilder.Build(executionResult, buffer); + SendResponse(connectionContext, buffer, executionResult); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void SendResponse(ConnectionContext connectionContext, Memory buffer, IActionResult actionResult) + { + int responseLength = _responseBuilder.Build(actionResult, buffer); connectionContext.SocketEventArgs.SetBuffer(0, responseLength); ThreadPool.UnsafeQueueUserWorkItem(OnExecuted, connectionContext, false); } - + private Action _executed; private void OnExecuted(ConnectionContext response) => _executed?.Invoke(response); diff --git a/src/Server/Infrastructure/Pipeline/src/PipelineFactory.cs b/src/Server/Infrastructure/Pipeline/src/PipelineFactory.cs index 87316a1..e1e69fe 100644 --- a/src/Server/Infrastructure/Pipeline/src/PipelineFactory.cs +++ b/src/Server/Infrastructure/Pipeline/src/PipelineFactory.cs @@ -1,26 +1,27 @@ namespace LiteHttp.Pipeline; -#nullable disable public sealed class PipelineFactory { public Func RouterFactory { get; set; } public Func ParserFactory { get; set; } public Func ResponseBuilderFactory { get; set; } - public Func ExecutorFactory { get; set; } + public Func ActionInvokerFactory { get; set; } +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. public PipelineFactory(Action factoryDelegate) { factoryDelegate(this); ThrowIfAnyFactoryIsNull(); } +#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. private void ThrowIfAnyFactoryIsNull() { if (RouterFactory is null) throw new ArgumentNullException(nameof(RouterFactory)); if (ParserFactory is null) throw new ArgumentNullException(nameof(ParserFactory)); if (ResponseBuilderFactory is null) throw new ArgumentNullException(nameof(ResponseBuilderFactory)); - if (ExecutorFactory is null) throw new ArgumentNullException(nameof(ExecutorFactory)); + if (ActionInvokerFactory is null) throw new ArgumentNullException(nameof(ActionInvokerFactory)); } public Pipeline Create() => new(this); diff --git a/src/Server/Infrastructure/RequestProcessors/src/LiteHttp/RequestProcessors/Executor.cs b/src/Server/Infrastructure/RequestProcessors/src/LiteHttp/RequestProcessors/ActionInvoker.cs similarity index 77% rename from src/Server/Infrastructure/RequestProcessors/src/LiteHttp/RequestProcessors/Executor.cs rename to src/Server/Infrastructure/RequestProcessors/src/LiteHttp/RequestProcessors/ActionInvoker.cs index 65cca94..2aa83dc 100644 --- a/src/Server/Infrastructure/RequestProcessors/src/LiteHttp/RequestProcessors/Executor.cs +++ b/src/Server/Infrastructure/RequestProcessors/src/LiteHttp/RequestProcessors/ActionInvoker.cs @@ -1,6 +1,6 @@ namespace LiteHttp.RequestProcessors; -public sealed class Executor +public sealed class ActionInvoker { public IActionResult Execute(Func action) => action(); } diff --git a/src/Server/Infrastructure/Routing/src/LiteHttp/Routing/Router.cs b/src/Server/Infrastructure/Routing/src/LiteHttp/Routing/Router.cs index dc26e29..8ea4ff6 100644 --- a/src/Server/Infrastructure/Routing/src/LiteHttp/Routing/Router.cs +++ b/src/Server/Infrastructure/Routing/src/LiteHttp/Routing/Router.cs @@ -7,6 +7,6 @@ internal sealed class Router : IRouter public Func? GetAction(HttpContext context) => _endpointContext?.EndpointProvider.GetEndpoint(context.Route, context.Method); - public void SetContext(IEndpointContext endpointContext) => + internal void SetContext(IEndpointContext endpointContext) => _endpointContext = endpointContext; } \ No newline at end of file diff --git a/src/Server/Infrastructure/Server/src/LiteHttp/Server/InternalServer.cs b/src/Server/Infrastructure/Server/src/LiteHttp/Server/InternalServer.cs index 2078489..89deda1 100644 --- a/src/Server/Infrastructure/Server/src/LiteHttp/Server/InternalServer.cs +++ b/src/Server/Infrastructure/Server/src/LiteHttp/Server/InternalServer.cs @@ -1,6 +1,4 @@ -using System.Runtime.InteropServices; - -using LiteHttp.Constants; +using LiteHttp.Constants; using LiteHttp.Heartbeat; using LiteHttp.Logging; using LiteHttp.Logging.Abstractions; @@ -29,8 +27,6 @@ public InternalServer(ILogger? logger, IPAddress address, int port) Listener = new(address, logger.ForContext(), port); ConnectionManager = new(); - - heartbeatHandlers.Add(ConnectionManager); _endpointProviderConfiguration = new EndpointProviderConfiguration(); @@ -39,11 +35,10 @@ public InternalServer(ILogger? logger, IPAddress address, int port) factory.ParserFactory = () => Parser.Instance; factory.RouterFactory = () => RouterFactory.Build(_endpointProviderConfiguration.EndpointContext); factory.ResponseBuilderFactory = () => new(); - factory.ExecutorFactory = () => new(); + factory.ActionInvokerFactory = () => new(); }); - Heartbeat = new (CollectionsMarshal.AsSpan(heartbeatHandlers), - _logger.ForContext()); + Heartbeat = new (heartbeatHandlers.ToArray(), _logger.ForContext()); Binder.Bind(this); }