diff --git a/sources/Leak.Client.Peer/PeerConnect.cs b/sources/Leak.Client.Peer/PeerConnect.cs index e5c8cc2a..fff2588c 100644 --- a/sources/Leak.Client.Peer/PeerConnect.cs +++ b/sources/Leak.Client.Peer/PeerConnect.cs @@ -175,7 +175,7 @@ public void OnHandshakeCompleted(HandshakeCompleted data) Glue = new GlueBuilder() .WithHash(Hash) - .WithBlocks(Blocks) + .WithMemory(Blocks) .WithPipeline(Pipeline) .WithPlugin(new MetadataPlugin(metadata)) .Build(new GlueHooks diff --git a/sources/Leak.Client.Swarm/SwarmConnect.cs b/sources/Leak.Client.Swarm/SwarmConnect.cs index e09d854b..b921d605 100644 --- a/sources/Leak.Client.Swarm/SwarmConnect.cs +++ b/sources/Leak.Client.Swarm/SwarmConnect.cs @@ -119,6 +119,7 @@ private void StartNetwork() .WithPipeline(Pipeline) .WithWorker(Worker) .WithMemory(Memory.AsNetwork()) + .WithBufferSize(32 * 1024) .Build(hooks); Network.Start(); @@ -182,7 +183,7 @@ private void StartGlue() Glue = new GlueBuilder() .WithHash(Hash) - .WithBlocks(Memory) + .WithMemory(Memory) .WithPipeline(Pipeline) .WithMetadata(Settings, metadata) .WithExchange(Settings, exchange) @@ -207,7 +208,7 @@ private void StartDataMap() .WithHash(Hash) .WithPipeline(Pipeline) .WithSchedulerThreshold(160) - .WithPoolSize(256) + .WithPoolSize(512) .Build(hooks); DataMap.Start(); @@ -305,6 +306,7 @@ private void StartDataStore(string destination) .WithDestination(Path.Combine(destination, Hash.ToString())) .WithPipeline(Pipeline) .WithFiles(Files) + .WithMemory(Memory.AsDataStore()) .Build(hooks); DataStore.Start(); diff --git a/sources/Leak.Client/ClientExtensions.cs b/sources/Leak.Client/ClientExtensions.cs index 3e32dd78..c10addd0 100644 --- a/sources/Leak.Client/ClientExtensions.cs +++ b/sources/Leak.Client/ClientExtensions.cs @@ -42,7 +42,12 @@ public static RetrieverOmnibus AsDataGet(this OmnibusService service) public static NetworkPoolMemory AsNetwork(this MemoryService service) { - return new NetworkToMemory(service); + return new MemoryToNetwork(service); + } + + public static RepositoryMemory AsDataStore(this MemoryService service) + { + return new MemoryToRepository(service); } private class MetaGetToGlueForwarder : MetagetGlue diff --git a/sources/Leak.Client/Leak.Client.csproj b/sources/Leak.Client/Leak.Client.csproj index d7f43acc..1c60cda0 100644 --- a/sources/Leak.Client/Leak.Client.csproj +++ b/sources/Leak.Client/Leak.Client.csproj @@ -33,7 +33,9 @@ - + + + diff --git a/sources/Leak.Client/MemoryToBlocks.cs b/sources/Leak.Client/MemoryToBlocks.cs new file mode 100644 index 00000000..e54f02e4 --- /dev/null +++ b/sources/Leak.Client/MemoryToBlocks.cs @@ -0,0 +1,6 @@ +namespace Leak.Client +{ + public class MemoryToBlocks + { + } +} \ No newline at end of file diff --git a/sources/Leak.Client/NetworkToMemory.cs b/sources/Leak.Client/MemoryToNetwork.cs similarity index 88% rename from sources/Leak.Client/NetworkToMemory.cs rename to sources/Leak.Client/MemoryToNetwork.cs index 0bbbe243..1ac97e37 100644 --- a/sources/Leak.Client/NetworkToMemory.cs +++ b/sources/Leak.Client/MemoryToNetwork.cs @@ -3,11 +3,11 @@ namespace Leak.Client { - public class NetworkToMemory : NetworkPoolMemory + public class MemoryToNetwork : NetworkPoolMemory { private readonly MemoryService service; - public NetworkToMemory(MemoryService service) + public MemoryToNetwork(MemoryService service) { this.service = service; } diff --git a/sources/Leak.Client/MemoryToRepository.cs b/sources/Leak.Client/MemoryToRepository.cs new file mode 100644 index 00000000..8f339f9f --- /dev/null +++ b/sources/Leak.Client/MemoryToRepository.cs @@ -0,0 +1,45 @@ +using Leak.Data.Store; +using Leak.Memory; + +namespace Leak.Client +{ + public class MemoryToRepository : RepositoryMemory + { + private readonly MemoryService service; + + public MemoryToRepository(MemoryService service) + { + this.service = service; + } + + public RepositoryMemoryBlock Allocate(int size) + { + return new Block(service.Allocate(size)); + } + + private class Block : RepositoryMemoryBlock + { + private readonly MemoryBlock inner; + + public Block(MemoryBlock inner) + { + this.inner = inner; + } + + public byte[] Data + { + get { return inner.Data; } + } + + public int Length + { + get { return inner.Data.Length; } + } + + public void Release() + { + inner.Release(); + } + } + } +} \ No newline at end of file diff --git a/sources/Leak.Datashare.Tests/DatashareFixture.cs b/sources/Leak.Datashare.Tests/DatashareFixture.cs index 4161af8e..b44b6085 100644 --- a/sources/Leak.Datashare.Tests/DatashareFixture.cs +++ b/sources/Leak.Datashare.Tests/DatashareFixture.cs @@ -46,7 +46,7 @@ public DatashareSession Start() GlueService glue = new GlueBuilder() .WithHash(metainfo.Hash) - .WithBlocks(new MemoryBuilder().Build()) + .WithMemory(new MemoryBuilder().Build()) .Build(); RepositoryService repository = diff --git a/sources/Leak.Extensions.Metadata.Tests/MetadataSide.cs b/sources/Leak.Extensions.Metadata.Tests/MetadataSide.cs index 21fd5ecd..2aaec386 100644 --- a/sources/Leak.Extensions.Metadata.Tests/MetadataSide.cs +++ b/sources/Leak.Extensions.Metadata.Tests/MetadataSide.cs @@ -36,7 +36,7 @@ public MetadataInstance Build() GlueService service = new GlueBuilder() .WithHash(handshake.Hash) - .WithBlocks(new MemoryBuilder().Build()) + .WithMemory(new MemoryBuilder().Build()) .WithPlugin(new MetadataPlugin(metadata)) .Build(hooks); diff --git a/sources/Leak.Extensions.Peers.Tests/PeersSide.cs b/sources/Leak.Extensions.Peers.Tests/PeersSide.cs index ab52ac75..6d2e507d 100644 --- a/sources/Leak.Extensions.Peers.Tests/PeersSide.cs +++ b/sources/Leak.Extensions.Peers.Tests/PeersSide.cs @@ -36,7 +36,7 @@ public PeersInstance Build() GlueService service = new GlueBuilder() .WithHash(handshake.Hash) - .WithBlocks(new MemoryBuilder().Build()) + .WithMemory(new MemoryBuilder().Build()) .WithPlugin(new PeersPlugin(peers)) .Build(hooks); diff --git a/sources/Leak.Glue.Tests/GlueSide.cs b/sources/Leak.Glue.Tests/GlueSide.cs index 912179e2..476e729f 100644 --- a/sources/Leak.Glue.Tests/GlueSide.cs +++ b/sources/Leak.Glue.Tests/GlueSide.cs @@ -40,7 +40,7 @@ public GlueInstance Build(params string[] plugins) GlueService service = builder .WithHash(hash) - .WithBlocks(new MemoryBuilder().Build()) + .WithMemory(new MemoryBuilder().Build()) .Build(hooks); return new GlueInstance(service); diff --git a/sources/Leak.Glue/GlueBuilder.cs b/sources/Leak.Glue/GlueBuilder.cs index 5aca761a..4d8ed0f0 100644 --- a/sources/Leak.Glue/GlueBuilder.cs +++ b/sources/Leak.Glue/GlueBuilder.cs @@ -23,7 +23,7 @@ public GlueBuilder WithHash(FileHash hash) return this; } - public GlueBuilder WithBlocks(DataBlockFactory blocks) + public GlueBuilder WithMemory(DataBlockFactory blocks) { dependencies.Blocks = blocks; return this; diff --git a/sources/Leak.Metashare.Tests/MetashareFixture.cs b/sources/Leak.Metashare.Tests/MetashareFixture.cs index 5131c3eb..2428926d 100644 --- a/sources/Leak.Metashare.Tests/MetashareFixture.cs +++ b/sources/Leak.Metashare.Tests/MetashareFixture.cs @@ -56,7 +56,7 @@ public MetashareSession Start() GlueService glue = new GlueBuilder() .WithHash(metainfo.Hash) - .WithBlocks(new MemoryBuilder().Build()) + .WithMemory(new MemoryBuilder().Build()) .WithPlugin(new MetadataPlugin(new MetadataHooks())) .Build(); diff --git a/sources/Leak.Networking/NetworkPoolBuilder.cs b/sources/Leak.Networking/NetworkPoolBuilder.cs index 06e58d90..f5a8de23 100644 --- a/sources/Leak.Networking/NetworkPoolBuilder.cs +++ b/sources/Leak.Networking/NetworkPoolBuilder.cs @@ -37,6 +37,12 @@ public NetworkPoolBuilder WithMemory(NetworkPoolMemory poolMemory) return this; } + public NetworkPoolBuilder WithBufferSize(int bufferSize) + { + configuration.BufferSize = bufferSize; + return this; + } + public NetworkPool Build(NetworkPoolHooks hooks) { return new NetworkPoolInstance(dependencies, configuration, hooks); diff --git a/sources/Leak.Repository/Leak.Data.Store.csproj b/sources/Leak.Repository/Leak.Data.Store.csproj index d8832b0b..55f3eac0 100644 --- a/sources/Leak.Repository/Leak.Data.Store.csproj +++ b/sources/Leak.Repository/Leak.Data.Store.csproj @@ -40,12 +40,14 @@ + + diff --git a/sources/Leak.Repository/RepositoryBuilder.cs b/sources/Leak.Repository/RepositoryBuilder.cs index 73afd180..154508b6 100644 --- a/sources/Leak.Repository/RepositoryBuilder.cs +++ b/sources/Leak.Repository/RepositoryBuilder.cs @@ -41,6 +41,12 @@ public RepositoryBuilder WithPipeline(PipelineService pipeline) return this; } + public RepositoryBuilder WithMemory(RepositoryMemory memory) + { + dependencies.Memory = memory; + return this; + } + public RepositoryService Build() { return Build(new RepositoryHooks()); diff --git a/sources/Leak.Repository/RepositoryContext.cs b/sources/Leak.Repository/RepositoryContext.cs index be1aeb34..ab092468 100644 --- a/sources/Leak.Repository/RepositoryContext.cs +++ b/sources/Leak.Repository/RepositoryContext.cs @@ -59,11 +59,6 @@ public BitfileService Bitfile get { return bitfile; } } - public byte[] Buffer - { - get { return buffer; } - } - public RepositoryView View { get { return view; } diff --git a/sources/Leak.Repository/RepositoryDependencies.cs b/sources/Leak.Repository/RepositoryDependencies.cs index 2b2d87a3..aa508a09 100644 --- a/sources/Leak.Repository/RepositoryDependencies.cs +++ b/sources/Leak.Repository/RepositoryDependencies.cs @@ -1,5 +1,4 @@ -using Leak.Common; -using Leak.Files; +using Leak.Files; using Leak.Tasks; namespace Leak.Data.Store @@ -8,7 +7,7 @@ public class RepositoryDependencies { public FileFactory Files; - public DataBlockFactory Blocks; + public RepositoryMemory Memory; public PipelineService Pipeline; } diff --git a/sources/Leak.Repository/RepositoryMemory.cs b/sources/Leak.Repository/RepositoryMemory.cs new file mode 100644 index 00000000..5acc95de --- /dev/null +++ b/sources/Leak.Repository/RepositoryMemory.cs @@ -0,0 +1,7 @@ +namespace Leak.Data.Store +{ + public interface RepositoryMemory + { + RepositoryMemoryBlock Allocate(int size); + } +} \ No newline at end of file diff --git a/sources/Leak.Repository/RepositoryMemoryBlock.cs b/sources/Leak.Repository/RepositoryMemoryBlock.cs new file mode 100644 index 00000000..402e55dd --- /dev/null +++ b/sources/Leak.Repository/RepositoryMemoryBlock.cs @@ -0,0 +1,11 @@ +namespace Leak.Data.Store +{ + public interface RepositoryMemoryBlock + { + byte[] Data { get; } + + int Length { get; } + + void Release(); + } +} \ No newline at end of file diff --git a/sources/Leak.Repository/RepositoryTaskVerifyPiece.cs b/sources/Leak.Repository/RepositoryTaskVerifyPiece.cs index d7506d48..d577e9f6 100644 --- a/sources/Leak.Repository/RepositoryTaskVerifyPiece.cs +++ b/sources/Leak.Repository/RepositoryTaskVerifyPiece.cs @@ -16,28 +16,29 @@ public RepositoryTaskVerifyPiece(PieceInfo piece) public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted) { HashAlgorithm algorithm = SHA1.Create(); + RepositoryMemoryBlock block = context.Dependencies.Memory.Allocate(16384); - context.View.Read(context.Buffer, piece.Index, 0, args => + context.View.Read(block.Data, piece.Index, 0, args => { if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + 1)) { - context.Queue.Add(new Continue(piece, args, algorithm)); + context.Queue.Add(new Continue(piece, args, algorithm, block)); } else { - context.Queue.Add(new Complete(piece, args, algorithm)); + context.Queue.Add(new Complete(piece, args, algorithm, block)); } }); } public bool CanExecute(RepositoryTaskQueue queue) { - return queue.IsBlocked("all") == false; + return queue.IsBlocked(piece) == false; } public void Block(RepositoryTaskQueue queue) { - queue.Block("all"); + queue.Block(piece); } public void Release(RepositoryTaskQueue queue) @@ -49,27 +50,29 @@ private class Continue : RepositoryTask private readonly PieceInfo piece; private readonly RepositoryViewRead read; private readonly HashAlgorithm algorithm; + private readonly RepositoryMemoryBlock block; - public Continue(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm) + public Continue(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm, RepositoryMemoryBlock block) { this.piece = piece; this.read = read; this.algorithm = algorithm; + this.block = block; } public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted) { algorithm.Push(read.Buffer.Data, read.Buffer.Offset, Math.Min(read.Buffer.Count, read.Count)); - context.View.Read(context.Buffer, piece.Index, read.Block + 1, args => + context.View.Read(block.Data, piece.Index, read.Block + 1, args => { if (args.Count > 0 && context.View.Exists(piece.Index, args.Block + 1)) { - context.Queue.Add(new Continue(piece, args, algorithm)); + context.Queue.Add(new Continue(piece, args, algorithm, block)); } else { - context.Queue.Add(new Complete(piece, args, algorithm)); + context.Queue.Add(new Complete(piece, args, algorithm, block)); } }); } @@ -93,12 +96,14 @@ private class Complete : RepositoryTask private readonly PieceInfo piece; private readonly RepositoryViewRead read; private readonly HashAlgorithm algorithm; + private readonly RepositoryMemoryBlock block; - public Complete(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm) + public Complete(PieceInfo piece, RepositoryViewRead read, HashAlgorithm algorithm, RepositoryMemoryBlock block) { this.piece = piece; this.read = read; this.algorithm = algorithm; + this.block = block; } public bool CanExecute(RepositoryTaskQueue queue) @@ -120,6 +125,8 @@ public void Execute(RepositoryContext context, RepositoryTaskCallback onComplete RejectIfRequired(context, result); algorithm.Dispose(); + block.Release(); + onCompleted.Invoke(this); } @@ -129,7 +136,7 @@ public void Block(RepositoryTaskQueue queue) public void Release(RepositoryTaskQueue queue) { - queue.Release("all"); + queue.Release(piece); } private void AcceptIfRequired(RepositoryContext context, bool valid) diff --git a/sources/Leak.Repository/RepositoryTaskVerifyRange.cs b/sources/Leak.Repository/RepositoryTaskVerifyRange.cs index e87f896f..fed838e2 100644 --- a/sources/Leak.Repository/RepositoryTaskVerifyRange.cs +++ b/sources/Leak.Repository/RepositoryTaskVerifyRange.cs @@ -24,7 +24,10 @@ public void Execute(RepositoryContext context, RepositoryTaskCallback onComplete if (next < length) { - context.Queue.Add(new Start(bitfield, reduced, next)); + int blockSize = context.Metainfo.Properties.BlockSize; + RepositoryMemoryBlock block = context.Dependencies.Memory.Allocate(blockSize); + + context.Queue.Add(new Start(bitfield, reduced, next, block)); } else { @@ -78,14 +81,16 @@ private class Start : RepositoryTask private readonly Bitfield bitfield; private readonly Bitfield scope; private readonly int piece; + private readonly RepositoryMemoryBlock block; - public Start(Bitfield bitfield, Bitfield scope, int piece) + public Start(Bitfield bitfield, Bitfield scope, int piece, RepositoryMemoryBlock block) { this.algorithm = SHA1.Create(); this.bitfield = bitfield; this.scope = scope; this.piece = piece; + this.block = block; } public bool CanExecute(RepositoryTaskQueue queue) @@ -95,15 +100,15 @@ public bool CanExecute(RepositoryTaskQueue queue) public void Execute(RepositoryContext context, RepositoryTaskCallback onCompleted) { - context.View.Read(context.Buffer, piece, 0, args => + context.View.Read(block.Data, piece, 0, args => { if (args.Count > 0 && context.View.Exists(args.Piece, args.Block + 1)) { - context.Queue.Add(new Continue(bitfield, scope, algorithm, args)); + context.Queue.Add(new Continue(bitfield, scope, algorithm, args, block)); } else { - context.Queue.Add(new Complete(bitfield, scope, algorithm, args)); + context.Queue.Add(new Complete(bitfield, scope, algorithm, args, block)); } }); } @@ -123,13 +128,15 @@ private class Continue : RepositoryTask private readonly Bitfield scope; private readonly HashAlgorithm algorithm; private readonly RepositoryViewRead read; + private readonly RepositoryMemoryBlock block; - public Continue(Bitfield bitfield, Bitfield scope, HashAlgorithm algorithm, RepositoryViewRead read) + public Continue(Bitfield bitfield, Bitfield scope, HashAlgorithm algorithm, RepositoryViewRead read, RepositoryMemoryBlock block) { this.bitfield = bitfield; this.scope = scope; this.algorithm = algorithm; this.read = read; + this.block = block; } public bool CanExecute(RepositoryTaskQueue queue) @@ -141,15 +148,15 @@ public void Execute(RepositoryContext context, RepositoryTaskCallback onComplete { algorithm.Push(read.Buffer.Data, read.Buffer.Offset, Math.Min(read.Buffer.Count, read.Count)); - context.View.Read(context.Buffer, read.Piece, read.Block + 1, args => + context.View.Read(block.Data, read.Piece, read.Block + 1, args => { if (args.Count > 0 && context.View.Exists(args.Piece, args.Block + 1)) { - context.Queue.Add(new Continue(bitfield, scope, algorithm, args)); + context.Queue.Add(new Continue(bitfield, scope, algorithm, args, block)); } else { - context.Queue.Add(new Complete(bitfield, scope, algorithm, args)); + context.Queue.Add(new Complete(bitfield, scope, algorithm, args, block)); } }); } @@ -169,13 +176,15 @@ private class Complete : RepositoryTask private readonly Bitfield scope; private readonly HashAlgorithm algorithm; private readonly RepositoryViewRead read; + private readonly RepositoryMemoryBlock block; - public Complete(Bitfield bitfield, Bitfield scope, HashAlgorithm algorithm, RepositoryViewRead read) + public Complete(Bitfield bitfield, Bitfield scope, HashAlgorithm algorithm, RepositoryViewRead read, RepositoryMemoryBlock block) { this.bitfield = bitfield; this.scope = scope; this.algorithm = algorithm; this.read = read; + this.block = block; } public bool CanExecute(RepositoryTaskQueue queue) @@ -194,6 +203,7 @@ public void Execute(RepositoryContext context, RepositoryTaskCallback onComplete bool result = Bytes.Equals(hash, expected); bitfield[read.Piece] = result; + algorithm.Dispose(); int next = Next(scope, read.Piece + 1); @@ -201,11 +211,13 @@ public void Execute(RepositoryContext context, RepositoryTaskCallback onComplete if (exists) { - context.Queue.Add(new Start(bitfield, scope, next)); + context.Queue.Add(new Start(bitfield, scope, next, block)); } else { + block.Release(); onCompleted.Invoke(this); + context.Bitfile.Write(bitfield); context.Hooks.CallDataVerified(metainfo.Hash, bitfield); } diff --git a/sources/Leak.Repository/RepositoryView.cs b/sources/Leak.Repository/RepositoryView.cs index 32ffd0b3..81818fe4 100644 --- a/sources/Leak.Repository/RepositoryView.cs +++ b/sources/Leak.Repository/RepositoryView.cs @@ -24,7 +24,7 @@ public void Read(byte[] buffer, int piece, RepositoryViewReadCallback callback) public void Read(byte[] buffer, int piece, int block, RepositoryViewReadCallback callback) { - new RepositoryViewReadRoutine(cache, piece, block, buffer, callback).Execute(); + new RepositoryViewReadRoutine(cache, piece, block, new FileBuffer(buffer, 0, cache.BlockSize), callback).Execute(); } public void Write(FileBuffer buffer, int piece, int block, RepositoryViewWriteCallback callback)