Skip to content

Commit

Permalink
Use different buffers in different components.
Browse files Browse the repository at this point in the history
  • Loading branch information
amacal committed Mar 19, 2017
1 parent d902d12 commit 77148b0
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 90 deletions.
5 changes: 4 additions & 1 deletion sources/Leak.Client.Swarm/SwarmConnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ private void StartMemory()

Memory =
new MemoryBuilder()
.WithMaxRequestSize(256 * 1024)
.WithThresholds(20 * 1024)
.Build(hooks);
}

Expand All @@ -119,7 +121,7 @@ private void StartNetwork()
.WithPipeline(Pipeline)
.WithWorker(Worker)
.WithMemory(Memory.AsNetwork())
.WithBufferSize(32 * 1024)
.WithBufferSize(256 * 1024)
.Build(hooks);

Network.Start();
Expand Down Expand Up @@ -307,6 +309,7 @@ private void StartDataStore(string destination)
.WithPipeline(Pipeline)
.WithFiles(Files)
.WithMemory(Memory.AsDataStore())
.WithBufferSize(256 * 1024)
.Build(hooks);

DataStore.Start();
Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Client/ClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static NetworkPoolMemory AsNetwork(this MemoryService service)

public static RepositoryMemory AsDataStore(this MemoryService service)
{
return new MemoryToRepository(service);
return new MemoryToDataStore(service);
}

private class MetaGetToGlueForwarder : MetagetGlue
Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Client/Leak.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<ItemGroup>
<Compile Include="MemoryToBlocks.cs" />
<Compile Include="MemoryToNetwork.cs" />
<Compile Include="MemoryToRepository.cs" />
<Compile Include="MemoryToDataStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientExtensions.cs" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

namespace Leak.Client
{
public class MemoryToRepository : RepositoryMemory
public class MemoryToDataStore : RepositoryMemory
{
private readonly MemoryService service;

public MemoryToRepository(MemoryService service)
public MemoryToDataStore(MemoryService service)
{
this.service = service;
}
Expand Down
1 change: 1 addition & 0 deletions sources/Leak.Memory/Leak.Memory.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<ItemGroup>
<Compile Include="MemoryBlock.cs" />
<Compile Include="MemoryBuilder.cs" />
<Compile Include="MemoryCollection.cs" />
<Compile Include="MemoryConfiguration.cs" />
<Compile Include="MemoryContext.cs" />
<Compile Include="MemoryDependencies.cs" />
Expand Down
10 changes: 5 additions & 5 deletions sources/Leak.Memory/MemoryBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ public class MemoryBlock : DataBlock
private readonly int start;
private readonly int count;

private readonly MemoryContext context;
private readonly MemoryCollection collection;

public MemoryBlock(byte[] data, int start, int count, MemoryContext context)
public MemoryBlock(byte[] data, int start, int count, MemoryCollection collection)
{
this.data = data;
this.start = start;
this.count = count;
this.context = context;
this.collection = collection;
}

public int Size
Expand All @@ -40,12 +40,12 @@ public void Write(DataBlockCallback callback)

public DataBlock Scope(int shift)
{
return new MemoryBlock(data, start + shift, count - shift, context);
return new MemoryBlock(data, start + shift, count - shift, collection);
}

public void Release()
{
context?.Buffer.Enqueue(data);
collection.Release(data);
}
}
}
12 changes: 12 additions & 0 deletions sources/Leak.Memory/MemoryBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ public MemoryBuilder()
configuration = new MemoryConfiguration();
}

public MemoryBuilder WithMaxRequestSize(int maxRequestSize)
{
configuration.MaxBlockSize = maxRequestSize;
return this;
}

public MemoryBuilder WithThresholds(params int[] thresholds)
{
configuration.Thresholds = thresholds;
return this;
}

public MemoryService Build()
{
return Build(new MemoryHooks());
Expand Down
86 changes: 86 additions & 0 deletions sources/Leak.Memory/MemoryCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Leak.Common;

namespace Leak.Memory
{
public class MemoryCollection
{
private readonly MemoryContext context;
private readonly object synchronizer;

private readonly SortedDictionary<int, ConcurrentQueue<byte[]>> items;

private int count;
private Size allocation;

public MemoryCollection(MemoryContext context)
{
this.count = 0;
this.allocation = new Size(0);
this.synchronizer = new object();

this.context = context;
this.items = new SortedDictionary<int, ConcurrentQueue<byte[]>>();

if (context.Configuration.Thresholds != null)
{
foreach (int threshold in context.Configuration.Thresholds)
{
items.Add(threshold, new ConcurrentQueue<byte[]>());
}
}

if (items.ContainsKey(context.Configuration.MaxBlockSize) == false)
{
items.Add(context.Configuration.MaxBlockSize, new ConcurrentQueue<byte[]>());
}
}

public MemoryBlock Allocate(int size)
{
byte[] data = null;

if (size > context.Configuration.MaxBlockSize)
{
throw new InvalidOperationException();
}

foreach (int key in items.Keys)
{
if (size <= key)
{
if (items[key].TryDequeue(out data) == false)
{
data = new byte[key];

lock (synchronizer)
{
count = count + 1;
allocation = allocation.Increase(data.Length);
}

context.CallSnapshot(count, allocation);
}

break;
}
}

return new MemoryBlock(data, 0, size, this);
}

public void Release(byte[] data)
{
foreach (int key in items.Keys)
{
if (data.Length == key)
{
items[key].Enqueue(data);
break;
}
}
}
}
}
8 changes: 4 additions & 4 deletions sources/Leak.Memory/MemoryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ public class MemoryConfiguration
{
public MemoryConfiguration()
{
Size = 32 * 1024;
Delay = TimeSpan.FromMinutes(1);
MaxBlockSize = 32 * 1024;
Thresholds = new int[0];
}

public int Size;
public TimeSpan Delay;
public int MaxBlockSize;
public int[] Thresholds;
}
}
30 changes: 5 additions & 25 deletions sources/Leak.Memory/MemoryContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using System.Collections.Concurrent;
using Leak.Common;

namespace Leak.Memory
namespace Leak.Memory
{
public class MemoryContext
{
Expand All @@ -10,10 +7,7 @@ public class MemoryContext
private readonly MemoryConfiguration configuration;
private readonly MemoryHooks hooks;

private readonly ConcurrentQueue<byte[]> buffer;

private int count;
private Size allocation;
private readonly MemoryCollection collection;

public MemoryContext(MemoryParameters parameters, MemoryDependencies dependencies, MemoryConfiguration configuration, MemoryHooks hooks)
{
Expand All @@ -22,9 +16,7 @@ public MemoryContext(MemoryParameters parameters, MemoryDependencies dependencie
this.configuration = configuration;
this.hooks = hooks;

count = 0;
allocation = new Size(0);
buffer = new ConcurrentQueue<byte[]>();
collection = new MemoryCollection(this);
}

public MemoryParameters Parameters
Expand All @@ -47,21 +39,9 @@ public MemoryHooks Hooks
get { return hooks; }
}

public ConcurrentQueue<byte[]> Buffer
{
get { return buffer; }
}

public int Count
{
get { return count; }
set { count = value; }
}

public Size Allocation
public MemoryCollection Collection
{
get { return allocation; }
set { allocation = value; }
get { return collection; }
}
}
}
40 changes: 4 additions & 36 deletions sources/Leak.Memory/MemoryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,16 @@ public DataBlock Create(byte[] data, int offset, int count)

public MemoryBlock Allocate(int size)
{
byte[] data;

if (size > context.Configuration.Size)
{
throw new InvalidOperationException();
}

if (context.Buffer.TryDequeue(out data) == false)
{
data = new byte[context.Configuration.Size];

lock (this)
{
context.Count = context.Count + 1;
context.Allocation = context.Allocation.Increase(data.Length);
}

context.CallSnapshot(context.Count, context.Allocation);
}

return new MemoryBlock(data, 0, size, context);
return context.Collection.Allocate(size);
}

public DataBlock New(int count, DataBlockCallback callback)
{
byte[] data;

if (context.Buffer.TryDequeue(out data) == false)
{
data = new byte[context.Configuration.Size];

lock (this)
{
context.Count = context.Count + 1;
context.Allocation = context.Allocation.Increase(data.Length);
}
MemoryBlock found = context.Collection.Allocate(count);

context.CallSnapshot(context.Count, context.Allocation);
}
callback?.Invoke(found.Data, 0, count);

callback?.Invoke(data, 0, count);
return new MemoryBlock(data, 0, count, context);
return found;
}
}
}
6 changes: 6 additions & 0 deletions sources/Leak.Repository/RepositoryBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public RepositoryBuilder WithMemory(RepositoryMemory memory)
return this;
}

public RepositoryBuilder WithBufferSize(int bufferSize)
{
configuration.BufferSize = bufferSize;
return this;
}

public RepositoryService Build()
{
return Build(new RepositoryHooks());
Expand Down
6 changes: 6 additions & 0 deletions sources/Leak.Repository/RepositoryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@
{
public class RepositoryConfiguration
{
public RepositoryConfiguration()
{
BufferSize = 32 * 1024;
}

public int BufferSize;
}
}
12 changes: 8 additions & 4 deletions sources/Leak.Repository/RepositoryTaskVerifyPiece.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ public RepositoryTaskVerifyPiece(PieceInfo piece)
public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted)
{
HashAlgorithm algorithm = SHA1.Create();
RepositoryMemoryBlock block = context.Dependencies.Memory.Allocate(16384);
int bufferSize = context.Configuration.BufferSize;

RepositoryMemoryBlock block = context.Dependencies.Memory.Allocate(bufferSize);
int step = block.Length / context.Metainfo.Properties.BlockSize;

context.View.Read(block.Data, piece.Index, 0, args =>
{
if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + 1))
if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + step))
{
context.Queue.Add(new Continue(piece, args, algorithm, block));
}
Expand Down Expand Up @@ -63,10 +66,11 @@ public Continue(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorith
public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted)
{
algorithm.Push(read.Buffer.Data, read.Buffer.Offset, Math.Min(read.Buffer.Count, read.Count));
int step = block.Length / context.Metainfo.Properties.BlockSize;

context.View.Read(block.Data, piece.Index, read.Block + 1, args =>
context.View.Read(block.Data, piece.Index, read.Block + step, args =>
{
if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + 1))
if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + step))
{
context.Queue.Add(new Continue(piece, args, algorithm, block));
}
Expand Down
Loading

0 comments on commit 77148b0

Please sign in to comment.