Skip to content

Commit

Permalink
feat(rocksdb): Support to open rocksdb with or without meta CF (#549)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 11, 2020
1 parent 7ab5fb8 commit 9185c3a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 12 deletions.
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 144 files
2 changes: 1 addition & 1 deletion rocksdb
76 changes: 67 additions & 9 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";

pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: dsn::apps::rrdb_service(r),
Expand Down Expand Up @@ -452,8 +454,7 @@ pegasus_server_impl::~pegasus_server_impl()
{
if (_is_open) {
dassert(_db != nullptr, "");
delete _db;
_db = nullptr;
release_db();
}
}

Expand Down Expand Up @@ -1647,6 +1648,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// 2, we can parse restore info from app env, which is stored in argv
// 3, restore_dir is exist
//
bool db_exist = true;
auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
if (::dsn::utils::filesystem::path_exists(path)) {
// only case 1
Expand All @@ -1662,6 +1664,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
replica_name());
return ::dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
}
} else {
Expand All @@ -1688,6 +1691,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
restore_dir.c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
dwarn(
"%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
"it, the role of this replica must not primary, so we open a new db on the "
Expand All @@ -1702,16 +1706,36 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)

ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());

auto status = rocksdb::DB::Open(rocksdb::Options(_db_opts, _data_cf_opts), path, &_db);
bool need_open_with_meta_cf = false;
// Check meta CF only when db exist.
if (db_exist && check_meta_cf(path, &need_open_with_meta_cf) != ::dsn::ERR_OK) {
derror_replica("check meta column family failed");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}});
if (need_open_with_meta_cf) {
column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(
META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()));
}
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
if (status.ok()) {
dcheck_eq_replica(column_families.size(), handles_opened.size());
dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
if (handles_opened.size() == 2) {
dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
_meta_cf = handles_opened[1];
}

_last_committed_decree = _db->GetLastFlushedDecree();
_pegasus_data_version = _db->GetPegasusDataVersion();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror("%s: open app failed, unsupported data version %" PRIu32,
replica_name(),
_pegasus_data_version);
delete _db;
_db = nullptr;
release_db();
return ::dsn::ERR_LOCAL_APP_FAILURE;
}

Expand All @@ -1736,8 +1760,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
auto err = async_checkpoint(false);
if (err != ::dsn::ERR_OK) {
derror("%s: create checkpoint failed, error = %s", replica_name(), err.to_string());
delete _db;
_db = nullptr;
release_db();
return err;
}
dassert(last_flushed == last_durable_decree(),
Expand Down Expand Up @@ -1822,8 +1845,7 @@ ::dsn::error_code pegasus_server_impl::stop(bool clear_state)
_context_cache.clear();

_is_open = false;
delete _db;
_db = nullptr;
release_db();

std::deque<int64_t> reserved_checkpoints;
{
Expand Down Expand Up @@ -2894,5 +2916,41 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
// TODO(heyuchen): set filter _partition_version in further pr
}

::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path,
bool *need_open_with_meta_cf)
{
*need_open_with_meta_cf = false;
std::vector<std::string> column_families;
auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families);
if (!s.ok()) {
derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}

for (const auto &column_family : column_families) {
if (column_family == DATA_COLUMN_FAMILY_NAME) {
continue;
}
if (column_family == META_COLUMN_FAMILY_NAME) {
*need_open_with_meta_cf = true;
continue;
}
dassert_replica(false, "Column family '{}' should not present");
}
return ::dsn::ERR_OK;
}

void pegasus_server_impl::release_db()
{
_db->DestroyColumnFamilyHandle(_data_cf);
_data_cf = nullptr;
if (_meta_cf != nullptr) {
_db->DestroyColumnFamilyHandle(_meta_cf);
}
_meta_cf = nullptr;
delete _db;
_db = nullptr;
}

} // namespace server
} // namespace pegasus
10 changes: 9 additions & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,14 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
return false;
}

::dsn::error_code check_meta_cf(const std::string &path, bool *need_open_with_meta_cf);
void release_db();

private:
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;

dsn::gpid _gpid;
std::string _primary_address;
Expand All @@ -322,7 +328,9 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;

rocksdb::DB *_db;
rocksdb::DB *_db = nullptr;
rocksdb::ColumnFamilyHandle *_data_cf = nullptr;
rocksdb::ColumnFamilyHandle *_meta_cf = nullptr;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
volatile bool _is_open;
uint32_t _pegasus_data_version;
Expand Down

0 comments on commit 9185c3a

Please sign in to comment.