Skip to content

Commit

Permalink
Allocate buffer once per EventSourceStreamReader lifetime.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidfowl committed Jul 7, 2012
1 parent 7f0fd4d commit 6771b65
Showing 1 changed file with 19 additions and 11 deletions.
Expand Up @@ -16,6 +16,8 @@ public class EventSourceStreamReader
private readonly Stream _stream;
private readonly ChunkBuffer _buffer;
private readonly object _lockObj = new object();
private byte[] _readBuffer;


private int _reading;
private Action _setOpened;
Expand Down Expand Up @@ -66,6 +68,11 @@ public void Start()
OnOpened();
};

if (_readBuffer == null)
{
_readBuffer = new byte[4096];
}

// Start the process loop
Process();
}
Expand All @@ -88,9 +95,7 @@ private void Process()
return;
}

var buffer = new byte[4096];

Task<int> readTask = _stream.ReadAsync(buffer);
Task<int> readTask = _stream.ReadAsync(_readBuffer);

if (readTask.IsCompleted)
{
Expand All @@ -101,7 +106,7 @@ private void Process()

int read = readTask.Result;

if (TryProcessRead(buffer, read))
if (TryProcessRead(read))
{
goto Read;
}
Expand All @@ -113,31 +118,31 @@ private void Process()
}
else
{
ReadAsync(readTask, buffer);
ReadAsync(readTask);
}
}

private void ReadAsync(Task<int> readTask, byte[] buffer)
private void ReadAsync(Task<int> readTask)
{
readTask.Catch(ex => Close(ex))
.Then(read =>
{
if (TryProcessRead(buffer, read))
if (TryProcessRead(read))
{
Process();
}
})
.Catch();
}

private bool TryProcessRead(byte[] buffer, int read)
private bool TryProcessRead(int read)
{
Interlocked.Exchange(ref _setOpened, () => { }).Invoke();

if (read > 0)
{
// Put chunks in the buffer
ProcessBuffer(buffer, read);
ProcessBuffer(read);

return true;
}
Expand All @@ -149,11 +154,11 @@ private bool TryProcessRead(byte[] buffer, int read)
return false;
}

private void ProcessBuffer(byte[] buffer, int read)
private void ProcessBuffer(int read)
{
lock (_lockObj)
{
_buffer.Add(buffer, read);
_buffer.Add(_readBuffer, read);

while (_buffer.HasChunks)
{
Expand Down Expand Up @@ -192,6 +197,9 @@ private void Close(Exception exception)

Closed(exception);
}

// Release the buffer
_readBuffer = null;
}
}

Expand Down

0 comments on commit 6771b65

Please sign in to comment.