Skip to content

Commit

Permalink
Merge #2203: Call wallet/validation notify callbacks in scheduler thr…
Browse files Browse the repository at this point in the history
…ead (without cs_main)

3d11027 wallet:CreateCoinStake, solving IsSpent() missing cs_main lock. (furszy)
046386b IsNoteSaplingChange: Add missing cs_wallet lock. (furszy)
ded2e8e feature_dbcrash.py using blockmaxsize instead of blockmaxweight that we currently don't support. (furszy)
00cc6ec dumpwallet: Add missing BlockUntilSyncedToCurrentChain (furszy)
bfd9a15 test: sapling_fillblock.py sync mempool every 200 transactions instead of only at the end. (furszy)
53497f0 Validation: DisconnectTip doesn't need to force a flush to disk. (furszy)
f8cd371 [Miner] Sync wallet state before try to solve proof of stake. (furszy)
3ace13b qa: Fix some tests to work on native windows (furszy)
65cf7e1 don't attempt mempool entry for wallet transactions on startup if already in mempool (instagibbs)
756d0fa Handle rename failure in DumpMempool(...) by using RenameOver(...) return value (practicalswift)
1423dba [bugfix] save feeDelta instead of priorityDelta in DumpMempool (Alex Morcos)
d97ace9 [Test] notes_double_spend: sync wallet before check balances. (furszy)
1ed753f Fix wallet_tests.cpp, missing fInMempool flag set. (furszy)
815667d unit test framework: missing scheduler service loop start added. (furszy)
de3c7ae fix wallet_upgrade.py test, wasn't counting the coinbase script. (furszy)
e6770c8 fixing invalid wallet_dump.py, generated PoW blocks use a P2PKH coinbase script that now is properly marked as used inside the wallet. (furszy)
4ed7024 fix invalid numbers in wallet_labels.py (furszy)
b9249c5 Miner: generate RPC, fix coinbase script key not marked as used (furszy)
296c956 wallet: guard null m_last_block_processed (furszy)
0dfebf4 sapling_rpc_wallet_tests: remove unneeded cs_main and cs_wallet locks. (furszy)
c3a281c fix mempool_persist.py dump issue, missing sync with validation interface. (furszy)
67c754a qa: Sync with validationinterface queue in sync_mempools (MarcoFalke)
596056c [validation] Do not check for double spent zerocoins. (furszy)
0c4642c Add helper to wait for validation interface queue to catch up (Matt Corallo)
cc91d44 Block ActivateBestChain to empty validationinterface queue (Matt Corallo)
0c68e2f Add an interface to get the queue depth out of CValidationInterface (Matt Corallo)
31c7974 Decouple block processing cs_main lock from the rest of inv get data requests (furszy)
da7c0f7 Refactor ProcessGetData avoiding to lock cs_main for its entire time. (furszy)
10efbe5 net_processing: making PushTierTwoGetDataRequest return a bool in case of pushing the message. (furszy)
51dea23 net_processing move-only: decouple tier two get data request into its own function. (furszy)
1c9fe10 RPC: listunspent remove redundant wallet check (furszy)
4d927b0 Add a dev notes document describing the new wallet RPC blocking (Matt Corallo)
5f521fd Give ZMQ consistent order with UpdatedBlockTip on scheduler thread (Matt Corallo)
7d05997 Fix wallet RPC race by waiting for callbacks in sendrawtransaction (Matt Corallo)
c7ab490 Also call other wallet notify callbacks in scheduler thread (Matt Corallo)
31a8790 Use callbacks to cache whether wallet transactions are in mempool (Matt Corallo)
f6df6e4 Add calls to CWallet::BlockUntilSyncedToCurrentChain() in RPCs (furszy)
24a3ce4 Add CWallet::BlockUntilSyncedToCurrentChain() (Matt Corallo)
40ed4c4 Add CallFunctionInQueue to wait on validation interface queue drain (Matt Corallo)
268be9c Call TransactionRemovedFromMempool in the CScheduler thread (Matt Corallo)
1fa0d70 Add a CValidationInterface::TransactionRemovedFromMempool (Matt Corallo)

Pull request description:

  Concluding with the validation <--> wallet asynchronous signal processing work started in #2082, #2118, #2150, #2192, #2195.

  Effectively moving every validation interface callback to a background thread without locking `cs_main` for its entire process (each handler can now request `cs_main` lock only when/if they need it).

  This has a direct performance improvement on the synchronization time (which i haven't measured yet because there is one/two more PRs over the wallet and GUI areas, probably large as well, on top of this one and #2201 that should boost the sync time a lot more).

  Containing the following changes:

  * Adaptations coming from bitcoin#10286.
  * Adaptations coming from bitcoin#11824 (this one is different for us, take the base idea when you review it). Essentially solves a severe memory leak introduced previously in 10286 and improves `cs_main` lock acquisitions as well.
  * net_processing: decouple and refactor tier two inv data request processing.
  * bitcoin#12206

ACKs for top commit:
  random-zebra:
    Great job! 🍻 ACK 3d11027
  Fuzzbawls:
    ACK 3d11027

Tree-SHA512: 60a25604fb8a3ad0553ccb074aed99c1b3c6f8a765b40c1b43f25412373cbd2a9e4f0f413d45cf694bd62e48512c936099ffb7a0d23a1b97576cb33283ca05ac
  • Loading branch information
furszy committed Feb 26, 2021
2 parents d41c0da + 3d11027 commit 307d7b1
Show file tree
Hide file tree
Showing 31 changed files with 695 additions and 241 deletions.
13 changes: 13 additions & 0 deletions doc/developer-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,16 @@ A few guidelines for introducing and reviewing new RPC interfaces:
- *Exception*: Using RPC method aliases may be appropriate in cases where a
new RPC is replacing a deprecated RPC, to avoid both RPCs confusingly
showing up in the command list.
- Wallet RPCs call BlockUntilSyncedToCurrentChain to maintain consistency with
`getblockchaininfo`'s state immediately prior to the call's execution. Wallet
RPCs whose behavior does *not* depend on the current chainstate may omit this
call.
- *Rationale*: In previous versions of Bitcoin Core, the wallet was always
in-sync with the chainstate (by virtue of them all being updated in the
same cs_main lock). In order to maintain the behavior that wallet RPCs
return results as of at least the highest best-known block an RPC
client may be aware of prior to entering a wallet RPC call, we must block
until the wallet is caught up to the chainstate as of the RPC call's entry.
This also makes the API much easier for RPC clients to reason about.
4 changes: 4 additions & 0 deletions src/blockassembler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ bool SolveProofOfStake(CBlock* pblock, CBlockIndex* pindexPrev, CWallet* pwallet
{
boost::this_thread::interruption_point();
pblock->nBits = GetNextWorkRequired(pindexPrev, pblock);

// Sync wallet before create coinstake
pwallet->BlockUntilSyncedToCurrentChain();

CMutableTransaction txCoinStake;
int64_t nTxNewTime = 0;
if (!pwallet->CreateCoinStake(*pwallet, pindexPrev, pblock->nBits, txCoinStake, nTxNewTime, availableCoins)) {
Expand Down
2 changes: 2 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ void PrepareShutdown()
// Disconnect all slots
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
GetMainSignals().UnregisterWithMempoolSignals(mempool);

#ifndef WIN32
try {
Expand Down Expand Up @@ -1264,6 +1265,7 @@ bool AppInitMain()
threadGroup.create_thread(std::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));

GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
GetMainSignals().RegisterWithMempoolSignals(mempool);

// Initialize Sapling circuit parameters
LoadSaplingParams();
Expand Down
6 changes: 3 additions & 3 deletions src/miner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
double dHashesPerSec = 0.0;
int64_t nHPSTimerStart = 0;

std::unique_ptr<CBlockTemplate> CreateNewBlockWithKey(CReserveKey& reservekey, CWallet* pwallet)
std::unique_ptr<CBlockTemplate> CreateNewBlockWithKey(CReserveKey* reservekey, CWallet* pwallet)
{
CPubKey pubkey;
if (!reservekey.GetReservedKey(pubkey))
if (!reservekey->GetReservedKey(pubkey))
return nullptr;

const int nHeightNext = chainActive.Tip()->nHeight + 1;
Expand Down Expand Up @@ -169,7 +169,7 @@ void BitcoinMiner(CWallet* pwallet, bool fProofOfStake)

std::unique_ptr<CBlockTemplate> pblocktemplate((fProofOfStake ?
BlockAssembler(Params(), DEFAULT_PRINTPRIORITY).CreateNewBlock(CScript(), pwallet, true, &availableCoins) :
CreateNewBlockWithKey(*opReservekey, pwallet)));
CreateNewBlockWithKey(opReservekey.get_ptr(), pwallet)));
if (!pblocktemplate) continue;
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(pblocktemplate->block);

Expand Down
2 changes: 1 addition & 1 deletion src/miner.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static const bool DEFAULT_PRINTPRIORITY = false;
/** Run the miner threads */
void GenerateBitcoins(bool fGenerate, CWallet* pwallet, int nThreads);
/** Generate a new block, without valid proof-of-work */
std::unique_ptr<CBlockTemplate> CreateNewBlockWithKey(CReserveKey& reservekey, CWallet* pwallet);
std::unique_ptr<CBlockTemplate> CreateNewBlockWithKey(CReserveKey* reservekey, CWallet* pwallet);

void BitcoinMiner(CWallet* pwallet, bool fProofOfStake);
void ThreadStakeMinter();
Expand Down
345 changes: 192 additions & 153 deletions src/net_processing.cpp

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/rpc/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "utilmoneystr.h"
#include "utilstrencodings.h"
#include "hash.h"
#include "validationinterface.h"
#include "wallet/wallet.h"
#include "zpiv/zpivmodule.h"
#include "zpivchain.h"
Expand Down Expand Up @@ -48,6 +49,21 @@ static CUpdatedBlock latestblock;
extern void TxToJSON(const CTransaction& tx, const uint256 hashBlock, UniValue& entry);
void ScriptPubKeyToJSON(const CScript& scriptPubKey, UniValue& out, bool fIncludeHex);

UniValue syncwithvalidationinterfacequeue(const JSONRPCRequest& request)
{
if (request.fHelp || request.params.size() > 0) {
throw std::runtime_error(
"syncwithvalidationinterfacequeue\n"
"\nWaits for the validation interface queue to catch up on everything that was there when we entered this function.\n"
"\nExamples:\n"
+ HelpExampleCli("syncwithvalidationinterfacequeue","")
+ HelpExampleRpc("syncwithvalidationinterfacequeue","")
);
}
SyncWithValidationInterfaceQueue();
return NullUniValue;
}

double GetDifficulty(const CBlockIndex* blockindex)
{
// Floating point number that is a multiple of the minimum difficulty,
Expand Down Expand Up @@ -1448,6 +1464,7 @@ static const CRPCCommand commands[] =
{ "hidden", "waitfornewblock", &waitfornewblock, true },
{ "hidden", "waitforblock", &waitforblock, true },
{ "hidden", "waitforblockheight", &waitforblockheight, true },
{ "hidden", "syncwithvalidationinterfacequeue", &syncwithvalidationinterfacequeue, true },


};
Expand Down
13 changes: 11 additions & 2 deletions src/rpc/mining.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ UniValue generate(const JSONRPCRequest& request)

const Consensus::Params& consensus = Params().GetConsensus();
bool fPoS = consensus.NetworkUpgradeActive(nHeight + 1, Consensus::UPGRADE_POS);
std::unique_ptr<CReserveKey> reservekey;

if (fPoS) {
// If we are in PoS, wallet must be unlocked.
EnsureWalletIsUnlocked();
} else {
// Coinbase key
reservekey = MakeUnique<CReserveKey>(pwalletMain);
}

UniValue blockHashes(UniValue::VARR);
CReserveKey reservekey(pwalletMain);
unsigned int nExtraNonce = 0;

while (nHeight < nHeightEnd && !ShutdownRequested()) {
Expand All @@ -81,7 +84,7 @@ UniValue generate(const JSONRPCRequest& request)

std::unique_ptr<CBlockTemplate> pblocktemplate((fPoS ?
BlockAssembler(Params(), DEFAULT_PRINTPRIORITY).CreateNewBlock(CScript(), pwalletMain, true, &availableCoins) :
CreateNewBlockWithKey(reservekey, pwalletMain)));
CreateNewBlockWithKey(reservekey.get(), pwalletMain)));
if (!pblocktemplate.get()) break;
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(pblocktemplate->block);

Expand Down Expand Up @@ -114,6 +117,12 @@ UniValue generate(const JSONRPCRequest& request)
if (nGenerated == 0 || (!fPoS && nGenerated < nGenerate))
throw JSONRPCError(RPC_INTERNAL_ERROR, "Couldn't create new blocks");

// mark key as used, only for PoW coinbases
if (reservekey) {
// Remove key from key pool
reservekey->KeepKey();
}

return blockHashes;
}
#endif // ENABLE_WALLET
Expand Down
27 changes: 25 additions & 2 deletions src/rpc/rawtransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "core_io.h"
#include "init.h"
#include "keystore.h"
#include "validationinterface.h"
#include "net.h"
#include "policy/policy.h"
#include "primitives/transaction.h"
Expand All @@ -25,6 +26,7 @@
#include "wallet/wallet.h"
#endif

#include <future>
#include <stdint.h>

#include <univalue.h>
Expand Down Expand Up @@ -518,6 +520,10 @@ UniValue fundrawtransaction(const JSONRPCRequest& request)
if (!pwalletMain)
throw std::runtime_error("wallet not initialized");

// Make sure the results are valid at least up to the most recent block
// the user could have gotten from another RPC command prior to now
pwalletMain->BlockUntilSyncedToCurrentChain();

RPCTypeCheck(request.params, {UniValue::VSTR});

CTxDestination changeAddress = CNoDestination();
Expand Down Expand Up @@ -899,6 +905,8 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
"\nSend the transaction (signed hex)\n" + HelpExampleCli("sendrawtransaction", "\"signedhex\"") +
"\nAs a json rpc call\n" + HelpExampleRpc("sendrawtransaction", "\"signedhex\""));

std::promise<void> promise;

RPCTypeCheck(request.params, {UniValue::VSTR, UniValue::VBOOL});

// parse hex string from parameter
Expand All @@ -911,7 +919,8 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
if (request.params.size() > 1)
fOverrideFees = request.params[1].get_bool();

AssertLockNotHeld(cs_main);
{ // cs_main scope
LOCK(cs_main);
CCoinsViewCache& view = *pcoinsTip;
bool fHaveChain = false;
for (size_t o = 0; !fHaveChain && o < mtx.vout.size(); o++) {
Expand All @@ -923,7 +932,6 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
CValidationState state;
bool fMissingInputs;
{
LOCK(cs_main);
if (!AcceptToMemoryPool(mempool, state, MakeTransactionRef(std::move(mtx)), true, &fMissingInputs, false, !fOverrideFees)) {
if (state.IsInvalid()) {
throw JSONRPCError(RPC_TRANSACTION_REJECTED, strprintf("%i: %s", state.GetRejectCode(), state.GetRejectReason()));
Expand All @@ -933,11 +941,25 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
}
throw JSONRPCError(RPC_TRANSACTION_ERROR, state.GetRejectReason());
}
} else {
// If wallet is enabled, ensure that the wallet has been made aware
// of the new transaction prior to returning. This prevents a race
// where a user might call sendrawtransaction with a transaction
// to/from their wallet, immediately call some wallet RPC, and get
// a stale result because callbacks have not yet been processed.
CallFunctionInValidationInterfaceQueue([&promise] {
promise.set_value();
});
}
}
} else if (fHaveChain) {
throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain");
}

} // cs_main

promise.get_future().wait();

if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");

Expand All @@ -946,6 +968,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request)
{
pnode->PushInventory(inv);
});

return hashTx.GetHex();
}

Expand Down
2 changes: 1 addition & 1 deletion src/sapling/saplingscriptpubkeyman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ CAmount SaplingScriptPubKeyMan::GetShieldedChange(const CWalletTx& wtx) const

bool SaplingScriptPubKeyMan::IsNoteSaplingChange(const SaplingOutPoint& op, libzcash::SaplingPaymentAddress address) const
{
LOCK(wallet->cs_KeyStore);
LOCK2(wallet->cs_wallet, wallet->cs_KeyStore);
std::set<libzcash::PaymentAddress> shieldedAddresses = {address};
std::set<std::pair<libzcash::PaymentAddress, uint256>> nullifierSet = GetNullifiersForAddresses(shieldedAddresses);
return IsNoteSaplingChange(nullifierSet, address, op);
Expand Down
5 changes: 5 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,8 @@ void SingleThreadedSchedulerClient::EmptyQueue() {
should_continue = !m_callbacks_pending.empty();
}
}

size_t SingleThreadedSchedulerClient::CallbacksPending() {
LOCK(m_cs_callbacks_pending);
return m_callbacks_pending.size();
}
2 changes: 2 additions & 0 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class SingleThreadedSchedulerClient {
// Processes all remaining queue members on the calling thread, blocking until queue is empty
// Must be called after the CScheduler has no remaining processing threads!
void EmptyQueue();

size_t CallbacksPending();
};

#endif
19 changes: 14 additions & 5 deletions src/test/librust/sapling_rpc_wallet_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ BOOST_AUTO_TEST_CASE(rpc_wallet_getbalance)
{
SelectParams(CBaseChainParams::TESTNET);

LOCK2(cs_main, pwalletMain->cs_wallet);
{
LOCK(pwalletMain->cs_wallet);
pwalletMain->SetMinVersion(FEATURE_SAPLING);
pwalletMain->SetupSPKM(false);
}

BOOST_CHECK_THROW(CallRPC("getshieldbalance too many args"), std::runtime_error);
BOOST_CHECK_THROW(CallRPC("getshieldbalance invalidaddress"), std::runtime_error);
Expand Down Expand Up @@ -249,7 +253,11 @@ BOOST_AUTO_TEST_CASE(rpc_shieldsendmany_parameters)
{
SelectParams(CBaseChainParams::TESTNET);

LOCK2(cs_main, pwalletMain->cs_wallet);
{
LOCK(pwalletMain->cs_wallet);
pwalletMain->SetMinVersion(FEATURE_SAPLING);
pwalletMain->SetupSPKM(false);
}

BOOST_CHECK_THROW(CallRPC("shieldsendmany"), std::runtime_error);
BOOST_CHECK_THROW(CallRPC("shieldsendmany toofewargs"), std::runtime_error);
Expand Down Expand Up @@ -299,7 +307,6 @@ BOOST_AUTO_TEST_CASE(rpc_shieldsendmany_parameters)
std::vector<char> v (2 * (ZC_MEMO_SIZE+1)); // x2 for hexadecimal string format
std::fill(v.begin(),v.end(), 'A');
std::string badmemo(v.begin(), v.end());
pwalletMain->SetupSPKM(false);
auto pa = pwalletMain->GenerateNewSaplingZKey();
std::string zaddr1 = KeyIO::EncodePaymentAddress(pa);
BOOST_CHECK_THROW(CallRPC(std::string("shieldsendmany yBYhwgzufrZ6F5VVuK9nEChENArq934mqC ")
Expand Down Expand Up @@ -543,8 +550,10 @@ BOOST_AUTO_TEST_CASE(rpc_listshieldunspent_parameters)
{
SelectParams(CBaseChainParams::TESTNET);

LOCK2(cs_main, pwalletMain->cs_wallet);
pwalletMain->SetupSPKM(false);
{
LOCK(pwalletMain->cs_wallet);
pwalletMain->SetupSPKM(false);
}

UniValue retValue;

Expand Down
7 changes: 7 additions & 0 deletions src/test/test_pivx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ TestingSetup::TestingSetup()
fs::create_directories(pathTemp);
gArgs.ForceSetArg("-datadir", pathTemp.string());

// Start the lightweight task scheduler thread
CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(std::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));

// Note that because we don't bother running a scheduler thread here,
// callbacks via CValidationInterface are unreliable, but that's OK,
// our unit tests aren't testing multiple parts of the code at once.
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
GetMainSignals().RegisterWithMempoolSignals(mempool);

// Ideally we'd move all the RPC tests to the functional testing framework
// instead of unit tests, but for now we need these here.
Expand Down Expand Up @@ -87,7 +92,9 @@ TestingSetup::~TestingSetup()
threadGroup.interrupt_all();
threadGroup.join_all();
GetMainSignals().FlushBackgroundCallbacks();
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
GetMainSignals().UnregisterWithMempoolSignals(mempool);
UnloadBlockIndex();
delete pcoinsTip;
delete pcoinsdbview;
Expand Down
3 changes: 3 additions & 0 deletions src/txmempool.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,9 @@ class CTxMemPool
// to track size/count of descendant transactions. First version of
// addUnchecked can be used to have it call CalculateMemPoolAncestors(), and
// then invoke the second version.
// Note that addUnchecked is ONLY called from ATMP outside of tests
// and any other callers may break wallet's in-mempool tracking (due to
// lack of CValidationInterface::TransactionAddedToMempool callbacks).
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry& entry, bool fCurrentEstimate = true);
bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, setEntries &setAncestors, bool fCurrentEstimate = true);

Expand Down

0 comments on commit 307d7b1

Please sign in to comment.