diff --git a/src/OpenRiaServices.DomainServices.Client.Web/Framework/OpenRiaServices.DomainServices.Client.Web.csproj b/src/OpenRiaServices.DomainServices.Client.Web/Framework/OpenRiaServices.DomainServices.Client.Web.csproj index f2dc2e3a2..2eb065523 100644 --- a/src/OpenRiaServices.DomainServices.Client.Web/Framework/OpenRiaServices.DomainServices.Client.Web.csproj +++ b/src/OpenRiaServices.DomainServices.Client.Web/Framework/OpenRiaServices.DomainServices.Client.Web.csproj @@ -5,6 +5,7 @@ net46;netstandard2.0 WCF;RIA;Services;RIAServices;Silverlight;OpenRiaServices false + true @@ -34,9 +35,7 @@ Web\Behaviors\WebHttpQueryStringConverter.cs - - Data\MessageEncoders\PoxBinaryMessageEncodingBindingElement.cs - + diff --git a/src/OpenRiaServices.DomainServices.Hosting/Framework/OpenRiaServices.DomainServices.Hosting.csproj b/src/OpenRiaServices.DomainServices.Hosting/Framework/OpenRiaServices.DomainServices.Hosting.csproj index 581bbfde1..c2555d96a 100644 --- a/src/OpenRiaServices.DomainServices.Hosting/Framework/OpenRiaServices.DomainServices.Hosting.csproj +++ b/src/OpenRiaServices.DomainServices.Hosting/Framework/OpenRiaServices.DomainServices.Hosting.csproj @@ -3,6 +3,7 @@ false net46 SERVERFX;$(DefineConstants) + true diff --git a/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/BinaryMessageWriter.cs b/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/BinaryMessageWriter.cs new file mode 100644 index 000000000..73108e8a3 --- /dev/null +++ b/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/BinaryMessageWriter.cs @@ -0,0 +1,112 @@ +using System; +using System.ServiceModel.Channels; +using System.Xml; + +#if SERVERFX +namespace OpenRiaServices.DomainServices.Hosting +#else +namespace OpenRiaServices.DomainServices.Client +#endif +{ + /// + /// Helper class to cache and stream in order + /// to not have to allocated all memory used for the writer. + /// It also adds some estimates of the buffer size needde + /// + internal class BinaryMessageWriter + { + private BufferManagerStream _stream; + private XmlDictionaryWriter _writer; + + private const int MaxStreamAllocationSize = 1024 * 1024; + // IMPORTANT: If this is changed then EstimateMessageSize should be changed as well + private const int MessageLengthHistorySize = 4; + private const int InitialBufferSize = 2048; + private readonly int[] _lastMessageLengths = new int[MessageLengthHistorySize] { InitialBufferSize, InitialBufferSize, InitialBufferSize, InitialBufferSize }; + private int _messageLengthIndex = 0; + + // Cache at most one writer per thread + [ThreadStatic] + private static BinaryMessageWriter s_threadInstance; + + /// + /// Prevent creation from outside of this class + /// + private BinaryMessageWriter() + { + } + + /// + /// Writes the specified message to a byte array allocated by the specigied + /// + public static ArraySegment WriteMessage(Message message, BufferManager bufferManager, int messageOffset) + { + var messageWriter = s_threadInstance ?? new BinaryMessageWriter(); + // Reentrancy is not expected, but if the operation throws we dont + // want to reuse the current messageWriter since XmlWriter might not be in starting state + s_threadInstance = null; + + var result = messageWriter.WriteMessageCore(message, bufferManager, messageOffset); + + // Save for later reuse + s_threadInstance = messageWriter; + return result; + } + + /// + /// Writes the specified message to a byte array allocated by the specigied + /// + private ArraySegment WriteMessageCore(Message message, BufferManager bufferManager, int offset) + { + try + { + XmlDictionaryWriter writer = GetXmlWriter(bufferManager, offset); + message.WriteMessage(writer); + writer.Flush(); + + var result = _stream.GetArrayAndClear(); + RecordMessageSize(result.Count); + return result; + } + catch + { + _stream.Clear(); + throw; + } + } + + private void RecordMessageSize(int count) + { + _lastMessageLengths[_messageLengthIndex] = count; + _messageLengthIndex = (_messageLengthIndex + 1) % MessageLengthHistorySize; + } + + /// + /// Get estimate based on maximum buffer size of the last few messages. + /// + private int EstimateMessageSize() + { + int max1 = Math.Max(_lastMessageLengths[3], _lastMessageLengths[2]); + int max2 = Math.Max(_lastMessageLengths[1], _lastMessageLengths[0]); + + return Math.Max(max2, max1) + 256; + } + + private XmlDictionaryWriter GetXmlWriter(BufferManager bufferManager, int offset) + { + int startSize = EstimateMessageSize(); + // Reuse created writer and stream if possible, they are created on first call + if (_writer != null) + { + _stream.Reset(bufferManager, offset, startSize); + } + else + { + _stream = new BufferManagerStream(bufferManager, offset, startSize, MaxStreamAllocationSize); + _writer = XmlDictionaryWriter.CreateBinaryWriter(_stream); + } + + return _writer; + } + } +} diff --git a/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/BufferManagerStream.cs b/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/BufferManagerStream.cs new file mode 100644 index 000000000..44b000c0b --- /dev/null +++ b/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/BufferManagerStream.cs @@ -0,0 +1,234 @@ +using System; +using System.IO; +using System.ServiceModel.Channels; + +#if SERVERFX +namespace OpenRiaServices.DomainServices.Hosting +#else +namespace OpenRiaServices.DomainServices.Client +#endif +{ + /// + /// Stream optimized for usage by without unneccessary + /// allocations on LOH. + /// It writes directly to memory pooled by a in order to + /// avoid allocations and be able to return memory directly without additional copies + /// (for small messages). + /// + internal class BufferManagerStream : Stream + { + private static readonly bool Is64BitProcess = Environment.Is64BitProcess; + private BufferManager _bufferManager; + private readonly int _maxSize; + // The offset into the final byte array where our content should start + private int _offset; + // number of bytes written to _buffer, used as offset into _buffer where we write next time + private int _bufferWritten; + // "Current" buffer where the next write should go + private byte[] _buffer; + // Any "previous" buffers already filled + private System.Collections.Generic.List _bufferList; + // String "position" (total size so far) + private int _position; + + + public BufferManagerStream(BufferManager bufferManager, int offset, int minAllocationSize, int maxAllocationSize) + { + _maxSize = maxAllocationSize; + Reset(bufferManager, offset, minAllocationSize); + } + + public void Reset(BufferManager bufferManager, int offset, int minAllocationSize) + { + _bufferManager = bufferManager; + _offset = offset; + _bufferWritten = offset; + _position = 0; + _buffer = bufferManager.TakeBuffer(minAllocationSize + offset); + } + + public override bool CanRead => false; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotImplementedException(); + + public override long Position { get => _position; set => throw new NotImplementedException(); } + + public override void Flush() + { + // Nothing to do + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + // Argument validation is skipped since it is only used by + // BinaryXml writer which we trust to always give valid input + + // Note: BinaryXml buffers up to 512 bytesso we should expect most writes to be around + // 500+ bytes (smaller if the next write is a long string or byte array) + do + { + EnsureBufferCapacity(); + + // Write up to count bytes, but never more than the rest of the buffer + int toCopy = Math.Min(count, _buffer.Length - _bufferWritten); + FastCopy(buffer, offset, _buffer, _bufferWritten, toCopy); + _position += toCopy; + _bufferWritten += toCopy; + offset += toCopy; + count -= toCopy; + } while (count > 0); + } + + /// + /// Allocate more space if buffer is full. + /// Ensures _buffer is non null and has space to write more bytes + /// + private void EnsureBufferCapacity() + { + // There is space left + if (_bufferWritten < _buffer.Length) + return; + + // Save current buffer in list before allocating a new buffer + if (_bufferList == null) + _bufferList = new System.Collections.Generic.List(capacity: 16); + _bufferList.Add(_buffer); + // Ensure we never return buffer twice in case TakeBuffer below throws + _buffer = null; + + int nextSize = Math.Min(_position * 2, _maxSize); + _buffer = _bufferManager.TakeBuffer(nextSize); + _bufferWritten = 0; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Clear(); + } + base.Dispose(disposing); + } + + /// + /// Returns all mmmory to the current BufferManager + /// + public void Clear() + { + if (_buffer != null) + { + _bufferManager.ReturnBuffer(_buffer); + _buffer = null; + } + + if (_bufferList != null) + { + foreach (var buffer in _bufferList) + _bufferManager.ReturnBuffer(buffer); + _bufferList = null; + } + } + + /// + /// Copies bytes from to using fastes + /// copy based on process bitness (x86 / x64) tested on .Net Framework 4.8 + /// + private static unsafe void FastCopy(byte[] src, int srcOffset, byte[] dest, int destOffset, int count) + { + if (count == 0) + return; + + if (Is64BitProcess && count <= 1024) + { + fixed (byte* s = &src[srcOffset], d = &dest[destOffset]) + Buffer.MemoryCopy(s, d, dest.Length - destOffset, count); + } + else + { + // For x86 it is significantly faster to do copying of int's and longs + // or similar in managed code for smaller counts (below 100-200) + // But we expect most copies to be larger since xml writer buffer around 500 bytes + Buffer.BlockCopy(src, srcOffset, dest, destOffset, count); + } + } + + public ArraySegment GetArrayAndClear() + { + // We only have a single segment, return it directly with no copying + if (_bufferList == null) + { + var buffer = _buffer; + _buffer = null; + + System.Diagnostics.Debug.Assert(_bufferWritten == _position + _offset); + Clear(); + return new ArraySegment(buffer, _offset, _position); + } + else + { + // Copy in reverse order from filled to utilize CPU caches better + // _buffer might only be partially filled + int totalSize = _offset + _position; + int destOffset = totalSize - _bufferWritten; + byte[] buffer = null; + + try + { + // Reuse the "current" buffer if it is large enough + if (_position <= _buffer.Length) + { + buffer = _buffer; + FastCopy(_buffer, 0, buffer, destOffset, _bufferWritten); + _buffer = null; + } + else + { + buffer = _bufferManager.TakeBuffer(totalSize); + FastCopy(_buffer, 0, buffer, destOffset, _bufferWritten); + } + + // Buffers in list are all full + for (int i = _bufferList.Count - 1; i > 0; --i) + { + destOffset -= _bufferList[i].Length; + FastCopy(_bufferList[i], 0, buffer, destOffset, _bufferList[i].Length); + } + + // First buffer might have offset + FastCopy(_bufferList[0], _offset, buffer, _offset, _bufferList[0].Length - _offset); + System.Diagnostics.Debug.Assert(destOffset - (_bufferList[0].Length - _offset) == _offset); + + Clear(); + + return new ArraySegment(buffer, _offset, _position); + + } + catch + { + if (buffer != null) + _bufferManager.ReturnBuffer(buffer); + throw; + } + } + } + } +} diff --git a/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/PoxBinaryMessageEncodingBindingElement.cs b/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/PoxBinaryMessageEncodingBindingElement.cs index 9fe2a671e..0a00497d0 100644 --- a/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/PoxBinaryMessageEncodingBindingElement.cs +++ b/src/OpenRiaServices.DomainServices.Hosting/Framework/Services/MessageEncoders/PoxBinaryMessageEncodingBindingElement.cs @@ -237,34 +237,17 @@ public override Message ReadMessage(Stream stream, int maxSizeOfHeaders, string public override ArraySegment WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset) { if (message == null) - { throw new ArgumentNullException(nameof(message)); - } if (bufferManager == null) - { throw new ArgumentNullException(nameof(bufferManager)); - } if (maxMessageSize < 0) - { throw new ArgumentOutOfRangeException(nameof(maxMessageSize)); - } this.ThrowIfInvalidMessageVersion(message); - message.Properties.Encoder = this; - - using (MemoryStream ms = new MemoryStream()) - using (XmlDictionaryWriter writer = XmlDictionaryWriter.CreateBinaryWriter(ms)) - { - message.WriteMessage(writer); - writer.Flush(); - - byte[] buffer = bufferManager.TakeBuffer((int)ms.Position + messageOffset); - Buffer.BlockCopy(ms.GetBuffer(), 0, buffer, messageOffset, (int)ms.Position); - return new ArraySegment(buffer, messageOffset, (int)ms.Position); - } + return BinaryMessageWriter.WriteMessage(message, bufferManager, messageOffset); } public override void WriteMessage(Message message, Stream stream) diff --git a/src/OpenRiaServices.DomainServices.Hosting/Test/Data/BufferManagerStreamTests.cs b/src/OpenRiaServices.DomainServices.Hosting/Test/Data/BufferManagerStreamTests.cs new file mode 100644 index 000000000..8ed861f40 --- /dev/null +++ b/src/OpenRiaServices.DomainServices.Hosting/Test/Data/BufferManagerStreamTests.cs @@ -0,0 +1,278 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.ServiceModel.Channels; +using System.Text; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using OpenRiaServices.DomainServices.Client.Test; + +namespace OpenRiaServices.DomainServices.Hosting.Test.Data +{ + [TestClass] + public class BufferManagerStreamTests + { + // Buffer with bytes [1..255] + private readonly byte[] _input; + + public BufferManagerStreamTests() + { + _input = new byte[255]; + for (int i = 0; i < _input.Length; ++i) + { + _input[i] = (byte)(i + 1); + } + } + + /// + /// Only partially fill first buffer + /// + [TestMethod] + public void SmallWrite() + { + SmallWrite(offset: 0); + } + + + /// + /// Only partially fill first buffer but start at an offset + /// + [TestMethod] + public void SmallWriteWithOffset() + { + SmallWrite(offset: 6); + } + + public void SmallWrite(int offset) + { + var manager = new BufferManageMock(); + using (var stream = new BufferManagerStream(manager, offset, 4, 1024)) + { + stream.Write(_input, 0, 2); + + var buffer = VerifyStreamContents(stream, offset, 2, manager); + Assert.AreEqual(1, manager.Allocated.Count, "Should only have allocated a single buffer"); + Assert.AreSame(manager.Allocated[0], buffer.Array, "Should reuse initial array"); + } + } + + /// + /// Only partially fill first buffer + /// + [TestMethod] + public void SmallWriteFullBuffer() + { + int initialOffset = 0, minBufferSize = 4; + var manager = new BufferManageMock(); + using (var stream = new BufferManagerStream(manager, initialOffset, minBufferSize, 1024)) + { + stream.Write(_input, 0, minBufferSize); + var buffer = VerifyStreamContents(stream, initialOffset, minBufferSize, manager); + Assert.AreEqual(1, manager.Allocated.Count, "Should only have allocated a single buffer"); + Assert.AreSame(manager.Allocated[0], buffer.Array, "Should reuse initial array"); + } + } + + [TestMethod] + public void LargeWrite() + { + LargeWrite(offset: 0); + } + + [TestMethod] + public void LargeWriteWithOffset() + { + LargeWrite(offset: 5); + } + + public void LargeWrite(int offset) + { + var manager = new BufferManageMock(); + using (var stream = new BufferManagerStream(manager, offset, 4, 1024)) + { + stream.Write(_input, 0, 40); + VerifyStreamContents(stream, offset, 40, manager); + Assert.IsTrue(manager.Allocated.Count > 2, "Multiple buffers should have been used"); + } + } + + [TestMethod] + public void ReuseLastBufferIfPossible() + { + int initialOffset = 0, initalBufferSize = 4; + int buffer2Size = initalBufferSize * 2; + int buffer3Size = initalBufferSize + buffer2Size; + var manager = new BufferManageMock(); + using (var stream = new BufferManagerStream(manager, initialOffset, initalBufferSize, 1024)) + { + // Fill first and second buffers full + stream.Write(_input, 0, initalBufferSize); + stream.Write(_input, initalBufferSize, buffer2Size); + Assert.AreEqual(buffer2Size, manager.Allocated[1].Length, "This test assumed allocation size was wrong"); + + // Write next one up just a little so that everyhing should fit in the 3rd + int streamOffsetLastBuffer = (int)stream.Position; + stream.Write(_input, streamOffsetLastBuffer, streamOffsetLastBuffer); + + Assert.AreEqual(3, manager.Allocated.Count, "Test assumes 3 buffers"); + Assert.AreEqual((int)stream.Position, manager.Allocated[2].Length, "This test assumes allocation size is enough"); + + var buffer = VerifyStreamContents(stream, initialOffset, streamOffsetLastBuffer * 2, manager); + Assert.AreSame(manager.Allocated[2], buffer.Array); + } + } + + [TestMethod] + public void MultipleWrites() + { + int initialOffset = 0; + var manager = new BufferManageMock(); + using (var stream = new BufferManagerStream(manager, initialOffset, 4, 1024)) + { + stream.Write(_input, 0, 2); // Write partial buffer + stream.Write(_input, 2, 2); // Fill next buffer, next shoul be 4 + + stream.Write(_input, 4, 2); + stream.Write(_input, 6, manager.Allocated[1].Length); // Write past buffer into next one + + VerifyStreamContents(stream, initialOffset, 6 + manager.Allocated[1].Length, manager); + Assert.IsTrue(manager.Allocated.Count > 2, "Multiple buffers should have been used"); + } + } + + [TestMethod] + public void ShouldHandleThrowingAllocationWithoutMemoryLeak() + { + bool shouldTrow = false; + Func getBufferSize = size => + { + if (shouldTrow) throw new InsufficientMemoryException(); + else return size; + }; + + var manager = new BufferManageMock(getBufferSize); + + using (var stream = new BufferManagerStream(manager, 0, 4, 1024)) + { + stream.Write(_input, 0, 2); // Write some into first buffer + + shouldTrow = true; + ExceptionHelper.ExpectException(() => + stream.Write(_input, 2, 10) + ); + } + + manager.AssertEverythingIsReturned(); + } + + [TestMethod] + public void ResetShouldAllowStreamReuse() + { + int initialOffset = 0; + var manager = new BufferManageMock(); + using (var stream = new BufferManagerStream(manager, initialOffset, 4, 1024)) + { + stream.Write(_input, 0, 4); + manager.ReturnBuffer(stream.GetArrayAndClear().Array); + manager.AssertEverythingIsReturned(); + + initialOffset = 1; + stream.Reset(manager, initialOffset, 4); + + Assert.AreEqual(stream.Position, 0, "Position should be 0 after reset"); + + byte[] otherInput = new byte[] { 3, 2, 1 }; + stream.Write(otherInput, 0, otherInput.Length); + + var array = stream.GetArrayAndClear(); + VerifyBufferContents(array, initialOffset, new ArraySegment(otherInput)); + manager.ReturnBuffer(array.Array); + manager.AssertEverythingIsReturned(); + } + } + + private ArraySegment VerifyStreamContents(BufferManagerStream stream, int expectedOffset, int count, BufferManageMock manager, byte[] expectedContents = null) + { + Assert.AreEqual(count, stream.Position, "Stream position should equal count"); + + var buffer = stream.GetArrayAndClear(); + VerifyBufferContents(buffer, expectedOffset, new ArraySegment(expectedContents ?? _input, 0 , count)); + + // By returning buffer we also ensure that it was allocated through the buffer manager + manager.ReturnBuffer(buffer.Array); + manager.AssertEverythingIsReturned(); + return buffer; + } + + private void VerifyBufferContents(ArraySegment buffer, int expectedOffset, ArraySegment expectedContents) + { + Assert.AreEqual(buffer.Offset, expectedOffset, "Wrong offset"); + Assert.AreEqual(buffer.Count, expectedContents.Count, "Wrong count"); + + for (int i = 0; i < expectedContents.Count; ++i) + { + byte expected = expectedContents.Array[expectedContents.Offset + i]; + byte actual = buffer.Array[buffer.Offset + i]; + if (expected != actual) + { + Dump(buffer.Array, expectedContents.Count); + + Assert.Fail($"Buffer contents is wrong, expected {expected} but buffer[{buffer.Offset} + {i}] = {actual}"); + } + } + } + + private static void Dump(byte[] buf, int count) + { + for (int j = 0; j < count; ++j) + { + Console.Write("{0} ", (int)buf[j]); + } + Console.WriteLine(); + } + + sealed class BufferManageMock : BufferManager + { + private readonly HashSet _rented = new HashSet(); + private readonly List _allocated = new List(); + private readonly Func _getBufferSize; + + public IReadOnlyCollection Rented => _rented; + public IReadOnlyList Allocated => _allocated; + + public BufferManageMock(Func getBufferSize = null) + { + _getBufferSize = getBufferSize ?? ((int i) => i); + } + + public override void Clear() + { + if (_rented.Count > 0) + throw new InvalidOperationException(); + _rented.Clear(); + _allocated.Clear(); + } + + public override void ReturnBuffer(byte[] buffer) + { + if (!_rented.Remove(buffer)) + { + Assert.Fail("Buffer was not rented (returned twice?"); + } + } + + public override byte[] TakeBuffer(int bufferSize) + { + var buff = new byte[_getBufferSize(bufferSize)]; + _rented.Add(buff); + _allocated.Add(buff); + return buff; + } + + public void AssertEverythingIsReturned() + { + Assert.AreEqual(0, _rented.Count, "Not all buffers were returned"); + } + } + } +}