Skip to content

Commit

Permalink
Merge #2207: Use non-atomic flushing with block replay
Browse files Browse the repository at this point in the history
aab15d7 ReplayBlocks: use find instead of brackets operator to access to the element. (furszy)
e898353 [Refactoring] Use const CBlockIndex* where appropriate (random-zebra)
c76fa04 qa: Extract rpc_timewait as test param (furszy)
0f832e3 shutdown: Stop threads before resetting ptrs (MarcoFalke)
67aebbf http: Remove numThreads and ThreadCounter (Wladimir J. van der Laan)
e24c710 http: Remove WaitExit from WorkQueue (Wladimir J. van der Laan)
b8f7364 http: Join worker threads before deleting work queue (Wladimir J. van der Laan)
7d68769 rpc: further constrain the libevent workaround (Cory Fields)
75af065 rpc: work-around an upstream libevent bug (Cory Fields)
50e5833 Always return true if AppInitMain got to the end (Matt Corallo)
bd70dcc [qa] Test non-atomic chainstate writes (furszy)
8f04970 Dont create pcoinsTip until after ReplayBlocks. (Matt Corallo)
93f2b15 Random db flush crash simulator (Pieter Wuille)
72f3b17 Adapt memory usage estimation for flushing (Pieter Wuille)
8540113 Non-atomic flushing using the blockchain as replay journal (Pieter Wuille)
8d6625f [MOVEONLY] Move LastCommonAncestor to chain (Pieter Wuille)

Pull request description:

  > This patch adds an extra "head blocks" to the chainstate, which gives the range of blocks for writes may be incomplete. At the start of a flush, we write this record, write the dirty dbcache entries in 16 MiB batches, and at the end we remove the heads record again. If it is present at startup it means we crashed during flush, and we rollback/roll forward blocks inside of it to get a consistent tip on disk before proceeding.

  > If a flush completes succesfully, the resulting database is compatible with previous versions. If the node crashes in the middle of a flush, a version of the code with this patch is needed to recovery.

  An adaptation of the following PRs with further modifications to the `feature_dbcrash.py` test to be up-to-date with upstream and solve RPC related bugs.

  * bitcoin#10148.
  * Increase RPC wait time.
  * bitcoin#11831
  * bitcoin#11593
  * bitcoin#12366
  * bitcoin#13837
  * bitcoin#13894

ACKs for top commit:
  random-zebra:
    ACK aab15d7
  Fuzzbawls:
    ACK aab15d7

Tree-SHA512: 898806746f581a9eb8deb0155c558481abf4454c6f3b3c3ad505c557938d0700fe9796e98e36492286ae869378647072c3ad77ad65e9dd7075108ff96469ade1
  • Loading branch information
random-zebra committed Feb 21, 2021
2 parents 8a65d7d + aab15d7 commit ac52366
Show file tree
Hide file tree
Showing 19 changed files with 572 additions and 119 deletions.
2 changes: 1 addition & 1 deletion contrib/devtools/check-doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
REGEX_ARG = re.compile(r'(?:map(?:Multi)?Args(?:\.count\(|\[)|Get(?:Bool)?Arg\()\"(\-[^\"]+?)\"')
REGEX_DOC = re.compile(r'HelpMessageOpt\(\"(\-[^\"=]+?)(?:=|\")')
# list unsupported, deprecated and duplicate args as they need no documentation
SET_DOC_OPTIONAL = set(['-rpcssl', '-benchmark', '-h', '-help', '-socks', '-tor', '-debugnet', '-whitelistalwaysrelay', '-prematurewitness', '-walletprematurewitness', '-promiscuousmempoolflags', '-blockminsize', '-sendfreetransactions', '-checklevel', '-liquidityprovider', '-anonymizepivxamount'])
SET_DOC_OPTIONAL = set(['-rpcssl', '-benchmark', '-h', '-help', '-socks', '-tor', '-debugnet', '-whitelistalwaysrelay', '-prematurewitness', '-walletprematurewitness', '-promiscuousmempoolflags', '-blockminsize', '-sendfreetransactions', '-checklevel', '-liquidityprovider', '-anonymizepivxamount', '-dbcrashratio'])

def main():
used = check_output(CMD_GREP_ARGS, shell=True, universal_newlines=True)
Expand Down
22 changes: 19 additions & 3 deletions src/chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,24 @@ bool CBlockIndex::RaiseValidity(enum BlockStatus nUpTo)
return false;
}

/*
* CBlockIndex - Legacy Zerocoin
*/
/** Find the last common ancestor two blocks have.
* Both pa and pb must be non-NULL. */
const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb)
{
if (pa->nHeight > pb->nHeight) {
pa = pa->GetAncestor(pb->nHeight);
} else if (pb->nHeight > pa->nHeight) {
pb = pb->GetAncestor(pa->nHeight);
}

while (pa != pb && pa && pb) {
pa = pa->pprev;
pb = pb->pprev;
}

// Eventually all chain branches meet at the genesis block.
assert(pa == pb);
return pa;
}


3 changes: 3 additions & 0 deletions src/chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ class CBlockIndex
const CBlockIndex* GetAncestor(int height) const;
};

/** Find the forking point between two chain tips. */
const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb);

/** Used to marshal pointers into hashes for db storage. */

// New serialization introduced with 4.0.99
Expand Down
7 changes: 5 additions & 2 deletions src/coins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
bool CCoinsView::GetCoin(const COutPoint& outpoint, Coin& coin) const { return false; }
bool CCoinsView::HaveCoin(const COutPoint& outpoint) const { return false; }
uint256 CCoinsView::GetBestBlock() const { return UINT256_ZERO; }
std::vector<uint256> CCoinsView::GetHeadBlocks() const { return std::vector<uint256>(); }
CCoinsViewCursor *CCoinsView::Cursor() const { return 0; }

bool CCoinsView::BatchWrite(CCoinsMap& mapCoins,
Expand All @@ -33,6 +34,7 @@ CCoinsViewBacked::CCoinsViewBacked(CCoinsView* viewIn) : base(viewIn) {}
bool CCoinsViewBacked::GetCoin(const COutPoint& outpoint, Coin& coin) const { return base->GetCoin(outpoint, coin); }
bool CCoinsViewBacked::HaveCoin(const COutPoint& outpoint) const { return base->HaveCoin(outpoint); }
uint256 CCoinsViewBacked::GetBestBlock() const { return base->GetBestBlock(); }
std::vector<uint256> CCoinsViewBacked::GetHeadBlocks() const { return base->GetHeadBlocks(); }
void CCoinsViewBacked::SetBackend(CCoinsView& viewIn) { base = &viewIn; }
CCoinsViewCursor *CCoinsViewBacked::Cursor() const { return base->Cursor(); }
size_t CCoinsViewBacked::EstimateSize() const { return base->EstimateSize(); }
Expand Down Expand Up @@ -111,13 +113,14 @@ void CCoinsViewCache::AddCoin(const COutPoint& outpoint, Coin&& coin, bool possi
cachedCoinsUsage += it->second.coin.DynamicMemoryUsage();
}

void AddCoins(CCoinsViewCache& cache, const CTransaction& tx, int nHeight)
void AddCoins(CCoinsViewCache& cache, const CTransaction& tx, int nHeight, bool check)
{
bool fCoinbase = tx.IsCoinBase();
bool fCoinstake = tx.IsCoinStake();
const uint256& txid = tx.GetHash();
for (size_t i = 0; i < tx.vout.size(); ++i) {
cache.AddCoin(COutPoint(txid, i), Coin(tx.vout[i], nHeight, fCoinbase, fCoinstake), false);
bool overwrite = check && cache.HaveCoin(COutPoint(txid, i));
cache.AddCoin(COutPoint(txid, i), Coin(tx.vout[i], nHeight, fCoinbase, fCoinstake), overwrite);
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/coins.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ class CCoinsView
//! Retrieve the block hash whose state this CCoinsView currently represents
virtual uint256 GetBestBlock() const;

//! Retrieve the range of blocks that may have been only partially written.
//! If the database is in a consistent state, the result is the empty vector.
//! Otherwise, a two-element vector is returned consisting of the new and
//! the old block hash, in that order.
virtual std::vector<uint256> GetHeadBlocks() const;

//! Do a bulk modification (multiple Coin changes + BestBlock change).
//! The passed mapCoins can be modified.
virtual bool BatchWrite(CCoinsMap& mapCoins,
Expand Down Expand Up @@ -250,6 +256,7 @@ class CCoinsViewBacked : public CCoinsView
bool GetCoin(const COutPoint& outpoint, Coin& coin) const override;
bool HaveCoin(const COutPoint& outpoint) const override;
uint256 GetBestBlock() const override;
std::vector<uint256> GetHeadBlocks() const override;
void SetBackend(CCoinsView& viewIn);
CCoinsViewCursor* Cursor() const override;
size_t EstimateSize() const override;
Expand Down Expand Up @@ -438,8 +445,10 @@ class CCoinsViewCache : public CCoinsViewBacked
};

//! Utility function to add all of a transaction's outputs to a cache.
// PIVX: It assumes that overwrites are never possible due to BIP34 always in effect
void AddCoins(CCoinsViewCache& cache, const CTransaction& tx, int nHeight);
// PIVX: When check is false, this assumes that overwrites are never possible due to BIP34 always in effect
// When check is true, the underlying view may be queried to determine whether an addition is
// an overwrite.
void AddCoins(CCoinsViewCache& cache, const CTransaction& tx, int nHeight, bool check = false);

//! Utility function to find any unspent output with a given txid.
const Coin& AccessByTxid(const CCoinsViewCache& cache, const uint256& txid);
Expand Down
78 changes: 38 additions & 40 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <event2/http.h>
#include <event2/thread.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/util.h>
#include <event2/keyvalq_struct.h>

Expand Down Expand Up @@ -73,34 +74,13 @@ class WorkQueue
std::deque<WorkItem*> queue;
bool running;
size_t maxDepth;
int numThreads;

/** RAII object to keep track of number of running worker threads */
class ThreadCounter
{
public:
WorkQueue &wq;
ThreadCounter(WorkQueue &w): wq(w)
{
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads += 1;
}
~ThreadCounter()
{
std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads -= 1;
wq.cond.notify_all();
}
};

public:
WorkQueue(size_t maxDepth) : running(true),
maxDepth(maxDepth),
numThreads(0)
explicit WorkQueue(size_t _maxDepth) : running(true),
maxDepth(_maxDepth)
{
}
/*( Precondition: worker threads have all stopped
* (call WaitExit)
/** Precondition: worker threads have all stopped (they have been joined).
*/
~WorkQueue()
{
Expand All @@ -123,9 +103,8 @@ class WorkQueue
/** Thread function */
void Run()
{
ThreadCounter count(*this);
while (running) {
WorkItem* i = 0;
while (true) {
WorkItem* i = nullptr;
{
std::unique_lock<std::mutex> lock(cs);
while (running && queue.empty())
Expand All @@ -146,13 +125,6 @@ class WorkQueue
running = false;
cond.notify_all();
}
/** Wait for worker threads to exit */
void WaitExit()
{
std::unique_lock<std::mutex> lock(cs);
while (numThreads > 0)
cond.wait(lock);
}

/** Return current depth of queue */
size_t Depth()
Expand Down Expand Up @@ -251,6 +223,16 @@ static std::string RequestMethodString(HTTPRequest::RequestMethod m)
/** HTTP request callback */
static void http_request_cb(struct evhttp_request* req, void* arg)
{
// Disable reading to work around a libevent bug, fixed in 2.2.0.
if (event_get_version_number() >= 0x02010600 && event_get_version_number() < 0x02020001) {
evhttp_connection* conn = evhttp_request_get_connection(req);
if (conn) {
bufferevent* bev = evhttp_connection_get_bufferevent(conn);
if (bev) {
bufferevent_disable(bev, EV_READ);
}
}
}
std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));

LogPrint(BCLog::HTTP, "Received a %s request for %s from %s\n",
Expand Down Expand Up @@ -455,6 +437,7 @@ bool UpdateHTTPServerLogging(bool enable) {

std::thread threadHTTP;
std::future<bool> threadResult;
static std::vector<std::thread> g_thread_http_workers;

bool StartHTTPServer()
{
Expand All @@ -466,8 +449,7 @@ bool StartHTTPServer()
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);

for (int i = 0; i < rpcThreads; i++) {
std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
}
return true;
}
Expand All @@ -490,7 +472,10 @@ void StopHTTPServer()
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
if (workQueue) {
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
workQueue->WaitExit();
for (auto& thread: g_thread_http_workers) {
thread.join();
}
g_thread_http_workers.clear();
delete workQueue;
}
MilliSleep(500); // Avoid race condition while the last HTTP-thread is exiting
Expand Down Expand Up @@ -615,9 +600,22 @@ void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size());
HTTPEvent* ev = new HTTPEvent(eventBase, true,
std::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL));
ev->trigger(0);
auto req_copy = req;
HTTPEvent* ev = new HTTPEvent(eventBase, true, [req_copy, nStatus]{
evhttp_send_reply(req_copy, nStatus, nullptr, nullptr);
// Re-enable reading from the socket. This is the second part of the libevent
// workaround above.
if (event_get_version_number() >= 0x02010600 && event_get_version_number() < 0x02020001) {
evhttp_connection* conn = evhttp_request_get_connection(req_copy);
if (conn) {
bufferevent* bev = evhttp_connection_get_bufferevent(conn);
if (bev) {
bufferevent_enable(bev, EV_READ | EV_WRITE);
}
}
}
});
ev->trigger(nullptr);
replySent = true;
req = 0; // transferred back to main thread
}
Expand Down
24 changes: 18 additions & 6 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ void PrepareShutdown()
// using the other before destroying them.
if (peerLogic) UnregisterValidationInterface(peerLogic.get());
if (g_connman) g_connman->Stop();
peerLogic.reset();
g_connman.reset();

StopTorControl();

Expand All @@ -247,6 +245,11 @@ void PrepareShutdown()
threadGroup.interrupt_all();
threadGroup.join_all();

// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
peerLogic.reset();
g_connman.reset();

DumpMasternodes();
DumpBudgets(g_budgetman);
DumpMasternodePayments();
Expand Down Expand Up @@ -431,6 +434,9 @@ std::string HelpMessage(HelpMessageMode mode)
#endif
}
strUsage += HelpMessageOpt("-datadir=<dir>", _("Specify data directory"));
if (showDebug) {
strUsage += HelpMessageOpt("-dbbatchsize", strprintf("Maximum database write batch size in bytes (default: %u)", nDefaultDbBatchSize));
}
strUsage += HelpMessageOpt("-paramsdir=<dir>", strprintf(_("Specify zk params directory (default: %s)"), ZC_GetParamsDir().string()));
strUsage += HelpMessageOpt("-debuglogfile=<file>", strprintf(_("Specify location of debug log file: this can be an absolute path or a path relative to the data directory (default: %s)"), DEFAULT_DEBUGLOGFILE));
strUsage += HelpMessageOpt("-disablesystemnotifications", strprintf(_("Disable OS notifications for incoming transactions (default: %u)"), 0));
Expand Down Expand Up @@ -1580,7 +1586,6 @@ bool AppInitMain()
pblocktree = new CBlockTreeDB(nBlockTreeDBCache, false, fReindex);
pcoinsdbview = new CCoinsViewDB(nCoinDBCache, false, fReindex);
pcoinscatcher = new CCoinsViewErrorCatcher(pcoinsdbview);
pcoinsTip = new CCoinsViewCache(pcoinscatcher);

if (fReindex) {
pblocktree->WriteReindexing(true);
Expand Down Expand Up @@ -1609,7 +1614,8 @@ bool AppInitMain()
break;
}

const Consensus::Params& consensus = Params().GetConsensus();
const CChainParams& chainparams = Params();
const Consensus::Params& consensus = chainparams.GetConsensus();

// If the loaded chain has a wrong genesis, bail out immediately
// (we're likely using a testnet datadir, or the other way around).
Expand All @@ -1628,6 +1634,13 @@ bool AppInitMain()
break;
}

if (!ReplayBlocks(chainparams, pcoinsdbview)) {
strLoadError = _("Unable to replay blocks. You will need to rebuild the database using -reindex.");
break;
}
pcoinsTip = new CCoinsViewCache(pcoinscatcher);
LoadChainTip(chainparams);

// Populate list of invalid/fraudulent outpoints that are banned from the chain
invalid_out::LoadOutpoints();
invalid_out::LoadSerials();
Expand Down Expand Up @@ -1939,6 +1952,5 @@ bool AppInitMain()
}
#endif


return !fRequestShutdown;
return true;
}
2 changes: 1 addition & 1 deletion src/legacy/validation_zerocoin_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bool DisconnectZerocoinTx(const CTransaction& tx, CAmount& nValueIn, CZerocoinDB

// Legacy Zerocoin DB: used for performance during IBD
// (between Zerocoin_Block_V2_Start and Zerocoin_Block_Last_Checkpoint)
void DataBaseAccChecksum(CBlockIndex* pindex, bool fWrite)
void DataBaseAccChecksum(const CBlockIndex* pindex, bool fWrite)
{
const Consensus::Params& consensus = Params().GetConsensus();
if (!pindex ||
Expand Down
2 changes: 1 addition & 1 deletion src/legacy/validation_zerocoin_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@

bool AcceptToMemoryPoolZerocoin(const CTransaction& tx, CAmount& nValueIn, int chainHeight, CValidationState& state, const Consensus::Params& consensus);
bool DisconnectZerocoinTx(const CTransaction& tx, CAmount& nValueIn, CZerocoinDB* zerocoinDB);
void DataBaseAccChecksum(CBlockIndex* pindex, bool fWrite);
void DataBaseAccChecksum(const CBlockIndex* pindex, bool fWrite);

#endif //VALIDATION_ZEROCOIN_LEGACY_H
Loading

0 comments on commit ac52366

Please sign in to comment.