Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 73 additions & 42 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,75 @@
#include <unordered_map>
#include <utility>

#include <boost/signals2/signal.hpp>

struct ValidationInterfaceConnections {
boost::signals2::scoped_connection UpdatedBlockTip;
boost::signals2::scoped_connection TransactionAddedToMempool;
boost::signals2::scoped_connection BlockConnected;
boost::signals2::scoped_connection BlockDisconnected;
boost::signals2::scoped_connection TransactionRemovedFromMempool;
boost::signals2::scoped_connection ChainStateFlushed;
boost::signals2::scoped_connection BlockChecked;
boost::signals2::scoped_connection NewPoWValidBlock;
};

//! The MainSignalsInstance manages a list of shared_ptr<CValidationInterface>
//! callbacks.
//!
//! A std::unordered_map is used to track what callbacks are currently
//! registered, and a std::list is to used to store the callbacks that are
//! currently registered as well as any callbacks that are just unregistered
//! and about to be deleted when they are done executing.
struct MainSignalsInstance {
boost::signals2::signal<void (const CBlockIndex *, const CBlockIndex *, bool fInitialDownload)> UpdatedBlockTip;
boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex)> BlockConnected;
boost::signals2::signal<void (const std::shared_ptr<const CBlock>&, const CBlockIndex* pindex)> BlockDisconnected;
boost::signals2::signal<void (const CTransactionRef &)> TransactionRemovedFromMempool;
boost::signals2::signal<void (const CBlockLocator &)> ChainStateFlushed;
boost::signals2::signal<void (const CBlock&, const BlockValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;

private:
Mutex m_mutex;
Copy link
Member

@sipa sipa Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all of these member variables/types need to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

Do all of these member variables/types need to be public?

No none do, added private/public sections

//! List entries consist of a callback pointer and reference count. The
//! count is equal to the number of current executions of that entry, plus 1
//! if it's registered. It cannot be 0 because that would imply it is
//! unregistered and also not being executed (so shouldn't exist).
struct ListEntry { std::shared_ptr<CValidationInterface> callbacks; int count = 1; };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit int count{1};

std::list<ListEntry> m_list GUARDED_BY(m_mutex);
std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex);

public:
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;
std::unordered_map<CValidationInterface*, ValidationInterfaceConnections> m_connMainSignals;

explicit MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}

void Register(std::shared_ptr<CValidationInterface> callbacks)
Copy link
Contributor

@promag promag Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, could avoid incrementing usage count - receive reference.

{
LOCK(m_mutex);
auto inserted = m_map.emplace(callbacks.get(), m_list.end());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we add the same twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

So we add the same twice?

There's two emplaces because of the map and list (if that's the question)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this is allowing adding the same callbacks. Why not assert(inserted.second)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

I mean this is allowing adding the same callbacks. Why not assert(inserted.second)?

Unit tests fail if the unregister function is not idempotent, so it seems good to me that the register function is idempotent as well. I could imagine an idempotent API here making calling code simpler, even though I could also imagine an assert catching potential bugs. I think asserts tend to work a better at short range within a module instead of being used to make an API rigid in an attempt to catch external bugs. But I don't think there is a right answer here and would be ok with a PR that took a different approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think these shouldn't be idempotent because makes sense to call just once. For instance std::fstream::open is not idempotent, fails if file is already opened. I'd rather fix unit tests. Just an opinion and should not prevent this change going forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

Personally I think these shouldn't be idempotent because makes sense to call just once. For instance std::fstream::open is not idempotent, fails if file is already opened. I'd rather fix unit tests. Just an opinion and should not prevent this change going forward.

Feel free to open a followup. If I wanted to change calling code not to use the same pointer more than once, I would change these to return bool and have calling code assert that they return true, rather having these crash on cases they can reasonably handle and making assumptions about calling code. I do find API's like std::map's insert/erase that are idempotent convenient for avoiding boilerplate code, so that's another reason I like the keeping the boost behavior beyond not wanting to increase scope of this PR and not liking asserts across an api boundary

if (inserted.second) inserted.first->second = m_list.emplace(m_list.end());
inserted.first->second->callbacks = std::move(callbacks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why callbacks are moved into the m_list always, but not only when an insertion into m_map occurs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

Why callbacks are moved into the m_list always, but not only when an insertion into m_map occurs?

Shouldn't make a difference in practice, but it seemed better to me to call operator=(&&) and do empty destroy on the RHS object consistently instead of having different call sequences depending on a basically unrelated condition. Also, this will probably never matter but maybe always updating could be useful and worth guaranteeing in the future if there's internal property of the pointer worth updating like a new custom deleter.

}

void Unregister(CValidationInterface* callbacks)
{
LOCK(m_mutex);
auto it = m_map.find(callbacks);
if (it != m_map.end()) {
if (!--it->second->count) m_list.erase(it->second);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, == 0.

m_map.erase(it);
}
}

//! Clear unregisters every previously registered callback, erasing every
//! map entry. After this call, the list may still contain callbacks that
//! are currently executing, but it will be cleared when they are done
//! executing.
void Clear()
{
LOCK(m_mutex);
for (auto it = m_list.begin(); it != m_list.end();) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct that this iterates over the entire list? I think it should only iterate over entries that are in the map (the count of those that are already unregistered shouldn't be decremented further).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested fix here: #18551

it = --it->count ? std::next(it) : m_list.erase(it);
}
m_map.clear();
}

template<typename F> void Iterate(F&& f)
{
WAIT_LOCK(m_mutex, lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about local copy and then iterate it lock free?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

How about local copy and then iterate it lock free?

I'm not sure what approach could be entirely lock free since the list is global and can be modified from any thread.

I just implemented something simple that can be changed and optimized in the future. The lock is not held when calling CValidation interface methods. It avoids copying so allocations aren't needed just to iterate the list. Assumption is that the list is iterated frequently and modified infrequently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So at best case it would be lock free I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I wanted to delete my own comment, but it seems I misclicked and deleted @promag's. Restoring from mail:

Assumption is that the list is iterated frequently and modified infrequently

Right. What I had in mind is 2 lists and an atomic book list_changed. On iterating it would then sync the list with the mutex locked.

Copy link
Contributor

@promag promag Apr 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) I was wondering what happened. So draft:

if (list_changed) {
  lock;
  iterate_list = list;
  list_changed = false
}
for (f : iterate_list) f()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

Right. What I had in mind is 2 lists and an atomic book list_changed. On iterating it would then sync the list with the mutex locked.

I don't understand the suggestion from the pseudocode. I don't love the idea of having multiple lists, but even if you have them, I don't see how you avoid locks copying the list if you do create copies, or avoid needing a condition variable to delay modifying the list if you don't create copies. I'm probably just making an incorrect assumption about what you want to accomplish here. Feel free to post sample code, or just change the implementation in an alternate PR or followup PR. I'd be especially interested if it's a simplification and not just an optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not possible that there are two invocations of Iterate simultaneously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #18524 (comment)

Is it not possible that there are two invocations of Iterate simultaneously?

It shouldn't be the most common thing but should be possible because notifications like CMainSignals::BlockChecked and CMainSignals::NewPoWValidBlock run on calling thread. Other notifications are sent from the scheduler so shouldn't happen simultaneously.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not possible that there are two invocations of Iterate simultaneously?

My previous suggestion isn't possible in this case.

for (auto it = m_list.begin(); it != m_list.end();) {
++it->count;
{
REVERSE_LOCK(lock);
f(*it->callbacks);
}
it = --it->count ? std::next(it) : m_list.erase(it);
}
}
};

static CMainSignals g_signals;
Expand Down Expand Up @@ -78,15 +117,7 @@ CMainSignals& GetMainSignals()
void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> pwalletIn) {
// Each connection captures pwalletIn to ensure that each callback is
// executed before pwalletIn is destroyed. For more details see #18338.
ValidationInterfaceConnections& conns = g_signals.m_internals->m_connMainSignals[pwalletIn.get()];
conns.UpdatedBlockTip = g_signals.m_internals->UpdatedBlockTip.connect(std::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
conns.TransactionAddedToMempool = g_signals.m_internals->TransactionAddedToMempool.connect(std::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, std::placeholders::_1));
conns.BlockConnected = g_signals.m_internals->BlockConnected.connect(std::bind(&CValidationInterface::BlockConnected, pwalletIn, std::placeholders::_1, std::placeholders::_2));
conns.BlockDisconnected = g_signals.m_internals->BlockDisconnected.connect(std::bind(&CValidationInterface::BlockDisconnected, pwalletIn, std::placeholders::_1, std::placeholders::_2));
conns.TransactionRemovedFromMempool = g_signals.m_internals->TransactionRemovedFromMempool.connect(std::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, std::placeholders::_1));
conns.ChainStateFlushed = g_signals.m_internals->ChainStateFlushed.connect(std::bind(&CValidationInterface::ChainStateFlushed, pwalletIn, std::placeholders::_1));
conns.BlockChecked = g_signals.m_internals->BlockChecked.connect(std::bind(&CValidationInterface::BlockChecked, pwalletIn, std::placeholders::_1, std::placeholders::_2));
conns.NewPoWValidBlock = g_signals.m_internals->NewPoWValidBlock.connect(std::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, std::placeholders::_1, std::placeholders::_2));
g_signals.m_internals->Register(std::move(pwalletIn));
}

void RegisterValidationInterface(CValidationInterface* callbacks)
Expand All @@ -103,15 +134,15 @@ void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> c

void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
if (g_signals.m_internals) {
g_signals.m_internals->m_connMainSignals.erase(pwalletIn);
g_signals.m_internals->Unregister(pwalletIn);
}
}

void UnregisterAllValidationInterfaces() {
if (!g_signals.m_internals) {
return;
}
g_signals.m_internals->m_connMainSignals.clear();
g_signals.m_internals->Clear();
}

void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) {
Expand Down Expand Up @@ -151,7 +182,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
// in the same critical section where the chain is updated

auto event = [pindexNew, pindexFork, fInitialDownload, this] {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: new block hash=%s fork block hash=%s (in IBD=%s)", __func__,
pindexNew->GetBlockHash().ToString(),
Expand All @@ -161,7 +192,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd

void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
auto event = [ptx, this] {
m_internals->TransactionAddedToMempool(ptx);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(ptx); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
ptx->GetHash().ToString(),
Expand All @@ -170,7 +201,7 @@ void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {

void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef &ptx) {
auto event = [ptx, this] {
m_internals->TransactionRemovedFromMempool(ptx);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(ptx); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
ptx->GetHash().ToString(),
Expand All @@ -179,7 +210,7 @@ void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef &ptx) {

void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
auto event = [pblock, pindex, this] {
m_internals->BlockConnected(pblock, pindex);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockConnected(pblock, pindex); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: block hash=%s block height=%d", __func__,
pblock->GetHash().ToString(),
Expand All @@ -189,7 +220,7 @@ void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, c
void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
{
auto event = [pblock, pindex, this] {
m_internals->BlockDisconnected(pblock, pindex);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockDisconnected(pblock, pindex); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: block hash=%s block height=%d", __func__,
pblock->GetHash().ToString(),
Expand All @@ -198,7 +229,7 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock

void CMainSignals::ChainStateFlushed(const CBlockLocator &locator) {
auto event = [locator, this] {
m_internals->ChainStateFlushed(locator);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.ChainStateFlushed(locator); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: block hash=%s", __func__,
locator.IsNull() ? "null" : locator.vHave.front().ToString());
Expand All @@ -207,10 +238,10 @@ void CMainSignals::ChainStateFlushed(const CBlockLocator &locator) {
void CMainSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
LOG_EVENT("%s: block hash=%s state=%s", __func__,
block.GetHash().ToString(), state.ToString());
m_internals->BlockChecked(block, state);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockChecked(block, state); });
}

void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString());
m_internals->NewPoWValidBlock(pindex, block);
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); });
}
4 changes: 1 addition & 3 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ class CValidationInterface {
* Notifies listeners that a block which builds directly on our current tip
* has been received and connected to the headers tree, though not validated yet */
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
friend void ::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface>);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
friend class CMainSignals;
};

struct MainSignalsInstance;
Expand Down