Skip to content

Commit

Permalink
add ResumableTokenStream. fixes #48
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Hodgson committed Jul 24, 2020
1 parent 4141044 commit 851dc1c
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 19 deletions.
37 changes: 37 additions & 0 deletions Pidgin.Tests/ResumableTokenStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.IO;
using Pidgin.TokenStreams;
using Xunit;

namespace Pidgin.Tests
{
public class ReusableTokenStreamTests
{
[Fact]
public void TestResume()
{
var input = "aaabb";
var stream = new ResumableTokenStream<char>(new ReaderTokenStream(new StringReader(input)));

// consume two 'a's, reject the third one
var chunk = new char[3].AsSpan();
stream.Read(chunk);
stream.OnParserEnd(chunk.Slice(2));

stream.Read(chunk);
Assert.Equal("abb", chunk.ToString());
}

[Fact]
public void TestReturnMultipleChunks()
{
var stream = new ResumableTokenStream<char>(new ReaderTokenStream(new StringReader("")));
stream.OnParserEnd("aa");
stream.OnParserEnd("bb");

var chunk = new char[4].AsSpan();
stream.Read(chunk);
Assert.Equal("bbaa", chunk.ToString());
}
}
}
4 changes: 2 additions & 2 deletions Pidgin/ParseState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private void Buffer(int readAhead)
_bufferStartLocation += keepFrom;
_currentIndex = keepSeenLength;
_bufferedCount = keepLength;
_bufferedCount += _stream!.ReadInto(_buffer.AsSpan().Slice(_bufferedCount, amountToRead));
_bufferedCount += _stream!.Read(_buffer.AsSpan().Slice(_bufferedCount, amountToRead));
}
}

Expand Down Expand Up @@ -248,10 +248,10 @@ public void Dispose()
{
if (_buffer != null)
{
_stream!.OnParserEnd(_buffer.AsSpan().Slice(_currentIndex, _bufferedCount - _currentIndex));
_arrayPool!.Return(_buffer, _needsClear);
_buffer = null;
}
_stream?.Dispose();
_bookmarks.Dispose();
}
}
Expand Down
4 changes: 1 addition & 3 deletions Pidgin/TokenStreams/EnumeratorTokenStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public EnumeratorTokenStream(IEnumerator<TToken> input)
}


public int ReadInto(Span<TToken> buffer)
public int Read(Span<TToken> buffer)
{
for (var i = 0; i < buffer.Length; i++)
{
Expand All @@ -27,7 +27,5 @@ public int ReadInto(Span<TToken> buffer)
}
return buffer.Length;
}

public void Dispose() { }
}
}
5 changes: 3 additions & 2 deletions Pidgin/TokenStreams/ITokenStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

namespace Pidgin.TokenStreams
{
internal interface ITokenStream<TToken> : IDisposable
internal interface ITokenStream<TToken>
{
int ChunkSizeHint => 1024;
int ReadInto(Span<TToken> buffer);
int Read(Span<TToken> buffer);
void OnParserEnd(ReadOnlySpan<TToken> unconsumed) { }
}
}
4 changes: 1 addition & 3 deletions Pidgin/TokenStreams/ListTokenStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public ListTokenStream(IList<TToken> value)
_input = value;
}

public int ReadInto(Span<TToken> buffer)
public int Read(Span<TToken> buffer)
{
var actualLength = Math.Min(_input.Count - _index, buffer.Length);
for (var i = 0; i < actualLength; i++)
Expand All @@ -26,7 +26,5 @@ public int ReadInto(Span<TToken> buffer)
}
return actualLength;
}

public void Dispose() { }
}
}
4 changes: 1 addition & 3 deletions Pidgin/TokenStreams/ReadOnlyListTokenStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public ReadOnlyListTokenStream(IReadOnlyList<TToken> value)
_input = value;
}

public int ReadInto(Span<TToken> buffer)
public int Read(Span<TToken> buffer)
{
var actualLength = Math.Min(_input.Count - _index, buffer.Length);
for (var i = 0; i < actualLength; i++)
Expand All @@ -26,7 +26,5 @@ public int ReadInto(Span<TToken> buffer)
}
return actualLength;
}

public void Dispose() { }
}
}
4 changes: 1 addition & 3 deletions Pidgin/TokenStreams/ReaderTokenStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ public ReaderTokenStream(TextReader input)
_input = input;
}

public int ReadInto(Span<char> buffer) => _input.Read(buffer);

public void Dispose() { }
public int Read(Span<char> buffer) => _input.Read(buffer);
}
}
73 changes: 73 additions & 0 deletions Pidgin/TokenStreams/ResumableTokenStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;

namespace Pidgin.TokenStreams
{
internal class ResumableTokenStream<TToken> : ITokenStream<TToken>, IDisposable
{
private static readonly bool _needsClear = RuntimeHelpers.IsReferenceOrContainsReferences<TToken>();
private readonly ArrayPool<TToken> _pool;
private readonly ITokenStream<TToken> _next;
private TToken[]? _buffer = null;
private int _bufferStart = 0; // amount of empty space at left-hand end of _buffer, aka index of first value

public ResumableTokenStream(ITokenStream<TToken> next, ArrayPool<TToken>? pool = null)
{
if (next == null)
{
throw new ArgumentNullException(nameof(next));
}
_next = next;
_pool = pool ?? ArrayPool<TToken>.Shared;
}

public int Read(Span<TToken> buffer)
{
var bufferedCount = 0;
if (_buffer != null && _bufferStart < _buffer.Length)
{
bufferedCount = Math.Min(_buffer.Length - _bufferStart, buffer.Length);
_buffer.AsSpan().Slice(_bufferStart, bufferedCount).CopyTo(buffer);
_bufferStart += bufferedCount;
}
return bufferedCount + _next.Read(buffer.Slice(bufferedCount));
}

public void OnParserEnd(ReadOnlySpan<TToken> unconsumed)
{
if (unconsumed.Length == 0)
{
return;
}
if (_buffer == null)
{
_buffer = _pool.Rent(unconsumed.Length);
_bufferStart = _buffer.Length;
}
if (_bufferStart < unconsumed.Length)
{
var bufferedCount = _buffer.Length - _bufferStart;
var newBuffer = _pool.Rent(bufferedCount + unconsumed.Length);
var newBufferStart = newBuffer.Length - bufferedCount;

Array.Copy(_buffer, _bufferStart, newBuffer, newBufferStart, bufferedCount);

_pool.Return(_buffer, _needsClear);
_buffer = newBuffer;
_bufferStart = newBufferStart;
}
_bufferStart -= unconsumed.Length;
unconsumed.CopyTo(_buffer.AsSpan().Slice(_bufferStart));
}

public void Dispose()
{
if (_buffer != null)
{
_pool.Return(_buffer, _needsClear);
_bufferStart = 0;
}
}
}
}
4 changes: 1 addition & 3 deletions Pidgin/TokenStreams/StreamTokenStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ public StreamTokenStream(Stream input)
_input = input;
}

public int ReadInto(Span<byte> buffer) => _input.Read(buffer);

public void Dispose() { }
public int Read(Span<byte> buffer) => _input.Read(buffer);
}
}

0 comments on commit 851dc1c

Please sign in to comment.