Skip to content
This repository has been archived by the owner on May 10, 2018. It is now read-only.

Move Consumed back to IReadableChannel as Advance #89

Merged
merged 4 commits into from Sep 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/Channels.Samples/HttpClient/ChannelHttpContent.cs
Expand Up @@ -62,7 +62,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
}
finally
{
inputBuffer.Consumed(consumed);
_output.Advance(consumed);
}
}
}
Expand Down
Expand Up @@ -219,7 +219,7 @@ private static async Task ProduceResponse(ConnectionState state, UvTcpConnection
}
finally
{
responseBuffer.Consumed(consumed);
connection.Input.Advance(consumed);
}

if (needMoreData)
Expand Down
2 changes: 1 addition & 1 deletion samples/Channels.Samples/HttpServer/HttpConnection.cs
Expand Up @@ -108,7 +108,7 @@ public async Task ProcessAllRequests()
}
finally
{
buffer.Consumed(buffer.Start, buffer.End);
_input.Advance(buffer.Start, buffer.End);
}

var context = _application.CreateContext(this);
Expand Down
Expand Up @@ -79,7 +79,7 @@ public async Task Execute(IReadableChannel input, IWritableChannel output)

inputBuffer = inputBuffer.Slice(0, consumed);

inputBuffer.Consumed();
input.Advance(inputBuffer.End);

await writerBuffer.FlushAsync();
}
Expand Down Expand Up @@ -164,7 +164,7 @@ public async Task Execute(IReadableChannel input, IWritableChannel output)
}
}

inputBuffer.Consumed();
input.Advance(inputBuffer.End);

await writerBuffer.FlushAsync();
}
Expand Down
2 changes: 1 addition & 1 deletion samples/Channels.Samples/RawLibuvHttpClientSample.cs
Expand Up @@ -55,7 +55,7 @@ private static async Task CopyCompletedAsync(IReadableChannel input, IWritableCh
}
finally
{
inputBuffer.Consumed();
input.Advance(inputBuffer.End);
}

var awaiter = input.ReadAsync();
Expand Down
2 changes: 1 addition & 1 deletion samples/Channels.Samples/RawLibuvHttpServerSample.cs
Expand Up @@ -74,7 +74,7 @@ public static void Run()
finally
{
// Consume the input
input.Consumed(input.Start, input.End);
connection.Input.Advance(input.Start, input.End);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Channels.Networking.Libuv/UvTcpConnection.cs
Expand Up @@ -77,7 +77,7 @@ private async Task ProcessWrites()
}
finally
{
buffer.Consumed();
_output.Advance(buffer.End);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Channels.Networking.Sockets/SocketConnection.cs
Expand Up @@ -283,7 +283,7 @@ private async void ProcessWrites()
}
finally
{
buffer.Consumed();
_output.Advance(buffer.End);
}
}
_output.CompleteReader();
Expand Down
2 changes: 1 addition & 1 deletion src/Channels.Networking.Windows.RIO/RioTcpConnection.cs
Expand Up @@ -103,7 +103,7 @@ private async Task ProcessSends()
await SendAsync(current, endOfMessage: true);
}

buffer.Consumed();
_output.Advance(buffer.End);
}

_output.CompleteReader();
Expand Down
13 changes: 4 additions & 9 deletions src/Channels/Channel.cs
Expand Up @@ -285,7 +285,7 @@ internal ReadableBuffer AsReadableBuffer()
return new ReadableBuffer(); // empty
}

return new ReadableBuffer(null, new ReadCursor(_writingHead, _writingHeadIndex), new ReadCursor(_writingTail, _writingTailIndex), isOwner: false);
return new ReadableBuffer(new ReadCursor(_writingHead, _writingHeadIndex), new ReadCursor(_writingTail, _writingTailIndex), isOwner: false);
}

private Task CompleteWriteAsync()
Expand Down Expand Up @@ -319,17 +319,12 @@ private ReadableBuffer Read()
throw new InvalidOperationException("Already consuming.");
}

return new ReadableBuffer(this, new ReadCursor(_head), new ReadCursor(_tail, _tail?.End ?? 0));
return new ReadableBuffer(new ReadCursor(_head), new ReadCursor(_tail, _tail?.End ?? 0));
}

internal void EndRead(ReadCursor end)
{
EndRead(end, end);
}
void IReadableChannel.Advance(ReadCursor consumed, ReadCursor examined) => AdvanceReader(consumed, examined);

internal void EndRead(
ReadCursor consumed,
ReadCursor examined)
public void AdvanceReader(ReadCursor consumed, ReadCursor examined)
{
PooledBufferSegment returnStart = null;
PooledBufferSegment returnEnd = null;
Expand Down
13 changes: 9 additions & 4 deletions src/Channels/ChannelExtensions.cs
Expand Up @@ -29,6 +29,11 @@ public static Task WriteAsync(this IWritableChannel channel, Span<byte> source)

public static class ReadableChannelExtensions
{
public static void Advance(this IReadableChannel input, ReadCursor cursor)
{
input.Advance(cursor, cursor);
}

public static ValueTask<int> ReadAsync(this IReadableChannel input, Span<byte> destination)
{
while (true)
Expand All @@ -46,7 +51,7 @@ public static ValueTask<int> ReadAsync(this IReadableChannel input, Span<byte> d
var sliced = inputBuffer.Slice(0, destination.Length);
sliced.CopyTo(destination);
int actual = sliced.Length;
inputBuffer.Consumed(sliced.End);
input.Advance(sliced.End);

if (actual != 0)
{
Expand Down Expand Up @@ -100,7 +105,7 @@ public static async Task CopyToAsync(this IReadableChannel input, Stream stream,
}
finally
{
inputBuffer.Consumed();
input.Advance(inputBuffer.End);
}
}
}
Expand Down Expand Up @@ -128,7 +133,7 @@ public static async Task CopyToAsync(this IReadableChannel input, IWritableChann
}
finally
{
inputBuffer.Consumed();
input.Advance(inputBuffer.End);
}
}
}
Expand All @@ -144,7 +149,7 @@ private static async Task<int> ReadAsyncAwaited(this IReadableChannel input, Spa
var sliced = inputBuffer.Slice(0, destination.Length);
sliced.CopyTo(destination);
int actual = sliced.Length;
inputBuffer.Consumed(sliced.End);
input.Advance(sliced.End);

if (actual != 0)
{
Expand Down
11 changes: 11 additions & 0 deletions src/Channels/IReadableChannel.cs
Expand Up @@ -20,6 +20,17 @@ public interface IReadableChannel
/// <remarks>This task indicates the producer has completed and will not write anymore data.</remarks>
Task Reading { get; }

/// <summary>
/// Moves forward the channels read cursor to after the consumed data.
/// </summary>
/// <param name="consumed">Marks the extent of the data that has been succesfully proceesed.</param>
/// <param name="examined">Marks the extent of the data that has been read and examined.</param>
/// <remarks>
/// The memory for the consumed data will be released and no longer available.
/// The examined data communicates to the channel when it should signal more data is available.
/// </remarks>
void Advance(ReadCursor consumed, ReadCursor examined);

/// <summary>
/// Signal to the producer that the consumer is done reading.
/// </summary>
Expand Down
54 changes: 6 additions & 48 deletions src/Channels/ReadableBuffer.cs
Expand Up @@ -18,7 +18,6 @@ public struct ReadableBuffer : IDisposable

private readonly PooledBufferSpan _span;
private readonly bool _isOwner;
private readonly Channel _channel;

private ReadCursor _start;
private ReadCursor _end;
Expand Down Expand Up @@ -54,15 +53,14 @@ public struct ReadableBuffer : IDisposable
/// </summary>
public ReadCursor End => _end;

internal ReadableBuffer(Channel channel, ReadCursor start, ReadCursor end) :
this(channel, start, end, isOwner: false)
internal ReadableBuffer(ReadCursor start, ReadCursor end) :
this(start, end, isOwner: false)
{

}

internal ReadableBuffer(Channel channel, ReadCursor start, ReadCursor end, bool isOwner)
internal ReadableBuffer(ReadCursor start, ReadCursor end, bool isOwner)
{
_channel = channel;
_start = start;
_end = end;
_isOwner = isOwner;
Expand All @@ -74,8 +72,6 @@ internal ReadableBuffer(Channel channel, ReadCursor start, ReadCursor end, bool

private ReadableBuffer(ref ReadableBuffer buffer)
{
_channel = buffer._channel;

var begin = buffer._start;
var end = buffer._end;

Expand Down Expand Up @@ -264,7 +260,7 @@ public ReadableBuffer Slice(int start, ReadCursor end)
/// <param name="end">The ending (inclusive) <see cref="ReadCursor"/> of the slice</param>
public ReadableBuffer Slice(ReadCursor start, ReadCursor end)
{
return new ReadableBuffer(_channel, start, end);
return new ReadableBuffer(start, end);
}

/// <summary>
Expand All @@ -283,7 +279,7 @@ public ReadableBuffer Slice(ReadCursor start, int length)
/// <param name="start">The starting (inclusive) <see cref="ReadCursor"/> at which to begin this slice.</param>
public ReadableBuffer Slice(ReadCursor start)
{
return new ReadableBuffer(_channel, start, _end);
return new ReadableBuffer(start, _end);
}

/// <summary>
Expand All @@ -294,7 +290,7 @@ public ReadableBuffer Slice(int start)
{
if (start == 0) return this;

return new ReadableBuffer(_channel, _start.Seek(start), _end);
return new ReadableBuffer(_start.Seek(start), _end);
}

/// <summary>
Expand Down Expand Up @@ -388,44 +384,6 @@ public void Dispose()
_end = default(ReadCursor);
}

private void ThrowIfNotConsumable()
{
if (_channel == null)
{
throw new InvalidOperationException("This data is not from a read operation and cannot be consumed");
}
}

/// <summary>
/// Mark the entire <see cref="ReadableBuffer"/> as consumed.
/// </summary>
public void Consumed()
{
ThrowIfNotConsumable();
_channel.EndRead(End, End);
}

/// <summary>
/// Mark up the the specified <see cref="ReadCursor"/> as consumed.
/// </summary>
/// <param name="consumed">The <see cref="ReadCursor"/> that points to the position in the <see cref="ReadableBuffer"/> up to where bytes were consumed.</param>
public void Consumed(ReadCursor consumed)
{
ThrowIfNotConsumable();
_channel.EndRead(consumed, consumed);
}

/// <summary>
/// Mark up the the specified <see cref="ReadCursor"/> as consumed.
/// </summary>
/// <param name="consumed">The <see cref="ReadCursor"/> that points to the position in the <see cref="ReadableBuffer"/> up to where bytes were consumed.</param>
/// <param name="examined">TODO</param>
public void Consumed(ReadCursor consumed, ReadCursor examined)
{
ThrowIfNotConsumable();
_channel.EndRead(consumed, examined);
}

/// <summary>
///
/// </summary>
Expand Down
11 changes: 11 additions & 0 deletions src/Channels/ReadableChannel.cs
Expand Up @@ -28,6 +28,17 @@ protected ReadableChannel(IBufferPool pool)
/// <remarks>This task indicates the producer has completed and will not write anymore data.</remarks>
public Task Reading => _channel.Reading;

/// <summary>
/// Moves forward the channels read cursor to after the consumed data.
/// </summary>
/// <param name="consumed">Marks the extent of the data that has been succesfully proceesed.</param>
/// <param name="examined">Marks the extent of the data that has been read and examined.</param>
/// <remarks>
/// The memory for the consumed data will be released and no longer available.
/// The examined data communicates to the channel when it should signal more data is available.
/// </remarks>
public void Advance(ReadCursor consumed, ReadCursor examined) => _channel.AdvanceReader(consumed, examined);

/// <summary>
/// Signal to the producer that the consumer is done reading.
/// </summary>
Expand Down
20 changes: 10 additions & 10 deletions test/Channels.Tests/SocketsFacts.cs
Expand Up @@ -67,12 +67,12 @@ public async Task CanCreateWorkingEchoServer_ChannelSocketServer_ChannelSocketCl
if (client.Input.Reading.IsCompleted)
{
reply = input.GetUtf8String();
input.Consumed();
client.Input.Advance(input.End);
break;
}
else
{
input.Consumed(input.Start, input.End);
client.Input.Advance(input.Start, input.End);
}
}
}
Expand Down Expand Up @@ -162,12 +162,12 @@ public async Task RunStressPingPongTest_Socket()
var inputBuffer = await channel.Input.ReadAsync();
if (inputBuffer.IsEmpty && channel.Input.Reading.IsCompleted)
{
inputBuffer.Consumed(inputBuffer.End);
channel.Input.Advance(inputBuffer.End);
break;
}
if (inputBuffer.Length < 4)
{
inputBuffer.Consumed(inputBuffer.Start, inputBuffer.End);
channel.Input.Advance(inputBuffer.Start, inputBuffer.End);
}
else
{
Expand All @@ -176,7 +176,7 @@ public async Task RunStressPingPongTest_Socket()
{
count++;
}
inputBuffer.Consumed(inputBuffer.End);
channel.Input.Advance(inputBuffer.End);
break;
}
}
Expand Down Expand Up @@ -207,12 +207,12 @@ private static async void PongServer(IChannel channel)
var inputBuffer = await channel.Input.ReadAsync();
if (inputBuffer.IsEmpty && channel.Input.Reading.IsCompleted)
{
inputBuffer.Consumed(inputBuffer.End);
channel.Input.Advance(inputBuffer.End);
break;
}
if (inputBuffer.Length < 4)
{
inputBuffer.Consumed(inputBuffer.Start, inputBuffer.End);
channel.Input.Advance(inputBuffer.Start, inputBuffer.End);
}
else
{
Expand All @@ -225,7 +225,7 @@ private static async void PongServer(IChannel channel)
{
break;
}
inputBuffer.Consumed(inputBuffer.End);
channel.Input.Advance(inputBuffer.End);
}
}
channel.Input.Complete();
Expand Down Expand Up @@ -271,15 +271,15 @@ private async void Echo(IChannel channel)
ReadableBuffer request = await channel.Input.ReadAsync();
if (request.IsEmpty && channel.Input.Reading.IsCompleted)
{
request.Consumed();
channel.Input.Advance(request.End);
break;
}

int len = request.Length;
var response = channel.Output.Alloc();
response.Append(ref request);
await response.FlushAsync();
request.Consumed();
channel.Input.Advance(request.End);
}
channel.Input.Complete();
channel.Output.Complete();
Expand Down