Skip to content

Commit

Permalink
[fix][store] Fix the issue where the lite version loses data upon reb…
Browse files Browse the repository at this point in the history
…oot.
  • Loading branch information
visualYJD authored and ketor committed Jun 4, 2024
1 parent 5fd161c commit d55ee90
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions src/engine/rocks_raw_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
#include "rocksdb/write_batch.h"

namespace dingodb {

DEFINE_bool(enable_rocksdb_sync, false, "enable rocksdb sync ");
namespace rocks {

ColumnFamily::ColumnFamily(const std::string& cf_name, const ColumnFamilyConfig& config,
Expand Down Expand Up @@ -459,6 +459,9 @@ butil::Status Writer::KvPut(const std::string& cf_name, const pb::common::KeyVal
}

rocksdb::WriteOptions write_options;
if (FLAGS_enable_rocksdb_sync) {
write_options.sync = true;
}
rocksdb::Status s = GetDB()->Put(write_options, GetColumnFamily(cf_name)->GetHandle(), rocksdb::Slice(kv.key()),
rocksdb::Slice(kv.value()));
if (!s.ok()) {
Expand Down Expand Up @@ -506,6 +509,9 @@ butil::Status Writer::KvBatchPutAndDelete(const std::string& cf_name,
}
}
rocksdb::WriteOptions write_options;
if (FLAGS_enable_rocksdb_sync) {
write_options.sync = true;
}
rocksdb::Status s = GetDB()->Write(write_options, &batch);
if (!s.ok()) {
DINGO_LOG(ERROR) << fmt::format("[rocksdb] write failed, error: {}", s.ToString());
Expand Down Expand Up @@ -565,6 +571,9 @@ butil::Status Writer::KvBatchPutAndDelete(
}

rocksdb::WriteOptions write_options;
if (FLAGS_enable_rocksdb_sync) {
write_options.sync = true;
}
rocksdb::Status s = GetDB()->Write(write_options, &batch);
if (!s.ok()) {
DINGO_LOG(ERROR) << fmt::format("[rocksdb] write failed, error: {}", s.ToString());
Expand All @@ -580,7 +589,10 @@ butil::Status Writer::KvDelete(const std::string& cf_name, const std::string& ke
return butil::Status(pb::error::EKEY_EMPTY, "Key is empty");
}

rocksdb::WriteOptions const write_options;
rocksdb::WriteOptions write_options;
if (FLAGS_enable_rocksdb_sync) {
write_options.sync = true;
}
rocksdb::Status const s =
GetDB()->Delete(write_options, GetColumnFamily(cf_name)->GetHandle(), rocksdb::Slice(key.data(), key.size()));
if (!s.ok()) {
Expand All @@ -605,8 +617,12 @@ butil::Status Writer::KvDeleteRange(const std::string& cf_name, const pb::common
DINGO_LOG(ERROR) << fmt::format("[rocksdb] delete range failed, error: {}.", s.ToString());
return butil::Status(pb::error::EINTERNAL, "Internal delete range error");
}
rocksdb::WriteOptions write_options;
if (FLAGS_enable_rocksdb_sync) {
write_options.sync = true;
}

s = GetDB()->Write(rocksdb::WriteOptions(), &batch);
s = GetDB()->Write(write_options, &batch);
if (!s.ok()) {
DINGO_LOG(ERROR) << fmt::format("[rocksdb] write failed, error: {}.", s.ToString());
return butil::Status(pb::error::EINTERNAL, "Internal write error");
Expand Down Expand Up @@ -634,8 +650,12 @@ butil::Status Writer::KvBatchDeleteRange(const std::map<std::string, std::vector
}
}
}
rocksdb::WriteOptions write_options;
if (FLAGS_enable_rocksdb_sync) {
write_options.sync = true;
}

rocksdb::Status s = GetDB()->Write(rocksdb::WriteOptions(), &batch);
rocksdb::Status s = GetDB()->Write(write_options, &batch);
if (!s.ok()) {
DINGO_LOG(ERROR) << fmt::format("[rocksdb] write failed, error: {}.", s.ToString());
return butil::Status(pb::error::EINTERNAL, "Internal write error");
Expand Down

0 comments on commit d55ee90

Please sign in to comment.