Skip to content

Commit

Permalink
Config store performance improvement
Browse files Browse the repository at this point in the history
Summary:
This is to improve the performance of config store. This improvement is compatible with the old version.
Previously, when writing to the database, we save the database to local disk.
The improvement is to only append the changes to the disk when the writing/overriding/deleting is needed. The following procedure shows the steps of this improvement in detail:

1. Write/Override to database.
2. Append PersistentObject: ('add', key, value) to the disk. This step requires the serialization of the PersistentObject to iobuf. Another example is to delete a key from databse, and the PersistentObject is: ('del', key, optional value).
3. An example of the file on the disk is:
```
('add', key1, value1)
('add', key2, value2)
('del', key1, value1)
....
```
Periodically, we need to make the file on the disk to be compact. In the above example, after reorganization, we have
```
('add', key2, value2)
...
```

This step requires loading the file from the disk and merging the changes to the same key, which is a deserialization procedure. The file loaded is put into the iobuf, and deserialized to many PersistentObjects, and then put into database.
```
iobuf -> PersistentObjects -> databse

Reviewed By: saifhhasan

Differential Revision: D15806558

fbshipit-source-id: b82018603ff3bef01c972d5a3b56e4bd3a3b83f9
  • Loading branch information
yunhongxu authored and facebook-github-bot committed Jun 22, 2019
1 parent 9c53cbd commit 8e04a93
Show file tree
Hide file tree
Showing 3 changed files with 473 additions and 44 deletions.
299 changes: 270 additions & 29 deletions openr/config-store/PersistentStore.cpp
Expand Up @@ -14,9 +14,15 @@

#include <openr/common/Util.h>

namespace openr {
using std::exception;

namespace {

static const long kDbFlushRatio = 10000;

using namespace std::chrono_literals;
} // anonymous namespace

namespace openr {

PersistentStore::PersistentStore(
const std::string& nodeName,
Expand All @@ -41,7 +47,7 @@ PersistentStore::PersistentStore(
saveInitialBackoff, saveMaxBackoff);

saveDbTimer_ = fbzmq::ZmqTimeout::make(this, [this]() noexcept {
if (saveDatabaseToDisk()) {
if (savePersistentObjectToDisk()) {
saveDbTimerBackoff_->reportSuccess();
} else {
// Report error and schedule next-try
Expand Down Expand Up @@ -76,10 +82,12 @@ PersistentStore::processRequestMsg(fbzmq::Message&& requestMsg) {

// Generate response
response.key = request->key;
PersistentObject pObject;
switch (request->requestType) {
case thrift::StoreRequestType::STORE: {
// Override previous value if any
database_.keyVals[request->key] = request->data;
pObject = toPersistentObject(ActionType::ADD, request->key, request->data);
response.success = true;
break;
}
Expand All @@ -92,6 +100,7 @@ PersistentStore::processRequestMsg(fbzmq::Message&& requestMsg) {
}
case thrift::StoreRequestType::ERASE: {
response.success = database_.keyVals.erase(request->key) > 0;
pObject = toPersistentObject(ActionType::DEL, request->key, request->data);
break;
}
default: {
Expand All @@ -104,10 +113,12 @@ PersistentStore::processRequestMsg(fbzmq::Message&& requestMsg) {
// Schedule database save
if (response.success and
(request->requestType != thrift::StoreRequestType::LOAD)) {
pObjects_.emplace_back(std::move(pObject));

if (not saveDbTimerBackoff_) {
// This is primarily used for unit testing to save DB immediately
// Block the response till file is saved
saveDatabaseToDisk();
savePersistentObjectToDisk();
} else if (not saveDbTimer_->isScheduled()) {
saveDbTimer_->scheduleTimeout(
saveDbTimerBackoff_->getTimeRemainingUntilRetry());
Expand All @@ -119,38 +130,96 @@ PersistentStore::processRequestMsg(fbzmq::Message&& requestMsg) {
}

bool
PersistentStore::saveDatabaseToDisk() noexcept {
// Write database_ to ioBuf
auto queue = folly::IOBufQueue();
serializer_.serialize(database_, &queue);
auto ioBuf = queue.move();
ioBuf->coalesce();
PersistentStore::savePersistentObjectToDisk() noexcept {
if (not dryrun_) {
// Write PersistentObject to ioBuf
std::vector<PersistentObject> newObjects;
newObjects = std::move(pObjects_);

try {
if (not dryrun_) {
LOG(INFO) << "Updating database on disk";
auto queue = folly::IOBufQueue(folly::IOBufQueue::cacheChainLength());

for (auto& pObject : newObjects) {
auto buf = encodePersistentObject(pObject);
if (buf.hasError()) {
LOG(ERROR) << "Failed to encode PersistentObject to ioBuf. Error: "
<< folly::exceptionStr(buf.error());
return false;
}
queue.append(std::move(**buf));
}

// Append IoBuf to disk
auto ioBuf = queue.move();
auto success = writeIoBufToDisk(ioBuf, WriteType::APPEND);
if (success.hasError()) {
LOG(ERROR) << "Failed to write PersistentObject to file '"
<< storageFilePath_
<< "'. Error: " << folly::exceptionStr(success.error());
return false;
}

numOfNewWritesToDisk_++;

// Write the whole database to disk periodically
if (numOfNewWritesToDisk_ >= kDbFlushRatio) {
numOfNewWritesToDisk_ = 0;
const auto startTs = std::chrono::steady_clock::now();
// Write ioBuf to disk atomically
auto fileData = ioBuf->moveToFbString().toStdString();
folly::writeFileAtomic(storageFilePath_, fileData, 0666);
if (not saveDatabaseToDisk()) {
return false;
}
LOG(INFO) << "Updated database on disk. Took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - startTs)
.count()
<< "ms";
} else {
VLOG(1) << "Skipping writing to disk in dryrun mode";
}
numOfWritesToDisk_++;
} catch (std::exception const& err) {
LOG(ERROR) << "Failed to write data to file '" << storageFilePath_ << "'. "
<< folly::exceptionStr(err);
return false;
} else {
VLOG(1) << "Skipping writing to disk in dryrun mode";
}
numOfWritesToDisk_++;

return true;
}

bool
PersistentStore::saveDatabaseToDisk() noexcept {
std::unique_ptr<folly::IOBuf> ioBuf;
// If database is empty, write 'kTlvFormatMarker' to disk and return
if (database_.keyVals.size() == 0) {
ioBuf = folly::IOBuf::copyBuffer(
kTlvFormatMarker.data(), kTlvFormatMarker.size());
} else {
// Append kTlvFormatMarker to queue
auto queue = folly::IOBufQueue(folly::IOBufQueue::cacheChainLength());
queue.append(kTlvFormatMarker.data(), kTlvFormatMarker.size());

// Encode database_ and append to queue
for (auto& keyPair : database_.keyVals) {
PersistentObject pObject;
pObject =
toPersistentObject(ActionType::ADD, keyPair.first, keyPair.second);

auto buf = encodePersistentObject(pObject);
if (buf.hasError()) {
LOG(ERROR) << "Failed to encode PersistentObject to ioBuf. Error: "
<< folly::exceptionStr(buf.error());
return false;
}
queue.append(std::move(*buf));
}
// Write queue to disk
ioBuf = queue.move();
}

auto success = writeIoBufToDisk(ioBuf, WriteType::WRITE);
if (success.hasError()) {
LOG(ERROR) << "Failed to write database to file '" << storageFilePath_
<< "'. Error: " << folly::exceptionStr(success.error());
return false;
}
return true;
}

bool
PersistentStore::loadDatabaseFromDisk() noexcept {
// Check if file exists
Expand All @@ -168,18 +237,190 @@ PersistentStore::loadDatabaseFromDisk() noexcept {
return false;
}

// Parse data into `database_`
// Create IoBuf and cursor for loading data from disk
auto ioBuf = folly::IOBuf::wrapBuffer(fileData.c_str(), fileData.size());
folly::io::Cursor cursor(ioBuf.get());

// Read 'kTlvFormatMarker' from ioBuf
if (not cursor.canAdvance(kTlvFormatMarker.size()) or
cursor.readFixedString(kTlvFormatMarker.size()) != kTlvFormatMarker) {
// Load old Format and write TlvFormat
auto oldSuccess = loadDatabaseOldFormat(ioBuf);
if (oldSuccess.hasError()) {
LOG(ERROR) << "Failed to read old-format file contents from '"
<< storageFilePath_
<< "'. Error: " << folly::exceptionStr(oldSuccess.error());
return false;
}
return true;
}
// Load TlvFormat
auto tlvSuccess = loadDatabaseTlvFormat(ioBuf);
if (tlvSuccess.hasError()) {
LOG(ERROR) << "Failed to read Tlv-format file contents from '"
<< storageFilePath_
<< "'. Error: " << folly::exceptionStr(tlvSuccess.error());
return false;
}
return true;
}

folly::Expected<folly::Unit, std::string>
PersistentStore::loadDatabaseOldFormat(
const std::unique_ptr<folly::IOBuf>& ioBuf) noexcept {
// Parse ioBuf into `database_`
try {
auto ioBuf = folly::IOBuf::wrapBuffer(fileData.c_str(), fileData.size());
thrift::StoreDatabase newDatabase;
serializer_.deserialize(ioBuf.get(), newDatabase);
database_ = std::move(newDatabase);
return true;
// Write Tlv format to disk
saveDatabaseToDisk();
} catch (std::exception const& e) {
LOG(ERROR) << "Failed to decode file content into StoreDatabase."
<< ". Error: " << folly::exceptionStr(e);
return false;
return folly::makeUnexpected<std::string>(
folly::exceptionStr(e).toStdString());
}
return folly::Unit();
}

folly::Expected<folly::Unit, std::string>
PersistentStore::loadDatabaseTlvFormat(
const std::unique_ptr<folly::IOBuf>& ioBuf) noexcept {
// Parse ioBuf to persistentObject and then to `database_`
folly::io::Cursor cursor(ioBuf.get());
thrift::StoreDatabase newDatabase;
// Read 'kTlvFormatMarker'
try {
cursor.readFixedString(kTlvFormatMarker.size());
} catch (std::out_of_range& e) {
return folly::makeUnexpected<std::string>(
folly::exceptionStr(e).toStdString());
}
// Iteratively read persistentObject from disk
while (true) {
// Read and decode into persistentObject
auto optionalObject = decodePersistentObject(cursor);
if (optionalObject.hasError()) {
return folly::makeUnexpected(optionalObject.error());
}

// Read finish
if (not optionalObject->hasValue()) {
break;
}
auto pObject = std::move(optionalObject->value());

// Add/Delete persistentObject to/from 'newDatabase'
if (pObject.type == ActionType::ADD) {
newDatabase.keyVals[pObject.key] =
pObject.data.has_value() ? pObject.data.value() : "";
} else if (pObject.type == ActionType::DEL) {
newDatabase.keyVals.erase(pObject.key);
}
}
database_ = std::move(newDatabase);
return folly::Unit();
}

// Write over or append IoBuf to disk atomically
folly::Expected<folly::Unit, std::string>
PersistentStore::writeIoBufToDisk(
const std::unique_ptr<folly::IOBuf>& ioBuf, WriteType writeType) noexcept {
std::string fileData("");
try {
ioBuf->coalesce();
fileData = ioBuf->moveToFbString().toStdString();

if (writeType == WriteType::WRITE) {
// Write over
folly::writeFileAtomic(storageFilePath_, fileData, 0666);
} else {
// Append to file
folly::writeFile(
fileData,
storageFilePath_.c_str(),
O_WRONLY | O_APPEND | O_CREAT,
0666);
}
} catch (std::exception const& e) {
return folly::makeUnexpected<std::string>(
folly::exceptionStr(e).toStdString());
}
return folly::Unit();
}

// A made up encoding of a PersistentObject.
folly::Expected<std::unique_ptr<folly::IOBuf>, std::string>
PersistentStore::encodePersistentObject(
const PersistentObject& pObject) noexcept {
// Create buf with reserved size
auto buf = folly::IOBuf::create(
sizeof(uint8_t) + sizeof(uint32_t) + pObject.key.size() +
sizeof(uint32_t) +
(pObject.data.has_value() ? pObject.data.value().size() : 0));

folly::io::Appender appender(buf.get(), 0);
try {
// Append 'pObject.type' to buf
appender.writeBE(static_cast<uint8_t>(pObject.type));
// Append key length and key to buf
appender.writeBE<uint32_t>(pObject.key.size());
appender.push(folly::StringPiece(pObject.key));

// If 'pObject.data' has value, append the length and the data to buf
// Otherwise, append 0 to buf
if (pObject.data.has_value()) {
appender.writeBE<uint32_t>(pObject.data.value().size());
appender.push(folly::StringPiece(pObject.data.value()));
} else {
appender.writeBE<uint32_t>(0);
}
return buf;
} catch (const exception& e) {
return folly::makeUnexpected<std::string>(
folly::exceptionStr(e).toStdString());
}
}

// A made up decoding of a PersistentObject.
folly::Expected<folly::Optional<PersistentObject>, std::string>
PersistentStore::decodePersistentObject(folly::io::Cursor& cursor) noexcept {
// If nothing can be read, return
if (not cursor.canAdvance(1)) {
return folly::none;
}

PersistentObject pObject;
try {
// Read 'type'
pObject.type = ActionType(cursor.readBE<uint8_t>());
// Read key length and key
auto length = cursor.readBE<uint32_t>();
pObject.key = cursor.readFixedString(length);

// Read data length and data
length = cursor.readBE<uint32_t>();
if (length == 0) {
return pObject;
}
pObject.data = cursor.readFixedString(length);
return pObject;
} catch (std::out_of_range& e) {
return folly::makeUnexpected<std::string>(
folly::exceptionStr(e).toStdString());
}
}

// Create a PersistentObject and assign value to it.
PersistentObject
PersistentStore::toPersistentObject(
const ActionType type, const std::string& key, const std::string& data) {
PersistentObject pObject;
pObject.type = type;
pObject.key = key;
if (type == ActionType::ADD) {
pObject.data = data;
}
return pObject;
}

} // namespace openr

0 comments on commit 8e04a93

Please sign in to comment.