Skip to content

Commit

Permalink
Use memory-service in the data-store.
Browse files Browse the repository at this point in the history
  • Loading branch information
amacal committed Mar 19, 2017
1 parent b5dc728 commit d902d12
Show file tree
Hide file tree
Showing 23 changed files with 149 additions and 44 deletions.
2 changes: 1 addition & 1 deletion sources/Leak.Client.Peer/PeerConnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void OnHandshakeCompleted(HandshakeCompleted data)
Glue =
new GlueBuilder()
.WithHash(Hash)
.WithBlocks(Blocks)
.WithMemory(Blocks)
.WithPipeline(Pipeline)
.WithPlugin(new MetadataPlugin(metadata))
.Build(new GlueHooks
Expand Down
6 changes: 4 additions & 2 deletions sources/Leak.Client.Swarm/SwarmConnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private void StartNetwork()
.WithPipeline(Pipeline)
.WithWorker(Worker)
.WithMemory(Memory.AsNetwork())
.WithBufferSize(32 * 1024)
.Build(hooks);

Network.Start();
Expand Down Expand Up @@ -182,7 +183,7 @@ private void StartGlue()
Glue =
new GlueBuilder()
.WithHash(Hash)
.WithBlocks(Memory)
.WithMemory(Memory)
.WithPipeline(Pipeline)
.WithMetadata(Settings, metadata)
.WithExchange(Settings, exchange)
Expand All @@ -207,7 +208,7 @@ private void StartDataMap()
.WithHash(Hash)
.WithPipeline(Pipeline)
.WithSchedulerThreshold(160)
.WithPoolSize(256)
.WithPoolSize(512)
.Build(hooks);

DataMap.Start();
Expand Down Expand Up @@ -305,6 +306,7 @@ private void StartDataStore(string destination)
.WithDestination(Path.Combine(destination, Hash.ToString()))
.WithPipeline(Pipeline)
.WithFiles(Files)
.WithMemory(Memory.AsDataStore())
.Build(hooks);

DataStore.Start();
Expand Down
7 changes: 6 additions & 1 deletion sources/Leak.Client/ClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ public static RetrieverOmnibus AsDataGet(this OmnibusService service)

public static NetworkPoolMemory AsNetwork(this MemoryService service)
{
return new NetworkToMemory(service);
return new MemoryToNetwork(service);
}

public static RepositoryMemory AsDataStore(this MemoryService service)
{
return new MemoryToRepository(service);
}

private class MetaGetToGlueForwarder : MetagetGlue
Expand Down
4 changes: 3 additions & 1 deletion sources/Leak.Client/Leak.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<Compile Include="NetworkToMemory.cs" />
<Compile Include="MemoryToBlocks.cs" />
<Compile Include="MemoryToNetwork.cs" />
<Compile Include="MemoryToRepository.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientExtensions.cs" />
</ItemGroup>
Expand Down
6 changes: 6 additions & 0 deletions sources/Leak.Client/MemoryToBlocks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Leak.Client
{
public class MemoryToBlocks
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

namespace Leak.Client
{
public class NetworkToMemory : NetworkPoolMemory
public class MemoryToNetwork : NetworkPoolMemory
{
private readonly MemoryService service;

public NetworkToMemory(MemoryService service)
public MemoryToNetwork(MemoryService service)
{
this.service = service;
}
Expand Down
45 changes: 45 additions & 0 deletions sources/Leak.Client/MemoryToRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Leak.Data.Store;
using Leak.Memory;

namespace Leak.Client
{
public class MemoryToRepository : RepositoryMemory
{
private readonly MemoryService service;

public MemoryToRepository(MemoryService service)
{
this.service = service;
}

public RepositoryMemoryBlock Allocate(int size)
{
return new Block(service.Allocate(size));
}

private class Block : RepositoryMemoryBlock
{
private readonly MemoryBlock inner;

public Block(MemoryBlock inner)
{
this.inner = inner;
}

public byte[] Data
{
get { return inner.Data; }
}

public int Length
{
get { return inner.Data.Length; }
}

public void Release()
{
inner.Release();
}
}
}
}
2 changes: 1 addition & 1 deletion sources/Leak.Datashare.Tests/DatashareFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public DatashareSession Start()
GlueService glue =
new GlueBuilder()
.WithHash(metainfo.Hash)
.WithBlocks(new MemoryBuilder().Build())
.WithMemory(new MemoryBuilder().Build())
.Build();

RepositoryService repository =
Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Extensions.Metadata.Tests/MetadataSide.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MetadataInstance Build()
GlueService service =
new GlueBuilder()
.WithHash(handshake.Hash)
.WithBlocks(new MemoryBuilder().Build())
.WithMemory(new MemoryBuilder().Build())
.WithPlugin(new MetadataPlugin(metadata))
.Build(hooks);

Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Extensions.Peers.Tests/PeersSide.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public PeersInstance Build()
GlueService service =
new GlueBuilder()
.WithHash(handshake.Hash)
.WithBlocks(new MemoryBuilder().Build())
.WithMemory(new MemoryBuilder().Build())
.WithPlugin(new PeersPlugin(peers))
.Build(hooks);

Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Glue.Tests/GlueSide.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public GlueInstance Build(params string[] plugins)
GlueService service =
builder
.WithHash(hash)
.WithBlocks(new MemoryBuilder().Build())
.WithMemory(new MemoryBuilder().Build())
.Build(hooks);

return new GlueInstance(service);
Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Glue/GlueBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public GlueBuilder WithHash(FileHash hash)
return this;
}

public GlueBuilder WithBlocks(DataBlockFactory blocks)
public GlueBuilder WithMemory(DataBlockFactory blocks)
{
dependencies.Blocks = blocks;
return this;
Expand Down
2 changes: 1 addition & 1 deletion sources/Leak.Metashare.Tests/MetashareFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public MetashareSession Start()
GlueService glue =
new GlueBuilder()
.WithHash(metainfo.Hash)
.WithBlocks(new MemoryBuilder().Build())
.WithMemory(new MemoryBuilder().Build())
.WithPlugin(new MetadataPlugin(new MetadataHooks()))
.Build();

Expand Down
6 changes: 6 additions & 0 deletions sources/Leak.Networking/NetworkPoolBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public NetworkPoolBuilder WithMemory(NetworkPoolMemory poolMemory)
return this;
}

public NetworkPoolBuilder WithBufferSize(int bufferSize)
{
configuration.BufferSize = bufferSize;
return this;
}

public NetworkPool Build(NetworkPoolHooks hooks)
{
return new NetworkPoolInstance(dependencies, configuration, hooks);
Expand Down
2 changes: 2 additions & 0 deletions sources/Leak.Repository/Leak.Data.Store.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RepositoryAllocation.cs" />
<Compile Include="RepositoryAllocationRange.cs" />
<Compile Include="RepositoryMemoryBlock.cs" />
<Compile Include="RepositoryBuilder.cs" />
<Compile Include="RepositoryConfiguration.cs" />
<Compile Include="RepositoryContext.cs" />
<Compile Include="RepositoryDependencies.cs" />
<Compile Include="RepositoryExtensions.cs" />
<Compile Include="RepositoryHooks.cs" />
<Compile Include="RepositoryMemory.cs" />
<Compile Include="RepositoryParameters.cs" />
<Compile Include="RepositoryService.cs" />
<Compile Include="RepositoryTask.cs" />
Expand Down
6 changes: 6 additions & 0 deletions sources/Leak.Repository/RepositoryBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public RepositoryBuilder WithPipeline(PipelineService pipeline)
return this;
}

public RepositoryBuilder WithMemory(RepositoryMemory memory)
{
dependencies.Memory = memory;
return this;
}

public RepositoryService Build()
{
return Build(new RepositoryHooks());
Expand Down
5 changes: 0 additions & 5 deletions sources/Leak.Repository/RepositoryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public BitfileService Bitfile
get { return bitfile; }
}

public byte[] Buffer
{
get { return buffer; }
}

public RepositoryView View
{
get { return view; }
Expand Down
5 changes: 2 additions & 3 deletions sources/Leak.Repository/RepositoryDependencies.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Leak.Common;
using Leak.Files;
using Leak.Files;
using Leak.Tasks;

namespace Leak.Data.Store
Expand All @@ -8,7 +7,7 @@ public class RepositoryDependencies
{
public FileFactory Files;

public DataBlockFactory Blocks;
public RepositoryMemory Memory;

public PipelineService Pipeline;
}
Expand Down
7 changes: 7 additions & 0 deletions sources/Leak.Repository/RepositoryMemory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Leak.Data.Store
{
public interface RepositoryMemory
{
RepositoryMemoryBlock Allocate(int size);
}
}
11 changes: 11 additions & 0 deletions sources/Leak.Repository/RepositoryMemoryBlock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Leak.Data.Store
{
public interface RepositoryMemoryBlock
{
byte[] Data { get; }

int Length { get; }

void Release();
}
}
29 changes: 18 additions & 11 deletions sources/Leak.Repository/RepositoryTaskVerifyPiece.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,29 @@ public RepositoryTaskVerifyPiece(PieceInfo piece)
public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted)
{
HashAlgorithm algorithm = SHA1.Create();
RepositoryMemoryBlock block = context.Dependencies.Memory.Allocate(16384);

context.View.Read(context.Buffer, piece.Index, 0, args =>
context.View.Read(block.Data, piece.Index, 0, args =>
{
if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + 1))
{
context.Queue.Add(new Continue(piece, args, algorithm));
context.Queue.Add(new Continue(piece, args, algorithm, block));
}
else
{
context.Queue.Add(new Complete(piece, args, algorithm));
context.Queue.Add(new Complete(piece, args, algorithm, block));
}
});
}

public bool CanExecute(RepositoryTaskQueue queue)
{
return queue.IsBlocked("all") == false;
return queue.IsBlocked(piece) == false;
}

public void Block(RepositoryTaskQueue queue)
{
queue.Block("all");
queue.Block(piece);
}

public void Release(RepositoryTaskQueue queue)
Expand All @@ -49,27 +50,29 @@ private class Continue : RepositoryTask
private readonly PieceInfo piece;
private readonly RepositoryViewRead read;
private readonly HashAlgorithm algorithm;
private readonly RepositoryMemoryBlock block;

public Continue(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm)
public Continue(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm, RepositoryMemoryBlock block)
{
this.piece = piece;
this.read = read;
this.algorithm = algorithm;
this.block = block;
}

public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted)
{
algorithm.Push(read.Buffer.Data, read.Buffer.Offset, Math.Min(read.Buffer.Count, read.Count));

context.View.Read(context.Buffer, piece.Index, read.Block + 1, args =>
context.View.Read(block.Data, piece.Index, read.Block + 1, args =>
{
if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + 1))
{
context.Queue.Add(new Continue(piece, args, algorithm));
context.Queue.Add(new Continue(piece, args, algorithm, block));
}
else
{
context.Queue.Add(new Complete(piece, args, algorithm));
context.Queue.Add(new Complete(piece, args, algorithm, block));
}
});
}
Expand All @@ -93,12 +96,14 @@ private class Complete : RepositoryTask
private readonly PieceInfo piece;
private readonly RepositoryViewRead read;
private readonly HashAlgorithm algorithm;
private readonly RepositoryMemoryBlock block;

public Complete(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm)
public Complete(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm, RepositoryMemoryBlock block)
{
this.piece = piece;
this.read = read;
this.algorithm = algorithm;
this.block = block;
}

public bool CanExecute(RepositoryTaskQueue queue)
Expand All @@ -120,6 +125,8 @@ public void Execute(RepositoryContext context, RepositoryTaskCallback onComplete
RejectIfRequired(context, result);

algorithm.Dispose();
block.Release();

onCompleted.Invoke(this);
}

Expand All @@ -129,7 +136,7 @@ public void Block(RepositoryTaskQueue queue)

public void Release(RepositoryTaskQueue queue)
{
queue.Release("all");
queue.Release(piece);
}

private void AcceptIfRequired(RepositoryContext context, bool valid)
Expand Down
Loading

0 comments on commit d902d12

Please sign in to comment.