Skip to content

Commit

Permalink
Merge #10286: Call wallet notify callbacks in scheduler thread (witho…
Browse files Browse the repository at this point in the history
…ut cs_main)

89f0312 Remove redundant pwallet nullptr check (Matt Corallo)
c4784b5 Add a dev notes document describing the new wallet RPC blocking (Matt Corallo)
3ea8b75 Give ZMQ consistent order with UpdatedBlockTip on scheduler thread (Matt Corallo)
cb06edf Fix wallet RPC race by waiting for callbacks in sendrawtransaction (Matt Corallo)
e545ded Also call other wallet notify callbacks in scheduler thread (Matt Corallo)
17220d6 Use callbacks to cache whether wallet transactions are in mempool (Matt Corallo)
5d67a78 Add calls to CWallet::BlockUntilSyncedToCurrentChain() in RPCs (Matt Corallo)
5ee3172 Add CWallet::BlockUntilSyncedToCurrentChain() (Matt Corallo)
0b2f42d Add CallFunctionInQueue to wait on validation interface queue drain (Matt Corallo)
2b4b345 Add ability to assert a lock is not held in DEBUG_LOCKORDER (Matt Corallo)
0343676 Call TransactionRemovedFromMempool in the CScheduler thread (Matt Corallo)
a7d3936 Add a CValidationInterface::TransactionRemovedFromMempool (Matt Corallo)

Pull request description:

  Based on #10179, this effectively reverts #9583, regaining most of the original speedups of #7946.

  This concludes the work of #9725, #10178, and #10179.

  See individual commit messages for more information.

Tree-SHA512: eead4809b0a75d1fb33b0765174ff52c972e45040635e38cf3686cef310859c1e6b3c00e7186cbd17374c6ae547bfbd6c1718fe36f26c76ba8a8b052d6ed7bc9
  • Loading branch information
laanwj committed Nov 15, 2017
2 parents aca77a4 + 89f0312 commit 927a1d7
Show file tree
Hide file tree
Showing 12 changed files with 354 additions and 28 deletions.
13 changes: 13 additions & 0 deletions doc/developer-notes.md
Expand Up @@ -675,3 +675,16 @@ A few guidelines for introducing and reviewing new RPC interfaces:

- *Rationale*: If a RPC response is not a JSON object then it is harder to avoid API breakage if
new data in the response is needed.

- Wallet RPCs call BlockUntilSyncedToCurrentChain to maintain consistency with
`getblockchaininfo`'s state immediately prior to the call's execution. Wallet
RPCs whose behavior does *not* depend on the current chainstate may omit this
call.

- *Rationale*: In previous versions of Bitcoin Core, the wallet was always
in-sync with the chainstate (by virtue of them all being updated in the
same cs_main lock). In order to maintain the behavior that wallet RPCs
return results as of at least the highest best-known block an RPC
client may be aware of prior to entering a wallet RPC call, we must block
until the wallet is caught up to the chainstate as of the RPC call's entry.
This also makes the API much easier for RPC clients to reason about.
2 changes: 2 additions & 0 deletions src/init.cpp
Expand Up @@ -261,6 +261,7 @@ void Shutdown()
#endif
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
GetMainSignals().UnregisterWithMempoolSignals(mempool);
#ifdef ENABLE_WALLET
CloseWallets();
#endif
Expand Down Expand Up @@ -1236,6 +1237,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));

GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
GetMainSignals().RegisterWithMempoolSignals(mempool);

/* Start the RPC server already. It will be started in "warmup" mode
* and not really process calls already (but it will signify connections
Expand Down
23 changes: 22 additions & 1 deletion src/rpc/rawtransaction.cpp
Expand Up @@ -11,6 +11,7 @@
#include "init.h"
#include "keystore.h"
#include "validation.h"
#include "validationinterface.h"
#include "merkleblock.h"
#include "net.h"
#include "policy/policy.h"
Expand All @@ -30,6 +31,7 @@
#include "wallet/wallet.h"
#endif

#include <future>
#include <stdint.h>

#include <univalue.h>
Expand Down Expand Up @@ -917,7 +919,9 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
);

ObserveSafeMode();
LOCK(cs_main);

std::promise<void> promise;

RPCTypeCheck(request.params, {UniValue::VSTR, UniValue::VBOOL});

// parse hex string from parameter
Expand All @@ -931,6 +935,8 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
if (!request.params[1].isNull() && request.params[1].get_bool())
nMaxRawTxFee = 0;

{ // cs_main scope
LOCK(cs_main);
CCoinsViewCache &view = *pcoinsTip;
bool fHaveChain = false;
for (size_t o = 0; !fHaveChain && o < tx->vout.size(); o++) {
Expand All @@ -952,10 +958,24 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
}
throw JSONRPCError(RPC_TRANSACTION_ERROR, state.GetRejectReason());
}
} else {
// If wallet is enabled, ensure that the wallet has been made aware
// of the new transaction prior to returning. This prevents a race
// where a user might call sendrawtransaction with a transaction
// to/from their wallet, immediately call some wallet RPC, and get
// a stale result because callbacks have not yet been processed.
CallFunctionInValidationInterfaceQueue([&promise] {
promise.set_value();
});
}
} else if (fHaveChain) {
throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain");
}

} // cs_main

promise.get_future().wait();

if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");

Expand All @@ -964,6 +984,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
{
pnode->PushInventory(inv);
});

return hashTx.GetHex();
}

Expand Down
10 changes: 10 additions & 0 deletions src/sync.cpp
Expand Up @@ -155,6 +155,16 @@ void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine,
abort();
}

void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs)
{
for (const std::pair<void*, CLockLocation>& i : *lockstack) {
if (i.first == cs) {
fprintf(stderr, "Assertion failed: lock %s held in %s:%i; locks held:\n%s", pszName, pszFile, nLine, LocksHeld().c_str());
abort();
}
}
}

void DeleteLock(void* cs)
{
if (!lockdata.available) {
Expand Down
3 changes: 3 additions & 0 deletions src/sync.h
Expand Up @@ -77,14 +77,17 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs
void LeaveCritical();
std::string LocksHeld();
void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
void DeleteLock(void* cs);
#else
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {}
void static inline LeaveCritical() {}
void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline DeleteLock(void* cs) {}
#endif
#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs)
#define AssertLockNotHeld(cs) AssertLockNotHeldInternal(#cs, __FILE__, __LINE__, &cs)

/**
* Wrapped mutex: supports recursive locking, but no waiting
Expand Down
3 changes: 3 additions & 0 deletions src/txmempool.h
Expand Up @@ -513,6 +513,9 @@ class CTxMemPool
// to track size/count of descendant transactions. First version of
// addUnchecked can be used to have it call CalculateMemPoolAncestors(), and
// then invoke the second version.
// Note that addUnchecked is ONLY called from ATMP outside of tests
// and any other callers may break wallet's in-mempool tracking (due to
// lack of CValidationInterface::TransactionAddedToMempool callbacks).
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, bool validFeeEstimate = true);
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, setEntries &setAncestors, bool validFeeEstimate = true);

Expand Down
2 changes: 1 addition & 1 deletion src/validation.cpp
Expand Up @@ -2494,7 +2494,7 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams,

for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, *trace.conflictedTxs);
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, trace.conflictedTxs);
}
}
// When we reach this point, we switched to a new tip (stored in pindexNewTip).
Expand Down
51 changes: 44 additions & 7 deletions src/validationinterface.cpp
Expand Up @@ -9,6 +9,7 @@
#include "primitives/block.h"
#include "scheduler.h"
#include "sync.h"
#include "txmempool.h"
#include "util.h"

#include <list>
Expand All @@ -21,6 +22,7 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef>&)> BlockConnected;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &)> BlockDisconnected;
boost::signals2::signal<void (const CTransactionRef &)> TransactionRemovedFromMempool;
boost::signals2::signal<void (const CBlockLocator &)> SetBestChain;
boost::signals2::signal<void (const uint256 &)> Inventory;
boost::signals2::signal<void (int64_t nBestBlockTime, CConnman* connman)> Broadcast;
Expand Down Expand Up @@ -50,6 +52,14 @@ void CMainSignals::FlushBackgroundCallbacks() {
m_internals->m_schedulerClient.EmptyQueue();
}

void CMainSignals::RegisterWithMempoolSignals(CTxMemPool& pool) {
pool.NotifyEntryRemoved.connect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2));
}

void CMainSignals::UnregisterWithMempoolSignals(CTxMemPool& pool) {
pool.NotifyEntryRemoved.disconnect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2));
}

CMainSignals& GetMainSignals()
{
return g_signals;
Expand All @@ -60,6 +70,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.m_internals->BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.m_internals->BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.m_internals->Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
Expand All @@ -75,6 +86,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.m_internals->BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.m_internals->BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
Expand All @@ -87,32 +99,57 @@ void UnregisterAllValidationInterfaces() {
g_signals.m_internals->TransactionAddedToMempool.disconnect_all_slots();
g_signals.m_internals->BlockConnected.disconnect_all_slots();
g_signals.m_internals->BlockDisconnected.disconnect_all_slots();
g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots();
g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots();
g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots();
}

void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) {
g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
}

void CMainSignals::MempoolEntryRemoved(CTransactionRef ptx, MemPoolRemovalReason reason) {
if (reason != MemPoolRemovalReason::BLOCK && reason != MemPoolRemovalReason::CONFLICT) {
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
m_internals->TransactionRemovedFromMempool(ptx);
});
}
}

void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
m_internals->m_schedulerClient.AddToProcessQueue([pindexNew, pindexFork, fInitialDownload, this] {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
});
}

void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
m_internals->TransactionAddedToMempool(ptx);
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
m_internals->TransactionAddedToMempool(ptx);
});
}

void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) {
m_internals->BlockConnected(pblock, pindex, vtxConflicted);
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>>& pvtxConflicted) {
m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] {
m_internals->BlockConnected(pblock, pindex, *pvtxConflicted);
});
}

void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &pblock) {
m_internals->BlockDisconnected(pblock);
m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] {
m_internals->BlockDisconnected(pblock);
});
}

void CMainSignals::SetBestChain(const CBlockLocator &locator) {
m_internals->SetBestChain(locator);
m_internals->m_schedulerClient.AddToProcessQueue([locator, this] {
m_internals->SetBestChain(locator);
});
}

void CMainSignals::Inventory(const uint256 &hash) {
m_internals->Inventory(hash);
m_internals->m_schedulerClient.AddToProcessQueue([hash, this] {
m_internals->Inventory(hash);
});
}

void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) {
Expand Down
70 changes: 62 additions & 8 deletions src/validationinterface.h
Expand Up @@ -6,10 +6,11 @@
#ifndef BITCOIN_VALIDATIONINTERFACE_H
#define BITCOIN_VALIDATIONINTERFACE_H

#include <memory>

#include "primitives/transaction.h" // CTransaction(Ref)

#include <functional>
#include <memory>

class CBlock;
class CBlockIndex;
struct CBlockLocator;
Expand All @@ -20,6 +21,8 @@ class CValidationInterface;
class CValidationState;
class uint256;
class CScheduler;
class CTxMemPool;
enum class MemPoolRemovalReason;

// These functions dispatch to one or all registered wallets

Expand All @@ -29,23 +32,66 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn);
void UnregisterValidationInterface(CValidationInterface* pwalletIn);
/** Unregister all wallets from core */
void UnregisterAllValidationInterfaces();
/**
* Pushes a function to callback onto the notification queue, guaranteeing any
* callbacks generated prior to now are finished when the function is called.
*
* Be very careful blocking on func to be called if any locks are held -
* validation interface clients may not be able to make progress as they often
* wait for things like cs_main, so blocking until func is called with cs_main
* will result in a deadlock (that DEBUG_LOCKORDER will miss).
*/
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);

class CValidationInterface {
protected:
/** Notifies listeners of updated block chain tip */
/**
* Notifies listeners of updated block chain tip
*
* Called on a background thread.
*/
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
/** Notifies listeners of a transaction having been added to mempool. */
/**
* Notifies listeners of a transaction having been added to mempool.
*
* Called on a background thread.
*/
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
/**
* Notifies listeners of a transaction leaving mempool.
*
* This only fires for transactions which leave mempool because of expiry,
* size limiting, reorg (changes in lock times/coinbase maturity), or
* replacement. This does not include any transactions which are included
* in BlockConnectedDisconnected either in block->vtx or in txnConflicted.
*
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
*
* Called on a background thread.
*/
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
/** Notifies listeners of a block being disconnected */
/**
* Notifies listeners of a block being disconnected
*
* Called on a background thread.
*/
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
/** Notifies listeners of the new active block chain on-disk. */
/**
* Notifies listeners of the new active block chain on-disk.
*
* Called on a background thread.
*/
virtual void SetBestChain(const CBlockLocator &locator) {}
/** Notifies listeners about an inventory item being seen on the network. */
/**
* Notifies listeners about an inventory item being seen on the network.
*
* Called on a background thread.
*/
virtual void Inventory(const uint256 &hash) {}
/** Tells listeners to broadcast their data. */
virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {}
Expand Down Expand Up @@ -73,6 +119,9 @@ class CMainSignals {
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);

void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason);

public:
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
Expand All @@ -82,9 +131,14 @@ class CMainSignals {
/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();

/** Register with mempool to call TransactionRemovedFromMempool callbacks */
void RegisterWithMempoolSignals(CTxMemPool& pool);
/** Unregister with mempool */
void UnregisterWithMempoolSignals(CTxMemPool& pool);

void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef &);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef> &);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>> &);
void BlockDisconnected(const std::shared_ptr<const CBlock> &);
void SetBestChain(const CBlockLocator &);
void Inventory(const uint256 &);
Expand Down

0 comments on commit 927a1d7

Please sign in to comment.