Skip to content
This repository has been archived by the owner on Apr 6, 2020. It is now read-only.

Commit

Permalink
Make BlockProcessor with less dependencies (#354)
Browse files Browse the repository at this point in the history
* RocksDbRepository reach 42% cover

* Rename abstract class ICrypto to Crypto. I'm sure this was once an interface but it's not anymore, threfore should not carry the prefix "I" that identify the interfaces.

* fix some merge problems

* Increase Server tests coverage to 94%.
Black listed peers try to connect test

* fix merge issues

* * Remove protected fields from TestBase and replace with overload usage of the method RandomInt.
* Small clean up on the UtCrypto

* Create UtBlockchain and create first tests.
more tests will be added soon!

* fix issues from the merge

* Adding testing information to the team Contributing page.

* Small CleanUp the test class with properties that aren't been used anymore.

* UtBlockChain: InitializeBlockchain_IsGenesisBlock_InitializeComplete

* Firsts UtBlockProcessor tests

* AddBlock tests added

* * UtBlockProcessor: Run test
* Fix an inverted if logic. The code only enter in the While loop if the cancellation was requested and should be the other way around.

* Cleanup
* Replace BlockProcessor dependency from Prompt and replace with IBlockPool
* Remove unused properties from BlockProcessor
*  ContainsBlock method in BlockProcessor signature changed to Private because is not used anywhere else but internally

* UtBlockProcessor - Loop test: When the received block is not the expected next.

* Remove comment

* Remove async modifier from methods that aren't implemented

* .

* Merge branch 'development' of https://github.com/aboimpinto/neo-sharp into development

* Add test class, #301

* Write tests for Add method.
#301

* Refactor the Block / BlockHeader in Signed and Unsigned objects (mutable and immutable).
The intenstion of this refactor is to bring better readability of the code and remove logic code from the DTO.

* Implement the UpdateHash methods in the creation of the Signed<object>

* Signed and Unsigned strucuture return GenenisBlock

* Draft of the BinaryCustomSeralization of TransactionBase

* Revert "Remove stack logs (#321)"

This reverts commit 34c676a.

* Add BlockOperationsManager and WitnessOperationManager that will be resposable to Sign and Verify the block and the witness

* Initial work, removing logic from the model objects and add it into Witness. Transaction, BlockHeader and Block OperationManager classes.
In this classes the verify method will be implemented.

#306, #307

* Cleanup and remove code that was not merged.

* regreassion test for the case where the current block is null. This happen when the node start without genesis block.

* Remove dependencies from BlockProcessor and remove BlockHeader references. BlockProcessor use only Block object.
#350, #351

* now the Persist consider the genesis block for the case of start a node in a clean environment
  • Loading branch information
aboimpinto authored and osmirnov committed Sep 18, 2018
1 parent 44e1dbe commit 3d872c2
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 101 deletions.
20 changes: 15 additions & 5 deletions src/NeoSharp.Core/Blockchain/Processing/BlockHeaderPersister.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NeoSharp.Core.Extensions;
Expand Down Expand Up @@ -28,11 +29,20 @@ public async Task Persist(params BlockHeader[] blockHeaders)
{
if (blockHeaders == null) throw new ArgumentNullException(nameof(blockHeaders));

var blockHeadersToPersist = blockHeaders
.Where(bh => bh != null && bh.Index > LastBlockHeader?.Index)
.Distinct(bh => bh.Index)
.OrderBy(bh => bh.Index)
.ToList();
var blockHeadersToPersist = new List<BlockHeader>();
if (this.LastBlockHeader == null)
{
// Persisting the Genesis block
blockHeadersToPersist = blockHeaders.ToList();
}
else
{
blockHeadersToPersist = blockHeaders
.Where(bh => bh != null && bh.Index > LastBlockHeader?.Index)
.Distinct(bh => bh.Index)
.OrderBy(bh => bh.Index)
.ToList();
}

foreach (var blockHeader in blockHeadersToPersist)
{
Expand Down
94 changes: 94 additions & 0 deletions src/NeoSharp.Core/Blockchain/Processing/BlockPersister.cs
@@ -0,0 +1,94 @@
using System;
using System.Threading.Tasks;
using NeoSharp.Core.Models;
using NeoSharp.Core.Persistence;

namespace NeoSharp.Core.Blockchain.Processing
{
public class BlockPersister : IBlockPersister
{
#region Private Fields
private readonly IRepository _repository;
private readonly IBlockHeaderPersister _blockHeaderPersister;
private readonly ITransactionPersister<Transaction> _transactionPersister;
private readonly ITransactionPool _transactionPool;
#endregion

#region Constructor
public BlockPersister(
IRepository repository,
IBlockHeaderPersister blockHeaderPersister,
ITransactionPersister<Transaction> transactionPersister,
ITransactionPool transactionPool)
{
this._repository = repository;
this._blockHeaderPersister = blockHeaderPersister;
this._transactionPersister = transactionPersister;
this._transactionPool = transactionPool;
}
#endregion

#region IBlockPersister Implementation
public Block LastPersistedBlock { get; private set; }

public event EventHandler<BlockHeader[]> OnBlockHeadersPersisted;

public async Task Persist(params Block[] blocks)
{
foreach (var block in blocks)
{
this.LastPersistedBlock = block;

var blockHeader = await this._repository.GetBlockHeader(block.Hash);
if (blockHeader == null)
{
await this._blockHeaderPersister.Persist(block.GetBlockHeader());
}

foreach (var transaction in block.Transactions)
{
await this._transactionPersister.Persist(transaction);
this._transactionPool.Remove(transaction.Hash);
}
}
}

public async Task Persist(params BlockHeader[] blockHeaders)
{
try
{
this._blockHeaderPersister.OnBlockHeadersPersisted += this.HandleBlockHandlePersisted;
await this._blockHeaderPersister.Persist(blockHeaders);
}
finally
{
this._blockHeaderPersister.OnBlockHeadersPersisted -= this.HandleBlockHandlePersisted;
}
}

public async Task<bool> IsBlockPersisted(Block block)
{
var blockHeader = await this._repository.GetBlockHeader(block.Hash);

if (blockHeader?.Type == HeaderType.Extended)
{
throw new InvalidOperationException($"The block \"{block.Hash.ToString(true)}\" exists already on the blockchain.");
}

if (blockHeader != null && blockHeader.Hash != block.Hash)
{
throw new InvalidOperationException($"The block \"{block.Hash.ToString(true)}\" has an invalid hash.");
}

return false;
}
#endregion

#region Private Method
private void HandleBlockHandlePersisted(object sender, BlockHeader[] e)
{
this.OnBlockHeadersPersisted?.Invoke(sender, e);
}
#endregion
}
}
52 changes: 11 additions & 41 deletions src/NeoSharp.Core/Blockchain/Processing/BlockProcessor.cs
Expand Up @@ -4,7 +4,6 @@
using NeoSharp.Core.Helpers;
using NeoSharp.Core.Models;
using NeoSharp.Core.Models.OperationManger;
using NeoSharp.Core.Persistence;
using NeoSharp.Core.Types;

namespace NeoSharp.Core.Blockchain.Processing
Expand All @@ -14,33 +13,24 @@ public class BlockProcessor : IBlockProcessor
private static readonly TimeSpan DefaultBlockPollingInterval = TimeSpan.FromMilliseconds(100);

private readonly IBlockPool _blockPool;
private readonly ITransactionPool _transactionPool;
private readonly IRepository _repository;
private readonly IAsyncDelayer _asyncDelayer;
private readonly IBlockHeaderPersister _blockHeaderPersister;
private readonly ITransactionPersister<Transaction> _transactionPersister;
private readonly IBlockOperationsManager _blockOperationsManager;
private readonly IBlockPersister _blockPersister;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private Block _currentBlock;

public event EventHandler<Block> OnBlockProcessed;

public BlockProcessor(
IBlockPool blockPool,
ITransactionPool transactionPool,
IRepository repository,
IAsyncDelayer asyncDelayer,
IBlockHeaderPersister blockHeaderPersister,
ITransactionPersister<Transaction> transactionPersister,
IBlockOperationsManager blockOperationsManager)
IBlockOperationsManager blockOperationsManager,
IBlockPersister blockPersister)
{
_blockPool = blockPool ?? throw new ArgumentNullException(nameof(blockPool));
_transactionPool = transactionPool ?? throw new ArgumentNullException(nameof(transactionPool));
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_asyncDelayer = asyncDelayer ?? throw new ArgumentNullException(nameof(asyncDelayer));
_blockHeaderPersister = blockHeaderPersister ?? throw new ArgumentNullException(nameof(blockHeaderPersister));
_transactionPersister = transactionPersister ?? throw new ArgumentNullException(nameof(transactionPersister));
_blockOperationsManager = blockOperationsManager;
_blockPersister = blockPersister ?? throw new ArgumentNullException(nameof(_blockPersister));
}

// TODO: We will read the current block from Blockchain
Expand All @@ -63,7 +53,7 @@ public void Run(Block currentBlock)
continue;
}
await Persist(block);
await this._blockPersister.Persist(block);
_blockPool.Remove(nextBlockHeight);
_currentBlock = block;
Expand All @@ -78,7 +68,10 @@ public async Task AddBlock(Block block)
{
if (block == null) throw new ArgumentNullException(nameof(block));

_blockOperationsManager.Sign(block);
if (block.Hash == null)
{
_blockOperationsManager.Sign(block);
}

var blockHash = block.Hash;

Expand All @@ -90,39 +83,16 @@ public async Task AddBlock(Block block)
throw new InvalidOperationException($"The block \"{blockHash.ToString(true)}\" was already queued to be added.");
}

var blockHeader = await _repository.GetBlockHeader(blockHash);
if (blockHeader?.Type == HeaderType.Extended)
if (!await this._blockPersister.IsBlockPersisted(block))
{
throw new InvalidOperationException($"The block \"{blockHash.ToString(true)}\" exists already on the blockchain.");
_blockPool.Add(block);
}

if (blockHeader != null && blockHeader.Hash != block.Hash)
{
throw new InvalidOperationException($"The block \"{blockHash.ToString(true)}\" has an invalid hash.");
}

_blockPool.Add(block);
}

public void Dispose()
{
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
}

protected virtual async Task Persist(Block block)
{
var blockHeader = await _repository.GetBlockHeader(block.Hash);
if (blockHeader == null)
{
await _blockHeaderPersister.Persist(block.GetBlockHeader());
}

foreach (var transaction in block.Transactions)
{
await _transactionPersister.Persist(transaction);
_transactionPool.Remove(transaction.Hash);
}
}
}
}
19 changes: 19 additions & 0 deletions src/NeoSharp.Core/Blockchain/Processing/IBlockPersister.cs
@@ -0,0 +1,19 @@
using System;
using System.Threading.Tasks;
using NeoSharp.Core.Models;

namespace NeoSharp.Core.Blockchain.Processing
{
public interface IBlockPersister
{
Block LastPersistedBlock { get; }

event EventHandler<BlockHeader[]> OnBlockHeadersPersisted;

Task Persist(params Block[] block);

Task Persist(params BlockHeader[] blockHeaders);

Task<bool> IsBlockPersisted(Block blocks);
}
}
1 change: 1 addition & 0 deletions src/NeoSharp.Core/DI/Modules/BlockchainModule.cs
Expand Up @@ -15,6 +15,7 @@ public void Register(IContainerBuilder containerBuilder)
containerBuilder.RegisterSingleton<ICoinIndex, CoinIndex>();

#region Processing
containerBuilder.RegisterSingleton<IBlockPersister, BlockPersister>();
containerBuilder.RegisterSingleton<IBlockHeaderPersister, BlockHeaderPersister>();
containerBuilder.RegisterSingleton<IBlockProcessor, BlockProcessor>();
containerBuilder.RegisterSingleton<IBlockPool, BlockPool>();
Expand Down
Expand Up @@ -16,31 +16,30 @@ public class BlockHeadersMessageHandler : IMessageHandler<BlockHeadersMessage>
{
private const int MaxBlocksCountToSync = 500;

private readonly IBlockHeaderPersister _blockHeaderPersister;
private readonly IBlockPersister _blockPersister;
private readonly IBlockchain _blockchain;
private readonly ILogger<BlockHeadersMessageHandler> _logger;

public BlockHeadersMessageHandler(IBlockHeaderPersister blockHeaderPersister, IBlockchain blockchain, ILogger<BlockHeadersMessageHandler> logger)
public BlockHeadersMessageHandler(IBlockPersister blockPersister, IBlockchain blockchain, ILogger<BlockHeadersMessageHandler> logger)
{
_blockHeaderPersister = blockHeaderPersister ?? throw new ArgumentNullException(nameof(blockHeaderPersister));
_blockPersister = blockPersister ?? throw new ArgumentNullException(nameof(blockPersister));
_blockchain = blockchain ?? throw new ArgumentNullException(nameof(blockchain));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task Handle(BlockHeadersMessage message, IPeer sender)
{
EventHandler<BlockHeader[]> blockHeadersPersisted =
async (_, blockHeaders) => await BlockHeadersPersisted(sender, blockHeaders);
async void HeadersPersisted(object _, BlockHeader[] blockHeaders) => await BlockHeadersPersisted(sender, blockHeaders);

try
{
_blockHeaderPersister.OnBlockHeadersPersisted += blockHeadersPersisted;
_blockPersister.OnBlockHeadersPersisted += HeadersPersisted;

await _blockHeaderPersister.Persist(message.Payload.Headers ?? new BlockHeader[0]);
await _blockPersister.Persist(message.Payload.Headers ?? new BlockHeader[0]);
}
finally
{
_blockHeaderPersister.OnBlockHeadersPersisted -= blockHeadersPersisted;
_blockPersister.OnBlockHeadersPersisted -= HeadersPersisted;
}

if (_blockchain.LastBlockHeader.Index < sender.Version.CurrentBlockIndex)
Expand Down Expand Up @@ -76,7 +75,6 @@ private static async Task SynchronizeBlocks(IPeer source, IReadOnlyCollection<UI
await source.Send(new GetDataMessage(InventoryType.Block, blockHashesInBatch));
}
}

#endregion
}
}
48 changes: 2 additions & 46 deletions test/NeoSharp.Core.Test/Blockchain/Processing/UtBlockProcessor.cs
Expand Up @@ -103,41 +103,6 @@ public async Task AddBlock_ValidBlockButInBlockPool_ThrowInvalidOperationExcepti
await testee.AddBlock(block);
}

[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task AddBlock_ValidBlockNotInBlockPoolInBlockchainButNotTheRightBlockHeaderType_ThrowInvalidOperationException()
{
var block = new Block
{
PreviousBlockHash = UInt256.Zero,
Hash = UInt256.Parse("d4dab99ed65c3655a9619b215ab1988561b706b6e5196b6e0ada916aa6601622"),
NextConsensus = UInt160.Zero,
Transactions = new Transaction[]
{
new ContractTransaction
{
Hash = UInt256.Parse("1a259dba256600620c6c91094f3a300b30f0cbaecee19c6114deffd3288957d7")
}
}
};

var expectedBlockHeader = new BlockHeader(HeaderType.Extended);

var blockPoolMock = this.AutoMockContainer.GetMock<IBlockPool>();
blockPoolMock
.Setup(x => x.Contains(block.Hash))
.Returns(false);

var repositoryMock = this.AutoMockContainer.GetMock<IRepository>();
repositoryMock
.Setup(x => x.GetBlockHeader(block.Hash))
.ReturnsAsync(expectedBlockHeader);

var testee = this.AutoMockContainer.Create<BlockProcessor>();

await testee.AddBlock(block);
}

[TestMethod]
public async Task AddBlock_ValidBlockNotInBlockPoolInBlockChainWithTheRightBlockHeaderType_BlockAddedToBlockPool()
{
Expand Down Expand Up @@ -234,9 +199,7 @@ public void Run_WhenAddBlockThisIsProcessed_OnBlockProcessedEventRaised()
.Setup(x => x.TryGet(1, out newBlock))
.Returns(true);

var blockHeaderPersisterMock = this.AutoMockContainer.GetMock<IBlockHeaderPersister>();
var transactionProcessorMock = this.AutoMockContainer.GetMock<ITransactionPersister<Transaction>>();
var repositoryMock = this.AutoMockContainer.GetMock<IRepository>();
var blockPersisterMock = this.AutoMockContainer.GetMock<IBlockPersister>();

var testee = this.AutoMockContainer.Create<BlockProcessor>();

Expand All @@ -254,14 +217,7 @@ public void Run_WhenAddBlockThisIsProcessed_OnBlockProcessedEventRaised()

waitForBlockProcessedEvent.WaitOne();

transactionProcessorMock.Verify(x => x.Persist(transactionInNewBlock));

blockHeaderPersisterMock.Verify(x => x.Persist(It.Is<BlockHeader>(blockHeader =>
blockHeader.ConsensusData == newBlock.ConsensusData &&
blockHeader.Hash == newBlock.Hash &&
blockHeader.Index == newBlock.Index &&
blockHeader.Timestamp == newBlock.Timestamp &&
blockHeader.Version == newBlock.Version)));
blockPersisterMock.Verify(x => x.Persist(newBlock));
}

[TestMethod]
Expand Down

0 comments on commit 3d872c2

Please sign in to comment.