Skip to content

Commit

Permalink
Don't pool buffer objects (#518)
Browse files Browse the repository at this point in the history
* wip

* fixing async stream->buffer
  • Loading branch information
nayato authored Oct 21, 2019
1 parent a9d0723 commit abda435
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 37 deletions.
17 changes: 1 addition & 16 deletions src/DotNetty.Buffers/PooledByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ namespace DotNetty.Buffers
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using DotNetty.Common;
using DotNetty.Common.Utilities;

abstract class PooledByteBuffer<T> : AbstractReferenceCountedByteBuffer
{
readonly ThreadLocalPool.Handle recyclerHandle;

protected internal PoolChunk<T> Chunk;
protected internal long Handle;
protected internal T Memory;
Expand All @@ -22,10 +19,9 @@ abstract class PooledByteBuffer<T> : AbstractReferenceCountedByteBuffer
internal PoolThreadCache<T> Cache;
PooledByteBufferAllocator allocator;

protected PooledByteBuffer(ThreadLocalPool.Handle recyclerHandle, int maxCapacity)
protected PooledByteBuffer(int maxCapacity)
: base(maxCapacity)
{
this.recyclerHandle = recyclerHandle;
}

internal virtual void Init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache<T> cache) =>
Expand All @@ -51,14 +47,6 @@ void Init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLengt
/**
* Method must be called before reuse this {@link PooledByteBufAllocator}
*/
internal void Reuse(int maxCapacity)
{
this.SetMaxCapacity(maxCapacity);
this.SetReferenceCount(1);
this.SetIndex0(0, 0);
this.DiscardMarks();
}

public override int Capacity
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -143,12 +131,9 @@ protected internal sealed override void Deallocate()
this.Memory = default(T);
this.Chunk.Arena.Free(this.Chunk, handle, this.MaxLength, this.Cache);
this.Chunk = null;
this.Recycle();
}
}

void Recycle() => this.recyclerHandle.Release(this);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected int Idx(int index) => this.Offset + index;
}
Expand Down
16 changes: 8 additions & 8 deletions src/DotNetty.Buffers/PooledHeapByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,17 @@ namespace DotNetty.Buffers
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common;
using DotNetty.Common.Internal;

sealed class PooledHeapByteBuffer : PooledByteBuffer<byte[]>
{
static readonly ThreadLocalPool<PooledHeapByteBuffer> Recycler = new ThreadLocalPool<PooledHeapByteBuffer>(handle => new PooledHeapByteBuffer(handle, 0));

internal static PooledHeapByteBuffer NewInstance(int maxCapacity)
{
PooledHeapByteBuffer buf = Recycler.Take();
buf.Reuse(maxCapacity);
return buf;
return new PooledHeapByteBuffer(maxCapacity);
}

internal PooledHeapByteBuffer(ThreadLocalPool.Handle recyclerHandle, int maxCapacity)
: base(recyclerHandle, maxCapacity)
internal PooledHeapByteBuffer(int maxCapacity)
: base(maxCapacity)
{
}

Expand Down Expand Up @@ -108,6 +103,11 @@ public override IByteBuffer SetBytes(int index, IByteBuffer src, int srcIndex, i

public override async Task<int> SetBytesAsync(int index, Stream src, int length, CancellationToken cancellationToken)
{
if (length == 0)
{
return 0;
}

int readTotal = 0;
int read;
int offset = this.ArrayOffset + index;
Expand Down
22 changes: 12 additions & 10 deletions src/DotNetty.Buffers/PooledUnsafeDirectByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,18 @@ namespace DotNetty.Buffers
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common;

sealed unsafe class PooledUnsafeDirectByteBuffer : PooledByteBuffer<byte[]>
{
static readonly ThreadLocalPool<PooledUnsafeDirectByteBuffer> Recycler = new ThreadLocalPool<PooledUnsafeDirectByteBuffer>(handle => new PooledUnsafeDirectByteBuffer(handle, 0));

byte* memoryAddress;

internal static PooledUnsafeDirectByteBuffer NewInstance(int maxCapacity)
{
PooledUnsafeDirectByteBuffer buf = Recycler.Take();
buf.Reuse(maxCapacity);
return buf;
return new PooledUnsafeDirectByteBuffer(maxCapacity);
}

PooledUnsafeDirectByteBuffer(ThreadLocalPool.Handle recyclerHandle, int maxCapacity)
: base(recyclerHandle, maxCapacity)
PooledUnsafeDirectByteBuffer(int maxCapacity)
: base(maxCapacity)
{
}

Expand All @@ -48,6 +43,14 @@ void InitMemoryAddress()

public override bool IsDirect => true;

internal void Reuse(int maxCapacity)
{
this.SetMaxCapacity(maxCapacity);
this.SetReferenceCount(1);
this.SetIndex0(0, 0);
this.DiscardMarks();
}

protected internal override byte _GetByte(int index) => *(this.memoryAddress + index);

protected internal override short _GetShort(int index) => UnsafeByteBufferUtil.GetShort(this.Addr(index));
Expand Down Expand Up @@ -121,8 +124,7 @@ public override IByteBuffer SetBytes(int index, byte[] src, int srcIndex, int le
public override Task<int> SetBytesAsync(int index, Stream src, int length, CancellationToken cancellationToken)
{
this.CheckIndex(index, length);
int read = UnsafeByteBufferUtil.SetBytes(this, this.Addr(index), index, src, length);
return Task.FromResult(read);
return UnsafeByteBufferUtil.SetBytesAsync(this, this.Addr(index), index, src, length, cancellationToken);
}

public override IByteBuffer Copy(int index, int length)
Expand Down
6 changes: 3 additions & 3 deletions src/DotNetty.Buffers/UnpooledUnsafeDirectByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ namespace DotNetty.Buffers
using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal;
using DotNetty.Common.Utilities;

public unsafe class UnpooledUnsafeDirectByteBuffer : AbstractReferenceCountedByteBuffer
{
Expand Down Expand Up @@ -294,10 +296,8 @@ public override IByteBuffer GetBytes(int index, Stream output, int length)
public override Task<int> SetBytesAsync(int index, Stream src, int length, CancellationToken cancellationToken)
{
this.CheckIndex(index, length);
int read;
fixed (byte* addr = &this.Addr(index))
read = UnsafeByteBufferUtil.SetBytes(this, addr, index, src, length);
return Task.FromResult(read);
return UnsafeByteBufferUtil.SetBytesAsync(this, addr, index, src, length, cancellationToken);
}

public override int IoBufferCount => 1;
Expand Down
34 changes: 34 additions & 0 deletions src/DotNetty.Buffers/UnsafeByteBufferUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ namespace DotNetty.Buffers
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal;
using DotNetty.Common.Utilities;

static unsafe class UnsafeByteBufferUtil
{
Expand Down Expand Up @@ -214,6 +217,11 @@ internal static IByteBuffer Copy(AbstractByteBuffer buf, byte* addr, int index,

internal static int SetBytes(AbstractByteBuffer buf, byte* addr, int index, Stream input, int length)
{
if (length == 0)
{
return 0;
}

IByteBuffer tmpBuf = buf.Allocator.HeapBuffer(length);
try
{
Expand All @@ -233,6 +241,32 @@ internal static int SetBytes(AbstractByteBuffer buf, byte* addr, int index, Stre
}
}

internal static Task<int> SetBytesAsync(AbstractByteBuffer buf, byte* addr, int index, Stream input, int length, CancellationToken cancellationToken)
{
if (length == 0)
{
return TaskEx.Zero;
}

IByteBuffer tmpBuf = buf.Allocator.HeapBuffer(length);
return tmpBuf.SetBytesAsync(0, input, length, cancellationToken)
.ContinueWith(t => {
try
{
var read = t.Result;
if (read > 0)
{
PlatformDependent.CopyMemory(tmpBuf.Array, tmpBuf.ArrayOffset, addr, read);
}
return read;
}
finally
{
tmpBuf.Release();
}
});
}

internal static void GetBytes(AbstractByteBuffer buf, byte* addr, int index, IByteBuffer dst, int dstIndex, int length)
{
Contract.Requires(dst != null);
Expand Down

0 comments on commit abda435

Please sign in to comment.