Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make PooledArrayBufferWriter more versatile, rename #8453

Merged
merged 1 commit into from Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Orleans.Core/Messaging/CachingSiloAddressCodec.cs
Expand Up @@ -131,7 +131,7 @@ private static SiloAddress ReadSiloAddressInner<TInput>(ref Reader<TInput> reade
return;
}

var innerWriter = Writer.Create(new PooledArrayBufferWriter(), null);
var innerWriter = Writer.Create(new PooledBuffer(), null);
innerWriter.WriteInt32(value.GetConsistentHashCode());
WriteSiloAddressInner(ref innerWriter, value);
innerWriter.Commit();
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Messaging/MessageSerializer.cs
Expand Up @@ -121,7 +121,7 @@ internal sealed class MessageSerializer
finally
{
input = input.Slice(requiredBytes);
_deserializationSession.FullReset();
_deserializationSession.Reset();
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ private ResponseCodec GetRawCodec(Type fieldType)
finally
{
_bufferWriter.Reset();
_serializationSession.FullReset();
_serializationSession.Reset();
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/Orleans.Serialization.SystemTextJson/JsonCodec.cs
Expand Up @@ -65,9 +65,9 @@ void IFieldCodec.WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uin
writer.Session.TypeCodec.WriteLengthPrefixed(ref writer, value.GetType());

// Write the serialized payload
// Note that the Utf8JsonWriter and PooledArrayBufferWriter could be pooled as long as they're correctly
// Note that the Utf8JsonWriter and PooledBuffer could be pooled as long as they're correctly
// reset at the end of each use.
var bufferWriter = new BufferWriterBox<PooledArrayBufferWriter>(new PooledArrayBufferWriter());
var bufferWriter = new BufferWriterBox<PooledBuffer>(new PooledBuffer());
try
{
var jsonWriter = new Utf8JsonWriter(bufferWriter);
Expand Down Expand Up @@ -126,7 +126,7 @@ object IFieldCodec.ReadValue<TInput>(ref Reader<TInput> reader, Field field)
var length = reader.ReadVarUInt32();

// To possibly improve efficiency, this could be converted to read a ReadOnlySequence<byte> instead of a byte array.
var tempBuffer = new PooledArrayBufferWriter();
var tempBuffer = new PooledBuffer();
try
{
reader.ReadBytes(ref tempBuffer, (int)length);
Expand Down Expand Up @@ -181,7 +181,7 @@ object IDeepCopier.DeepCopy(object input, CopyContext context)
return result;


var bufferWriter = new BufferWriterBox<PooledArrayBufferWriter>(new PooledArrayBufferWriter());
var bufferWriter = new BufferWriterBox<PooledBuffer>(new PooledBuffer());
try
{
var jsonWriter = new Utf8JsonWriter(bufferWriter);
Expand Down
29 changes: 29 additions & 0 deletions src/Orleans.Serialization.TestKit/BufferTestHelper.cs
Expand Up @@ -20,6 +20,7 @@ public static IBufferTestSerializer[] GetTestSerializers(IServiceProvider servic
}

result.Add(ActivatorUtilities.CreateInstance<StructBufferWriterTester>(serviceProvider));
result.Add(ActivatorUtilities.CreateInstance<PooledBufferWriterTester>(serviceProvider));
return result.ToArray();
}

Expand Down Expand Up @@ -90,5 +91,33 @@ public StructBufferWriterTester(IServiceProvider serviceProvider) : base(service

public override string ToString() => $"{nameof(TestBufferWriterStruct)}";
}

private struct PooledOutputBuffer : IBufferWriter<byte>, IOutputBuffer, IDisposable
{
private PooledBuffer _buffer;

public PooledOutputBuffer()
{
_buffer = new();
}

public void Advance(int count) => _buffer.Advance(count);
public void Dispose() => _buffer.Dispose();
public Memory<byte> GetMemory(int sizeHint = 0) => _buffer.GetMemory(sizeHint);
public ReadOnlySequence<byte> GetReadOnlySequence(int maxSegmentSize) => _buffer.AsReadOnlySequence();
public Span<byte> GetSpan(int sizeHint = 0) => _buffer.GetSpan(sizeHint);
}

[ExcludeFromCodeCoverage]
private class PooledBufferWriterTester : BufferTester<PooledOutputBuffer>
{
public PooledBufferWriterTester(IServiceProvider serviceProvider) : base(serviceProvider)
{
}

protected override PooledOutputBuffer CreateBufferWriter() => new();

public override string ToString() => $"{nameof(PooledBufferWriterTester)}";
}
}
}
80 changes: 41 additions & 39 deletions src/Orleans.Serialization.TestKit/FieldCodecTester.cs
Expand Up @@ -521,7 +521,7 @@ public void CanRoundTripCollectionViaSerializer()
var writer = Writer.CreatePooled(writerSession);
serializer.Serialize(original, ref writer);
using var readerSession = _sessionPool.GetSession();
var reader = Reader.Create(writer.Output.AsReadOnlySequence(), readerSession);
var reader = Reader.Create(writer.Output, readerSession);
var deserialized = serializer.Deserialize(ref reader);

Assert.Equal(original.Count, deserialized.Count);
Expand Down Expand Up @@ -655,7 +655,7 @@ public void CanRoundTripTupleViaSerializer()
var writer = Writer.CreatePooled(writerSession);
serializer.Serialize(original, ref writer);
using var readerSession = _sessionPool.GetSession();
var reader = Reader.Create(writer.Output.AsReadOnlySequence(), readerSession);
var reader = Reader.Create(writer.Output, readerSession);
var deserialized = serializer.Deserialize(ref reader);

var isEqual = Equals(original.Item1, deserialized.Item1);
Expand Down Expand Up @@ -860,60 +860,62 @@ private void TestRoundTrippedValue(TValue original)
protected T RoundTripThroughCodec<T>(T original)
{
T result;
var pipe = new Pipe();
using (var readerSession = SessionPool.GetSession())
using (var writeSession = SessionPool.GetSession())
{
var writer = Writer.Create(pipe.Writer, writeSession);
var codec = ServiceProvider.GetRequiredService<ICodecProvider>().GetCodec<T>();
codec.WriteField(
ref writer,
0,
null,
original);
writer.Commit();
_ = pipe.Writer.FlushAsync().AsTask().GetAwaiter().GetResult();
pipe.Writer.Complete();

_ = pipe.Reader.TryRead(out var readResult);
var reader = Reader.Create(readResult.Buffer, readerSession);

var previousPos = reader.Position;
var initialHeader = reader.ReadFieldHeader();
Assert.True(reader.Position > previousPos);

result = codec.ReadValue(ref reader, initialHeader);
pipe.Reader.AdvanceTo(readResult.Buffer.End);
pipe.Reader.Complete();
var writer = Writer.CreatePooled(writeSession);
try
{
var codec = ServiceProvider.GetRequiredService<ICodecProvider>().GetCodec<T>();
codec.WriteField(
ref writer,
0,
null,
original);
writer.Commit();

var output = writer.Output.AsReadOnlySequence();
var reader = Reader.Create(output, readerSession);

var previousPos = reader.Position;
var initialHeader = reader.ReadFieldHeader();
Assert.True(reader.Position > previousPos);

result = codec.ReadValue(ref reader, initialHeader);
}
finally
{
writer.Dispose();
}
}

return result;
}

protected object RoundTripThroughUntypedSerializer(object original, out string formattedBitStream)
{
var pipe = new Pipe();
object result;
using (var readerSession = SessionPool.GetSession())
using (var writeSession = SessionPool.GetSession())
{
var writer = Writer.Create(pipe.Writer, writeSession);
var serializer = ServiceProvider.GetService<Serializer<object>>();
serializer.Serialize(original, ref writer);

_ = pipe.Writer.FlushAsync().AsTask().GetAwaiter().GetResult();
pipe.Writer.Complete();

_ = pipe.Reader.TryRead(out var readResult);
var writer = Writer.CreatePooled(writeSession);
try
{
var serializer = ServiceProvider.GetService<Serializer<object>>();
serializer.Serialize(original, ref writer);

using var analyzerSession = SessionPool.GetSession();
formattedBitStream = BitStreamFormatter.Format(readResult.Buffer, analyzerSession);
using var analyzerSession = SessionPool.GetSession();
var output = writer.Output.Slice();
formattedBitStream = BitStreamFormatter.Format(output, analyzerSession);

var reader = Reader.Create(readResult.Buffer, readerSession);
var reader = Reader.Create(output, readerSession);

result = serializer.Deserialize(ref reader);
pipe.Reader.AdvanceTo(readResult.Buffer.End);
pipe.Reader.Complete();
result = serializer.Deserialize(ref reader);
}
finally
{
writer.Dispose();
}
}

return result;
Expand Down
111 changes: 111 additions & 0 deletions src/Orleans.Serialization/Buffers/Adaptors/BufferSliceReaderInput.cs
@@ -0,0 +1,111 @@
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using static Orleans.Serialization.Buffers.PooledBuffer;

namespace Orleans.Serialization.Buffers.Adaptors;

/// <summary>
/// Input type for <see cref="Reader{TInput}"/> to support <see cref="BufferSlice"/> buffers.
/// </summary>
public struct BufferSliceReaderInput
{
private static readonly SequenceSegment InitialSegmentSentinel = new();
private static readonly SequenceSegment FinalSegmentSentinel = new();
private readonly BufferSlice _slice;
private SequenceSegment _segment;
private int _position;

/// <summary>
/// Initializes a new instance of the <see cref="BufferSliceReaderInput"/> type.
/// </summary>
/// <param name="slice">The underlying buffer.</param>
public BufferSliceReaderInput(in BufferSlice slice)
{
_slice = slice;
_segment = InitialSegmentSentinel;
}

internal readonly PooledBuffer Buffer => _slice._buffer;
internal readonly int Position => _position;
internal readonly int Offset => _slice._offset;
internal readonly int Length => _slice._length;
internal long PreviousBuffersSize;

internal BufferSliceReaderInput ForkFrom(int position)
{
var sliced = _slice.Slice(position);
return new BufferSliceReaderInput(in sliced);
}

internal ReadOnlySpan<byte> GetNext()
{
if (ReferenceEquals(_segment, InitialSegmentSentinel))
{
_segment = _slice._buffer._first;
}

var endPosition = Offset + Length;
while (_segment != null && _segment != FinalSegmentSentinel)
{
var segment = _segment.CommittedMemory.Span;

// Find the starting segment and the offset to copy from.
int segmentOffset;
if (_position < Offset)
{
if (_position + segment.Length <= Offset)
{
// Start is in a subsequent segment
_position += segment.Length;
_segment = _segment.Next as SequenceSegment;
continue;
}
else
{
// Start is in this segment
segmentOffset = Offset;
}
}
else
{
segmentOffset = 0;
}

var segmentLength = Math.Min(segment.Length - segmentOffset, endPosition - (_position + segmentOffset));
if (segmentLength == 0)
{
ThrowInsufficientData();
return default;
}

var result = segment.Slice(segmentOffset, segmentLength);
_position += segmentOffset + segmentLength;
_segment = _segment.Next as SequenceSegment;
return result;
}

if (_segment != FinalSegmentSentinel && Buffer._currentPosition > 0 && Buffer._writeHead is { } head && _position < endPosition)
{
var finalOffset = Math.Max(Offset - _position, 0);
var finalLength = Math.Min(Buffer._currentPosition, endPosition - (_position + finalOffset));
if (finalLength == 0)
{
ThrowInsufficientData();
return default;
}

var result = head.Array.AsSpan(finalOffset, finalLength);
_position += finalOffset + finalLength;
Debug.Assert(_position == endPosition);
_segment = FinalSegmentSentinel;
return result;
}

ThrowInsufficientData();
return default;
}

[DoesNotReturn]
private static void ThrowInsufficientData() => throw new InvalidOperationException("Insufficient data present in buffer.");
}
@@ -0,0 +1,54 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;

namespace Orleans.Serialization.Buffers.Adaptors;

internal static class BufferWriterExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Write<TBufferWriter>(this ref TBufferWriter writer, ReadOnlySequence<byte> input) where TBufferWriter : struct, IBufferWriter<byte>
{
foreach (var segment in input)
{
writer.Write(segment.Span);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Write<TBufferWriter>(ref this TBufferWriter writer, ReadOnlySpan<byte> value) where TBufferWriter : struct, IBufferWriter<byte>
{
var destination = writer.GetSpan();

// Fast path, try copying to the available memory directly
if (value.Length <= destination.Length)
{
value.CopyTo(destination);
writer.Advance(value.Length);
}
else
{
WriteMultiSegment(ref writer, value, destination);
}
}

private static void WriteMultiSegment<TBufferWriter>(ref TBufferWriter writer, in ReadOnlySpan<byte> source, Span<byte> destination) where TBufferWriter : struct, IBufferWriter<byte>
{
var input = source;
while (true)
{
var writeSize = Math.Min(destination.Length, input.Length);
input.Slice(0, writeSize).CopyTo(destination);
writer.Advance(writeSize);
input = input.Slice(writeSize);
if (input.Length > 0)
{
destination = writer.GetSpan();

continue;
}

return;
}
}
}