Skip to content

Commit

Permalink
Speedup SideChain::get_outputs_blob()
Browse files Browse the repository at this point in the history
  • Loading branch information
SChernykh committed Aug 15, 2022
1 parent ea6a19a commit 3af6dcb
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/block_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void BlockCache::load_all(SideChain& side_chain, P2PServer& server)
continue;
}

if (block.deserialize(data + sizeof(uint32_t), n, side_chain) == 0) {
if (block.deserialize(data + sizeof(uint32_t), n, side_chain, uv_default_loop_checked()) == 0) {
server.add_cached_block(block);
++blocks_loaded;
}
Expand Down
4 changes: 2 additions & 2 deletions src/block_template.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ void BlockTemplate::update(const MinerData& data, const Mempool& mempool, Wallet
buf.insert(buf.end(), m_poolBlockTemplate->m_sideChainData.begin(), m_poolBlockTemplate->m_sideChainData.end());

PoolBlock check;
const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain());
const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain(), nullptr);
if (result != 0) {
LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result);
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ void BlockTemplate::submit_sidechain_block(uint32_t template_id, uint32_t nonce,
buf.insert(buf.end(), m_poolBlockTemplate->m_sideChainData.begin(), m_poolBlockTemplate->m_sideChainData.end());

PoolBlock check;
const int result = check.deserialize(buf.data(), buf.size(), side_chain);
const int result = check.deserialize(buf.data(), buf.size(), side_chain, nullptr);
if (result != 0) {
LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result);
}
Expand Down
2 changes: 1 addition & 1 deletion src/p2p_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size)
result = m_blockDeserializeResult;
}
else {
result = m_block->deserialize(buf, size, m_pool->side_chain());
result = m_block->deserialize(buf, size, m_pool->side_chain(), &m_loop);
m_blockDeserializeBuf.assign(buf, buf + size);
m_blockDeserializeResult = result;
m_lookForMissingBlocks = true;
Expand Down
2 changes: 1 addition & 1 deletion src/pool_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ struct PoolBlock
void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash);
void serialize_sidechain_data();

int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain);
int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop);
void reset_offchain_data();

bool get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash);
Expand Down
4 changes: 2 additions & 2 deletions src/pool_block_parser.inl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace p2pool {
// Since data here can come from external and possibly malicious sources, check everything
// Only the syntax (i.e. the serialized block binary format) and the keccak hash are checked here
// Semantics must also be checked elsewhere before accepting the block (PoW, reward split between miners, difficulty calculation and so on)
int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain)
int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop)
{
try {
// Sanity check
Expand Down Expand Up @@ -272,7 +272,7 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si
return __LINE__;
}

if ((num_outputs == 0) && !sidechain.get_outputs_blob(this, total_reward, outputs_blob)) {
if ((num_outputs == 0) && !sidechain.get_outputs_blob(this, total_reward, outputs_blob, loop)) {
return __LINE__;
}

Expand Down
72 changes: 71 additions & 1 deletion src/side_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ bool SideChain::get_block_blob(const hash& id, std::vector<uint8_t>& blob) const
return true;
}

bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob) const
bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob, uv_loop_t* loop) const
{
blob.clear();

Expand Down Expand Up @@ -719,6 +719,60 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v

const size_t n = tmpShares.size();

// Helper jobs call get_eph_public_key with indices in descending order
// Current thread will process indices in ascending order so when they meet, everything will be cached

std::atomic<int> counter = 0;
std::atomic<int> num_helper_jobs_finished = 0;
int num_helper_jobs_started = 0;

if (loop) {
constexpr size_t HELPER_JOBS_COUNT = 4;

struct Work
{
uv_work_t req;
const std::vector<MinerShare>& tmpShares;
const hash& txkeySec;
std::atomic<int>& counter;
std::atomic<int>& num_helper_jobs_finished;

// Fix MSVC warnings
Work() = delete;
Work& operator=(Work&&) = delete;
};

counter = static_cast<int>(n) - 1;
num_helper_jobs_started = HELPER_JOBS_COUNT;

for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) {
Work* w = new Work{ {}, tmpShares, block->m_txkeySec, counter, num_helper_jobs_finished };
w->req.data = w;

const int err = uv_queue_work(loop, &w->req,
[](uv_work_t* req)
{
Work* work = reinterpret_cast<Work*>(req->data);
hash eph_public_key;

int index;
while ((index = work->counter.fetch_sub(1)) >= 0) {
uint8_t view_tag;
work->tmpShares[index].m_wallet->get_eph_public_key(work->txkeySec, static_cast<size_t>(index), eph_public_key, view_tag);
}

++work->num_helper_jobs_finished;
delete work;
}, nullptr);

if (err) {
LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err));
--num_helper_jobs_started;
delete w;
}
}
}

blob.reserve(n * 39 + 64);

writeVarint(n, blob);
Expand All @@ -730,6 +784,13 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v

hash eph_public_key;
for (size_t i = 0; i < n; ++i) {
// stop helper jobs when they meet with current thread
const int c = counter.load();
if ((c >= 0) && (static_cast<int>(i) >= c)) {
// this will cause all helper jobs to finish immediately
counter = -1;
}

writeVarint(tmpRewards[i], blob);

blob.emplace_back(tx_type);
Expand All @@ -747,6 +808,15 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
block->m_outputs.emplace_back(tmpRewards[i], eph_public_key, tx_type, view_tag);
}

if (loop) {
// this will cause all helper jobs to finish immediately
counter = -1;

while (num_helper_jobs_finished < num_helper_jobs_started) {
std::this_thread::yield();
}
}

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/side_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SideChain : public nocopy_nomove
void watch_mainchain_block(const ChainMain& data, const hash& possible_id);

bool get_block_blob(const hash& id, std::vector<uint8_t>& blob) const;
bool get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob) const;
bool get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob, uv_loop_t* loop) const;

void print_status() const;
double get_reward_share(const Wallet& w) const;
Expand Down
4 changes: 2 additions & 2 deletions tests/src/pool_block_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ TEST(pool_block, deserialize)
f.read(reinterpret_cast<char*>(buf.data()), buf.size());
ASSERT_EQ(f.good(), true);

ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain), 0);
ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain, nullptr), 0);

ASSERT_EQ(b.m_mainChainData.size(), 5607);
ASSERT_EQ(b.m_mainChainHeaderSize, 43);
Expand Down Expand Up @@ -121,7 +121,7 @@ TEST(pool_block, verify)
p += sizeof(uint32_t);

ASSERT_TRUE(p + n <= e);
ASSERT_EQ(b.deserialize(p, n, sidechain), 0);
ASSERT_EQ(b.deserialize(p, n, sidechain, nullptr), 0);
p += n;

sidechain.add_block(b);
Expand Down

0 comments on commit 3af6dcb

Please sign in to comment.