Skip to content

Commit

Permalink
Custom frame size computation support in Framing (#5444)
Browse files Browse the repository at this point in the history
* Custom frame size computation support in Framing

* Speed up of framing spec
  • Loading branch information
ismaelhamed committed Dec 19, 2021
1 parent 985e5ed commit bf581e9
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ namespace Akka.Streams.Dsl
{
public static Akka.Streams.Dsl.Flow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> Delimiter(Akka.IO.ByteString delimiter, int maximumFrameLength, bool allowTruncation = False) { }
public static Akka.Streams.Dsl.Flow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> LengthField(int fieldLength, int maximumFramelength, int fieldOffset = 0, Akka.IO.ByteOrder byteOrder = 1) { }
public static Akka.Streams.Dsl.Flow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> LengthField(int fieldLength, int fieldOffset, int maximumFrameLength, Akka.IO.ByteOrder byteOrder, System.Func<System.Collections.Generic.IReadOnlyList<byte>, int, int> computeFrameSize) { }
public static Akka.Streams.Dsl.BidiFlow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> SimpleFramingProtocol(int maximumMessageLength) { }
public static Akka.Streams.Dsl.Flow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> SimpleFramingProtocolDecoder(int maximumMessageLength) { }
public static Akka.Streams.Dsl.Flow<Akka.IO.ByteString, Akka.IO.ByteString, Akka.NotUsed> SimpleFramingProtocolEncoder(int maximumMessageLength) { }
Expand Down
133 changes: 111 additions & 22 deletions src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Text;
using System.Threading.Tasks;
using Akka.IO;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Fusing;
Expand Down Expand Up @@ -240,43 +241,98 @@ private static string RandomString(int length)

private static readonly List<int> FieldOffsets = new List<int> {0, 1, 2, 3, 15, 16, 31, 32, 44, 107};

private static ByteString Encode(ByteString payload, int fieldOffset, int fieldLength, ByteOrder byteOrder)
private static ByteString Encode(ByteString payload, int fieldOffset, int fieldLength, ByteOrder byteOrder) =>
EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(new byte[fieldOffset]), ByteString.Empty);

private static ByteString EncodeComplexFrame(
ByteString payload,
int fieldLength,
ByteOrder byteOrder,
ByteString offset,
ByteString tail)
{
var h = ByteString.FromBytes(new byte[4].PutInt(payload.Count, order: byteOrder));
var header = byteOrder == ByteOrder.LittleEndian ? h.Slice(0, fieldLength) : h.Slice(4 - fieldLength);

return ByteString.FromBytes(new byte[fieldOffset]) + header + payload;
return offset + header + payload + tail;
}

[Fact]
public void Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets()
{
var counter = 1;
foreach (var byteOrder in ByteOrders)
IEnumerable<Task<(IEnumerable<ByteString>, List<ByteString>, (ByteOrder, int, int))>> GetFutureResults()
{
foreach (var byteOrder in ByteOrders)
foreach (var fieldOffset in FieldOffsets)
foreach (var fieldLength in FieldLengths)
{
foreach (var fieldLength in FieldLengths)
var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var payload = ReferenceChunk.Slice(0, length);
return Encode(payload, fieldOffset, fieldLength, byteOrder);
}).ToList();

var task = Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer);

task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result.Should().BeEquivalentTo(encodedFrames);

_helper.WriteLine($"{counter++} from 80 passed");
var payload = ReferenceChunk.Slice(0, length);
return Encode(payload, fieldOffset, fieldLength, byteOrder);
}).ToList();

yield return Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer)
.ContinueWith(t => (t.Result, encodedFrames, (byteOrder, fieldOffset, fieldLength)));
}
}

Parallel.ForEach(GetFutureResults(), async futureResult =>
{
var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult;
result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}");
});
}

[Fact]
public void Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets_using_ComputeFrameSize()
{
IEnumerable<Task<(IEnumerable<ByteString>, List<ByteString>, (ByteOrder, int, int))>> GetFutureResults()
{
foreach (var byteOrder in ByteOrders)
foreach (var fieldOffset in FieldOffsets)
foreach (var fieldLength in FieldLengths)
{
int ComputeFrameSize(IReadOnlyList<byte> offset, int length)
{
var sizeWithoutTail = offset.Count + fieldLength + length;
return offset.Count > 0 ? offset[0] + sizeWithoutTail : sizeWithoutTail;
}

var random = new Random();
byte[] Offset()
{
var arr = new byte[fieldOffset];
if (arr.Length > 0) arr[0] = Convert.ToByte(random.Next(128));
return arr;
}

var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var payload = ReferenceChunk.Slice(0, length);
var offsetBytes = Offset();
var tailBytes = offsetBytes.Length > 0 ? new byte[offsetBytes[0]] : Array.Empty<byte>();
return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), ByteString.FromBytes(tailBytes));
}).ToList();

yield return Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, fieldOffset, int.MaxValue, byteOrder, ComputeFrameSize))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer)
.ContinueWith(t => (t.Result, encodedFrames, (byteOrder, fieldOffset, fieldLength)));
}
}

Parallel.ForEach(GetFutureResults(), async futureResult =>
{
var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult;
result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}");
});
}

[Fact]
Expand Down Expand Up @@ -384,6 +440,39 @@ public void Length_field_based_framing_must_fail_the_stage_on_negative_length_fi
.Should().Throw<Framing.FramingException>()
.WithMessage("Decoded frame header reported negative size -4");
}

[Fact]
public void Length_field_based_framing_must_ignore_length_field_value_when_provided_computeFrameSize()
{
int ComputeFrameSize(IReadOnlyList<byte> offset, int length) => 8;

var tempArray = new byte[4].PutInt(unchecked((int)0xFF010203), order: ByteOrder.LittleEndian);
Array.Resize(ref tempArray, 8);
var bs = ByteString.FromBytes(tempArray.PutInt(checked(0x04050607), order: ByteOrder.LittleEndian));

var result = Source.Single(bs)
.Via(Flow.Create<ByteString>().Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize)))
.RunWith(Sink.Seq<ByteString>(), Materializer);

result.AwaitResult().Should().BeEquivalentTo(ImmutableArray.Create(bs));
}

[Fact]
public void Length_field_based_framing_must_fail_the_stage_on_computeFrameSize_values_less_than_minimum_chunk_size()
{
int ComputeFrameSize(IReadOnlyList<byte> offset, int length) => 3;

// A 4-byte message containing only an Int specifying the length of the payload
var bytes = ByteString.FromBytes(BitConverter.GetBytes(4));

var result = Source.Single(bytes)
.Via(Flow.Create<ByteString>().Via(Framing.LengthField(4, 0, 1000, ByteOrder.LittleEndian, ComputeFrameSize)))
.RunWith(Sink.Seq<ByteString>(), Materializer);

result.Invoking(t => t.AwaitResult())
.Should().Throw<Framing.FramingException>()
.WithMessage("Computed frame size 3 is less than minimum chunk size 4");
}

[Fact]
public void Length_field_based_framing_must_let_zero_length_field_values_pass_through()
Expand Down
67 changes: 61 additions & 6 deletions src/core/Akka.Streams/Dsl/Framing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static class Framing
/// Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that
/// incoming frames have a field that encodes their length.
///
/// If the input stream finishes before the last frame has been fully decoded this Flow will fail the stream reporting
/// If the input stream finishes before the last frame has been fully decoded, this Flow will fail the stream reporting
/// a truncated frame.
/// </summary>
/// <param name="fieldLength">The length of the "Count" field in bytes</param>
Expand All @@ -69,6 +69,43 @@ public static class Framing
.Named("LengthFieldFraming");
}

/// <summary>
/// Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that
/// incoming frames have a field that encodes their length.
/// <para>
/// If the input stream finishes before the last frame has been fully decoded, this Flow will fail the stream reporting
/// a truncated frame.
/// </para>
/// </summary>
/// <param name="fieldLength">The length of the "Count" field in bytes</param>
/// <param name="maximumFrameLength">The maximum length of allowed frames while decoding. If the maximum length is exceeded this Flow will fail the stream. This length *includes* the header (i.e the offset and the length of the size field)</param>
/// <param name="fieldOffset">The offset of the field from the beginning of the frame in bytes.</param>
/// <param name="byteOrder">The <see cref="ByteOrder"/> to be used when decoding the field.</param>
/// <param name="computeFrameSize">
/// This function can be supplied if frame size is varied or needs to be computed in a special fashion.
/// For example, frame can have a shape like this: `[offset bytes][body size bytes][body bytes][footer bytes]`.
/// Then computeFrameSize can be used to compute the frame size: `(offset bytes, computed size) => (actual frame size)`.
/// "Actual frame size" must be equal or bigger than sum of `fieldOffset` and `fieldLength`, the operator fails otherwise.
/// </param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="fieldLength"/> is not equal to either 1, 2, 3 or 4.
/// </exception>
/// <returns>TBD</returns>
public static Flow<ByteString, ByteString, NotUsed> LengthField(
int fieldLength,
int fieldOffset,
int maximumFrameLength,
ByteOrder byteOrder,
Func<IReadOnlyList<byte>, int, int> computeFrameSize)
{
if (fieldLength < 1 || fieldLength > 4)
throw new ArgumentException("Length field length must be 1,2,3 or 4", nameof(fieldLength));

return Flow.Create<ByteString>()
.Via(new LengthFieldFramingStage(fieldLength, maximumFrameLength, fieldOffset, byteOrder, computeFrameSize))
.Named("LengthFieldFraming");
}

/// <summary>
/// Returns a BidiFlow that implements a simple framing protocol. This is a convenience wrapper over <see cref="LengthField"/>
/// and simply attaches a length field header of four bytes (using big endian encoding) to outgoing messages, and decodes
Expand Down Expand Up @@ -405,14 +442,20 @@ private void TryPushFrame()
{
var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator();
var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength);
_frameSize = parsedLength + _stage._minimumChunkSize;

_frameSize = _stage._computeFrameSize.HasValue
? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength)
: parsedLength + _stage._minimumChunkSize;

if (_frameSize > _stage._maximumFramelength)
FailStage(new FramingException(
$"Maximum allowed frame size is {_stage._maximumFramelength} but decoded frame header reported size {_frameSize}"));
else if (parsedLength < 0)
else if (_stage._computeFrameSize.IsEmpty && parsedLength < 0)
FailStage(new FramingException(
$"Decoded frame header reported negative size {parsedLength}"));
else if (_frameSize < _stage._minimumChunkSize)
FailStage(new FramingException(
$"Computed frame size {_frameSize} is less than minimum chunk size {_stage._minimumChunkSize}"));
else if (bufferSize >= _frameSize)
PushFrame();
else
Expand All @@ -438,13 +481,25 @@ private void TryPull()
private readonly int _lengthFieldOffset;
private readonly int _minimumChunkSize;
private readonly Func<IEnumerator<byte>, int, int> _intDecoder;

public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, ByteOrder byteOrder) : base("LengthFieldFramingStage")
private readonly Option<Func<IReadOnlyList<byte>, int, int>> _computeFrameSize;

// For the sake of binary compatibility
public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, ByteOrder byteOrder)
: this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, Option<Func<IReadOnlyList<byte>, int, int>>.None)
{ }

public LengthFieldFramingStage(
int lengthFieldLength,
int maximumFrameLength,
int lengthFieldOffset,
ByteOrder byteOrder,
Option<Func<IReadOnlyList<byte>, int, int>> computeFrameSize) : base("LengthFieldFramingStage")
{
_lengthFieldLength = lengthFieldLength;
_maximumFramelength = maximumFramelength;
_maximumFramelength = maximumFrameLength;
_lengthFieldOffset = lengthFieldOffset;
_minimumChunkSize = lengthFieldOffset + lengthFieldLength;
_computeFrameSize = computeFrameSize;
_intDecoder = byteOrder == ByteOrder.BigEndian ? BigEndianDecoder : LittleEndianDecoder;
}

Expand Down

0 comments on commit bf581e9

Please sign in to comment.