Skip to content

Commit aaba809

Browse files
authored
Move broadcaster from wallet to base and to mempool (#113)
* Move broadcaster from wallet to base and to mempool * Move broadcaster to base project and use signal events instead of an EventArgs, alsointroduce BroadcastCheck interface instead of overriding the broadcast manager
1 parent 8af0043 commit aaba809

28 files changed

Lines changed: 224 additions & 173 deletions

File tree

src/Blockcore/Base/BaseFeature.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Blockcore.Configuration;
1212
using Blockcore.Configuration.Settings;
1313
using Blockcore.Connection;
14+
using Blockcore.Connection.Broadcasting;
1415
using Blockcore.Consensus;
1516
using Blockcore.Consensus.Rules;
1617
using Blockcore.Consensus.Validators;
@@ -109,6 +110,7 @@ public sealed class BaseFeature : FullNodeFeature
109110
private readonly Network network;
110111

111112
private readonly INodeStats nodeStats;
113+
private readonly IBroadcasterManager broadcasterManager;
112114
private readonly IProvenBlockHeaderStore provenBlockHeaderStore;
113115

114116
private readonly IConsensusManager consensusManager;
@@ -148,6 +150,7 @@ public BaseFeature(NodeSettings nodeSettings,
148150
ITipsManager tipsManager,
149151
IKeyValueRepository keyValueRepo,
150152
INodeStats nodeStats,
153+
IBroadcasterManager broadcasterManager,
151154
IProvenBlockHeaderStore provenBlockHeaderStore = null)
152155
{
153156
this.chainState = Guard.NotNull(chainState, nameof(chainState));
@@ -164,6 +167,7 @@ public BaseFeature(NodeSettings nodeSettings,
164167
this.blockStore = blockStore;
165168
this.network = network;
166169
this.nodeStats = nodeStats;
170+
this.broadcasterManager = broadcasterManager;
167171
this.provenBlockHeaderStore = provenBlockHeaderStore;
168172
this.partialValidator = partialValidator;
169173
this.peerBanning = Guard.NotNull(peerBanning, nameof(peerBanning));
@@ -211,6 +215,7 @@ public override async Task InitializeAsync()
211215
connectionParameters.TemplateBehaviors.Add(new PeerBanningBehavior(this.loggerFactory, this.peerBanning, this.nodeSettings));
212216
connectionParameters.TemplateBehaviors.Add(new BlockPullerBehavior(this.blockPuller, this.initialBlockDownloadState, this.dateTimeProvider, this.loggerFactory));
213217
connectionParameters.TemplateBehaviors.Add(new ConnectionManagerBehavior(this.connectionManager, this.loggerFactory));
218+
connectionParameters.TemplateBehaviors.Add(new BroadcasterBehavior(this.broadcasterManager, this.loggerFactory));
214219

215220
this.StartAddressManager(connectionParameters);
216221

@@ -397,6 +402,8 @@ public static IFullNodeBuilder UseBaseFeature(this IFullNodeBuilder fullNodeBuil
397402
services.AddSingleton<IKeyValueRepository, KeyValueRepository>();
398403
services.AddSingleton<ITipsManager, TipsManager>();
399404
services.AddSingleton<IAsyncProvider, AsyncProvider>();
405+
services.AddSingleton<IBroadcasterManager, BroadcasterManager>();
406+
services.AddSingleton<IBroadcastCheck, NoCheckBroadcastCheck>();
400407

401408
// Consensus
402409
services.AddSingleton<ConsensusSettings>();

src/Features/Blockcore.Features.Wallet/Broadcasting/TransactionBroadcastEntry.cs renamed to src/Blockcore/Connection/Broadcasting/BroadcastTransactionStateChanedEntry.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
using System;
2-
using Blockcore.Features.MemoryPool;
32

4-
namespace Blockcore.Features.Wallet.Broadcasting
3+
namespace Blockcore.Connection.Broadcasting
54
{
6-
public class TransactionBroadcastEntry
5+
public class BroadcastTransactionStateChanedEntry
76
{
87
public NBitcoin.Transaction Transaction { get; }
98

109
public TransactionBroadcastState TransactionBroadcastState { get; set; }
1110

12-
public string ErrorMessage => (this.MempoolError == null) ? string.Empty : (this.MempoolError.ConsensusError?.Message ?? this.MempoolError.Code ?? "Failed");
11+
public string ErrorMessage => (this.MempoolError == null) ? string.Empty : $"Failed: {this.ErrorMessage}";
1312

14-
public MempoolError MempoolError { get; set; }
13+
public string MempoolError { get; set; }
1514

16-
public TransactionBroadcastEntry(NBitcoin.Transaction transaction, TransactionBroadcastState transactionBroadcastState, MempoolError mempoolError)
15+
public BroadcastTransactionStateChanedEntry(NBitcoin.Transaction transaction, TransactionBroadcastState transactionBroadcastState, string mempoolError)
1716
{
1817
this.Transaction = transaction ?? throw new ArgumentNullException(nameof(transaction));
1918
this.TransactionBroadcastState = transactionBroadcastState;
2019
this.MempoolError = mempoolError;
2120
}
2221
}
23-
}
22+
}

src/Features/Blockcore.Features.Wallet/Broadcasting/BroadcasterBehavior.cs renamed to src/Blockcore/Connection/Broadcasting/BroadcasterBehavior.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
using System.Diagnostics;
33
using System.Linq;
44
using System.Threading.Tasks;
5-
using Blockcore.Features.Wallet.Interfaces;
5+
using Blockcore.Interfaces;
66
using Blockcore.P2P.Peer;
77
using Blockcore.P2P.Protocol;
88
using Blockcore.P2P.Protocol.Behaviors;
99
using Blockcore.P2P.Protocol.Payloads;
1010
using Microsoft.Extensions.Logging;
1111

12-
namespace Blockcore.Features.Wallet.Broadcasting
12+
namespace Blockcore.Connection.Broadcasting
1313
{
1414
public class BroadcasterBehavior : NetworkPeerBehavior
1515
{
@@ -91,7 +91,7 @@ private void ProcessInvPayload(InvPayload invPayload)
9191
// if node has transaction we broadcast
9292
foreach (InventoryVector inv in invPayload.Inventory.Where(x => x.Type == InventoryType.MSG_TX))
9393
{
94-
TransactionBroadcastEntry txEntry = this.broadcasterManager.GetTransaction(inv.Hash);
94+
BroadcastTransactionStateChanedEntry txEntry = this.broadcasterManager.GetTransaction(inv.Hash);
9595
if (txEntry != null)
9696
{
9797
this.broadcasterManager.AddOrUpdate(txEntry.Transaction, TransactionBroadcastState.Propagated);
@@ -104,7 +104,7 @@ protected async Task ProcessGetDataPayloadAsync(INetworkPeer peer, GetDataPayloa
104104
// If node asks for transaction we want to broadcast.
105105
foreach (InventoryVector inv in getDataPayload.Inventory.Where(x => x.Type == InventoryType.MSG_TX))
106106
{
107-
TransactionBroadcastEntry txEntry = this.broadcasterManager.GetTransaction(inv.Hash);
107+
BroadcastTransactionStateChanedEntry txEntry = this.broadcasterManager.GetTransaction(inv.Hash);
108108
if ((txEntry != null) && (txEntry.TransactionBroadcastState != TransactionBroadcastState.CantBroadcast))
109109
{
110110
await peer.SendMessageAsync(new TxPayload(txEntry.Transaction)).ConfigureAwait(false);

src/Features/Blockcore.Features.Wallet/Broadcasting/BroadcasterManagerBase.cs renamed to src/Blockcore/Connection/Broadcasting/BroadcasterManager.cs

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,56 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Threading.Tasks;
5-
using Blockcore.Connection;
6-
using Blockcore.Features.MemoryPool;
7-
using Blockcore.Features.Wallet.Interfaces;
5+
using Blockcore.Interfaces;
86
using Blockcore.P2P.Peer;
97
using Blockcore.P2P.Protocol.Payloads;
8+
using Blockcore.Signals;
109
using Blockcore.Utilities;
1110
using ConcurrentCollections;
1211
using NBitcoin;
1312

14-
namespace Blockcore.Features.Wallet.Broadcasting
13+
namespace Blockcore.Connection.Broadcasting
1514
{
16-
public abstract class BroadcasterManagerBase : IBroadcasterManager
15+
public class BroadcasterManager : IBroadcasterManager
1716
{
18-
public event EventHandler<TransactionBroadcastEntry> TransactionStateChanged;
19-
20-
/// <summary> Connection manager for managing node connections.</summary>
2117
protected readonly IConnectionManager connectionManager;
18+
private readonly ISignals signals;
19+
private readonly IEnumerable<IBroadcastCheck> broadcastChecks;
2220

23-
public BroadcasterManagerBase(IConnectionManager connectionManager)
21+
public BroadcasterManager(IConnectionManager connectionManager, ISignals signals, IEnumerable<IBroadcastCheck> broadcastChecks)
2422
{
2523
Guard.NotNull(connectionManager, nameof(connectionManager));
2624

2725
this.connectionManager = connectionManager;
28-
this.Broadcasts = new ConcurrentHashSet<TransactionBroadcastEntry>();
26+
this.signals = signals;
27+
this.broadcastChecks = broadcastChecks;
28+
this.Broadcasts = new ConcurrentHashSet<BroadcastTransactionStateChanedEntry>();
2929
}
3030

31-
public void OnTransactionStateChanged(TransactionBroadcastEntry entry)
31+
public void OnTransactionStateChanged(BroadcastTransactionStateChanedEntry entry)
3232
{
33-
this.TransactionStateChanged?.Invoke(this, entry);
33+
this.signals.Publish(new TransactionBroadcastEvent(this, entry));
3434
}
3535

3636
/// <summary>Transactions to broadcast.</summary>
37-
private ConcurrentHashSet<TransactionBroadcastEntry> Broadcasts { get; }
37+
private ConcurrentHashSet<BroadcastTransactionStateChanedEntry> Broadcasts { get; }
3838

3939
/// <summary>Retrieves a transaction with provided hash from the collection of transactions to broadcast.</summary>
4040
/// <param name="transactionHash">Hash of the transaction to retrieve.</param>
41-
public TransactionBroadcastEntry GetTransaction(uint256 transactionHash)
41+
public BroadcastTransactionStateChanedEntry GetTransaction(uint256 transactionHash)
4242
{
43-
TransactionBroadcastEntry txEntry = this.Broadcasts.FirstOrDefault(x => x.Transaction.GetHash() == transactionHash);
43+
BroadcastTransactionStateChanedEntry txEntry = this.Broadcasts.FirstOrDefault(x => x.Transaction.GetHash() == transactionHash);
4444
return txEntry ?? null;
4545
}
4646

4747
/// <summary>Adds or updates a transaction from the collection of transactions to broadcast.</summary>
48-
public void AddOrUpdate(Transaction transaction, TransactionBroadcastState transactionBroadcastState, MempoolError mempoolError = null)
48+
public void AddOrUpdate(Transaction transaction, TransactionBroadcastState transactionBroadcastState, string errorMessage = null)
4949
{
50-
TransactionBroadcastEntry broadcastEntry = this.Broadcasts.FirstOrDefault(x => x.Transaction.GetHash() == transaction.GetHash());
50+
BroadcastTransactionStateChanedEntry broadcastEntry = this.Broadcasts.FirstOrDefault(x => x.Transaction.GetHash() == transaction.GetHash());
5151

5252
if (broadcastEntry == null)
5353
{
54-
broadcastEntry = new TransactionBroadcastEntry(transaction, transactionBroadcastState, mempoolError);
54+
broadcastEntry = new BroadcastTransactionStateChanedEntry(transaction, transactionBroadcastState, errorMessage);
5555
this.Broadcasts.Add(broadcastEntry);
5656
this.OnTransactionStateChanged(broadcastEntry);
5757
}
@@ -62,7 +62,28 @@ public void AddOrUpdate(Transaction transaction, TransactionBroadcastState trans
6262
}
6363
}
6464

65-
public abstract Task BroadcastTransactionAsync(Transaction transaction);
65+
public async Task BroadcastTransactionAsync(Transaction transaction)
66+
{
67+
Guard.NotNull(transaction, nameof(transaction));
68+
69+
if (this.IsPropagated(transaction))
70+
{
71+
return;
72+
}
73+
74+
foreach (IBroadcastCheck broadcastCheck in this.broadcastChecks)
75+
{
76+
string error = await broadcastCheck.CheckTransaction(transaction).ConfigureAwait(false);
77+
78+
if (!string.IsNullOrEmpty(error))
79+
{
80+
this.AddOrUpdate(transaction, TransactionBroadcastState.CantBroadcast, error);
81+
return;
82+
}
83+
}
84+
85+
await this.PropagateTransactionToPeersAsync(transaction, this.connectionManager.ConnectedPeers.ToList()).ConfigureAwait(false);
86+
}
6687

6788
/// <summary>
6889
/// Sends transaction to peers.
@@ -90,8 +111,8 @@ protected async Task PropagateTransactionToPeersAsync(Transaction transaction, L
90111
/// <summary>Checks if transaction was propagated to any peers on the network.</summary>
91112
protected bool IsPropagated(Transaction transaction)
92113
{
93-
TransactionBroadcastEntry broadcastEntry = this.GetTransaction(transaction.GetHash());
114+
BroadcastTransactionStateChanedEntry broadcastEntry = this.GetTransaction(transaction.GetHash());
94115
return (broadcastEntry != null) && (broadcastEntry.TransactionBroadcastState == TransactionBroadcastState.Propagated);
95116
}
96117
}
97-
}
118+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System.Linq;
2+
using System.Threading.Tasks;
3+
using Blockcore.Interfaces;
4+
using Blockcore.Utilities;
5+
using NBitcoin;
6+
7+
namespace Blockcore.Connection.Broadcasting
8+
{
9+
/// <summary>
10+
/// Broadcast that makes not checks.
11+
/// </summary>
12+
public class NoCheckBroadcastCheck : IBroadcastCheck
13+
{
14+
public NoCheckBroadcastCheck()
15+
{
16+
}
17+
18+
public Task<string> CheckTransaction(Transaction transaction)
19+
{
20+
return Task.FromResult(string.Empty);
21+
}
22+
}
23+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using Blockcore.EventBus;
2+
using Blockcore.Interfaces;
3+
4+
namespace Blockcore.Connection.Broadcasting
5+
{
6+
/// <summary>
7+
/// Event that is executed when a transaction is broadcasted.
8+
/// </summary>
9+
public class TransactionBroadcastEvent : EventBase
10+
{
11+
public BroadcastTransactionStateChanedEntry BroadcastEntry { get; }
12+
13+
public IBroadcasterManager BroadcasterManager { get; }
14+
15+
public TransactionBroadcastEvent(IBroadcasterManager broadcasterManager, BroadcastTransactionStateChanedEntry broadcastEntry)
16+
{
17+
this.BroadcasterManager = broadcasterManager;
18+
this.BroadcastEntry = broadcastEntry;
19+
}
20+
}
21+
}

src/Features/Blockcore.Features.Wallet/Broadcasting/TransactionBroadcastState.cs renamed to src/Blockcore/Connection/Broadcasting/TransactionBroadcastState.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
namespace Blockcore.Features.Wallet.Broadcasting
1+
namespace Blockcore.Connection.Broadcasting
22
{
33
public enum TransactionBroadcastState
44
{

src/Blockcore/EventBus/EventBase.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ namespace Blockcore.EventBus
88
/// <seealso cref="Blockcore.EventBus.IEvent" />
99
public abstract class EventBase
1010
{
11-
/// <inheritdoc />
1211
public Guid CorrelationId { get; }
1312

1413
public EventBase()
@@ -22,4 +21,4 @@ public override string ToString()
2221
return $"{this.CorrelationId.ToString()} - {this.GetType().Name}";
2322
}
2423
}
25-
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Blockcore.Connection.Broadcasting;
4+
using NBitcoin;
5+
6+
namespace Blockcore.Interfaces
7+
{
8+
/// <summary>
9+
/// Allow to check a transaction is valid before broadcasting it.
10+
/// </summary>
11+
public interface IBroadcastCheck
12+
{
13+
Task<string> CheckTransaction(Transaction transaction);
14+
}
15+
16+
public interface IBroadcasterManager
17+
{
18+
Task BroadcastTransactionAsync(Transaction transaction);
19+
20+
BroadcastTransactionStateChanedEntry GetTransaction(uint256 transactionHash);
21+
22+
void AddOrUpdate(Transaction transaction, TransactionBroadcastState transactionBroadcastState, string errorMessage = null);
23+
}
24+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Linq;
3+
using Microsoft.Extensions.DependencyInjection;
4+
5+
namespace Blockcore.Utilities.Extensions
6+
{
7+
public static class ServiceCollectionExtensions
8+
{
9+
/// <summary>
10+
/// Converts a long that represents a number of bytes to be represented in MB.
11+
/// </summary>
12+
public static bool RemoveSingleton<T>(this IServiceCollection services)
13+
{
14+
// Remove the service if it exists.
15+
return services.Remove(services.Where(sd => sd.ServiceType == typeof(T)).FirstOrDefault());
16+
}
17+
}
18+
}

0 commit comments

Comments
 (0)