diff --git a/Samples/Diagnostics/PackageInspection_Samples.cs b/Samples/Diagnostics/PackageInspection_Samples.cs index 29b634d17..a8daa5657 100644 --- a/Samples/Diagnostics/PackageInspection_Samples.cs +++ b/Samples/Diagnostics/PackageInspection_Samples.cs @@ -17,19 +17,19 @@ public static async Task Inspect_Outgoing_Package() /* * This sample covers the inspection of outgoing packages from the client. */ - + var mqttFactory = new MqttClientFactory(); - + using (var mqttClient = mqttFactory.CreateMqttClient()) { var mqttClientOptions = mqttFactory.CreateClientOptionsBuilder() .WithTcpServer("broker.hivemq.com") .Build(); - + mqttClient.InspectPacketAsync += OnInspectPacket; - + await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None); - + Console.WriteLine("MQTT client is connected."); var mqttClientDisconnectOptions = mqttFactory.CreateClientDisconnectOptionsBuilder() diff --git a/Samples/Server/Server_Retained_Messages_Samples.cs b/Samples/Server/Server_Retained_Messages_Samples.cs index c563aec63..2711868df 100644 --- a/Samples/Server/Server_Retained_Messages_Samples.cs +++ b/Samples/Server/Server_Retained_Messages_Samples.cs @@ -6,6 +6,7 @@ // ReSharper disable UnusedMember.Global // ReSharper disable InconsistentNaming +using System.Buffers; using System.Text.Json; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -112,7 +113,7 @@ public static MqttRetainedMessageModel Create(MqttApplicationMessage message) // Create a copy of the buffer from the payload segment because // it cannot be serialized and deserialized with the JSON serializer. - Payload = message.PayloadSegment.ToArray(), + Payload = message.Payload.Sequence.ToArray(), UserProperties = message.UserProperties, ResponseTopic = message.ResponseTopic, CorrelationData = message.CorrelationData, @@ -130,7 +131,7 @@ public MqttApplicationMessage ToApplicationMessage() return new MqttApplicationMessage { Topic = Topic, - PayloadSegment = new ArraySegment(Payload ?? Array.Empty()), + Payload = Payload != null ? new ReadOnlySequence(Payload) : ReadOnlySequence.Empty, PayloadFormatIndicator = PayloadFormatIndicator, ResponseTopic = ResponseTopic, CorrelationData = CorrelationData, diff --git a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs index 7afd3e215..e60088c0e 100644 --- a/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs +++ b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs @@ -8,6 +8,7 @@ using System; using MQTTnet.Client; using MQTTnet.Diagnostics; +using MQTTnet.Buffers; namespace MQTTnet.AspNetCore.Client { diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index eb801cd5c..dbe46ab6e 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -203,11 +203,11 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat { var buffer = PacketFormatterAdapter.Encode(packet); - if (buffer.Payload.Count == 0) + if (buffer.Packet.IsSingleSegment && buffer.Payload.Length == 0) { // zero copy // https://github.com/dotnet/runtime/blob/e31ddfdc4f574b26231233dc10c9a9c402f40590/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs#L279 - await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false); + await _output.WriteAsync(buffer.Packet.First, cancellationToken).ConfigureAwait(false); } else { @@ -231,8 +231,18 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer) var span = output.GetSpan(buffer.Length); - buffer.Packet.AsSpan().CopyTo(span); - buffer.Payload.AsSpan().CopyTo(span.Slice(buffer.Packet.Count)); + int offset = 0; + foreach (var segment in buffer.Packet) + { + segment.Span.CopyTo(span.Slice(offset)); + offset += segment.Length; + } + + foreach (var segment in buffer.Payload) + { + segment.Span.CopyTo(span.Slice(offset)); + offset += segment.Length; + } output.Advance(buffer.Length); } diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs index 4e1138a40..f17faa4f4 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs @@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Server; diff --git a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 0d9522949..786ca25f9 100644 --- a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Http; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Implementations; diff --git a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs index d120062ba..01d443c7b 100644 --- a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs @@ -43,7 +43,7 @@ public static bool TryDecode( return false; } - var fixedHeader = copy.First.Span[0]; + var fixedHeader = copy.FirstSpan[0]; copy = copy.Slice(headerLength); if (copy.Length < bodyLength) { @@ -53,7 +53,7 @@ public static bool TryDecode( var bodySlice = copy.Slice(0, bodyLength); var bodySegment = GetArraySegment(ref bodySlice); - var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySegment, headerLength + bodyLength); + using var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySegment, headerLength + bodyLength); if (formatter.ProtocolVersion == MqttProtocolVersion.Unknown) { formatter.DetectProtocolVersion(receivedMqttPacket); diff --git a/Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs b/Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs index 27a348d4f..96b40c568 100644 --- a/Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs @@ -11,7 +11,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [MemoryDiagnoser] public class AsyncLockBenchmark : BaseBenchmark { diff --git a/Source/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs b/Source/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs index 3b6a7d858..038427f1f 100644 --- a/Source/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs @@ -2,15 +2,16 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.IO; -using System.Threading; using BenchmarkDotNet.Attributes; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Packets; using MQTTnet.Tests.Mockups; +using System.Buffers; +using System.IO; +using System.Threading; namespace MQTTnet.Benchmarks { @@ -58,7 +59,7 @@ public void Setup() var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311, new MqttBufferWriter(4096, 65535)); - var serializedPacket = Join(serializer.Encode(_packet).Join()); + var serializedPacket = serializer.Encode(_packet).ToArray(); _iterations = 10000; @@ -75,16 +76,5 @@ public void Setup() _channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetEventLogger()); } - - static byte[] Join(params ArraySegment[] chunks) - { - var buffer = new MemoryStream(); - foreach (var chunk in chunks) - { - buffer.Write(chunk.Array, chunk.Offset, chunk.Count); - } - - return buffer.ToArray(); - } } } \ No newline at end of file diff --git a/Source/MQTTnet.Benchmarks/LoggerBenchmark.cs b/Source/MQTTnet.Benchmarks/LoggerBenchmark.cs index 70f0ff51d..fd635cb35 100644 --- a/Source/MQTTnet.Benchmarks/LoggerBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/LoggerBenchmark.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter] [MemoryDiagnoser] public class LoggerBenchmark : BaseBenchmark diff --git a/Source/MQTTnet.Benchmarks/MemoryCopyBenchmark.cs b/Source/MQTTnet.Benchmarks/MemoryCopyBenchmark.cs index 0733e2bb9..991e3577a 100644 --- a/Source/MQTTnet.Benchmarks/MemoryCopyBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MemoryCopyBenchmark.cs @@ -5,7 +5,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter, RankColumn] [MemoryDiagnoser] public class MemoryCopyBenchmark @@ -33,7 +33,7 @@ public void Array_Copy() [Benchmark] public void Memory_Copy() { - MQTTnet.Internal.MqttMemoryHelper.Copy(source, 0, target, 0, Length); + MQTTnet.Buffers.MqttMemoryHelper.Copy(source, 0, target, 0, Length); } } diff --git a/Source/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs b/Source/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs index b0c869244..cf693f154 100644 --- a/Source/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Benchmarks; -[SimpleJob(RuntimeMoniker.Net60)] +[SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter] [RankColumn] [MemoryDiagnoser] diff --git a/Source/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs b/Source/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs index 4d6e364d4..6f2fa91fb 100644 --- a/Source/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MessageProcessingMqttConnectionContextBenchmark.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [MemoryDiagnoser] public class MessageProcessingMqttConnectionContextBenchmark : BaseBenchmark { diff --git a/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs b/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs index bfa3d209c..b5be9a714 100644 --- a/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs @@ -6,11 +6,12 @@ using System.Text; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Jobs; +using MQTTnet.Buffers; using MQTTnet.Formatter; namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [MemoryDiagnoser] public class MqttBufferReaderBenchmark { diff --git a/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs b/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs index 0efc7ffac..e7a9e912c 100644 --- a/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs @@ -2,20 +2,20 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Jobs; -using MQTTnet.Formatter; +using MQTTnet.Buffers; using MQTTnet.Tests.Mockups; +using System; namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [MemoryDiagnoser] public class MqttPacketReaderWriterBenchmark : BaseBenchmark { readonly byte[] _demoPayload = new byte[1024]; - + byte[] _readPayload; [GlobalCleanup] @@ -27,7 +27,7 @@ public void GlobalCleanup() public void GlobalSetup() { TestEnvironment.EnableLogger = false; - + var writer = new MqttBufferWriter(4096, 65535); writer.WriteString("A relative short string."); writer.WriteBinary(_demoPayload); @@ -69,7 +69,7 @@ public void Read_100_000_Messages() reader.ReadBinaryData(); } } - + [Benchmark] public void Write_100_000_Messages() { diff --git a/Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs b/Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs index 1dc822c40..209dec63f 100644 --- a/Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs @@ -2,10 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Reflection; -using System.Threading; -using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Jobs; using MQTTnet.Channel; @@ -14,10 +10,14 @@ using MQTTnet.Implementations; using MQTTnet.Server; using MQTTnet.Server.Internal.Adapter; +using System.Buffers; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Benchmarks; -[SimpleJob(RuntimeMoniker.Net60)] +[SimpleJob(RuntimeMoniker.Net80)] [MemoryDiagnoser] public class MqttTcpChannelBenchmark : BaseBenchmark { @@ -64,12 +64,13 @@ async Task ReadAsync(int iterations, int size) { await Task.Yield(); + var buffer = new byte[size]; var expected = iterations * size; long read = 0; while (read < expected) { - var readResult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false); + var readResult = await _clientChannel.ReadAsync(buffer, 0, size, CancellationToken.None).ConfigureAwait(false); read += readResult; } } @@ -78,7 +79,7 @@ async Task WriteAsync(int iterations, int size) { await Task.Yield(); - var buffer = new ArraySegment(new byte[size]); + var buffer = new ReadOnlySequence(new byte[size]); for (var i = 0; i < iterations; i++) { diff --git a/Source/MQTTnet.Benchmarks/ReaderExtensionsBenchmark.cs b/Source/MQTTnet.Benchmarks/ReaderExtensionsBenchmark.cs index 41ba0a821..3bed9d381 100644 --- a/Source/MQTTnet.Benchmarks/ReaderExtensionsBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/ReaderExtensionsBenchmark.cs @@ -2,6 +2,7 @@ using BenchmarkDotNet.Jobs; using MQTTnet.Adapter; using MQTTnet.AspNetCore; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Formatter; using MQTTnet.Packets; @@ -14,7 +15,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter, RankColumn] [MemoryDiagnoser] public class ReaderExtensionsBenchmark @@ -35,8 +36,14 @@ public void GlobalSetup() var buffer = mqttPacketFormatter.Encode(packet); stream = new MemoryStream(); - stream.Write(buffer.Packet); - stream.Write(buffer.Payload); + foreach (var segment in buffer.Packet) + { + stream.Write(segment.Span); + } + foreach (var segment in buffer.Payload) + { + stream.Write(segment.Span); + } mqttPacketFormatter.Cleanup(); } @@ -172,7 +179,7 @@ public static bool TryDecode(MqttPacketFormatterAdapter formatter, var bodySlice = copy.Slice(0, bodyLength); var buffer = GetMemory(bodySlice).ToArray(); - var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, new ArraySegment(buffer, 0, buffer.Length), buffer.Length + 2); + using var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, new ArraySegment(buffer, 0, buffer.Length), buffer.Length + 2); if (formatter.ProtocolVersion == MqttProtocolVersion.Unknown) { diff --git a/Source/MQTTnet.Benchmarks/RoundtripProcessingBenchmark.cs b/Source/MQTTnet.Benchmarks/RoundtripProcessingBenchmark.cs index e3358fb91..69fedda41 100644 --- a/Source/MQTTnet.Benchmarks/RoundtripProcessingBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/RoundtripProcessingBenchmark.cs @@ -5,7 +5,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter, RankColumn] [MemoryDiagnoser] public class RoundtripProcessingBenchmark : BaseBenchmark diff --git a/Source/MQTTnet.Benchmarks/SendPacketAsyncBenchmark.cs b/Source/MQTTnet.Benchmarks/SendPacketAsyncBenchmark.cs index aa5cd0383..7886d184a 100644 --- a/Source/MQTTnet.Benchmarks/SendPacketAsyncBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/SendPacketAsyncBenchmark.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter, RankColumn] [MemoryDiagnoser] public class SendPacketAsyncBenchmark : BaseBenchmark @@ -40,9 +40,12 @@ public async ValueTask After() stream.Position = 0; var output = PipeWriter.Create(stream); - if (buffer.Payload.Count == 0) + if (buffer.Payload.Length == 0) { - await output.WriteAsync(buffer.Packet).ConfigureAwait(false); + foreach (var buffer in buffer.Packet) + { + await output.WriteAsync(buffer).ConfigureAwait(false); + } } else { @@ -59,8 +62,18 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer) var span = output.GetSpan(buffer.Length); - buffer.Packet.AsSpan().CopyTo(span); - buffer.Payload.AsSpan().CopyTo(span.Slice(buffer.Packet.Count)); + int offset = 0; + foreach (var segment in buffer.Packet) + { + segment.Span.CopyTo(span.Slice(offset)); + offset += segment.Length; + } + + foreach (var segment in buffer.Payload) + { + segment.Span.CopyTo(span.Slice(offset)); + offset += segment.Length; + } output.Advance(buffer.Length); } diff --git a/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs index de6ed3dc9..f451ce60c 100644 --- a/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -14,10 +14,12 @@ using MQTTnet.Formatter.V3; using BenchmarkDotNet.Jobs; using MQTTnet.Diagnostics; +using System.Buffers; +using MQTTnet.Buffers; namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter] [MemoryDiagnoser] public class SerializerBenchmark : BaseBenchmark @@ -104,7 +106,7 @@ public Task ReadAsync(byte[] buffer, int offset, int count, CancellationTok return Task.FromResult(count); } - public Task WriteAsync(ArraySegment buffer, bool isEndOfPacket, CancellationToken cancellationToken) + public Task WriteAsync(ReadOnlySequence buffer, bool isEndOfPacket, CancellationToken cancellationToken) { throw new NotSupportedException(); } diff --git a/Source/MQTTnet.Benchmarks/ServerProcessingBenchmark.cs b/Source/MQTTnet.Benchmarks/ServerProcessingBenchmark.cs index fbac6dc02..f2e582af7 100644 --- a/Source/MQTTnet.Benchmarks/ServerProcessingBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/ServerProcessingBenchmark.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter, RankColumn] [MemoryDiagnoser] public class ServerProcessingBenchmark : BaseBenchmark diff --git a/Source/MQTTnet.Benchmarks/TcpPipesBenchmark.cs b/Source/MQTTnet.Benchmarks/TcpPipesBenchmark.cs index 32e45a3ac..57845070d 100644 --- a/Source/MQTTnet.Benchmarks/TcpPipesBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/TcpPipesBenchmark.cs @@ -13,7 +13,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [MemoryDiagnoser] public class TcpPipesBenchmark : BaseBenchmark { diff --git a/Source/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs b/Source/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs index a75d3521a..248db82c4 100644 --- a/Source/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Benchmarks { - [SimpleJob(RuntimeMoniker.Net60)] + [SimpleJob(RuntimeMoniker.Net80)] [RPlotExporter] [MemoryDiagnoser] public class TopicFilterComparerBenchmark : BaseBenchmark diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs index 663e23846..b667f0c5b 100644 --- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs +++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -132,7 +133,7 @@ Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventAr return CompletedTask.Instance; } - var payloadBuffer = eventArgs.ApplicationMessage.PayloadSegment.ToArray(); + var payloadBuffer = eventArgs.ApplicationMessage.Payload.Sequence.ToArray(); awaitable.TrySetResult(payloadBuffer); // Set this message to handled to that other code can avoid execution etc. diff --git a/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs b/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs index b09a0e468..007a48da2 100644 --- a/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs +++ b/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs @@ -7,6 +7,7 @@ using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Implementations; diff --git a/Source/MQTTnet.Server/Internal/Formatter/MqttPublishPacketFactory.cs b/Source/MQTTnet.Server/Internal/Formatter/MqttPublishPacketFactory.cs index 6e747f4d0..1e2db79e4 100644 --- a/Source/MQTTnet.Server/Internal/Formatter/MqttPublishPacketFactory.cs +++ b/Source/MQTTnet.Server/Internal/Formatter/MqttPublishPacketFactory.cs @@ -4,6 +4,7 @@ using MQTTnet.Exceptions; using MQTTnet.Packets; +using System.Buffers; namespace MQTTnet.Server.Internal.Formatter; @@ -30,7 +31,7 @@ public static MqttPublishPacket Create(MqttConnectPacket connectPacket) var packet = new MqttPublishPacket { Topic = connectPacket.WillTopic, - PayloadSegment = willMessageBuffer, + Payload = new ReadOnlySequence(willMessageBuffer), QualityOfServiceLevel = connectPacket.WillQoS, Retain = connectPacket.WillRetain, ContentType = connectPacket.WillContentType, @@ -56,7 +57,7 @@ public static MqttPublishPacket Create(MqttApplicationMessage applicationMessage var packet = new MqttPublishPacket { Topic = applicationMessage.Topic, - PayloadSegment = applicationMessage.PayloadSegment, + Payload = applicationMessage.Payload, QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel, Retain = applicationMessage.Retain, Dup = applicationMessage.Dup, diff --git a/Source/MQTTnet.Server/Internal/MqttRetainedMessagesManager.cs b/Source/MQTTnet.Server/Internal/MqttRetainedMessagesManager.cs index fb411eca7..2505671bf 100644 --- a/Source/MQTTnet.Server/Internal/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttRetainedMessagesManager.cs @@ -2,8 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Internal; +using System.Buffers; namespace MQTTnet.Server.Internal { @@ -65,8 +67,8 @@ public async Task UpdateMessage(string clientId, MqttApplicationMessage applicat lock (_messages) { - var payloadSegment = applicationMessage.PayloadSegment; - var hasPayload = payloadSegment.Count > 0; + var payload = applicationMessage.Payload; + var hasPayload = payload.Length > 0; if (!hasPayload) { @@ -77,14 +79,15 @@ public async Task UpdateMessage(string clientId, MqttApplicationMessage applicat { if (!_messages.TryGetValue(applicationMessage.Topic, out var existingMessage)) { - _messages[applicationMessage.Topic] = applicationMessage; + _messages[applicationMessage.Topic] = applicationMessage.Clone(); saveIsRequired = true; } else { - if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || !SequenceEqual(existingMessage.PayloadSegment, payloadSegment)) + if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || + !MqttMemoryHelper.SequenceEqual(existingMessage.Payload.Sequence, payload.Sequence)) { - _messages[applicationMessage.Topic] = applicationMessage; + _messages[applicationMessage.Topic] = applicationMessage.Clone(); saveIsRequired = true; } } @@ -147,10 +150,5 @@ public async Task ClearMessages() await _eventContainer.RetainedMessagesClearedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false); } } - - static bool SequenceEqual(ArraySegment source, ArraySegment target) - { - return source.AsSpan().SequenceEqual(target); - } } } \ No newline at end of file diff --git a/Source/MQTTnet.Server/MqttServerExtensions.cs b/Source/MQTTnet.Server/MqttServerExtensions.cs index 59aecebbf..d7ef08a8a 100644 --- a/Source/MQTTnet.Server/MqttServerExtensions.cs +++ b/Source/MQTTnet.Server/MqttServerExtensions.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Buffers; using System.Text; using MQTTnet.Internal; using MQTTnet.Packets; @@ -49,7 +50,7 @@ public static Task InjectApplicationMessage( new MqttApplicationMessage { Topic = topic, - PayloadSegment = new ArraySegment(payloadBuffer), + Payload = new ReadOnlySequence(payloadBuffer), QualityOfServiceLevel = qualityOfServiceLevel, Retain = retain })); diff --git a/Source/MQTTnet.TestApp/ClientTest.cs b/Source/MQTTnet.TestApp/ClientTest.cs index 775281037..a278332a5 100644 --- a/Source/MQTTnet.TestApp/ClientTest.cs +++ b/Source/MQTTnet.TestApp/ClientTest.cs @@ -35,12 +35,9 @@ public static async Task RunAsync() client.ApplicationMessageReceivedAsync += e => { var payloadText = string.Empty; - if (e.ApplicationMessage.PayloadSegment.Count > 0) + if (e.ApplicationMessage.Payload.Length > 0) { - payloadText = Encoding.UTF8.GetString( - e.ApplicationMessage.PayloadSegment.Array, - e.ApplicationMessage.PayloadSegment.Offset, - e.ApplicationMessage.PayloadSegment.Count); + payloadText = Encoding.UTF8.GetString(e.ApplicationMessage.Payload.Sequence); } Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); diff --git a/Source/MQTTnet.TestApp/PerformanceTest.cs b/Source/MQTTnet.TestApp/PerformanceTest.cs index cf715e07d..9c27bdf33 100644 --- a/Source/MQTTnet.TestApp/PerformanceTest.cs +++ b/Source/MQTTnet.TestApp/PerformanceTest.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Diagnostics; using System.Linq; using System.Net; @@ -200,7 +201,7 @@ static MqttApplicationMessage CreateMessage() return new MqttApplicationMessage { Topic = "A/B/C", - PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes(Payload)), + Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes(Payload)), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }; } @@ -233,7 +234,7 @@ public static async Task RunQoS2Test() var message = new MqttApplicationMessage { Topic = "A/B/C", - PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes("Hello World")), + Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("Hello World")), QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }; @@ -284,7 +285,7 @@ public static async Task RunQoS1Test() var message = new MqttApplicationMessage { Topic = "A/B/C", - PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes("Hello World")), + Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("Hello World")), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }; @@ -335,7 +336,7 @@ public static async Task RunQoS0Test() var message = new MqttApplicationMessage { Topic = "A/B/C", - PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes("Hello World")), + Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("Hello World")), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; diff --git a/Source/MQTTnet.TestApp/PublicBrokerTest.cs b/Source/MQTTnet.TestApp/PublicBrokerTest.cs index 5124ae967..d1b0cf462 100644 --- a/Source/MQTTnet.TestApp/PublicBrokerTest.cs +++ b/Source/MQTTnet.TestApp/PublicBrokerTest.cs @@ -144,7 +144,7 @@ static async Task ExecuteTestAsync(string name, MqttClientOptions options) MqttApplicationMessage receivedMessage = null; client.ApplicationMessageReceivedAsync += e => { - receivedMessage = e.ApplicationMessage; + receivedMessage = e.TransferApplicationMessageOwnership(true); return CompletedTask.Instance; }; diff --git a/Source/MQTTnet.TestApp/ServerTest.cs b/Source/MQTTnet.TestApp/ServerTest.cs index 0076381e0..b52bfde40 100644 --- a/Source/MQTTnet.TestApp/ServerTest.cs +++ b/Source/MQTTnet.TestApp/ServerTest.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -101,7 +102,7 @@ public static async Task RunAsync() { // Replace the payload with the timestamp. But also extending a JSON // based payload with the timestamp is a suitable use case. - e.ApplicationMessage.PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"))); + e.ApplicationMessage.Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"))); } if (e.ApplicationMessage.Topic == "not_allowed_topic") @@ -145,12 +146,9 @@ public static async Task RunAsync() mqttServer.InterceptingPublishAsync += e => { var payloadText = string.Empty; - if (e.ApplicationMessage.PayloadSegment.Count > 0) + if (e.ApplicationMessage.Payload.Length > 0) { - payloadText = Encoding.UTF8.GetString( - e.ApplicationMessage.PayloadSegment.Array, - e.ApplicationMessage.PayloadSegment.Offset, - e.ApplicationMessage.PayloadSegment.Count); + payloadText = Encoding.UTF8.GetString(e.ApplicationMessage.Payload.Sequence); } MqttNetConsoleLogger.PrintToConsole($"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{payloadText}'", ConsoleColor.Magenta); diff --git a/Source/MQTTnet.Tests/ASP/Mockups/ConnectionHandlerMockup.cs b/Source/MQTTnet.Tests/ASP/Mockups/ConnectionHandlerMockup.cs index b4ca73d1c..38d1f91e7 100644 --- a/Source/MQTTnet.Tests/ASP/Mockups/ConnectionHandlerMockup.cs +++ b/Source/MQTTnet.Tests/ASP/Mockups/ConnectionHandlerMockup.cs @@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Connections; using MQTTnet.Adapter; using MQTTnet.AspNetCore; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Server; diff --git a/Source/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs b/Source/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs index cf8075728..b78839ea7 100644 --- a/Source/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs +++ b/Source/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs @@ -3,12 +3,14 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Connections; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.AspNetCore; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Formatter; using MQTTnet.Packets; @@ -99,7 +101,7 @@ public async Task TestLargePacket() connection.Transport = pipe; var ctx = new MqttConnectionContext(serializer, connection); - await ctx.SendPacketAsync(new MqttPublishPacket { PayloadSegment = new ArraySegment(new byte[20_000]) }, CancellationToken.None).ConfigureAwait(false); + await ctx.SendPacketAsync(new MqttPublishPacket { Payload = new ReadOnlySequence(new byte[20_000]) }, CancellationToken.None).ConfigureAwait(false); var readResult = await pipe.Send.Reader.ReadAsync(); Assert.IsTrue(readResult.Buffer.Length > 20000); diff --git a/Source/MQTTnet.Tests/ASP/ReaderExtensionsTest.cs b/Source/MQTTnet.Tests/ASP/ReaderExtensionsTest.cs index 6c9cac8f8..ab5e8357f 100644 --- a/Source/MQTTnet.Tests/ASP/ReaderExtensionsTest.cs +++ b/Source/MQTTnet.Tests/ASP/ReaderExtensionsTest.cs @@ -5,6 +5,7 @@ using System.Buffers; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.AspNetCore; +using MQTTnet.Buffers; using MQTTnet.Formatter; using MQTTnet.Packets; @@ -18,7 +19,7 @@ public void TestTryDeserialize() { var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311, new MqttBufferWriter(4096, 65535)); - var buffer = serializer.Encode(new MqttPublishPacket { Topic = "a", PayloadSegment = new byte[5] }).Join(); + var buffer = serializer.Encode(new MqttPublishPacket { Topic = "a", Payload = new ReadOnlySequence(new byte[5]) }).Join(); var sequence = new ReadOnlySequence(buffer.Array, buffer.Offset, buffer.Count); diff --git a/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs b/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs index 35a1af921..02938911f 100644 --- a/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs +++ b/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs @@ -2,14 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; using MQTTnet.Exceptions; @@ -19,6 +11,14 @@ using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Tests.Mockups; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; // ReSharper disable InconsistentNaming @@ -287,7 +287,7 @@ await receiver.SubscribeAsync( MqttApplicationMessage receivedMessage = null; receiver.ApplicationMessageReceivedAsync += e => { - receivedMessage = e.ApplicationMessage; + receivedMessage = e.ApplicationMessage.Clone(); return CompletedTask.Instance; }; @@ -297,7 +297,7 @@ await receiver.SubscribeAsync( Assert.IsNotNull(receivedMessage); Assert.AreEqual("A", receivedMessage.Topic); - Assert.AreEqual(null, receivedMessage.PayloadSegment.Array); + Assert.AreEqual(0, receivedMessage.Payload.Length); } } @@ -508,7 +508,7 @@ public async Task Publish_QoS_1_In_ApplicationMessageReceiveHandler() client2.ApplicationMessageReceivedAsync += e => { - client2TopicResults.Add(Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray())); + client2TopicResults.Add(Encoding.UTF8.GetString(e.ApplicationMessage.Payload.Sequence)); return CompletedTask.Instance; }; @@ -869,7 +869,7 @@ public async Task Subscribe_In_Callback_Events() { lock (receivedMessages) { - receivedMessages.Add(e.ApplicationMessage); + receivedMessages.Add(e.TransferApplicationMessageOwnership(true)); } return CompletedTask.Instance; diff --git a/Source/MQTTnet.Tests/Diagnostics/PacketInspection_Tests.cs b/Source/MQTTnet.Tests/Diagnostics/PacketInspection_Tests.cs index 7db513f58..cb998e6f9 100644 --- a/Source/MQTTnet.Tests/Diagnostics/PacketInspection_Tests.cs +++ b/Source/MQTTnet.Tests/Diagnostics/PacketInspection_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; diff --git a/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs index cf709af80..386612188 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.IO; using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Formatter; diff --git a/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Binary_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Binary_Tests.cs index b92608489..87567ab35 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Binary_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Binary_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -10,6 +11,7 @@ using System.Threading; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Formatter; @@ -111,7 +113,7 @@ public void DeserializeV311_MqttPublishPacket() PacketIdentifier = 123, Dup = true, Retain = true, - PayloadSegment = new ArraySegment(Encoding.ASCII.GetBytes("HELLO")), + Payload = new ReadOnlySequence(Encoding.ASCII.GetBytes("HELLO")), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Topic = "A/B/C" }; @@ -318,7 +320,7 @@ public void Serialize_LargePacket() var publishPacket = new MqttPublishPacket { Topic = "abcdefghijklmnopqrstuvwxyz0123456789", - PayloadSegment = new ArraySegment(payload) + Payload = new ReadOnlySequence(payload) }; var serializationHelper = new MqttPacketSerializationHelper(); @@ -328,17 +330,17 @@ public void Serialize_LargePacket() Assert.IsNotNull(publishPacketCopy); Assert.AreEqual(publishPacket.Topic, publishPacketCopy.Topic); - CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), publishPacketCopy.PayloadSegment.ToArray()); + CollectionAssert.AreEqual(publishPacket.Payload.Sequence.ToArray(), publishPacketCopy.Payload.Sequence.ToArray()); // Now modify the payload and test again. - publishPacket.PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes("MQTT")); + publishPacket.Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("MQTT")); buffer = serializationHelper.Encode(publishPacket); var publishPacketCopy2 = serializationHelper.Decode(buffer) as MqttPublishPacket; Assert.IsNotNull(publishPacketCopy2); Assert.AreEqual(publishPacket.Topic, publishPacketCopy2.Topic); - CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), publishPacketCopy2.PayloadSegment.ToArray()); + CollectionAssert.AreEqual(publishPacket.Payload.Sequence.ToArray(), publishPacketCopy2.Payload.Sequence.ToArray()); } [TestMethod] @@ -462,7 +464,7 @@ public void SerializeV311_MqttPublishPacket() PacketIdentifier = 123, Dup = true, Retain = true, - PayloadSegment = new ArraySegment(Encoding.ASCII.GetBytes("HELLO")), + Payload = new ReadOnlySequence(Encoding.ASCII.GetBytes("HELLO")), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Topic = "A/B/C" }; diff --git a/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Tests.cs index 3a99ff5fb..c5bf1078c 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V3_Tests.cs @@ -2,15 +2,15 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Exceptions; using MQTTnet.Formatter; using MQTTnet.Packets; using MQTTnet.Protocol; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Text; namespace MQTTnet.Tests.Formatter { @@ -77,7 +77,7 @@ public void Serialize_Full_MqttConnAckPacket_V311() Assert.AreEqual(false, deserialized.WildcardSubscriptionAvailable); Assert.IsNull(deserialized.UserProperties); // Not supported in v3.1.1 } - + [TestMethod] public void Serialize_Full_MqttConnAckPacket_V310() { @@ -178,7 +178,7 @@ public void Serialize_Full_MqttConnectPacket_V311() Assert.AreEqual(connectPacket.ClientId, deserialized.ClientId); CollectionAssert.AreEqual(null, deserialized.AuthenticationData); // Not supported in v3.1.1 Assert.AreEqual(null, deserialized.AuthenticationMethod); // Not supported in v3.1.1 - Assert.AreEqual(connectPacket.CleanSession, deserialized.CleanSession); + Assert.AreEqual(connectPacket.CleanSession, deserialized.CleanSession); Assert.AreEqual(0L, deserialized.ReceiveMaximum); // Not supported in v3.1.1 Assert.AreEqual(connectPacket.WillFlag, deserialized.WillFlag); Assert.AreEqual(connectPacket.WillTopic, deserialized.WillTopic); @@ -298,7 +298,7 @@ public void Serialize_Full_MqttPublishPacket_V311() PacketIdentifier = 123, Dup = true, Retain = true, - PayloadSegment = new ArraySegment(Encoding.ASCII.GetBytes("Payload")), + Payload = new ReadOnlySequence(Encoding.ASCII.GetBytes("Payload")), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Topic = "Topic", ResponseTopic = "/Response", @@ -322,7 +322,7 @@ public void Serialize_Full_MqttPublishPacket_V311() Assert.AreEqual(publishPacket.PacketIdentifier, deserialized.PacketIdentifier); Assert.AreEqual(publishPacket.Dup, deserialized.Dup); Assert.AreEqual(publishPacket.Retain, deserialized.Retain); - CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), deserialized.PayloadSegment.ToArray()); + CollectionAssert.AreEqual(publishPacket.Payload.Sequence.ToArray(), deserialized.Payload.Sequence.ToArray()); Assert.AreEqual(publishPacket.QualityOfServiceLevel, deserialized.QualityOfServiceLevel); Assert.AreEqual(publishPacket.Topic, deserialized.Topic); Assert.AreEqual(null, deserialized.ResponseTopic); // Not supported in v3.1.1. @@ -399,7 +399,7 @@ public void Serialize_Full_MqttSubAckPacket_V311() }; var deserialized = MqttPacketSerializationHelper.EncodeAndDecodePacket(subAckPacket, MqttProtocolVersion.V311); - + Assert.AreEqual(subAckPacket.PacketIdentifier, deserialized.PacketIdentifier); Assert.AreEqual(null, deserialized.ReasonString); // Not supported in v3.1.1 Assert.AreEqual(subAckPacket.ReasonCodes.Count, deserialized.ReasonCodes.Count); diff --git a/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V5_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V5_Tests.cs index 1f9634379..766be15d3 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V5_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttPacketSerialization_V5_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Text; @@ -259,7 +260,7 @@ public void Serialize_Full_MqttPublishPacket_V500() PacketIdentifier = 123, Dup = true, Retain = true, - PayloadSegment = new ArraySegment(Encoding.ASCII.GetBytes("Payload")), + Payload = new ReadOnlySequence(Encoding.ASCII.GetBytes("Payload")), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Topic = "Topic", ResponseTopic = "/Response", @@ -283,7 +284,7 @@ public void Serialize_Full_MqttPublishPacket_V500() Assert.AreEqual(publishPacket.PacketIdentifier, deserialized.PacketIdentifier); Assert.AreEqual(publishPacket.Dup, deserialized.Dup); Assert.AreEqual(publishPacket.Retain, deserialized.Retain); - CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), deserialized.PayloadSegment.ToArray()); + CollectionAssert.AreEqual(publishPacket.Payload.Sequence.ToArray(), deserialized.Payload.Sequence.ToArray()); Assert.AreEqual(publishPacket.QualityOfServiceLevel, deserialized.QualityOfServiceLevel); Assert.AreEqual(publishPacket.Topic, deserialized.Topic); Assert.AreEqual(publishPacket.ResponseTopic, deserialized.ResponseTopic); diff --git a/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs b/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs index 7a16ff30a..37da37aa9 100644 --- a/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs +++ b/Source/MQTTnet.Tests/Formatter/MqttPacketWriter_Tests.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Formatter; diff --git a/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs b/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs index 1c6e1bf11..83020d5ad 100644 --- a/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs +++ b/Source/MQTTnet.Tests/Helpers/MqttPacketWriterExtensions.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Buffers; using MQTTnet.Formatter; using MQTTnet.Protocol; diff --git a/Source/MQTTnet.Tests/MQTTnet.Tests.csproj b/Source/MQTTnet.Tests/MQTTnet.Tests.csproj index 90a2e1d89..f5b034fed 100644 --- a/Source/MQTTnet.Tests/MQTTnet.Tests.csproj +++ b/Source/MQTTnet.Tests/MQTTnet.Tests.csproj @@ -14,18 +14,18 @@ - - - - + + + + - - - - - + + + + + \ No newline at end of file diff --git a/Source/MQTTnet.Tests/MQTTv5/Client_Tests.cs b/Source/MQTTnet.Tests/MQTTv5/Client_Tests.cs index 40def6acd..1c508d1bc 100644 --- a/Source/MQTTnet.Tests/MQTTv5/Client_Tests.cs +++ b/Source/MQTTnet.Tests/MQTTv5/Client_Tests.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -211,7 +212,7 @@ public async Task Subscribe_And_Publish() var client1 = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500).WithClientId("client1")); - var testMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); + using var testMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); await client1.SubscribeAsync("a"); @@ -243,38 +244,45 @@ public async Task Publish_And_Receive_New_Properties() MqttApplicationMessage receivedMessage = null; receiver.ApplicationMessageReceivedAsync += e => { - receivedMessage = e.ApplicationMessage; + receivedMessage = e.TransferApplicationMessageOwnership(true); return CompletedTask.Instance; }; - var sender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); - - var applicationMessage = new MqttApplicationMessageBuilder() - .WithTopic("Hello") - .WithPayload("World") - .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) - .WithUserProperty("x", "1") - .WithUserProperty("y", "2") - .WithResponseTopic("response") - .WithContentType("text") - .WithMessageExpiryInterval(50) - .WithCorrelationData(new byte[12]) - .WithTopicAlias(2) - .Build(); - - await sender.PublishAsync(applicationMessage); - - await Task.Delay(500); - - Assert.IsNotNull(receivedMessage); - Assert.AreEqual(applicationMessage.Topic, receivedMessage.Topic); - Assert.AreEqual(applicationMessage.TopicAlias, receivedMessage.TopicAlias); - Assert.AreEqual(applicationMessage.ContentType, receivedMessage.ContentType); - Assert.AreEqual(applicationMessage.ResponseTopic, receivedMessage.ResponseTopic); - Assert.AreEqual(applicationMessage.MessageExpiryInterval, receivedMessage.MessageExpiryInterval); - CollectionAssert.AreEqual(applicationMessage.CorrelationData, receivedMessage.CorrelationData); - CollectionAssert.AreEqual(applicationMessage.PayloadSegment.ToArray(), receivedMessage.PayloadSegment.ToArray()); - CollectionAssert.AreEqual(applicationMessage.UserProperties, receivedMessage.UserProperties); + try + { + var sender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); + + var applicationMessage = new MqttApplicationMessageBuilder() + .WithTopic("Hello") + .WithPayload("World") + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) + .WithUserProperty("x", "1") + .WithUserProperty("y", "2") + .WithResponseTopic("response") + .WithContentType("text") + .WithMessageExpiryInterval(50) + .WithCorrelationData(new byte[12]) + .WithTopicAlias(2) + .Build(); + + await sender.PublishAsync(applicationMessage); + + await Task.Delay(500); + + Assert.IsNotNull(receivedMessage); + Assert.AreEqual(applicationMessage.Topic, receivedMessage.Topic); + Assert.AreEqual(applicationMessage.TopicAlias, receivedMessage.TopicAlias); + Assert.AreEqual(applicationMessage.ContentType, receivedMessage.ContentType); + Assert.AreEqual(applicationMessage.ResponseTopic, receivedMessage.ResponseTopic); + Assert.AreEqual(applicationMessage.MessageExpiryInterval, receivedMessage.MessageExpiryInterval); + CollectionAssert.AreEqual(applicationMessage.CorrelationData, receivedMessage.CorrelationData); + CollectionAssert.AreEqual(applicationMessage.Payload.Sequence.ToArray(), receivedMessage.Payload.Sequence.ToArray()); + CollectionAssert.AreEqual(applicationMessage.UserProperties, receivedMessage.UserProperties); + } + finally + { + receivedMessage?.Dispose(); + } } } } diff --git a/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs b/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs index 5cb8ead61..32f6f0c1b 100644 --- a/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs +++ b/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.IO; using System.Security.Cryptography.X509Certificates; using System.Threading; @@ -47,9 +48,12 @@ public Task ReadAsync(byte[] buffer, int offset, int count, CancellationTok return _stream.ReadAsync(buffer, offset, count, cancellationToken); } - public Task WriteAsync(ArraySegment buffer, bool isEndOfPacket, CancellationToken cancellationToken) + public async Task WriteAsync(ReadOnlySequence buffer, bool isEndOfPacket, CancellationToken cancellationToken) { - return _stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken); + foreach (var segment in buffer) + { + await _stream.WriteAsync(segment, cancellationToken).ConfigureAwait(false); + } } public void Dispose() diff --git a/Source/MQTTnet.Tests/Mockups/TestApplicationMessageReceivedHandler.cs b/Source/MQTTnet.Tests/Mockups/TestApplicationMessageReceivedHandler.cs index 21dfcfc68..679815b7d 100644 --- a/Source/MQTTnet.Tests/Mockups/TestApplicationMessageReceivedHandler.cs +++ b/Source/MQTTnet.Tests/Mockups/TestApplicationMessageReceivedHandler.cs @@ -13,10 +13,18 @@ namespace MQTTnet.Tests.Mockups { - public sealed class TestApplicationMessageReceivedHandler + public sealed class TestApplicationMessageReceivedHandler : IDisposable { readonly List _receivedEventArgs = new List(); + public void Dispose() + { + foreach (var eventArgs in _receivedEventArgs) + { + eventArgs.ApplicationMessage?.Dispose(); + } + } + public TestApplicationMessageReceivedHandler(IMqttClient mqttClient) { if (mqttClient == null) @@ -76,6 +84,8 @@ Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e { lock (_receivedEventArgs) { + // take ownership of application message to avoid cloning + eventArgs.TransferApplicationMessageOwnership(false); _receivedEventArgs.Add(eventArgs); } diff --git a/Source/MQTTnet.Tests/MqttApplicationMessageBuilder_Tests.cs b/Source/MQTTnet.Tests/MqttApplicationMessageBuilder_Tests.cs index f4f28f0bc..7115280fd 100644 --- a/Source/MQTTnet.Tests/MqttApplicationMessageBuilder_Tests.cs +++ b/Source/MQTTnet.Tests/MqttApplicationMessageBuilder_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.IO; using System.Linq; using System.Text; @@ -30,7 +31,7 @@ public void CreateApplicationMessage_TimeStampPayload() Assert.AreEqual("xyz", message.Topic); Assert.IsFalse(message.Retain); Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); - Assert.AreEqual(Encoding.UTF8.GetString(message.PayloadSegment.ToArray()), "00:06:00"); + Assert.AreEqual(Encoding.UTF8.GetString(message.Payload.Sequence), "00:06:00"); } [TestMethod] @@ -42,7 +43,7 @@ public void CreateApplicationMessage_StreamPayload() Assert.AreEqual("123", message.Topic); Assert.IsFalse(message.Retain); Assert.AreEqual(MqttQualityOfServiceLevel.AtMostOnce, message.QualityOfServiceLevel); - Assert.AreEqual(Encoding.UTF8.GetString(message.PayloadSegment.ToArray()), "Hello"); + Assert.AreEqual(Encoding.UTF8.GetString(message.Payload.Sequence), "Hello"); } [TestMethod] diff --git a/Source/MQTTnet.Tests/MqttPacketSerializationHelper.cs b/Source/MQTTnet.Tests/MqttPacketSerializationHelper.cs index 43d1323d9..87e4fdcd9 100644 --- a/Source/MQTTnet.Tests/MqttPacketSerializationHelper.cs +++ b/Source/MQTTnet.Tests/MqttPacketSerializationHelper.cs @@ -1,6 +1,7 @@ using System; using System.Threading; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Packets; diff --git a/Source/MQTTnet.Tests/MqttPacketWriter_Tests.cs b/Source/MQTTnet.Tests/MqttPacketWriter_Tests.cs index 0a3d3f5eb..ba90b1dd9 100644 --- a/Source/MQTTnet.Tests/MqttPacketWriter_Tests.cs +++ b/Source/MQTTnet.Tests/MqttPacketWriter_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Buffers; using MQTTnet.Formatter; namespace MQTTnet.Tests diff --git a/Source/MQTTnet.Tests/Protocol_Tests.cs b/Source/MQTTnet.Tests/Protocol_Tests.cs index 6f5146c8d..c77ee02d1 100644 --- a/Source/MQTTnet.Tests/Protocol_Tests.cs +++ b/Source/MQTTnet.Tests/Protocol_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Buffers; using MQTTnet.Formatter; namespace MQTTnet.Tests diff --git a/Source/MQTTnet.Tests/Server/Cross_Version_Tests.cs b/Source/MQTTnet.Tests/Server/Cross_Version_Tests.cs index 35683a843..e0f23f084 100644 --- a/Source/MQTTnet.Tests/Server/Cross_Version_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Cross_Version_Tests.cs @@ -18,7 +18,7 @@ public async Task Send_V311_Receive_V500() await testEnvironment.StartServer(); var receiver = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500)); - var receivedApplicationMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + using var receivedApplicationMessages = testEnvironment.CreateApplicationMessageHandler(receiver); await receiver.SubscribeAsync("#"); var sender = await testEnvironment.ConnectClient(); @@ -42,7 +42,7 @@ public async Task Send_V500_Receive_V311() await testEnvironment.StartServer(); var receiver = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V311)); - var receivedApplicationMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + using var receivedApplicationMessages = testEnvironment.CreateApplicationMessageHandler(receiver); await receiver.SubscribeAsync("#"); var sender = await testEnvironment.ConnectClient(); diff --git a/Source/MQTTnet.Tests/Server/General.cs b/Source/MQTTnet.Tests/Server/General.cs index 0cf7c43f6..9ef75a155 100644 --- a/Source/MQTTnet.Tests/Server/General.cs +++ b/Source/MQTTnet.Tests/Server/General.cs @@ -2,20 +2,19 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Adapter; using MQTTnet.Client; -using MQTTnet.Formatter; using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server; +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet.Tests.Server { @@ -306,7 +305,7 @@ public async Task Intercept_Message() var server = await testEnvironment.StartServer(); server.InterceptingPublishAsync += e => { - e.ApplicationMessage.PayloadSegment = new ArraySegment(Encoding.ASCII.GetBytes("extended")); + e.ApplicationMessage.Payload = new ReadOnlySequence(Encoding.ASCII.GetBytes("extended")); return CompletedTask.Instance; }; @@ -317,7 +316,7 @@ public async Task Intercept_Message() var isIntercepted = false; c2.ApplicationMessageReceivedAsync += e => { - isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()), StringComparison.Ordinal) == 0; + isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(e.ApplicationMessage.Payload.Sequence), StringComparison.Ordinal) == 0; return CompletedTask.Instance; }; @@ -428,7 +427,7 @@ await server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "/test/1", - PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes("true")), + Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("true")), QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }) { @@ -783,7 +782,7 @@ public async Task Send_Long_Body() var client1 = await testEnvironment.ConnectClient(); client1.ApplicationMessageReceivedAsync += e => { - receivedBody = e.ApplicationMessage.PayloadSegment.ToArray(); + receivedBody = e.ApplicationMessage.Payload.Sequence.ToArray(); return CompletedTask.Instance; }; @@ -1014,7 +1013,7 @@ async Task TestPublishAsync( await testEnvironment.StartServer(); var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("receiver")); - var c1MessageHandler = testEnvironment.CreateApplicationMessageHandler(c1); + using var c1MessageHandler = testEnvironment.CreateApplicationMessageHandler(c1); await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("sender")); diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index 14d30026e..27979a867 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -17,8 +17,8 @@ public async Task Inject_Application_Message_At_Session_Level() var server = await testEnvironment.StartServer(); var receiver1 = await testEnvironment.ConnectClient(); var receiver2 = await testEnvironment.ConnectClient(); - var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); - var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + using var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); + using var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); var status = await server.GetSessionsAsync(); var clientStatus = status[0]; @@ -47,7 +47,7 @@ public async Task Inject_ApplicationMessage_At_Server_Level() var receiver = await testEnvironment.ConnectClient(); - var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + using var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); await receiver.SubscribeAsync("#"); diff --git a/Source/MQTTnet.Tests/Server/No_Local_Tests.cs b/Source/MQTTnet.Tests/Server/No_Local_Tests.cs index 4f223edc4..a909247a7 100644 --- a/Source/MQTTnet.Tests/Server/No_Local_Tests.cs +++ b/Source/MQTTnet.Tests/Server/No_Local_Tests.cs @@ -17,7 +17,7 @@ public Task Subscribe_With_No_Local() { return ExecuteTest(true, 0); } - + [TestMethod] public Task Subscribe_Without_No_Local() { @@ -31,19 +31,19 @@ async Task ExecuteTest( using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500)) { await testEnvironment.StartServer(); - + var client1 = await testEnvironment.ConnectClient(); - var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); + using var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); var topicFilter = testEnvironment.ClientFactory.CreateTopicFilterBuilder().WithTopic("Topic").WithNoLocal(noLocal).Build(); await client1.SubscribeAsync(topicFilter); await LongTestDelay(); applicationMessageHandler.AssertReceivedCountEquals(0); - + // The client will publish a message where it is itself subscribing to. await client1.PublishStringAsync("Topic", "Payload", retain: true); await LongTestDelay(); - + applicationMessageHandler.AssertReceivedCountEquals(expectedCountAfterPublish); } } diff --git a/Source/MQTTnet.Tests/Server/Publishing_Tests.cs b/Source/MQTTnet.Tests/Server/Publishing_Tests.cs index 9ac14aa55..3429fb1cc 100644 --- a/Source/MQTTnet.Tests/Server/Publishing_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Publishing_Tests.cs @@ -79,29 +79,29 @@ public async Task Intercept_Client_Enqueue() using (var testEnvironment = CreateTestEnvironment()) { var server = await testEnvironment.StartServer(); - + var sender = await testEnvironment.ConnectClient(); - + var receiver = await testEnvironment.ConnectClient(); await receiver.SubscribeAsync("A"); - var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); - + using var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce); await LongTestDelay(); - + receivedMessages.AssertReceivedCountEquals(1); - + server.InterceptingClientEnqueueAsync += e => { e.AcceptEnqueue = false; return CompletedTask.Instance; }; - + await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce); await LongTestDelay(); - + // Do not increase because the internal enqueue to the target client is not accepted! receivedMessages.AssertReceivedCountEquals(1); } @@ -118,15 +118,15 @@ public async Task Intercept_Client_Enqueue_Multiple_Clients_Subscribed_Messages_ var receiverOne = await testEnvironment.ConnectClient(o => o.WithClientId("One")); await receiverOne.SubscribeAsync("A"); - var receivedMessagesOne = testEnvironment.CreateApplicationMessageHandler(receiverOne); + using var receivedMessagesOne = testEnvironment.CreateApplicationMessageHandler(receiverOne); var receiverTwo = await testEnvironment.ConnectClient(o => o.WithClientId("Two")); await receiverTwo.SubscribeAsync("A"); - var receivedMessagesTwo = testEnvironment.CreateApplicationMessageHandler(receiverTwo); + using var receivedMessagesTwo = testEnvironment.CreateApplicationMessageHandler(receiverTwo); var receiverThree = await testEnvironment.ConnectClient(o => o.WithClientId("Three")); await receiverThree.SubscribeAsync("A"); - var receivedMessagesThree = testEnvironment.CreateApplicationMessageHandler(receiverThree); + using var receivedMessagesThree = testEnvironment.CreateApplicationMessageHandler(receiverThree); server.InterceptingClientEnqueueAsync += e => { diff --git a/Source/MQTTnet.Tests/Server/QoS_Tests.cs b/Source/MQTTnet.Tests/Server/QoS_Tests.cs index 8f52d614b..e503b3b1b 100644 --- a/Source/MQTTnet.Tests/Server/QoS_Tests.cs +++ b/Source/MQTTnet.Tests/Server/QoS_Tests.cs @@ -148,7 +148,7 @@ public async Task Preserve_Message_Order_For_Queued_Messages() // Create a new client for the existing message. var client = await testEnvironment.ConnectClient(o => o.WithClientId("A").WithCleanSession(false)); - var messages = testEnvironment.CreateApplicationMessageHandler(client); + using var messages = testEnvironment.CreateApplicationMessageHandler(client); await LongTestDelay(); diff --git a/Source/MQTTnet.Tests/Server/Retain_As_Published_Tests.cs b/Source/MQTTnet.Tests/Server/Retain_As_Published_Tests.cs index 74dd9cfe4..29b3d4180 100644 --- a/Source/MQTTnet.Tests/Server/Retain_As_Published_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Retain_As_Published_Tests.cs @@ -31,7 +31,7 @@ async Task ExecuteTest(bool retainAsPublished) await testEnvironment.StartServer(); var client1 = await testEnvironment.ConnectClient(); - var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); + using var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); var topicFilter = testEnvironment.ClientFactory.CreateTopicFilterBuilder().WithTopic("Topic").WithRetainAsPublished(retainAsPublished).Build(); await client1.SubscribeAsync(topicFilter); await LongTestDelay(); diff --git a/Source/MQTTnet.Tests/Server/Retain_Handling_Tests.cs b/Source/MQTTnet.Tests/Server/Retain_Handling_Tests.cs index dcc8702ff..2917b4673 100644 --- a/Source/MQTTnet.Tests/Server/Retain_Handling_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Retain_Handling_Tests.cs @@ -47,7 +47,7 @@ async Task ExecuteTest( await LongTestDelay(); var client2 = await testEnvironment.ConnectClient(); - var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client2); + using var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client2); var topicFilter = testEnvironment.ClientFactory.CreateTopicFilterBuilder().WithTopic("Topic").WithRetainHandling(retainHandling).Build(); await client2.SubscribeAsync(topicFilter); diff --git a/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs b/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs index 579260166..16aa2cee5 100644 --- a/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs @@ -30,7 +30,7 @@ public async Task Clear_Retained_Message_With_Empty_Payload() await c1.DisconnectAsync(); var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await Task.Delay(200); await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); @@ -55,7 +55,7 @@ public async Task Clear_Retained_Message_With_Null_Payload() await c1.DisconnectAsync(); var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await Task.Delay(200); await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }); @@ -84,7 +84,7 @@ await c1.PublishAsync( // The second client uses QoS 1 so a downgrade is required. var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); await LongTestDelay(); @@ -114,7 +114,7 @@ await c1.PublishAsync( // The second client uses QoS 2 so an upgrade is expected but according to the MQTT spec this is not supported! var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }); await LongTestDelay(); @@ -137,7 +137,7 @@ public async Task Receive_No_Retained_Message_After_Subscribe() await c1.DisconnectAsync(); var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained_other").Build()); await Task.Delay(500); @@ -158,7 +158,7 @@ public async Task Receive_Retained_Message_After_Subscribe() await c1.DisconnectAsync(); var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").Build()); @@ -189,7 +189,7 @@ await c1.PublishAsync( // Subscribe using a new client. var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await Task.Delay(200); // Using QoS 2 will lead to 1 instead because the publish was made with QoS level 1 (see 3.8.4 SUBSCRIBE Actions)! @@ -211,7 +211,7 @@ public async Task Retained_Messages_Flow() var c1 = await testEnvironment.ConnectClient(); var c2 = await testEnvironment.ConnectClient(); - var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); + using var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2); await c1.PublishAsync(retainedMessage); await c1.DisconnectAsync(); diff --git a/Source/MQTTnet.Tests/Server/Session_Tests.cs b/Source/MQTTnet.Tests/Server/Session_Tests.cs index 0aed57aba..d1b4c6a1e 100644 --- a/Source/MQTTnet.Tests/Server/Session_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Session_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Linq; using System.Text; using System.Threading; @@ -283,7 +284,7 @@ public async Task Set_Session_Item() server.InterceptingPublishAsync += e => { - e.ApplicationMessage.PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes(e.SessionItems["default_payload"] as string ?? string.Empty)); + e.ApplicationMessage.Payload = new ReadOnlySequence(Encoding.UTF8.GetBytes(e.SessionItems["default_payload"] as string ?? string.Empty)); return CompletedTask.Instance; }; diff --git a/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs b/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs index cbedaefe1..e447cd157 100644 --- a/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs @@ -45,7 +45,7 @@ public async Task Subscription_Roundtrip(string topic, string filter, bool shoul var receiver = await testEnvironment.ConnectClient(); await receiver.SubscribeAsync(filter).ConfigureAwait(false); - var receivedMessages = receiver.TrackReceivedMessages(); + using var receivedMessages = receiver.TrackReceivedMessages(); var sender = await testEnvironment.ConnectClient(); await sender.PublishStringAsync(topic, "PAYLOAD").ConfigureAwait(false); @@ -143,7 +143,7 @@ public async Task Enqueue_Message_After_Subscription() }; var client = await testEnvironment.ConnectClient(); - var receivedMessages = testEnvironment.CreateApplicationMessageHandler(client); + using var receivedMessages = testEnvironment.CreateApplicationMessageHandler(client); await client.SubscribeAsync("test_topic"); diff --git a/Source/MQTTnet.Tests/Server/Subscription_Identifier_Tests.cs b/Source/MQTTnet.Tests/Server/Subscription_Identifier_Tests.cs index 257189c4e..d9e818a7b 100644 --- a/Source/MQTTnet.Tests/Server/Subscription_Identifier_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Subscription_Identifier_Tests.cs @@ -36,7 +36,7 @@ public async Task Subscribe_With_Subscription_Identifier() await testEnvironment.StartServer(); var client1 = await testEnvironment.ConnectClient(); - var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); + using var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); var topicFilter = testEnvironment.ClientFactory.CreateTopicFilterBuilder().WithTopic("Topic").Build(); var subscribeOptions = testEnvironment.ClientFactory.CreateSubscribeOptionsBuilder().WithSubscriptionIdentifier(456).WithTopicFilter(topicFilter).Build(); @@ -63,7 +63,7 @@ public async Task Subscribe_With_Multiple_Subscription_Identifiers() await testEnvironment.StartServer(); var client1 = await testEnvironment.ConnectClient(); - var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); + using var applicationMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1); var topicFilter = testEnvironment.ClientFactory.CreateTopicFilterBuilder().WithTopic("Topic/A").Build(); var subscribeOptions = testEnvironment.ClientFactory.CreateSubscribeOptionsBuilder().WithSubscriptionIdentifier(456).WithTopicFilter(topicFilter).Build(); diff --git a/Source/MQTTnet.Tests/Server/Tls_Tests.cs b/Source/MQTTnet.Tests/Server/Tls_Tests.cs index 325463bfb..9bc5b01f6 100644 --- a/Source/MQTTnet.Tests/Server/Tls_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Tls_Tests.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Linq; using System.Net; using System.Security.Authentication; @@ -100,7 +101,7 @@ await firstClient.PublishAsync( new MqttApplicationMessage { Topic = "TestTopic1", - PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) + Payload = new ReadOnlySequence(new byte[] { 1, 2, 3, 4 }) }); await testEnvironment.Server.InjectApplicationMessage( @@ -108,7 +109,7 @@ await testEnvironment.Server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "TestTopic1", - PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) + Payload = new ReadOnlySequence(new byte[] { 1, 2, 3, 4 }) })); certificateProvider.CurrentCertificate = CreateCertificate(secondOid); @@ -136,7 +137,7 @@ await firstClient.PublishAsync( new MqttApplicationMessage { Topic = "TestTopic2", - PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) + Payload = new ReadOnlySequence(new byte[] { 1, 2, 3, 4 }) }); await testEnvironment.Server.InjectApplicationMessage( @@ -144,7 +145,7 @@ await testEnvironment.Server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "TestTopic2", - PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) + Payload = new ReadOnlySequence(new byte[] { 1, 2, 3, 4 }) })); // Ensure first client still works @@ -152,7 +153,7 @@ await firstClient.PublishAsync( new MqttApplicationMessage { Topic = "TestTopic1", - PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) + Payload = new ReadOnlySequence(new byte[] { 1, 2, 3, 4 }) }); await testEnvironment.Server.InjectApplicationMessage( @@ -160,7 +161,7 @@ await testEnvironment.Server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "TestTopic1", - PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) + Payload = new ReadOnlySequence(new byte[] { 1, 2, 3, 4 }) })); await Task.Delay(1000); diff --git a/Source/MQTTnet.Tests/Server/Will_Tests.cs b/Source/MQTTnet.Tests/Server/Will_Tests.cs index e9c364ca1..267d97859 100644 --- a/Source/MQTTnet.Tests/Server/Will_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Will_Tests.cs @@ -43,7 +43,7 @@ public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect() var receiver = await testEnvironment.ConnectClient().ConfigureAwait(false); - var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + using var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); await receiver.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); @@ -65,7 +65,7 @@ public async Task Will_Message_Send() await testEnvironment.StartServer(); var receiver = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()).ConfigureAwait(false); - var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); + using var receivedMessages = testEnvironment.CreateApplicationMessageHandler(receiver); await receiver.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); var clientOptions = new MqttClientOptionsBuilder().WithWillTopic("My/last/will").WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce); diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 85e5b14b3..ef2cde1bf 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.IO; using System.Net.Sockets; using System.Runtime.InteropServices; @@ -139,12 +140,12 @@ public async Task ReceivePacketAsync(CancellationToken cancellationT cancellationToken.ThrowIfCancellationRequested(); ThrowIfDisposed(); + ReceivedMqttPacket receivedPacket = new(); try { var localPacketInspector = PacketInspector; localPacketInspector?.BeginReceivePacket(); - ReceivedMqttPacket receivedPacket; var receivedPacketTask = ReceiveAsync(cancellationToken); if (receivedPacketTask.IsCompleted) { @@ -195,6 +196,10 @@ public async Task ReceivePacketAsync(CancellationToken cancellationT throw; } } + finally + { + receivedPacket.Dispose(); + } return null; } @@ -228,14 +233,20 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat _logger.Verbose("TX ({0} bytes) >>> {1}", packetBuffer.Length, packet); - if (packetBuffer.Payload.Count == 0 || !AllowPacketFragmentation) + if (!AllowPacketFragmentation) { - await _channel.WriteAsync(packetBuffer.Join(), true, cancellationToken).ConfigureAwait(false); + using (var memoryOwner = packetBuffer.ToMemoryOwner()) + { + await _channel.WriteAsync(new ReadOnlySequence(memoryOwner.Memory), true, cancellationToken).ConfigureAwait(false); + } } else { await _channel.WriteAsync(packetBuffer.Packet, false, cancellationToken).ConfigureAwait(false); - await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false); + if (packetBuffer.Payload.Length > 0) + { + await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false); + } } Interlocked.Add(ref _statistics._bytesReceived, packetBuffer.Length); @@ -403,20 +414,19 @@ async Task ReceiveAsync(CancellationToken cancellationToken) } var bodyLength = fixedHeader.RemainingLength; - var body = new byte[bodyLength]; - + var mqttPacket = new ReceivedMqttPacket(fixedHeader.Flags, bodyLength, fixedHeader.TotalLength); var bodyOffset = 0; var chunkSize = Math.Min(ReadBufferSize, bodyLength); - + var bodyArray = mqttPacket.Body.Array; do { - var bytesLeft = body.Length - bodyOffset; + var bytesLeft = bodyLength - bodyOffset; if (chunkSize > bytesLeft) { chunkSize = bytesLeft; } - var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); + var readBytes = await _channel.ReadAsync(bodyArray, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) { @@ -431,10 +441,9 @@ async Task ReceiveAsync(CancellationToken cancellationToken) bodyOffset += readBytes; } while (bodyOffset < bodyLength); - PacketInspector?.FillReceiveBuffer(body); + PacketInspector?.FillReceiveBuffer(mqttPacket.Body); - var bodySegment = new ArraySegment(body, 0, bodyLength); - return new ReceivedMqttPacket(fixedHeader.Flags, bodySegment, fixedHeader.TotalLength); + return mqttPacket; } static bool WrapAndThrowException(Exception exception) diff --git a/Source/MQTTnet/Adapter/MqttPacketInspector.cs b/Source/MQTTnet/Adapter/MqttPacketInspector.cs index 88cb1a305..fe0f70858 100644 --- a/Source/MQTTnet/Adapter/MqttPacketInspector.cs +++ b/Source/MQTTnet/Adapter/MqttPacketInspector.cs @@ -3,8 +3,10 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.IO; using System.Threading.Tasks; +using MQTTnet.Buffers; using MQTTnet.Diagnostics; using MQTTnet.Formatter; using MQTTnet.Internal; @@ -62,7 +64,7 @@ public Task BeginSendPacket(MqttPacketBuffer buffer) public Task EndReceivePacket() { - if (!_asyncEvent.HasHandlers) + if (!_asyncEvent.HasHandlers || _receivedPacketBuffer == null) { return CompletedTask.Instance; } @@ -73,14 +75,14 @@ public Task EndReceivePacket() return InspectPacket(buffer, MqttPacketFlowDirection.Inbound); } - public void FillReceiveBuffer(byte[] buffer) + public void FillReceiveBuffer(ReadOnlySpan buffer) { if (!_asyncEvent.HasHandlers) { return; } - _receivedPacketBuffer?.Write(buffer, 0, buffer.Length); + _receivedPacketBuffer?.Write(buffer); } async Task InspectPacket(byte[] buffer, MqttPacketFlowDirection direction) diff --git a/Source/MQTTnet/Adapter/ReceivedMqttPacket.cs b/Source/MQTTnet/Adapter/ReceivedMqttPacket.cs index f290e5656..1bea9d85e 100644 --- a/Source/MQTTnet/Adapter/ReceivedMqttPacket.cs +++ b/Source/MQTTnet/Adapter/ReceivedMqttPacket.cs @@ -3,11 +3,13 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; namespace MQTTnet.Adapter; -public readonly struct ReceivedMqttPacket +public readonly struct ReceivedMqttPacket : IDisposable { + private readonly bool _arrayPool; public static readonly ReceivedMqttPacket Empty = new(); public ReceivedMqttPacket(byte fixedHeader, ArraySegment body, int totalLength) @@ -15,6 +17,23 @@ public ReceivedMqttPacket(byte fixedHeader, ArraySegment body, int totalLe FixedHeader = fixedHeader; Body = body; TotalLength = totalLength; + _arrayPool = false; + } + + public ReceivedMqttPacket(byte fixedHeader, int bodyLength, int totalLength) + { + FixedHeader = fixedHeader; + Body = new ArraySegment(ArrayPool.Shared.Rent(bodyLength), 0, bodyLength); + TotalLength = totalLength; + _arrayPool = true; + } + + public void Dispose() + { + if (_arrayPool) + { + ArrayPool.Shared.Return(Body.Array); + } } public byte FixedHeader { get; } diff --git a/Source/MQTTnet/Buffers/ArrayPoolBufferSegment.cs b/Source/MQTTnet/Buffers/ArrayPoolBufferSegment.cs new file mode 100644 index 000000000..041dfa16a --- /dev/null +++ b/Source/MQTTnet/Buffers/ArrayPoolBufferSegment.cs @@ -0,0 +1,64 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------ + +using System; +using System.Buffers; + +namespace MQTTnet.Buffers +{ + /// + /// Helper to build a ReadOnlySequence from a set of allocated buffers. + /// + public sealed class ArrayPoolBufferSegment : ReadOnlySequenceSegment + { + private T[] _array; + + /// + /// Initializes a new instance of the class. + /// + public ArrayPoolBufferSegment(T[] array, int offset, int length) + { + Memory = new ReadOnlyMemory(array, offset, length); + _array = array; + } + + /// + /// Returns the base array of the buffer. + /// + public T[] Array() => _array; + + /// + /// Rents a buffer from the pool and returns a instance. + /// + /// The length of the segment. + public static ArrayPoolBufferSegment Rent(int length) + { + var array = ArrayPool.Shared.Rent(length); + return new ArrayPoolBufferSegment(array, 0, length); + } + + /// + /// Rents a new buffer and appends it to the sequence. + /// + public ArrayPoolBufferSegment RentAndAppend(int length) + { + var array = ArrayPool.Shared.Rent(length); + return Append(array, 0, length); + } + + /// + /// Appends a buffer to the sequence. + /// + public ArrayPoolBufferSegment Append(T[] array, int offset, int length) + { + var segment = new ArrayPoolBufferSegment(array, offset, length) + { + RunningIndex = RunningIndex + Memory.Length, + }; + Next = segment; + return segment; + } + } +} diff --git a/Source/MQTTnet/Buffers/ArrayPoolMemoryOwner.cs b/Source/MQTTnet/Buffers/ArrayPoolMemoryOwner.cs new file mode 100644 index 000000000..a1d2e9736 --- /dev/null +++ b/Source/MQTTnet/Buffers/ArrayPoolMemoryOwner.cs @@ -0,0 +1,56 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Buffers; + +namespace MQTTnet.Buffers +{ + /// + /// Owner of memory rented from that + /// is responsible for disposing the underlying memory appropriately. + /// + public struct ArrayPoolMemoryOwner : IMemoryOwner + { + public static ArrayPoolMemoryOwner Rent(int length) + { + var memory = ArrayPool.Shared.Rent(length); + return new ArrayPoolMemoryOwner(memory, length); + } + + private ArrayPoolMemoryOwner(T[] memory, int length) + { + Initialize(memory, length); + } + + private void Initialize(T[] array, int length) + { + _length = length; + _array = array; + } + + private int _length; + private T[] _array; + + /// + /// Gets the rented memory./>. + /// + public T[] Array => _array; + + /// + public Memory Memory => _array.AsMemory(0, _length); + + /// + /// Returns the underlying memory and sets the to null. + /// + public void Dispose() + { + if (_array != null) + { + ArrayPool.Shared.Return(_array); + _array = null; + } + } + } +} diff --git a/Source/MQTTnet/Buffers/EmptyMemoryOwner.cs b/Source/MQTTnet/Buffers/EmptyMemoryOwner.cs new file mode 100644 index 000000000..66f3463f3 --- /dev/null +++ b/Source/MQTTnet/Buffers/EmptyMemoryOwner.cs @@ -0,0 +1,36 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Buffers; + +namespace MQTTnet.Buffers +{ + /// + /// Holds an empty array of type and + /// provides a view of it. + /// The static memory is not disposed. + /// + public struct EmptyMemoryOwner : IMemoryOwner + { + public static IMemoryOwner Empty { get; } = new EmptyMemoryOwner(); + + private T[] _array; + + public EmptyMemoryOwner() + { + _array = Array.Empty(); + } + + /// + public Memory Memory => _array.AsMemory(); + + /// + /// Nothing to do. + /// + public void Dispose() + { + } + } +} diff --git a/Source/MQTTnet/Buffers/IReadOnlySequenceOwner.cs b/Source/MQTTnet/Buffers/IReadOnlySequenceOwner.cs new file mode 100644 index 000000000..5ff5185d3 --- /dev/null +++ b/Source/MQTTnet/Buffers/IReadOnlySequenceOwner.cs @@ -0,0 +1,21 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Buffers; + +namespace MQTTnet.Buffers +{ + /// + /// Owner of that is responsible + /// for disposing the underlying memory appropriately. + /// + public interface IReadOnlySequenceOwner : IDisposable + { + /// + /// Gets a . + /// + ReadOnlySequence Sequence { get; } + } +} diff --git a/Source/MQTTnet/Formatter/MqttBufferReader.cs b/Source/MQTTnet/Buffers/MqttBufferReader.cs similarity index 86% rename from Source/MQTTnet/Formatter/MqttBufferReader.cs rename to Source/MQTTnet/Buffers/MqttBufferReader.cs index 1a795c184..87ee5bf8a 100644 --- a/Source/MQTTnet/Formatter/MqttBufferReader.cs +++ b/Source/MQTTnet/Buffers/MqttBufferReader.cs @@ -8,8 +8,9 @@ using MQTTnet.Exceptions; using MQTTnet.Internal; using System.Buffers.Binary; +using System.Buffers; -namespace MQTTnet.Formatter +namespace MQTTnet.Buffers { public sealed class MqttBufferReader { @@ -35,7 +36,7 @@ public byte[] ReadBinaryData() ValidateReceiveBuffer(length); - var result = new byte[length]; + var result = GC.AllocateUninitializedArray(length); MqttMemoryHelper.Copy(_buffer, _position, result, 0, length); _position += length; @@ -66,13 +67,28 @@ public byte[] ReadRemainingData() return EmptyBuffer.Array; } - var buffer = new byte[bufferLength]; + var buffer = GC.AllocateUninitializedArray(bufferLength); MqttMemoryHelper.Copy(_buffer, _position, buffer, 0, bufferLength); _position += bufferLength; return buffer; } + public IMemoryOwner ReadPayload() + { + var bufferLength = BytesLeft; + if (bufferLength == 0) + { + return EmptyMemoryOwner.Empty; + } + + var result = ArrayPoolMemoryOwner.Rent(bufferLength); + MqttMemoryHelper.Copy(_buffer, _position, result.Array, 0, bufferLength); + _position += bufferLength; + + return result; + } + public string ReadString() { var length = ReadTwoByteInteger(); diff --git a/Source/MQTTnet/Formatter/MqttBufferWriter.cs b/Source/MQTTnet/Buffers/MqttBufferWriter.cs similarity index 98% rename from Source/MQTTnet/Formatter/MqttBufferWriter.cs rename to Source/MQTTnet/Buffers/MqttBufferWriter.cs index 14ad707b5..6212fc893 100644 --- a/Source/MQTTnet/Formatter/MqttBufferWriter.cs +++ b/Source/MQTTnet/Buffers/MqttBufferWriter.cs @@ -2,14 +2,13 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Exceptions; +using MQTTnet.Protocol; using System; using System.Runtime.CompilerServices; using System.Text; -using MQTTnet.Exceptions; -using MQTTnet.Internal; -using MQTTnet.Protocol; -namespace MQTTnet.Formatter +namespace MQTTnet.Buffers { /// /// This is a custom implementation of a memory stream which provides only MQTTnet relevant features. @@ -181,9 +180,9 @@ public void WriteString(string value) // UTF8 chars can have a max length of 4 and the used buffer increase *2 every time. // So the buffer should always have much more capacity left so that a correct value // here is only waste of CPU cycles. - var byteCount = value.Length * 4; + var maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length); - EnsureAdditionalCapacity(byteCount + 2); + EnsureAdditionalCapacity(maxByteCount + 2); var writtenBytes = Encoding.UTF8.GetBytes(value, 0, value.Length, _buffer, _position + 2); diff --git a/Source/MQTTnet/Buffers/MqttMemoryHelper.cs b/Source/MQTTnet/Buffers/MqttMemoryHelper.cs new file mode 100644 index 000000000..c7f1062b4 --- /dev/null +++ b/Source/MQTTnet/Buffers/MqttMemoryHelper.cs @@ -0,0 +1,154 @@ +using System; +using System.Buffers; +using System.Runtime.CompilerServices; + +namespace MQTTnet.Buffers +{ + public static class MqttMemoryHelper + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void Copy(byte[] source, int sourceIndex, byte[] destination, int destinationIndex, int length) + { + source.AsSpan(sourceIndex, length).CopyTo(destination.AsSpan(destinationIndex, length)); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void Copy(ReadOnlySequence sequence, int sourceIndex, byte[] destination, int destinationIndex, int length) + { + sequence.Slice(sourceIndex).CopyTo(destination.AsSpan(destinationIndex, length)); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void Copy(ReadOnlySequence sequence, int sourceIndex, Memory destination, int destinationIndex, int length) + { + var offset = destinationIndex; + foreach (var segment in sequence) + { + if (segment.Length < sourceIndex) + { + sourceIndex -= segment.Length; + continue; + } + + var targetLength = Math.Min(segment.Length - sourceIndex, length); + segment.Span.Slice(sourceIndex, targetLength).CopyTo(destination.Span.Slice(offset)); + offset += targetLength; + length -= targetLength; + if (length == 0) + { + break; + } + sourceIndex = 0; + } + } + + public static ReadOnlySequence RentCopy(ReadOnlySequence sequence, int sourceIndex, int length) + { + ArrayPoolBufferSegment firstSegment = null; + ArrayPoolBufferSegment nextSegment = null; + + var offset = sourceIndex; + foreach (var segment in sequence) + { + if (segment.Length >= sourceIndex) + { + sourceIndex -= segment.Length; + continue; + } + + var targetLength = Math.Min(segment.Length - sourceIndex, length); + if (firstSegment == null) + { + firstSegment = ArrayPoolBufferSegment.Rent(targetLength); + nextSegment = firstSegment; + } + else + { + nextSegment = nextSegment.RentAndAppend(targetLength); + } + + segment.Span.Slice(sourceIndex, targetLength).CopyTo(nextSegment.Array().AsSpan()); + offset += targetLength; + length -= targetLength; + if (length == 0) + { + break; + } + sourceIndex = 0; + } + + if (firstSegment == null) + { + return ReadOnlySequence.Empty; + } + return new ReadOnlySequence(firstSegment, 0, nextSegment, nextSegment.Memory.Length); + } + + public static bool SequenceEqual(ArraySegment source, ArraySegment target) + { + return source.AsSpan().SequenceEqual(target); + } + + public static bool SequenceEqual(ReadOnlySequence source, ReadOnlySequence target) + { + if (source.Length != target.Length) + { + return false; + } + + long comparedLength = 0; + long length = source.Length; + + int sourceOffset = 0; + int targetOffset = 0; + + var sourceEnumerator = source.GetEnumerator(); + var targetEnumerator = target.GetEnumerator(); + + ReadOnlyMemory sourceSegment = sourceEnumerator.Current; + ReadOnlyMemory targetSegment = targetEnumerator.Current; + + while (true) + { + int compareLength = Math.Min(sourceSegment.Length - sourceOffset, targetSegment.Length - targetOffset); + + if (compareLength > 0 && + !sourceSegment.Span.Slice(sourceOffset, compareLength).SequenceEqual(targetSegment.Span.Slice(targetOffset, compareLength))) + { + return false; + } + + comparedLength += compareLength; + if (comparedLength >= length) + { + return true; + } + + sourceOffset += compareLength; + if (sourceOffset >= sourceSegment.Length) + { + if (!sourceEnumerator.MoveNext()) + { + return false; + } + + sourceSegment = sourceEnumerator.Current; + sourceOffset = 0; + } + + targetOffset += compareLength; + if (targetOffset >= targetSegment.Length) + { + if (!targetEnumerator.MoveNext()) + { + return false; + } + + targetSegment = targetEnumerator.Current; + targetOffset = 0; + } + } + } + } +} + diff --git a/Source/MQTTnet/Buffers/MqttPayloadOwner.cs b/Source/MQTTnet/Buffers/MqttPayloadOwner.cs new file mode 100644 index 000000000..f9f04c9d5 --- /dev/null +++ b/Source/MQTTnet/Buffers/MqttPayloadOwner.cs @@ -0,0 +1,92 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Buffers; + +namespace MQTTnet.Buffers +{ + /// + /// Owner of that is responsible + /// for disposing the underlying payload appropriately. + /// + public struct MqttPayloadOwner : IReadOnlySequenceOwner + { + public MqttPayloadOwner() + { + Initialize(ReadOnlySequence.Empty, null); + } + + public MqttPayloadOwner(T[] memory, IDisposable owner = null) + { + Initialize(new ReadOnlySequence(memory), owner); + } + + public MqttPayloadOwner(ReadOnlySequence sequence, IDisposable owner = null) + { + Initialize(sequence, owner); + } + + public MqttPayloadOwner(ReadOnlyMemory memory, IDisposable owner = null) + { + Initialize(new ReadOnlySequence(memory), owner); + } + + public MqttPayloadOwner(ArraySegment memory, IDisposable owner = null) + { + Initialize(new ReadOnlySequence(memory), owner); + } + + private void Initialize(ReadOnlySequence sequence, IDisposable owner) + { + _sequence = sequence; + _owner = owner; + } + + private ReadOnlySequence _sequence; + private IDisposable _owner; + + /// + /// Gets a . + /// + public ReadOnlySequence Sequence { get => _sequence; } + + /// + /// Gets the owner of the . + /// + public IDisposable Owner => _owner; + + /// + /// Gets the length of the . + /// + public long Length => _sequence.Length; + + /// + /// Frees the underlying memory and sets the to empty. + /// + public void Dispose() + { + _sequence = ReadOnlySequence.Empty; + _owner?.Dispose(); + _owner = null; + } + + /// + /// Returns a new with the same + /// and transfers the ownership + /// to the caller. + /// + public static MqttPayloadOwner TransferOwnership(ref MqttPayloadOwner payloadOwner) + { + var payload = new MqttPayloadOwner(payloadOwner._sequence, payloadOwner._owner); + payloadOwner._owner = null; + return payload; + } + + public static implicit operator MqttPayloadOwner(ReadOnlySequence sequence) => new MqttPayloadOwner(sequence); + public static implicit operator MqttPayloadOwner(ReadOnlyMemory memory) => new MqttPayloadOwner(memory); + public static implicit operator MqttPayloadOwner(ArraySegment memory) => new MqttPayloadOwner(memory); + public static implicit operator MqttPayloadOwner(T[] memory) => new MqttPayloadOwner(memory); + } +} diff --git a/Source/MQTTnet/Channel/IMqttChannel.cs b/Source/MQTTnet/Channel/IMqttChannel.cs index 2f0a6be51..55cd48ba0 100644 --- a/Source/MQTTnet/Channel/IMqttChannel.cs +++ b/Source/MQTTnet/Channel/IMqttChannel.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; @@ -22,5 +23,5 @@ public interface IMqttChannel : IDisposable Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken); - Task WriteAsync(ArraySegment buffer, bool isEndOfPacket, CancellationToken cancellationToken); + Task WriteAsync(ReadOnlySequence buffer, bool isEndOfPacket, CancellationToken cancellationToken); } \ No newline at end of file diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 59e26ec9b..948ad9331 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -273,21 +274,21 @@ public Task PublishAsync(MqttApplicationMessage applica switch (applicationMessage.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtMostOnce: - { - return PublishAtMostOnce(publishPacket, cancellationToken); - } + { + return PublishAtMostOnce(publishPacket, cancellationToken); + } case MqttQualityOfServiceLevel.AtLeastOnce: - { - return PublishAtLeastOnce(publishPacket, cancellationToken); - } + { + return PublishAtLeastOnce(publishPacket, cancellationToken); + } case MqttQualityOfServiceLevel.ExactlyOnce: - { - return PublishExactlyOnce(publishPacket, cancellationToken); - } + { + return PublishExactlyOnce(publishPacket, cancellationToken); + } default: - { - throw new NotSupportedException(); - } + { + throw new NotSupportedException(); + } } } @@ -407,34 +408,34 @@ Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs ev switch (eventArgs.PublishPacket.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtMostOnce: - { - // no response required - break; - } - case MqttQualityOfServiceLevel.AtLeastOnce: - { - if (!eventArgs.ProcessingFailed) { - var pubAckPacket = MqttPubAckPacketFactory.Create(eventArgs); - return Send(pubAckPacket, cancellationToken); + // no response required + break; } + case MqttQualityOfServiceLevel.AtLeastOnce: + { + if (!eventArgs.ProcessingFailed) + { + var pubAckPacket = MqttPubAckPacketFactory.Create(eventArgs); + return Send(pubAckPacket, cancellationToken); + } - break; - } + break; + } case MqttQualityOfServiceLevel.ExactlyOnce: - { - if (!eventArgs.ProcessingFailed) { - var pubRecPacket = MqttPubRecPacketFactory.Create(eventArgs); - return Send(pubRecPacket, cancellationToken); - } + if (!eventArgs.ProcessingFailed) + { + var pubRecPacket = MqttPubRecPacketFactory.Create(eventArgs); + return Send(pubRecPacket, cancellationToken); + } - break; - } + break; + } default: - { - throw new MqttProtocolViolationException("Received a not supported QoS level."); - } + { + throw new MqttProtocolViolationException("Received a not supported QoS level."); + } } return CompletedTask.Instance; @@ -454,22 +455,22 @@ async Task Authenticate(IMqttChannelAdapter channelAdap switch (receivedPacket) { case MqttConnAckPacket connAckPacket: - { - result = MqttClientResultFactory.ConnectResult.Create(connAckPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion); - break; - } + { + result = MqttClientResultFactory.ConnectResult.Create(connAckPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion); + break; + } case MqttAuthPacket _: - { - throw new NotSupportedException("Extended authentication handler is not yet supported"); - } + { + throw new NotSupportedException("Extended authentication handler is not yet supported"); + } case null: - { - throw new MqttCommunicationException("Connection closed."); - } + { + throw new MqttCommunicationException("Connection closed."); + } default: - { - throw new InvalidOperationException($"Received an unexpected MQTT packet ({receivedPacket})."); - } + { + throw new InvalidOperationException($"Received an unexpected MQTT packet ({receivedPacket})."); + } } } catch (Exception exception) @@ -683,6 +684,7 @@ async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { + MqttApplicationMessageReceivedEventArgs eventArgs = null; try { var publishPacketDequeueResult = await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); @@ -692,7 +694,7 @@ async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken) } var publishPacket = publishPacketDequeueResult.Item; - var eventArgs = await HandleReceivedApplicationMessage(publishPacket).ConfigureAwait(false); + eventArgs = await HandleReceivedApplicationMessage(publishPacket).ConfigureAwait(false); if (eventArgs.AutoAcknowledge) { @@ -709,6 +711,13 @@ async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken) { _logger.Error(exception, "Error while handling application message"); } + finally + { + if (eventArgs?.DisposeApplicationMessage == true) + { + eventArgs.ApplicationMessage?.Dispose(); + } + } } } @@ -982,14 +991,14 @@ async Task TryProcessReceivedPacket(MqttPacket packet, CancellationToken cancell case MqttPingReqPacket _: throw new MqttProtocolViolationException("The PINGREQ Packet is sent from a client to the server only."); default: - { - if (!_packetDispatcher.TryDispatch(packet)) { - throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time."); - } + if (!_packetDispatcher.TryDispatch(packet)) + { + throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time."); + } - break; - } + break; + } } } catch (Exception exception) diff --git a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedEventArgs.cs b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedEventArgs.cs index fc4e43bf1..0d37fd55f 100644 --- a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedEventArgs.cs +++ b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedEventArgs.cs @@ -28,7 +28,33 @@ public MqttApplicationMessageReceivedEventArgs( _acknowledgeHandler = acknowledgeHandler; } - public MqttApplicationMessage ApplicationMessage { get; } + /// + /// The invoked message receiver can take ownership of the application + /// message with payload to avoid cloning. + /// It is then the obligation of the new owner to dispose the obtained + /// application message. + /// + /// + /// If set to true, clones the ApplicationMessage and copies the payload. + /// + public MqttApplicationMessage TransferApplicationMessageOwnership(bool clonePayload) + { + DisposeApplicationMessage = false; + if (clonePayload) + { + var applicationMessage = ApplicationMessage; + // replace application message with a clone + // if the payload is owner managed + if (applicationMessage?.Payload.Owner != null) + { + ApplicationMessage = applicationMessage.Clone(); + applicationMessage.Dispose(); + } + } + return ApplicationMessage; + } + + public MqttApplicationMessage ApplicationMessage { get; private set; } /// /// Gets or sets whether the library should send MQTT ACK packets automatically if required. @@ -41,6 +67,15 @@ public MqttApplicationMessageReceivedEventArgs( /// public string ClientId { get; } + /// + /// Gets or sets whether the ownership of the message payload + /// was handed over to the invoked code. This value determines + /// if the payload can be disposed after the callback returns. + /// If transferred, the new owner of the message is responsible + /// to dispose the payload after processing. + /// + public bool DisposeApplicationMessage { get; private set; } = true; + /// /// Gets or sets whether this message was handled. /// This value can be used in user code for custom control flow. diff --git a/Source/MQTTnet/Formatter/MqttApplicationMessageFactory.cs b/Source/MQTTnet/Formatter/MqttApplicationMessageFactory.cs index 4564ddf84..1fd23bf70 100644 --- a/Source/MQTTnet/Formatter/MqttApplicationMessageFactory.cs +++ b/Source/MQTTnet/Formatter/MqttApplicationMessageFactory.cs @@ -19,7 +19,7 @@ public static MqttApplicationMessage Create(MqttPublishPacket publishPacket) return new MqttApplicationMessage { Topic = publishPacket.Topic, - PayloadSegment = publishPacket.PayloadSegment, + Payload = publishPacket.Payload, QualityOfServiceLevel = publishPacket.QualityOfServiceLevel, Retain = publishPacket.Retain, Dup = publishPacket.Dup, diff --git a/Source/MQTTnet/Formatter/MqttPacketBuffer.cs b/Source/MQTTnet/Formatter/MqttPacketBuffer.cs index 4df92b36e..46b78a937 100644 --- a/Source/MQTTnet/Formatter/MqttPacketBuffer.cs +++ b/Source/MQTTnet/Formatter/MqttPacketBuffer.cs @@ -2,65 +2,79 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Buffers; using System; -using System.Linq; -using MQTTnet.Implementations; -using MQTTnet.Internal; +using System.Buffers; namespace MQTTnet.Formatter { public readonly struct MqttPacketBuffer { - static readonly ArraySegment EmptyPayload = EmptyBuffer.ArraySegment; - - public MqttPacketBuffer(ArraySegment packet, ArraySegment payload) + public MqttPacketBuffer(ReadOnlySequence packet, ReadOnlySequence payload = default) { Packet = packet; Payload = payload; - Length = Packet.Count + Payload.Count; + if (Packet.Length + Payload.Length > int.MaxValue) + { + throw new InvalidOperationException("The packet is too large."); + } + + Length = (int)Packet.Length + (int)Payload.Length; } - + public MqttPacketBuffer(ArraySegment packet) { - Packet = packet; - Payload = EmptyPayload; + Packet = new ReadOnlySequence(packet); + Payload = ReadOnlySequence.Empty; - Length = Packet.Count; + if (Packet.Length > int.MaxValue) + { + throw new InvalidOperationException("The packet is too large."); + } + + Length = (int)Packet.Length; + } + + public MqttPacketBuffer(ReadOnlySequence packet) : this(packet, ReadOnlySequence.Empty) + { } public int Length { get; } - - public ArraySegment Packet { get; } - - public ArraySegment Payload { get; } + + public ReadOnlySequence Packet { get; } + + public ReadOnlySequence Payload { get; } public byte[] ToArray() { - if (Payload.Count == 0) + var buffer = GC.AllocateUninitializedArray(Length); + int packetLength = (int)Packet.Length; + MqttMemoryHelper.Copy(Packet, 0, buffer, 0, packetLength); + if (Payload.Length > 0) { - return Packet.ToArray(); + int payloadLength = (int)Payload.Length; + MqttMemoryHelper.Copy(Payload, 0, buffer, packetLength, payloadLength); } - - var buffer = new byte[Length]; - MqttMemoryHelper.Copy(Packet.Array, Packet.Offset, buffer, 0, Packet.Count); - MqttMemoryHelper.Copy(Payload.Array, Payload.Offset, buffer, Packet.Count, Payload.Count); - return buffer; } - - public ArraySegment Join() + + public IMemoryOwner ToMemoryOwner() { - if (Payload.Count == 0) + var memoryOwner = MemoryPool.Shared.Rent(Length); + int packetLength = (int)Packet.Length; + MqttMemoryHelper.Copy(Packet, 0, memoryOwner.Memory, 0, packetLength); + if (Payload.Length > 0) { - return Packet; + int payloadLength = (int)Payload.Length; + MqttMemoryHelper.Copy(Payload, 0, memoryOwner.Memory, packetLength, payloadLength); } + return memoryOwner; + } - var buffer = new byte[Length]; - MqttMemoryHelper.Copy(Packet.Array, Packet.Offset, buffer, 0, Packet.Count); - MqttMemoryHelper.Copy(Payload.Array, Payload.Offset, buffer, Packet.Count, Payload.Count); - - return new ArraySegment(buffer); + public ArraySegment Join() + { + return new ArraySegment(ToArray()); } } } \ No newline at end of file diff --git a/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs b/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs index af4e4976b..b852adb48 100644 --- a/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs +++ b/Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs @@ -5,6 +5,7 @@ using System; using System.Runtime.CompilerServices; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Formatter.V3; using MQTTnet.Formatter.V5; diff --git a/Source/MQTTnet/Formatter/MqttPublishPacketFactory.cs b/Source/MQTTnet/Formatter/MqttPublishPacketFactory.cs index 9ec30fe59..e63239064 100644 --- a/Source/MQTTnet/Formatter/MqttPublishPacketFactory.cs +++ b/Source/MQTTnet/Formatter/MqttPublishPacketFactory.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System; -using MQTTnet.Exceptions; using MQTTnet.Packets; namespace MQTTnet.Formatter @@ -22,7 +21,7 @@ public static MqttPublishPacket Create(MqttApplicationMessage applicationMessage var packet = new MqttPublishPacket { Topic = applicationMessage.Topic, - PayloadSegment = applicationMessage.PayloadSegment, + Payload = applicationMessage.Payload, QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel, Retain = applicationMessage.Retain, Dup = applicationMessage.Dup, diff --git a/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs b/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs index 808140b42..348845fd5 100644 --- a/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs +++ b/Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs @@ -2,14 +2,16 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.CompilerServices; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; namespace MQTTnet.Formatter.V3 { @@ -102,11 +104,11 @@ public MqttPacketBuffer Encode(MqttPacket packet) var fixedHeader = EncodePacket(packet, _bufferWriter); var remainingLength = (uint)(_bufferWriter.Length - 5); - var publishPacket = packet as MqttPublishPacket; - var payloadSegment = publishPacket?.PayloadSegment; - if (payloadSegment != null) + ReadOnlySequence payload = default; + if (packet is MqttPublishPacket publishPacket) { - remainingLength += (uint)payloadSegment.Value.Count; + payload = publishPacket.Payload.Sequence; + remainingLength += (uint)payload.Length; } var remainingLengthSize = MqttBufferWriter.GetVariableByteIntegerSize(remainingLength); @@ -119,12 +121,11 @@ public MqttPacketBuffer Encode(MqttPacket packet) _bufferWriter.WriteByte(fixedHeader); _bufferWriter.WriteVariableByteInteger(remainingLength); - var buffer = _bufferWriter.GetBuffer(); - var firstSegment = new ArraySegment(buffer, headerOffset, _bufferWriter.Length - headerOffset); + var firstSegment = new ReadOnlySequence(_bufferWriter.GetBuffer(), headerOffset, _bufferWriter.Length - headerOffset); - return payloadSegment == null + return payload.Length == 0 ? new MqttPacketBuffer(firstSegment) - : new MqttPacketBuffer(firstSegment, payloadSegment.Value); + : new MqttPacketBuffer(firstSegment, payload); } MqttPacket DecodeConnAckPacket(ArraySegment body) @@ -278,7 +279,8 @@ MqttPacket DecodePublishPacket(ReceivedMqttPacket receivedMqttPacket) if (!_bufferReader.EndOfStream) { - packet.PayloadSegment = new ArraySegment(_bufferReader.ReadRemainingData()); + IMemoryOwner payloadOwner = _bufferReader.ReadPayload(); + packet.Payload = new MqttPayloadOwner(payloadOwner.Memory, payloadOwner); } return packet; diff --git a/Source/MQTTnet/Formatter/V5/MqttV5PacketDecoder.cs b/Source/MQTTnet/Formatter/V5/MqttV5PacketDecoder.cs index c9010cabe..5643869fa 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV5PacketDecoder.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV5PacketDecoder.cs @@ -3,8 +3,10 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Collections.Generic; using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -470,7 +472,6 @@ MqttPacket DecodePubCompPacket(ArraySegment body) return packet; } - MqttPacket DecodePublishPacket(byte header, ArraySegment body) { ThrowIfBodyIsEmpty(body); @@ -540,7 +541,8 @@ MqttPacket DecodePublishPacket(byte header, ArraySegment body) if (!_bufferReader.EndOfStream) { - packet.PayloadSegment = new ArraySegment(_bufferReader.ReadRemainingData()); + IMemoryOwner payloadOwner = _bufferReader.ReadPayload(); + packet.Payload = new MqttPayloadOwner(payloadOwner.Memory, payloadOwner); } return packet; diff --git a/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs b/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs index 8e6804f3c..070e2ad4a 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs @@ -3,7 +3,9 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Linq; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -30,18 +32,18 @@ public MqttPacketBuffer Encode(MqttPacket packet) } // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes) - _bufferWriter.Reset(5); - _bufferWriter.Seek(5); + const int ReservedHeaderSize = 5; + _bufferWriter.Reset(ReservedHeaderSize); + _bufferWriter.Seek(ReservedHeaderSize); var fixedHeader = EncodePacket(packet); - var remainingLength = (uint)_bufferWriter.Length - 5; + uint remainingLength = (uint)_bufferWriter.Length - ReservedHeaderSize; - var publishPacket = packet as MqttPublishPacket; - var payloadSegment = publishPacket?.PayloadSegment; - - if (payloadSegment != null) + ReadOnlySequence payload = default; + if (packet is MqttPublishPacket publishPacket) { - remainingLength += (uint)payloadSegment.Value.Count; + payload = publishPacket.Payload.Sequence; + remainingLength += (uint)payload.Length; } var remainingLengthSize = MqttBufferWriter.GetVariableByteIntegerSize(remainingLength); @@ -54,12 +56,11 @@ public MqttPacketBuffer Encode(MqttPacket packet) _bufferWriter.WriteByte(fixedHeader); _bufferWriter.WriteVariableByteInteger(remainingLength); - var buffer = _bufferWriter.GetBuffer(); - var firstSegment = new ArraySegment(buffer, headerOffset, _bufferWriter.Length - headerOffset); + var firstSegment = new ReadOnlySequence(_bufferWriter.GetBuffer(), headerOffset, _bufferWriter.Length - headerOffset); - return payloadSegment == null + return payload.Length == 0 ? new MqttPacketBuffer(firstSegment) - : new MqttPacketBuffer(firstSegment, payloadSegment.Value); + : new MqttPacketBuffer(firstSegment, payload); } byte EncodeAuthPacket(MqttAuthPacket packet) diff --git a/Source/MQTTnet/Formatter/V5/MqttV5PacketFormatter.cs b/Source/MQTTnet/Formatter/V5/MqttV5PacketFormatter.cs index 371cb868e..35102e0fb 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV5PacketFormatter.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV5PacketFormatter.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using MQTTnet.Adapter; +using MQTTnet.Buffers; using MQTTnet.Packets; namespace MQTTnet.Formatter.V5 diff --git a/Source/MQTTnet/Formatter/V5/MqttV5PropertiesReader.cs b/Source/MQTTnet/Formatter/V5/MqttV5PropertiesReader.cs index e5a3efe7e..5b79be99d 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV5PropertiesReader.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV5PropertiesReader.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using MQTTnet.Buffers; using MQTTnet.Exceptions; using MQTTnet.Packets; using MQTTnet.Protocol; diff --git a/Source/MQTTnet/Formatter/V5/MqttV5PropertiesWriter.cs b/Source/MQTTnet/Formatter/V5/MqttV5PropertiesWriter.cs index 29c065667..de653ae3e 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV5PropertiesWriter.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV5PropertiesWriter.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using MQTTnet.Buffers; using MQTTnet.Packets; using MQTTnet.Protocol; diff --git a/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs b/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs index 1749817c4..c27cbf705 100644 --- a/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs +++ b/Source/MQTTnet/Implementations/MqttClientAdapterFactory.cs @@ -8,6 +8,7 @@ using System; using MQTTnet.Channel; using MQTTnet.Client; +using MQTTnet.Buffers; namespace MQTTnet.Implementations { diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 1f1bbf85f..5f8ac61bd 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.IO; using System.Net; using System.Net.Security; @@ -248,7 +249,7 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat } } - public async Task WriteAsync(ArraySegment buffer, bool isEndOfPacket, CancellationToken cancellationToken) + public async Task WriteAsync(ReadOnlySequence buffer, bool isEndOfPacket, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -261,7 +262,10 @@ public async Task WriteAsync(ArraySegment buffer, bool isEndOfPacket, Canc throw new MqttCommunicationException("The TCP connection is closed."); } - await stream.WriteAsync(buffer.AsMemory(), cancellationToken).ConfigureAwait(false); + foreach (var segment in buffer) + { + await stream.WriteAsync(segment, cancellationToken).ConfigureAwait(false); + } } catch (ObjectDisposedException) { diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index 341ea8afb..a8c321734 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; using System.Net; using System.Net.WebSockets; using System.Security.Cryptography.X509Certificates; @@ -100,13 +101,19 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat return response.Count; } - public async Task WriteAsync(ArraySegment buffer, bool isEndOfPacket, CancellationToken cancellationToken) + public async Task WriteAsync(ReadOnlySequence buffer, bool isEndOfPacket, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); // MQTT Control Packets MUST be sent in WebSocket binary data frames. If any other type of data frame is received the recipient MUST close the Network Connection [MQTT-6.0.0-1]. // A single WebSocket data frame can contain multiple or partial MQTT Control Packets. The receiver MUST NOT assume that MQTT Control Packets are aligned on WebSocket frame boundaries [MQTT-6.0.0-2]. - await _webSocket.SendAsync(buffer, WebSocketMessageType.Binary, isEndOfPacket, cancellationToken).ConfigureAwait(false); + long length = buffer.Length; + foreach (var segment in buffer) + { + length -= segment.Length; + bool endOfPacket = isEndOfPacket && length == 0; + await _webSocket.SendAsync(segment, WebSocketMessageType.Binary, endOfPacket, cancellationToken).ConfigureAwait(false); + } } void Cleanup() diff --git a/Source/MQTTnet/Internal/AsyncEventInvocator.cs b/Source/MQTTnet/Internal/AsyncEventInvocator.cs index f344cb1e6..d46e56a35 100644 --- a/Source/MQTTnet/Internal/AsyncEventInvocator.cs +++ b/Source/MQTTnet/Internal/AsyncEventInvocator.cs @@ -33,15 +33,17 @@ public bool WrapsHandler(Func handler) public Task InvokeAsync(TEventArgs eventArgs) { - if (_handler != null) + var handler = _handler; + if (handler != null) { - _handler.Invoke(eventArgs); + handler.Invoke(eventArgs); return CompletedTask.Instance; } - if (_asyncHandler != null) + var asyncHandler = _asyncHandler; + if (asyncHandler != null) { - return _asyncHandler.Invoke(eventArgs); + return asyncHandler.Invoke(eventArgs); } throw new InvalidOperationException(); diff --git a/Source/MQTTnet/Internal/EmptyBuffer.cs b/Source/MQTTnet/Internal/EmptyBuffer.cs index 2351264dd..e00f8d316 100644 --- a/Source/MQTTnet/Internal/EmptyBuffer.cs +++ b/Source/MQTTnet/Internal/EmptyBuffer.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Buffers; namespace MQTTnet.Internal { diff --git a/Source/MQTTnet/Internal/MqttMemoryHelper.cs b/Source/MQTTnet/Internal/MqttMemoryHelper.cs deleted file mode 100644 index 448e0296c..000000000 --- a/Source/MQTTnet/Internal/MqttMemoryHelper.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Runtime.CompilerServices; - -namespace MQTTnet.Internal -{ - public static class MqttMemoryHelper - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void Copy(byte[] source, int sourceIndex, byte[] destination, int destinationIndex, int length) - { - source.AsSpan(sourceIndex, length).CopyTo(destination.AsSpan(destinationIndex, length)); - } - } -} diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj index 1d3adc6de..fb3e0e5c6 100644 --- a/Source/MQTTnet/MQTTnet.csproj +++ b/Source/MQTTnet/MQTTnet.csproj @@ -2,7 +2,7 @@ - + @(ReleaseNotes, '%0a') @@ -44,10 +44,11 @@ all true latest-Recommended + latest - Full + portable @@ -61,7 +62,7 @@ - + \ No newline at end of file diff --git a/Source/MQTTnet/MqttApplicationMessage.cs b/Source/MQTTnet/MqttApplicationMessage.cs index 204a71576..82b3bf1bd 100644 --- a/Source/MQTTnet/MqttApplicationMessage.cs +++ b/Source/MQTTnet/MqttApplicationMessage.cs @@ -2,16 +2,67 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using MQTTnet.Internal; +using MQTTnet.Buffers; using MQTTnet.Packets; using MQTTnet.Protocol; using System; +using System.Buffers; using System.Collections.Generic; namespace MQTTnet { - public sealed class MqttApplicationMessage + public sealed class MqttApplicationMessage : IDisposable { + private bool _disposed; + private MqttPayloadOwner _payload; + + /// + /// Create a clone of the . + /// with a deep copy of the Payload allocated from the heap. + /// + public MqttApplicationMessage Clone() + { + return new MqttApplicationMessage() + { + ContentType = this.ContentType, + CorrelationData = this.CorrelationData, + Dup = this.Dup, + MessageExpiryInterval = this.MessageExpiryInterval, + Payload = this.Payload.Sequence.ToArray(), + PayloadFormatIndicator = this.PayloadFormatIndicator, + QualityOfServiceLevel = this.QualityOfServiceLevel, + ResponseTopic = this.ResponseTopic, + Retain = this.Retain, + SubscriptionIdentifiers = this.SubscriptionIdentifiers, + Topic = this.Topic, + TopicAlias = this.TopicAlias, + UserProperties = this.UserProperties + }; + } + + /// + /// Transfers the payload ownership to the caller. + /// + /// + /// This method is used to transfer the ownership of the payload to the caller. + /// It returns the with a reference to the + /// payload owner and sets the owner in this application message to null. + /// After the transfer the caller is responsible to dispose the payload. + /// + public MqttPayloadOwner TransferPayloadOwnership() + { + return MqttPayloadOwner.TransferOwnership(ref _payload); + } + + /// + /// Disposes the payload used by the current instance of the class. + /// + public void Dispose() + { + _disposed = true; + _payload.Dispose(); + } + /// /// Gets or sets the content type. /// The content type must be a UTF-8 encoded string. The content type value identifies the kind of UTF-8 encoded @@ -50,9 +101,32 @@ public sealed class MqttApplicationMessage public uint MessageExpiryInterval { get; set; } /// - /// Get or set ArraySegment style of Payload. + /// Get or set Mqtt Payload owner. /// - public ArraySegment PayloadSegment { get; set; } = EmptyBuffer.ArraySegment; + /// + /// is a struct that wraps a + /// and provides a way to manage the lifetime of the buffers. Special care has to be + /// taken to dispose the object, because it is a struct with a " method + /// which must be called on this instance to properly track the owner. The property always + /// returns a Value type which has no owner, to avoid double dispose and double ownership./> + /// + public MqttPayloadOwner Payload + { + get + { + if (_disposed) + { + throw new ObjectDisposedException("Accessing the MqttApplicationMessage.Payload which is already disposed"); + } + + // Since the payload is a value type, do not pass the owner, + // so we return a new instance which contains only the sequence. + // There is no allocation involved, because the sequence is a value type. + return new MqttPayloadOwner(_payload.Sequence, null); + } + + set => _payload = value; + } /// /// Gets or sets the payload format indicator. diff --git a/Source/MQTTnet/MqttApplicationMessageBuilder.cs b/Source/MQTTnet/MqttApplicationMessageBuilder.cs index 316ab6024..c8eda8adb 100644 --- a/Source/MQTTnet/MqttApplicationMessageBuilder.cs +++ b/Source/MQTTnet/MqttApplicationMessageBuilder.cs @@ -2,16 +2,16 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Buffers; +using MQTTnet.Exceptions; +using MQTTnet.Packets; +using MQTTnet.Protocol; using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; -using System.Runtime.InteropServices; using System.Text; -using MQTTnet.Exceptions; -using MQTTnet.Internal; -using MQTTnet.Packets; -using MQTTnet.Protocol; namespace MQTTnet { @@ -22,7 +22,8 @@ public sealed class MqttApplicationMessageBuilder uint _messageExpiryInterval; MqttPayloadFormatIndicator _payloadFormatIndicator; - ArraySegment _payloadSegment; + ReadOnlySequence _payload; + IDisposable _payloadOwner; MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; string _responseTopic; bool _retain; @@ -41,7 +42,7 @@ public MqttApplicationMessage Build() var applicationMessage = new MqttApplicationMessage { Topic = _topic, - PayloadSegment = _payloadSegment, + Payload = _payload, QualityOfServiceLevel = _qualityOfServiceLevel, Retain = _retain, ContentType = _contentType, @@ -89,13 +90,26 @@ public MqttApplicationMessageBuilder WithMessageExpiryInterval(uint messageExpir public MqttApplicationMessageBuilder WithPayload(byte[] payload) { - _payloadSegment = payload == null || payload.Length == 0 ? EmptyBuffer.ArraySegment : new ArraySegment(payload); + _payload = payload == null || payload.Length == 0 ? ReadOnlySequence.Empty : new ReadOnlySequence(payload); return this; } - public MqttApplicationMessageBuilder WithPayload(ArraySegment payloadSegment) + public MqttApplicationMessageBuilder WithPayload(ArraySegment payload) { - _payloadSegment = payloadSegment; + _payload = new ReadOnlySequence(payload); + return this; + } + + public MqttApplicationMessageBuilder WithPayload(ReadOnlyMemory payload) + { + _payload = new ReadOnlySequence(payload); + return this; + } + + public MqttApplicationMessageBuilder WithPayload(ReadOnlySequence payload, IDisposable payloadOwner = null) + { + _payload = payload; + _payloadOwner = payloadOwner; return this; } @@ -113,7 +127,7 @@ public MqttApplicationMessageBuilder WithPayload(IEnumerable payload) if (payload is ArraySegment arraySegment) { - return WithPayloadSegment(arraySegment); + return WithPayload(arraySegment); } return WithPayload(payload.ToArray()); @@ -131,11 +145,18 @@ public MqttApplicationMessageBuilder WithPayload(Stream payload, long length) return WithPayload(default(byte[])); } - var payloadBuffer = new byte[length]; - var totalRead = 0; + if (length > int.MaxValue || length < 0) + { + throw new ArgumentOutOfRangeException(nameof(length)); + } + + // for most streams the Read(byte[]) method is most efficient way to tread the buffer + int checkedLength = (int)length; + var payloadBuffer = ArrayPoolMemoryOwner.Rent(checkedLength); + int totalRead = 0; do { - var bytesRead = payload.Read(payloadBuffer, totalRead, payloadBuffer.Length - totalRead); + int bytesRead = payload.Read(payloadBuffer.Array, totalRead, checkedLength - totalRead); if (bytesRead == 0) { break; @@ -144,7 +165,7 @@ public MqttApplicationMessageBuilder WithPayload(Stream payload, long length) totalRead += bytesRead; } while (totalRead < length); - return WithPayload(payloadBuffer); + return WithPayload(new ReadOnlySequence(payloadBuffer.Array.AsMemory(0, totalRead)), payloadBuffer); } public MqttApplicationMessageBuilder WithPayload(string payload) @@ -168,17 +189,6 @@ public MqttApplicationMessageBuilder WithPayloadFormatIndicator(MqttPayloadForma return this; } - public MqttApplicationMessageBuilder WithPayloadSegment(ArraySegment payloadSegment) - { - _payloadSegment = payloadSegment; - return this; - } - - public MqttApplicationMessageBuilder WithPayloadSegment(ReadOnlyMemory payloadSegment) - { - return MemoryMarshal.TryGetArray(payloadSegment, out var segment) ? WithPayloadSegment(segment) : WithPayload(payloadSegment.ToArray()); - } - /// /// The quality of service level. /// The Quality of Service (QoS) level is an agreement between the sender of a message and the receiver of a message diff --git a/Source/MQTTnet/MqttApplicationMessageExtensions.cs b/Source/MQTTnet/MqttApplicationMessageExtensions.cs index c2f82dd8d..83ed9306f 100644 --- a/Source/MQTTnet/MqttApplicationMessageExtensions.cs +++ b/Source/MQTTnet/MqttApplicationMessageExtensions.cs @@ -4,7 +4,6 @@ using System; using System.Text; -using MQTTnet.Internal; namespace MQTTnet; @@ -17,17 +16,11 @@ public static string ConvertPayloadToString(this MqttApplicationMessage applicat throw new ArgumentNullException(nameof(applicationMessage)); } - if (applicationMessage.PayloadSegment == EmptyBuffer.ArraySegment) + if (applicationMessage.Payload.Length == 0) { return null; } - if (applicationMessage.PayloadSegment.Array == null) - { - return null; - } - - var payloadSegment = applicationMessage.PayloadSegment; - return Encoding.UTF8.GetString(payloadSegment.Array, payloadSegment.Offset, payloadSegment.Count); + return Encoding.UTF8.GetString(applicationMessage.Payload.Sequence); } } \ No newline at end of file diff --git a/Source/MQTTnet/Packets/MqttPublishPacket.cs b/Source/MQTTnet/Packets/MqttPublishPacket.cs index c07dddda6..d379d0397 100644 --- a/Source/MQTTnet/Packets/MqttPublishPacket.cs +++ b/Source/MQTTnet/Packets/MqttPublishPacket.cs @@ -2,13 +2,15 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Buffers; +using MQTTnet.Protocol; using System; +using System.Buffers; using System.Collections.Generic; -using MQTTnet.Protocol; namespace MQTTnet.Packets; -public sealed class MqttPublishPacket : MqttPacketWithIdentifier +public class MqttPublishPacket : MqttPacketWithIdentifier { public string ContentType { get; set; } @@ -20,7 +22,7 @@ public sealed class MqttPublishPacket : MqttPacketWithIdentifier public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; } = MqttPayloadFormatIndicator.Unspecified; - public ArraySegment PayloadSegment { get; set; } + public MqttPayloadOwner Payload { get; set; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } = MqttQualityOfServiceLevel.AtMostOnce; @@ -39,6 +41,6 @@ public sealed class MqttPublishPacket : MqttPacketWithIdentifier public override string ToString() { return - $"Publish: [Topic={Topic}] [PayloadLength={PayloadSegment.Count}] [QoSLevel={QualityOfServiceLevel}] [Dup={Dup}] [Retain={Retain}] [PacketIdentifier={PacketIdentifier}]"; + $"Publish: [Topic={Topic}] [PayloadLength={Payload.Length}] [QoSLevel={QualityOfServiceLevel}] [Dup={Dup}] [Retain={Retain}] [PacketIdentifier={PacketIdentifier}]"; } } \ No newline at end of file