Skip to content

Commit

Permalink
Fix db leaks in LLMQ db (#2914)
Browse files Browse the repository at this point in the history
* Store rs_t key time in big endian

Also implement ConvertInvalidTimeKeys to convert old entries. We can remove
this later when we know that most MNs have run this code on testnet.

The way we stored the time field in the past lead to CleanupOldRecoveredSigs
iterating the keys in a strange order, causing no deletion at all and the
LLMQ DB filling up.

* Write batch in CleanupOldRecoveredSigs when it gets too large

This avoids RAM filling up and OOM getting triggered.

* Keep track of when a vote was written to the DB and clean up after week

Instead of only deleting when the corresponding recovered sig is deleted.
It sometimes happens that a masternode votes on something but a recovered
sig is never created, which leaves us with a vote that will never be
deleted.

* Apply suggestions from code review

Co-Authored-By: PastaPastaPasta <pasta@dashboost.org>
  • Loading branch information
codablock and PastaPastaPasta committed May 13, 2019
1 parent 56f3119 commit 240238b
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 14 deletions.
160 changes: 146 additions & 14 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,90 @@ UniValue CRecoveredSig::ToJson() const
CRecoveredSigsDb::CRecoveredSigsDb(CDBWrapper& _db) :
db(_db)
{
if (Params().NetworkIDString() == CBaseChainParams::TESTNET) {
// TODO this can be completely removed after some time (when we're pretty sure the conversion has been run on most testnet MNs)
if (db.Exists(std::string("rs_upgraded"))) {
return;
}

ConvertInvalidTimeKeys();
AddVoteTimeKeys();

db.Write(std::string("rs_upgraded"), (uint8_t)1);
}
}

// This converts time values in "rs_t" from host endiannes to big endiannes, which is required to have proper ordering of the keys
void CRecoveredSigsDb::ConvertInvalidTimeKeys()
{
LogPrintf("CRecoveredSigsDb::%s -- converting invalid rs_t keys\n", __func__);

std::unique_ptr<CDBIterator> pcursor(db.NewIterator());

auto start = std::make_tuple(std::string("rs_t"), (uint32_t)0, (uint8_t)0, uint256());
pcursor->Seek(start);

CDBBatch batch(db);
size_t cnt = 0;
while (pcursor->Valid()) {
decltype(start) k;

if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_t") {
break;
}

batch.Erase(k);
std::get<1>(k) = htobe32(std::get<1>(k));
batch.Write(k, (uint8_t)1);

cnt++;

pcursor->Next();
}
pcursor.reset();

db.WriteBatch(batch);

LogPrintf("CRecoveredSigsDb::%s -- converted %d invalid rs_t keys\n", __func__, cnt);
}

// This adds rs_vt keys for every rs_v entry to the DB. The time in the key is set to the current time.
// This causes cleanup of all these votes a week later.
void CRecoveredSigsDb::AddVoteTimeKeys()
{
LogPrintf("CRecoveredSigsDb::%s -- adding rs_vt keys with current time\n", __func__);

auto curTime = GetAdjustedTime();

std::unique_ptr<CDBIterator> pcursor(db.NewIterator());

auto start = std::make_tuple(std::string("rs_v"), (uint8_t)0, uint256());
pcursor->Seek(start);

CDBBatch batch(db);
size_t cnt = 0;
while (pcursor->Valid()) {
decltype(start) k;

if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_v") {
break;
}

uint8_t llmqType = std::get<1>(k);
const uint256& id = std::get<2>(k);

auto k2 = std::make_tuple(std::string("rs_vt"), (uint32_t)htobe32(curTime), llmqType, id);
batch.Write(k2, (uint8_t)1);

cnt++;

pcursor->Next();
}
pcursor.reset();

db.WriteBatch(batch);

LogPrintf("CRecoveredSigsDb::%s -- added %d rs_vt entries\n", __func__, cnt);
}

bool CRecoveredSigsDb::HasRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash)
Expand Down Expand Up @@ -156,13 +240,9 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
auto k4 = std::make_tuple(std::string("rs_s"), signHash);
batch.Write(k4, (uint8_t)1);

// remove the votedForId entry as we won't need it anymore
auto k5 = std::make_tuple(std::string("rs_v"), recSig.llmqType, recSig.id);
batch.Erase(k5);

// store by current time. Allows fast cleanup of old recSigs
auto k6 = std::make_tuple(std::string("rs_t"), (uint32_t)GetAdjustedTime(), recSig.llmqType, recSig.id);
batch.Write(k6, (uint8_t)1);
auto k5 = std::make_tuple(std::string("rs_t"), (uint32_t)htobe32(GetAdjustedTime()), recSig.llmqType, recSig.id);
batch.Write(k5, (uint8_t)1);

db.WriteBatch(batch);

Expand All @@ -180,9 +260,8 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
{
std::unique_ptr<CDBIterator> pcursor(db.NewIterator());

static const uint256 maxUint256 = uint256S("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff");
auto start = std::make_tuple(std::string("rs_t"), (uint32_t)0, (uint8_t)0, uint256());
auto end = std::make_tuple(std::string("rs_t"), (uint32_t)(GetAdjustedTime() - maxAge), (uint8_t)255, maxUint256);
uint32_t endTime = (uint32_t)(GetAdjustedTime() - maxAge);
pcursor->Seek(start);

std::vector<std::pair<Consensus::LLMQType, uint256>> toDelete;
Expand All @@ -191,10 +270,10 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
while (pcursor->Valid()) {
decltype(start) k;

if (!pcursor->GetKey(k)) {
if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_t") {
break;
}
if (k >= end) {
if (be32toh(std::get<1>(k)) >= endTime) {
break;
}

Expand Down Expand Up @@ -224,16 +303,19 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
auto k2 = std::make_tuple(std::string("rs_r"), recSig.llmqType, recSig.id, recSig.msgHash);
auto k3 = std::make_tuple(std::string("rs_h"), recSig.GetHash());
auto k4 = std::make_tuple(std::string("rs_s"), signHash);
auto k5 = std::make_tuple(std::string("rs_v"), recSig.llmqType, recSig.id);
batch.Erase(k1);
batch.Erase(k2);
batch.Erase(k3);
batch.Erase(k4);
batch.Erase(k5);

hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id));
hasSigForSessionCache.erase(signHash);
hasSigForHashCache.erase(recSig.GetHash());

if (batch.SizeEstimate() >= (1 << 24)) {
db.WriteBatch(batch);
batch.Clear();
}
}
}

Expand All @@ -242,6 +324,8 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
}

db.WriteBatch(batch);

LogPrint("llmq", "CRecoveredSigsDb::%d -- deleted %d entries\n", __func__, toDelete.size());
}

bool CRecoveredSigsDb::HasVotedOnId(Consensus::LLMQType llmqType, const uint256& id)
Expand All @@ -258,8 +342,55 @@ bool CRecoveredSigsDb::GetVoteForId(Consensus::LLMQType llmqType, const uint256&

void CRecoveredSigsDb::WriteVoteForId(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash)
{
auto k = std::make_tuple(std::string("rs_v"), (uint8_t)llmqType, id);
db.Write(k, msgHash);
auto k1 = std::make_tuple(std::string("rs_v"), (uint8_t)llmqType, id);
auto k2 = std::make_tuple(std::string("rs_vt"), (uint32_t)htobe32(GetAdjustedTime()), (uint8_t)llmqType, id);

CDBBatch batch(db);
batch.Write(k1, msgHash);
batch.Write(k2, (uint8_t)1);

db.WriteBatch(batch);
}

void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge)
{
std::unique_ptr<CDBIterator> pcursor(db.NewIterator());

auto start = std::make_tuple(std::string("rs_vt"), (uint32_t)0, (uint8_t)0, uint256());
uint32_t endTime = (uint32_t)(GetAdjustedTime() - maxAge);
pcursor->Seek(start);

CDBBatch batch(db);
size_t cnt = 0;
while (pcursor->Valid()) {
decltype(start) k;

if (!pcursor->GetKey(k) || std::get<0>(k) != "rs_vt") {
break;
}
if (be32toh(std::get<1>(k)) >= endTime) {
break;
}

uint8_t llmqType = std::get<2>(k);
const uint256& id = std::get<3>(k);

batch.Erase(k);
batch.Erase(std::make_tuple(std::string("rs_v"), llmqType, id));

cnt++;

pcursor->Next();
}
pcursor.reset();

if (cnt == 0) {
return;
}

db.WriteBatch(batch);

LogPrint("llmq", "CRecoveredSigsDb::%d -- deleted %d entries\n", __func__, cnt);
}

//////////////////
Expand Down Expand Up @@ -559,6 +690,7 @@ void CSigningManager::Cleanup()
int64_t maxAge = GetArg("-recsigsmaxage", DEFAULT_MAX_RECOVERED_SIGS_AGE);

db.CleanupOldRecoveredSigs(maxAge);
db.CleanupOldVotes(maxAge);

lastCleanupTime = GetTimeMillis();
}
Expand Down
5 changes: 5 additions & 0 deletions src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class CRecoveredSigsDb
public:
CRecoveredSigsDb(CDBWrapper& _db);

void ConvertInvalidTimeKeys();
void AddVoteTimeKeys();

bool HasRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash);
bool HasRecoveredSigForId(Consensus::LLMQType llmqType, const uint256& id);
bool HasRecoveredSigForSession(const uint256& signHash);
Expand All @@ -89,6 +92,8 @@ class CRecoveredSigsDb
bool GetVoteForId(Consensus::LLMQType llmqType, const uint256& id, uint256& msgHashRet);
void WriteVoteForId(Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash);

void CleanupOldVotes(int64_t maxAge);

private:
bool ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret);
};
Expand Down

0 comments on commit 240238b

Please sign in to comment.