Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate validator stream #293

Open
wants to merge 19 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/Lachain.Console/Application.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ public void Start(RunOptions options)

blockSynchronizer.Start();
Logger.LogInformation("Synchronizing blocks...");
var validators = validatorManager.GetValidatorsPublicKeys((long) blockManager.GetHeight()).ToList();
blockSynchronizer.SynchronizeWith(
validatorManager.GetValidatorsPublicKeys((long) blockManager.GetHeight())
.Where(key => !key.Equals(wallet.EcdsaKeyPair.PublicKey))
validators.Where(key => !key.Equals(wallet.EcdsaKeyPair.PublicKey))
);
Logger.LogInformation("Block synchronization finished, starting consensus...");
if (validators.Contains(wallet.EcdsaKeyPair.PublicKey))
{
networkManager.ConnectValidatorChannel(
validators.Where(key => !key.Equals(wallet.EcdsaKeyPair.PublicKey)).ToList()
);
}
consensusManager.Start(blockManager.GetHeight() + 1);
validatorStatusManager.Start(false);

Expand Down
27 changes: 26 additions & 1 deletion src/Lachain.Core/Blockchain/Validators/ValidatorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
using System.Collections.Generic;
using System.Linq;
using Lachain.Consensus;
using Lachain.Core.Blockchain.Interface;
using Lachain.Core.Blockchain.SystemContracts;
using Lachain.Crypto.ThresholdSignature;
using Lachain.Logger;
using Lachain.Networking;
using Lachain.Proto;
using Lachain.Storage.Repositories;
using NLog.Fluent;
Expand All @@ -14,12 +17,34 @@ namespace Lachain.Core.Blockchain.Validators
public class ValidatorManager : IValidatorManager
{
private readonly ISnapshotIndexRepository _snapshotIndexRepository;
private readonly INetworkManager _networkManager;
private static readonly ILogger<ValidatorManager> Logger = LoggerFactory.GetLoggerForClass<ValidatorManager>();
private readonly Dictionary<long, IReadOnlyCollection<ECDSAPublicKey>> _pubkeyCache = new Dictionary<long, IReadOnlyCollection<ECDSAPublicKey>>();

public ValidatorManager(ISnapshotIndexRepository snapshotIndexRepository)
public ValidatorManager(
ISnapshotIndexRepository snapshotIndexRepository,
IBlockManager blockManager,
INetworkManager networkManager
)
{
_snapshotIndexRepository = snapshotIndexRepository;
_networkManager = networkManager;

blockManager.OnBlockPersisted += OnBlockPersisted;
}

private void OnBlockPersisted(object? sender, Block block)
{
if (block.Header.Index % StakingContract.CycleDuration == 0)
{
var validators = _snapshotIndexRepository.GetSnapshotForBlock(block.Header.Index).Validators
.GetValidatorsPublicKeys().ToList();

var myPublicKey = _networkManager.MessageFactory.GetPublicKey();
if (validators.Contains(myPublicKey))
_networkManager.ConnectValidatorChannel(validators.Where(pubKey => !pubKey.Equals(myPublicKey)).ToList());
else _networkManager.DisconnectValidatorChannel();
}
}

public IPublicConsensusKeySet? GetValidators(long afterBlock)
Expand Down
4 changes: 2 additions & 2 deletions src/Lachain.Core/Network/NetworkManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace Lachain.Core.Network
{
public class NetworkManager : NetworkManagerBase
{
public const int MyVersion = 14;
public const int MinCompatiblePeerVersion = 14;
public const int MyVersion = 15;
public const int MinCompatiblePeerVersion = 15;

public NetworkManager(
IConfigManager configManager, IPrivateWallet privateWallet
Expand Down
15 changes: 14 additions & 1 deletion src/Lachain.Networking/Hub/ClientWorker.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using Google.Protobuf;
using Lachain.Consensus;
Expand Down Expand Up @@ -33,6 +34,7 @@ public class ClientWorker : IDisposable
private readonly HubConnector _hubConnector;
private readonly Thread _worker;
private bool _isConnected = false;
private bool _isValidator = false;
private int _eraMsgCounter;

private readonly C5.IntervalHeap<(NetworkMessagePriority, NetworkMessage)> _messageQueue
Expand Down Expand Up @@ -79,6 +81,11 @@ public void AddMsgToQueue(NetworkMessage message, NetworkMessagePriority priorit
}
}

public void SetValidator(bool isValidator)
{
_isValidator = isValidator;
}

private void Worker()
{
_isConnected = true;
Expand Down Expand Up @@ -128,7 +135,7 @@ private void Worker()
.Inc(message.CalculateSize());
}

_hubConnector.Send(PeerPublicKey, megaBatchBytes);
Send(megaBatchBytes);
_eraMsgCounter += 1;
}

Expand All @@ -142,6 +149,12 @@ private void Worker()
}
}

private void Send(byte[] megaBatchBytes)
{
if (_isValidator) _hubConnector.SendToValidator(PeerPublicKey, megaBatchBytes);
else _hubConnector.Send(PeerPublicKey, megaBatchBytes);
}

public void Dispose()
{
lock (_messageQueue)
Expand Down
20 changes: 20 additions & 0 deletions src/Lachain.Networking/Hub/HubConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ public void Send(byte[] publicKey, byte[] message)
CommunicationHub.Net.Hub.Send(publicKey, CompressUtils.DeflateCompress(message).ToArray());
}

public void SendToValidator(byte[] publicKey, byte[] message)
{
CommunicationHub.Net.Hub.SendToValidator(publicKey, CompressUtils.DeflateCompress(message).ToArray());
}

public void StartValidatorChannel(byte[] publicKyes)
{
CommunicationHub.Net.Hub.StartValidatorChannel(publicKyes);
}

public void StopValidatorChannel()
{
CommunicationHub.Net.Hub.StopValidatorChannel();
}

public void DisconnectValidators(byte[] publicKyes)
{
CommunicationHub.Net.Hub.DisconnectValidators(publicKyes);
}

public void Dispose()
{
_running = false;
Expand Down
2 changes: 2 additions & 0 deletions src/Lachain.Networking/INetworkManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface INetworkManager : IDisposable
IMessageFactory MessageFactory { get; }
void SendTo(ECDSAPublicKey publicKey, NetworkMessage message, NetworkMessagePriority priority);
void Start();
void ConnectValidatorChannel(List<ECDSAPublicKey> validators);
void DisconnectValidatorChannel();
void BroadcastLocalTransaction(TransactionReceipt receipt);
void AdvanceEra(ulong era);
Node LocalNode { get; }
Expand Down
104 changes: 103 additions & 1 deletion src/Lachain.Networking/NetworkManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Lachain.Logger;
using Lachain.Networking.Hub;
using Lachain.Proto;
using Lachain.Storage.Repositories;
using Lachain.Utility;
using Lachain.Utility.Utils;
using PingReply = Lachain.Proto.PingReply;
Expand All @@ -28,10 +29,13 @@ public abstract class NetworkManagerBase : INetworkManager, INetworkBroadcaster,
private readonly MessageFactory _messageFactory;
private readonly HubConnector _hubConnector;
private readonly ClientWorker _broadcaster;
private bool _started = false;

private readonly IDictionary<ECDSAPublicKey, ClientWorker> _clientWorkers =
new ConcurrentDictionary<ECDSAPublicKey, ClientWorker>();

private List<ECDSAPublicKey> _connectedValidators = new List<ECDSAPublicKey>();

protected NetworkManagerBase(NetworkConfig networkConfig, EcdsaKeyPair keyPair, byte[] hubPrivateKey,
int version, int minPeerVersion)
{
Expand Down Expand Up @@ -70,8 +74,106 @@ public void SendTo(ECDSAPublicKey publicKey, NetworkMessage message, NetworkMess

public void Start()
{
_broadcaster.Start();
_started = true;
_hubConnector.Start();
_broadcaster.Start();
}

public void ConnectValidatorChannel(List<ECDSAPublicKey> validators)
{
if (!_started) return;

validators = validators.OrderBy(
x => x, new ComparisonUtils.ECDSAPublicKeyComparer()).ToList();
_connectedValidators = _connectedValidators.OrderBy(
x => x, new ComparisonUtils.ECDSAPublicKeyComparer()).ToList();

var validatorsToDisconnect = RemovePublicKeys(_connectedValidators, validators);

if (validatorsToDisconnect.Count > 0)
{
foreach (var publicKey in validatorsToDisconnect)
{
GetClientWorker(publicKey)?.SetValidator(false);
}
_hubConnector.DisconnectValidators(
validatorsToDisconnect.Select(pubKey => pubKey.EncodeCompressed()).Flatten().ToArray()
);
Logger.LogTrace(
$"Disconnected validators: [{string.Join(", ", validatorsToDisconnect.Select(k => k.ToHex()))}] from validator channel"
);
}

var validatorsToConnect = RemovePublicKeys(validators, _connectedValidators);

if (validatorsToConnect.Count > 0)
{
_hubConnector.StartValidatorChannel(
validatorsToConnect.Select(pubKey => pubKey.EncodeCompressed()).Flatten().ToArray()
);
Logger.LogTrace(
$"Connected to validator channel with validators: [{string.Join(", ", validatorsToConnect.Select(k => k.ToHex()))}]"
);

foreach (var publicKey in validatorsToConnect)
{
GetClientWorker(publicKey)?.SetValidator(true);
}
}

lock (_connectedValidators)
{
_connectedValidators.Clear();
_connectedValidators = new List<ECDSAPublicKey>(validators);
}
}

public void DisconnectValidatorChannel()
{
if (!_started || _connectedValidators.Count == 0) return;

Logger.LogTrace("Disconnecting from validator channel");

foreach (var publicKey in _connectedValidators)
{
GetClientWorker(publicKey)?.SetValidator(false);
}
lock (_connectedValidators)
_connectedValidators.Clear();

_hubConnector.StopValidatorChannel();
Logger.LogTrace(
$"Disconnected validators: [{string.Join(", ", _connectedValidators.Select(k => k.ToHex()))}] from validator channel"
);
}

// both input lists need to be sorted and no duplicate element allowed
// removing items from source which is present in keysToRemove
// in O(source.Count + keysToRemove.Count) complexity
private List<ECDSAPublicKey> RemovePublicKeys(
List<ECDSAPublicKey> source,
List<ECDSAPublicKey> keysToRemove
)
{
var res = new List<ECDSAPublicKey>();
int iter = 0;
foreach (var publicKey in source)
{
bool found = false;
while (iter < keysToRemove.Count)
{
var key = keysToRemove[iter];
var compare = key.Buffer.Cast<IComparable<byte>>().
CompareLexicographically(publicKey.Buffer);
if (compare == 0) found = true;
else if (compare > 0) break;

iter++;
}

if (!found) res.Add(publicKey);
}
return res;
}

private ClientWorker? GetClientWorker(ECDSAPublicKey publicKey)
Expand Down
12 changes: 12 additions & 0 deletions src/Lachain.Utility/Utils/ComparisonUtils.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Lachain.Proto;

namespace Lachain.Utility.Utils
{
Expand All @@ -21,5 +23,15 @@ public static int CompareLexicographically<T>(this IEnumerable<IComparable<T>> x
if (res != 0) return res;
}
}

public class ECDSAPublicKeyComparer : IComparer<ECDSAPublicKey?>
{
public int Compare(ECDSAPublicKey? x, ECDSAPublicKey? y)
{
if (x is null) return y is null ? 0 : -1;
if (y is null) return 1;
return x.Buffer.Cast<IComparable<byte>>().CompareLexicographically(y.Buffer);
}
}
}
}
10 changes: 5 additions & 5 deletions test/Lachain.CoreTest/IntegrationTests/ValidatorStatusTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public ValidatorStatusTest()
containerBuilder.RegisterModule<BlockchainModule>();
containerBuilder.RegisterModule<ConfigModule>();
containerBuilder.RegisterModule<StorageModule>();
containerBuilder.RegisterModule<ConsensusModule>();
containerBuilder.RegisterModule<NetworkModule>();

_container = containerBuilder.Build();
}
Expand All @@ -69,6 +71,8 @@ public void Setup()
containerBuilder.RegisterModule<BlockchainModule>();
containerBuilder.RegisterModule<ConfigModule>();
containerBuilder.RegisterModule<StorageModule>();
containerBuilder.RegisterModule<ConsensusModule>();
containerBuilder.RegisterModule<NetworkModule>();
_container = containerBuilder.Build();
_blockManager = _container.Resolve<IBlockManager>();
_stateManager = _container.Resolve<IStateManager>();
Expand All @@ -84,11 +88,7 @@ public void Setup()
HardforkHeights.SetHardforkHeights(_configManager.GetConfig<HardforkConfig>("hardfork") ?? throw new InvalidOperationException());
StakingContract.Initialize(_configManager.GetConfig<NetworkConfig>("network")!);
}
_validatorStatusManager = new ValidatorStatusManager(
_transactionPool, _container.Resolve<ITransactionSigner>(), _container.Resolve<ITransactionBuilder>(),
_wallet, _stateManager, _container.Resolve<IValidatorAttendanceRepository>(),
_container.Resolve<ISystemContractReader>()
);
_validatorStatusManager = _container.Resolve<IValidatorStatusManager>();
}

[TearDown]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public void Setup()
containerBuilder.RegisterModule<BlockchainModule>();
containerBuilder.RegisterModule<ConfigModule>();
containerBuilder.RegisterModule<StorageModule>();
containerBuilder.RegisterModule<ConsensusModule>();
containerBuilder.RegisterModule<NetworkModule>();
_container = containerBuilder.Build();

_configManager = _container.Resolve<IConfigManager>();
Expand All @@ -61,11 +63,7 @@ public void Setup()
_localTransactionRepository = _container.Resolve<ILocalTransactionRepository>();
_privateWallet = _container.Resolve<IPrivateWallet>();
_transactionManager = _container.Resolve<ITransactionManager>();
_validatorStatusManager = _validatorStatusManager = new ValidatorStatusManager(
_transactionPool, _container.Resolve<ITransactionSigner>(), _container.Resolve<ITransactionBuilder>(),
_privateWallet, _stateManager, _container.Resolve<IValidatorAttendanceRepository>(),
_container.Resolve<ISystemContractReader>()
);
_validatorStatusManager = _container.Resolve<IValidatorStatusManager>();
ServiceBinder.BindService<GenericParameterAttributes>();
_fes = new FrontEndService(_stateManager, _transactionPool, _transactionSigner,
_systemContractReader, _localTransactionRepository, _validatorStatusManager, _privateWallet, _transactionManager);
Expand Down