Skip to content

Commit

Permalink
[q4-working-branch] Q4 working branch bartosz (#3775)
Browse files Browse the repository at this point in the history
* Fixing test

* Increasing POW window

* Fixing errorneous test

* Some in-progress work

* Change genesis account

* Doesn't compile but we're getting there

* Further progress with desharding

* Further work with txns dispatching

* Further progress with desharding

* Comilation fix

---------

Co-authored-by: bzawisto <bartosz@zilliqa.com>
  • Loading branch information
bzawisto and bzawisto committed Sep 12, 2023
1 parent 65b7e0f commit 0b6fc8f
Show file tree
Hide file tree
Showing 31 changed files with 582 additions and 1,145 deletions.
3 changes: 0 additions & 3 deletions src/cmd/genaccounts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ int main(int argc, char** argv) {
while (true) {
PairOfKey keypair = Schnorr::GenKeyPair();
Address addr = CryptoUtils::GetAddressFromPubKey(keypair.second);
if (Transaction::GetShardIndex(addr, numShards) != targetShard) {
continue;
}
cout << "\t\t<account>" << endl;
cout << "\t\t\t<private_key>" << keypair.first << "</private_key>"
<< endl;
Expand Down
8 changes: 4 additions & 4 deletions src/common/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ const unsigned int DS_ANNOUNCEMENT_DELAY_IN_MS{
ReadConstantNumeric("DS_ANNOUNCEMENT_DELAY_IN_MS", "node.epoch_timing.")};
const unsigned int SHARD_ANNOUNCEMENT_DELAY_IN_MS{ReadConstantNumeric(
"SHARD_ANNOUNCEMENT_DELAY_IN_MS", "node.epoch_timing.")};
const unsigned int LOOKUP_DELAY_SEND_TXNPACKET_IN_MS{ReadConstantNumeric(
"LOOKUP_DELAY_SEND_TXNPACKET_IN_MS", "node.epoch_timing.")};
const unsigned int MICROBLOCK_TIMEOUT{
ReadConstantNumeric("MICROBLOCK_TIMEOUT", "node.epoch_timing.")};
const unsigned int NEW_NODE_SYNC_INTERVAL{
Expand Down Expand Up @@ -440,8 +438,10 @@ const unsigned int CONNECTION_ALL_TIMEOUT{
ReadConstantNumeric("CONNECTION_ALL_TIMEOUT", "node.jsonrpc.")};
const unsigned int CONNECTION_CALLBACK_TIMEOUT{
ReadConstantNumeric("CONNECTION_CALLBACK_TIMEOUT", "node.jsonrpc.")};
const size_t REQUEST_PROCESSING_THREADS{ReadConstantNumeric("REQUEST_PROCESSING_THREADS", "node.jsonrpc.", 64)};
const size_t REQUEST_QUEUE_SIZE{ReadConstantNumeric("REQUEST_QUEUE_SIZE", "node.jsonrpc.", 65536)};
const size_t REQUEST_PROCESSING_THREADS{
ReadConstantNumeric("REQUEST_PROCESSING_THREADS", "node.jsonrpc.", 64)};
const size_t REQUEST_QUEUE_SIZE{
ReadConstantNumeric("REQUEST_QUEUE_SIZE", "node.jsonrpc.", 65536)};

// Network composition constants
const unsigned int COMM_SIZE{
Expand Down
1 change: 0 additions & 1 deletion src/common/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ extern const unsigned int DELAY_FIRSTXNEPOCH_IN_MS;
extern const unsigned int FETCHING_MISSING_DATA_TIMEOUT;
extern const unsigned int DS_ANNOUNCEMENT_DELAY_IN_MS;
extern const unsigned int SHARD_ANNOUNCEMENT_DELAY_IN_MS;
extern const unsigned int LOOKUP_DELAY_SEND_TXNPACKET_IN_MS;
extern const unsigned int MICROBLOCK_TIMEOUT;
extern const unsigned int NEW_NODE_SYNC_INTERVAL;
extern const unsigned int POW_SUBMISSION_TIMEOUT;
Expand Down
23 changes: 0 additions & 23 deletions src/libData/AccountData/Transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,29 +313,6 @@ void Transaction::SetSignature(const Signature &signature) {
m_signature = signature;
}

unsigned int Transaction::GetShardIndex(const Address &fromAddr,
unsigned int numShards) {
uint32_t x = 0;

if (numShards == 0) {
LOG_GENERAL(WARNING, "numShards is 0 and trying to calculate shard index");
return 0;
}

// Take the last four bytes of the address
for (unsigned int i = 0; i < 4; i++) {
x = (x << 8) | fromAddr.asArray().at(ACC_ADDR_SIZE - 4 + i);
}

return x % numShards;
}

unsigned int Transaction::GetShardIndex(unsigned int numShards) const {
const auto &fromAddr = GetSenderAddr();

return GetShardIndex(fromAddr, numShards);
}

bool Transaction::Verify(const Transaction &tran) {
zbytes txnData;
tran.SerializeCoreFields(txnData, 0);
Expand Down
6 changes: 0 additions & 6 deletions src/libData/AccountData/Transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,9 @@ class Transaction : public SerializableDataBlock {
/// Returns the EC-Schnorr signature over the transaction data.
const Signature& GetSignature() const;

unsigned int GetShardIndex(unsigned int numShards) const;

/// Set the signature
void SetSignature(const Signature& signature);

/// Identifies the shard number that should process the transaction.
static unsigned int GetShardIndex(const Address& fromAddr,
unsigned int numShards);

enum ContractType {
NON_CONTRACT = 0,
CONTRACT_CREATION,
Expand Down
21 changes: 4 additions & 17 deletions src/libData/AccountStore/AccountStoreSC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
#include <chrono>


#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -250,8 +249,7 @@ bool AccountStoreSC::UpdateAccounts(
.blockTimestamp = extras.block_timestamp,
.blockDifficulty = extras.block_difficulty,
.contractType = Transaction::GetTransactionType(transaction),
.txnHash = transaction.GetTranID()
};
.txnHash = transaction.GetTranID()};

AccountStoreCpsInterface acCpsInterface{*this};
libCps::CpsExecutor cpsExecutor{acCpsInterface, receipt};
Expand All @@ -264,7 +262,6 @@ bool AccountStoreSC::UpdateAccounts(
}
error_code = cpsRunResult.txnStatus;


return cpsRunResult.isSuccess;
}

Expand Down Expand Up @@ -1541,18 +1538,6 @@ bool AccountStoreSC::ParseCallContractJsonOutput(const Json::Value &_json,
gasRemained -= SCILLA_RUNNER_INVOKE_GAS;
}

// check whether the recipient contract is in the same shard with the
// current contract
if (!m_curIsDS &&
(Transaction::GetShardIndex(curContractAddr, m_curNumShards) !=
Transaction::GetShardIndex(recipient, m_curNumShards))) {
LOG_GENERAL(WARNING,
"another contract doesn't belong to the same shard with "
"current contract");
receipt.AddError(CHAIN_CALL_DIFF_SHARD);
return false;
}

if (m_curEdges > MAX_CONTRACT_EDGES) {
LOG_GENERAL(
WARNING,
Expand Down Expand Up @@ -1691,7 +1676,9 @@ bool AccountStoreSC::TransferBalanceAtomic(const Address &from,
const Address &to,
const uint128_t &delta) {
// LOG_MARKER();
LOG_GENERAL(WARNING, "AccountStoreSC::TransferBalanceAtomicTransferBalanceAtomic from " << from << ", to: " << to << ", value: " << delta);
LOG_GENERAL(WARNING,
"AccountStoreSC::TransferBalanceAtomicTransferBalanceAtomic from "
<< from << ", to: " << to << ", value: " << delta);
return m_accountStoreAtomic->TransferBalance(from, to, delta);
}

Expand Down
14 changes: 8 additions & 6 deletions src/libDirectoryService/DSBlockPostProcessing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ void DirectoryService::UpdateMyDSModeAndConsensusId() {
// 2. My node was removed by the DS Committee due to lack of sufficient
// performance.
bool isDropout = true;
for (auto it = m_mediator.m_DSCommittee->begin();
it != m_mediator.m_DSCommittee->end(); ++it) {
for (auto it = m_mediator.m_DSCommittee->cbegin();
it != m_mediator.m_DSCommittee->cend(); ++it) {
// Look for my public key.
if (m_mediator.m_selfKey.second == it->first) {
m_consensusMyID = std::distance(m_mediator.m_DSCommittee->begin(), it);
m_consensusMyID = std::distance(m_mediator.m_DSCommittee->cbegin(), it);
isDropout = false;
break;
}
Expand Down Expand Up @@ -325,12 +325,14 @@ void DirectoryService::UpdateDSCommitteeComposition() {
LOG_MARKER();
std::lock_guard<mutex> g(m_mediator.m_mutexDSCommittee);

auto old_size = m_mediator.m_DSCommittee->size();
const bool leader = m_mediator.m_ds->GetConsensusMyID() ==
m_mediator.m_ds->GetConsensusMyID();
LOG_GENERAL(WARNING, "BZ UpdateDSCommitteeComposition enter, I am leader? : "
<< (leader ? "true" : "false"));

UpdateDSCommitteeCompositionCore(m_mediator.m_selfKey.second,
*m_mediator.m_DSCommittee,
m_mediator.m_dsBlockChain.GetLastBlock());
LOG_EXTRA("m_DSCommittee updated " << old_size << "->"
<< m_mediator.m_DSCommittee->size());
}

void DirectoryService::StartNextTxEpoch() {
Expand Down
6 changes: 3 additions & 3 deletions src/libDirectoryService/DSBlockPreProcessing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ bool DirectoryService::ProcessShardingStructure(
void DirectoryService::SaveDSPerformanceCore(
std::map<uint64_t, std::map<int32_t, std::vector<PubKey>>>&
coinbaseRewardees,
std::map<PubKey, uint32_t>& dsMemberPerformance, DequeOfNode& dsComm,
std::map<PubKey, uint32_t>& dsMemberPerformance, const DequeOfNode& dsComm,
uint64_t currentEpochNum, unsigned int numOfFinalBlock,
int finalblockRewardID) {
LOG_MARKER();
Expand Down Expand Up @@ -1511,7 +1511,7 @@ unsigned int DirectoryService::DetermineByzantineNodesCore(
unsigned int numOfProposedDSMembers,
std::vector<PubKey>& removeDSNodePubkeys, uint64_t currentEpochNum,
unsigned int numOfFinalBlock, double performanceThreshold,
unsigned int maxByzantineRemoved, DequeOfNode& dsComm,
unsigned int maxByzantineRemoved, const DequeOfNode& dsComm,
const std::map<PubKey, uint32_t>& dsMemberPerformance) {
LOG_MARKER();

Expand All @@ -1538,7 +1538,7 @@ unsigned int DirectoryService::DetermineByzantineNodesCore(
INFO, "threshold = " << threshold << " (" << performanceThreshold << ")");
unsigned int numByzantine = 0;
unsigned int index = 0;
for (auto it = dsComm.begin(); it != dsComm.end(); ++it) {
for (auto it = dsComm.cbegin(); it != dsComm.cend(); ++it) {
// Do not evaluate guard nodes.
if (GUARD_MODE && Guard::GetInstance().IsNodeInDSGuardList(it->first)) {
continue;
Expand Down
18 changes: 18 additions & 0 deletions src/libDirectoryService/DSComposition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ void UpdateDSCommitteeCompositionCore(const PubKey& selfKeyPub,
const auto& NewDSMembers = dsblock.GetHeader().GetDSPoWWinners();
unsigned int NumWinners = NewDSMembers.size();

LOG_GENERAL(WARNING, "BZ UpdateDSCommitteeCompositionCore enter, winners: "
<< NumWinners);

// Get the vector of all non-performant nodes to be removed.
const auto& removeDSNodePubkeys = dsblock.GetHeader().GetDSRemovePubKeys();

Expand Down Expand Up @@ -72,6 +75,9 @@ void UpdateDSCommitteeCompositionCore(const PubKey& selfKeyPub,
dsComm.emplace_back(*it);
dsComm.erase(it);

LOG_GENERAL(WARNING, "BZ Pushing node to the end: "
<< it->second.GetPrintableIPAddress());

continue;
}

Expand All @@ -84,23 +90,31 @@ void UpdateDSCommitteeCompositionCore(const PubKey& selfKeyPub,
// Peer() is required because my own node's network information is
// zeroed out.
dsComm.emplace_front(selfKeyPub, Peer());
LOG_GENERAL(WARNING, "BZ Myself Pushing non-guard to front: "
<< DSPowWinner.second.GetPrintableIPAddress());
} else {
// Calculate the position to insert the current winner.
it = dsComm.begin() + (Guard::GetInstance().GetNumOfDSGuard());
// Place my node's information in front of the DS Committee Community
// Nodes.
LOG_GENERAL(WARNING, "BZ Myself Pushing guard to proper position: "
<< DSPowWinner.second.GetPrintableIPAddress());
dsComm.emplace(it, selfKeyPub, Peer());
}
} else {
if (!GUARD_MODE) {
// Place the current winner node's information in front of the DS
// Committee.
dsComm.emplace_front(DSPowWinner);
LOG_GENERAL(WARNING, "BZ Other Pushing non-guard to front: "
<< DSPowWinner.second.GetPrintableIPAddress());
} else {
// Calculate the position to insert the current winner.
it = dsComm.begin() + (Guard::GetInstance().GetNumOfDSGuard());
// Place the winner's information in front of the DS Committee Community
// Nodes.
LOG_GENERAL(WARNING, "BZ Other Pushing guard to proper position: "
<< DSPowWinner.second.GetPrintableIPAddress());
dsComm.emplace(it, DSPowWinner);
}
}
Expand Down Expand Up @@ -132,10 +146,14 @@ void UpdateDSCommitteeCompositionCore(const PubKey& selfKeyPub,
}

if (LOOKUP_NODE_MODE && !bStoreDSCommittee) {
LOG_GENERAL(WARNING, "BZ Adding ejected node: "
<< dsComm.back().second.GetPrintableIPAddress());
minerInfo.m_dsNodesEjected.emplace_back(dsComm.back().first);
}

// Remove this node from blacklist if it exists
LOG_GENERAL(WARNING, "BZ Removing from dsComm node: "
<< dsComm.back().second.GetPrintableIPAddress());
Peer& p = dsComm.back().second;
Blacklist::GetInstance().Remove({p.GetIpAddress(),p.GetListenPortHost(),p.GetNodeIndentifier()});
dsComm.pop_back();
Expand Down
36 changes: 18 additions & 18 deletions src/libDirectoryService/DirectoryService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ bool DirectoryService::CheckState(Action action) {
return true;
}

uint32_t DirectoryService::GetNumShards() const {
lock_guard<mutex> g(m_mutexShards);

return m_shards.size();
}

bool DirectoryService::ProcessSetPrimary(
const zbytes& message, unsigned int offset,
[[gnu::unused]] const Peer& from,
Expand All @@ -206,6 +200,7 @@ bool DirectoryService::ProcessSetPrimary(
// Note: This function should only be invoked during bootstrap sequence
// Message = [Primary node IP] [Primary node port]
LOG_MARKER();
LOG_GENERAL(WARNING, "BZ: ProcessSetPrimary 1");

if (m_mediator.m_currentEpochNum > 1) {
// TODO: Get the IP address of who send this message, and deduct its
Expand Down Expand Up @@ -271,7 +266,7 @@ bool DirectoryService::ProcessSetPrimary(
// Load the DS committee, with my own peer set to dummy
m_mediator.m_lookup->SetDSCommitteInfo(true);
}

LOG_GENERAL(WARNING, "BZ: ProcessSetPrimary 2");
// Lets start the gossip as earliest as possible
if (BROADCAST_GOSSIP_MODE) {
VectorOfNode peers;
Expand Down Expand Up @@ -321,7 +316,7 @@ bool DirectoryService::ProcessSetPrimary(
Guard::GetInstance().AddDSGuardToBlacklistExcludeList(
*m_mediator.m_DSCommittee);
}

LOG_GENERAL(WARNING, "BZ: ProcessSetPrimary 3");
SetConsensusLeaderID(0);
if (m_mediator.m_currentEpochNum > 1) {
LOG_GENERAL(WARNING, "ProcessSetPrimary called in epoch "
Expand Down Expand Up @@ -350,11 +345,12 @@ bool DirectoryService::ProcessSetPrimary(
<< "][" << std::setw(6) << std::left << m_consensusMyID
<< "] DSBK");
}

LOG_GENERAL(WARNING, "BZ: ProcessSetPrimary 4");
if ((m_consensusMyID < POW_PACKET_SENDERS) ||
(primary == m_mediator.m_selfPeer)) {
m_powSubmissionWindowExpired = false;
LOG_GENERAL(INFO, "m_consensusMyID: " << m_consensusMyID);
LOG_GENERAL(WARNING, "BZ: ProcessSetPrimary 11");
LOG_EPOCH(INFO, m_mediator.m_currentEpochNum,
"Waiting " << POW_WINDOW_IN_SECONDS
<< " seconds, accepting PoW submissions...");
Expand All @@ -366,7 +362,7 @@ bool DirectoryService::ProcessSetPrimary(
this->SendPoWPacketSubmissionToOtherDSComm();
};
DetachedFunction(1, func);

LOG_GENERAL(WARNING, "BZ: ProcessSetPrimary 22");
LOG_EPOCH(INFO, m_mediator.m_currentEpochNum,
"Waiting " << POWPACKETSUBMISSION_WINDOW_IN_SECONDS
<< " seconds, accepting PoW submissions packet from "
Expand Down Expand Up @@ -1010,10 +1006,12 @@ bool DirectoryService::ProcessNewDSGuardNetworkInfo(
if (m_mediator.m_DSCommittee->at(indexOfDSGuard).first == dsGuardPubkey) {
foundDSGuardNode = true;

Blacklist::GetInstance().RemoveFromWhitelist({
m_mediator.m_DSCommittee->at(indexOfDSGuard).second.m_ipAddress,
m_mediator.m_DSCommittee->at(indexOfDSGuard).second.m_listenPortHost,
m_mediator.m_DSCommittee->at(indexOfDSGuard).second.GetNodeIndentifier()} );
Blacklist::GetInstance().RemoveFromWhitelist(
{m_mediator.m_DSCommittee->at(indexOfDSGuard).second.m_ipAddress,
m_mediator.m_DSCommittee->at(indexOfDSGuard)
.second.m_listenPortHost,
m_mediator.m_DSCommittee->at(indexOfDSGuard)
.second.GetNodeIndentifier()});
LOG_GENERAL(INFO,
"Removed "
<< m_mediator.m_DSCommittee->at(indexOfDSGuard).second
Expand All @@ -1027,9 +1025,10 @@ bool DirectoryService::ProcessNewDSGuardNetworkInfo(
dsGuardNewNetworkInfo;

if (GUARD_MODE) {
Blacklist::GetInstance().Whitelist({dsGuardNewNetworkInfo.m_ipAddress,
dsGuardNewNetworkInfo.m_listenPortHost,
dsGuardNewNetworkInfo.GetNodeIndentifier()});
Blacklist::GetInstance().Whitelist(
{dsGuardNewNetworkInfo.m_ipAddress,
dsGuardNewNetworkInfo.m_listenPortHost,
dsGuardNewNetworkInfo.GetNodeIndentifier()});
LOG_GENERAL(INFO, "Added ds guard " << dsGuardNewNetworkInfo
<< " to blacklist exclude list");
}
Expand Down Expand Up @@ -1207,7 +1206,8 @@ bool DirectoryService::Execute(const zbytes& message, unsigned int offset,
LOG_EPOCH(WARNING, m_mediator.m_currentEpochNum, "Ignore DS message");
return false;
}

LOG_GENERAL(WARNING,
"BZ Dispatching DS msg type: " << hex << (unsigned int)ins_byte);
if (ins_byte < ins_handlers_count) {
result =
(this->*ins_handlers[ins_byte])(message, offset + 1, from, startByte);
Expand Down
10 changes: 5 additions & 5 deletions src/libDirectoryService/DirectoryService.h
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ class DirectoryService : public Executable {
bool m_dsEpochAfterUpgrade = false;

// GetShards
uint32_t GetNumShards() const;
uint32_t GetNumShards() const { return 1; }
/// Force multicast when sending block to shard
std::atomic<bool> m_forceMulticast{};

Expand Down Expand Up @@ -713,14 +713,14 @@ class DirectoryService : public Executable {
static void SaveDSPerformanceCore(
std::map<uint64_t, std::map<int32_t, std::vector<PubKey>>>&
coinbaseRewardees,
std::map<PubKey, uint32_t>& dsMemberPerformance, DequeOfNode& dsComm,
uint64_t currentEpochNum, unsigned int numOfFinalBlock,
int finalblockRewardID);
std::map<PubKey, uint32_t>& dsMemberPerformance,
const DequeOfNode& dsComm, uint64_t currentEpochNum,
unsigned int numOfFinalBlock, int finalblockRewardID);
static unsigned int DetermineByzantineNodesCore(
unsigned int numOfProposedDSMembers,
std::vector<PubKey>& removeDSNodePubkeys, uint64_t currentEpochNum,
unsigned int numOfFinalBlock, double performanceThreshold,
unsigned int maxByzantineRemoved, DequeOfNode& dsComm,
unsigned int maxByzantineRemoved, const DequeOfNode& dsComm,
const std::map<PubKey, uint32_t>& dsMemberPerformance);

private:
Expand Down

0 comments on commit 0b6fc8f

Please sign in to comment.