Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,9 @@ namespace config {
// txn commit rpc timeout
CONF_Int32(txn_commit_rpc_timeout_ms, "10000");
// path gc
CONF_Bool(path_gc_check, "true");
CONF_Int32(path_gc_check_interval_second, "1800");
CONF_Int32(path_gc_check_step, "1000");
CONF_Int32(path_gc_check_step, "-1");
CONF_Int32(path_gc_check_step_interval_ms, "10");
CONF_Int32(path_scan_interval_second, "1800");
} // namespace config
Expand Down
53 changes: 25 additions & 28 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,29 @@ OLAPStatus BaseCompaction::run() {
// 3. 执行base compaction
// 执行过程可能会持续比较长时间
stage_watch.reset();
RowsetId rowset_id = 0;
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
RowsetWriterContext context;
context.rowset_id = rowset_id;
context.tablet_id = _tablet->tablet_id();
context.partition_id = _tablet->partition_id();
context.tablet_schema_hash = _tablet->schema_hash();
context.rowset_type = ALPHA_ROWSET;
context.rowset_path_prefix = _tablet->tablet_path();
context.tablet_schema = &(_tablet->tablet_schema());
context.rowset_state = VISIBLE;
context.data_dir = _tablet->data_dir();
context.version = _new_base_version;
context.version_hash = new_base_version_hash;

RowsetWriterSharedPtr rs_writer(new (std::nothrow)AlphaRowsetWriter());
if (rs_writer == nullptr) {
LOG(WARNING) << "fail to new rowset.";
return OLAP_ERR_MALLOC_ERROR;
}
RETURN_NOT_OK(rs_writer->init(context));
res = _do_base_compaction(new_base_version_hash, rowsets);
_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(rs_writer->rowset_id()));
// 释放不再使用的ColumnData对象
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to do base version. tablet=" << _tablet->full_name()
Expand Down Expand Up @@ -311,30 +333,6 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,

OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash,
const vector<RowsetSharedPtr>& rowsets) {
// 1. 生成新base文件对应的olap index
RowsetId rowset_id = 0;
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
RowsetWriterContextBuilder context_builder;
context_builder.set_rowset_id(rowset_id)
.set_tablet_id(_tablet->tablet_id())
.set_partition_id(_tablet->partition_id())
.set_tablet_schema_hash(_tablet->schema_hash())
.set_rowset_type(ALPHA_ROWSET)
.set_rowset_path_prefix(_tablet->tablet_path())
.set_tablet_schema(&(_tablet->tablet_schema()))
.set_data_dir(_tablet->data_dir())
.set_rowset_state(VISIBLE)
.set_version(_new_base_version)
.set_version_hash(new_base_version_hash);
RowsetWriterContext context = context_builder.build();

RowsetWriterSharedPtr rs_writer(new (std::nothrow)AlphaRowsetWriter());
if (rs_writer == nullptr) {
LOG(WARNING) << "fail to new rowset.";
return OLAP_ERR_MALLOC_ERROR;
}
rs_writer->init(context);

vector<RowsetReaderSharedPtr> rs_readers;
for (auto& rowset : rowsets) {
RowsetReaderSharedPtr rs_reader(rowset->create_reader());
Expand All @@ -360,9 +358,8 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
uint64_t filted_rows = 0;
OLAPStatus res = OLAP_SUCCESS;

Merger merger(_tablet, rs_writer, READER_BASE_COMPACTION);
Merger merger(_tablet, _rs_writer, READER_BASE_COMPACTION);
res = merger.merge(rs_readers, &merged_rows, &filted_rows);
StorageEngine::instance()->remove_pending_paths(rs_writer->rowset_id());
// 3. 如果merge失败,执行清理工作,返回错误码退出
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to make new base version. res=" << res
Expand All @@ -371,10 +368,10 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
<< "-" << _new_base_version.second;
return OLAP_ERR_BE_MERGE_ERROR;
}
RowsetSharedPtr new_base = rs_writer->build();
RowsetSharedPtr new_base = _rs_writer->build();
if (new_base == nullptr) {
LOG(WARNING) << "rowset writer build failed. writer version:"
<< rs_writer->version().first << "-" << rs_writer->version().second;
<< _rs_writer->version().first << "-" << _rs_writer->version().second;
return OLAP_ERR_MALLOC_ERROR;
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class BaseCompaction {
BaseCompaction() :
_new_base_version(0, 0),
_old_base_version(0, 0),
_base_compaction_locked(false) {}
_base_compaction_locked(false),
_rs_writer(nullptr) {}

virtual ~BaseCompaction() {
_release_base_compaction_lock();
Expand Down Expand Up @@ -175,8 +176,8 @@ class BaseCompaction {
std::vector<Version> _need_merged_versions;
// 需要新增的版本对应Rowset的
std::vector<RowsetSharedPtr> _new_rowsets;

bool _base_compaction_locked;
RowsetWriterSharedPtr _rs_writer;

DISALLOW_COPY_AND_ASSIGN(BaseCompaction);
};
Expand Down
27 changes: 13 additions & 14 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,18 @@ OLAPStatus CumulativeCompaction::run() {
// 3. 生成新cumulative文件对应的olap index
RowsetId rowset_id = 0;
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
RowsetWriterContextBuilder context_builder;
context_builder.set_rowset_id(rowset_id)
.set_tablet_id(_tablet->tablet_id())
.set_partition_id(_tablet->partition_id())
.set_tablet_schema_hash(_tablet->schema_hash())
.set_rowset_type(ALPHA_ROWSET)
.set_rowset_path_prefix(_tablet->tablet_path())
.set_tablet_schema(&(_tablet->tablet_schema()))
.set_rowset_state(VISIBLE)
.set_data_dir(_tablet->data_dir())
.set_version(_cumulative_version)
.set_version_hash(_cumulative_version_hash);
RowsetWriterContext context = context_builder.build();
RowsetWriterContext context;
context.rowset_id = rowset_id;
context.tablet_id = _tablet->tablet_id();
context.partition_id = _tablet->partition_id();
context.tablet_schema_hash = _tablet->schema_hash();
context.rowset_type = ALPHA_ROWSET;
context.rowset_path_prefix = _tablet->tablet_path();
context.tablet_schema = &(_tablet->tablet_schema());
context.rowset_state = VISIBLE;
context.data_dir = _tablet->data_dir();
context.version = _cumulative_version;
context.version_hash = _cumulative_version_hash;
_rs_writer->init(context);

// 4. 执行cumulative compaction合并过程
Expand All @@ -159,6 +158,7 @@ OLAPStatus CumulativeCompaction::run() {
_rs_readers.push_back(rs_reader);
}
res = _do_cumulative_compaction();
_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(_rs_writer->rowset_id()));
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to do cumulative compaction."
<< ", tablet=" << _tablet->full_name()
Expand Down Expand Up @@ -388,7 +388,6 @@ OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
}

_rowset = _rs_writer->build();
StorageEngine::instance()->remove_pending_paths(_rs_writer->rowset_id());
if (_rowset == nullptr) {
LOG(WARNING) << "rowset writer build failed. writer version:"
<< _rs_writer->version().first << "-" << _rs_writer->version().second;
Expand Down
165 changes: 158 additions & 7 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ namespace doris {
static const char* const kMtabPath = "/etc/mtab";
static const char* const kTestFilePath = "/.testfile";

DataDir::DataDir(const std::string& path, int64_t capacity_bytes)
DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
TabletManager* tablet_manager, TxnManager* txn_manager)
: _path(path),
_cluster_id(-1),
_capacity_bytes(capacity_bytes),
_tablet_manager(tablet_manager),
_txn_manager(txn_manager),
_cluster_id(-1),
_available_bytes(0),
_used_bytes(0),
_current_shard(0),
_is_used(false),
_to_be_deleted(false),
_test_file_read_buf(nullptr),
_test_file_write_buf(nullptr),
_meta((nullptr)) {
_meta(nullptr) {
}

DataDir::~DataDir() {
Expand Down Expand Up @@ -660,7 +663,8 @@ OLAPStatus DataDir::_remove_old_meta_and_files() {
alpha_rowset_meta->init_from_pb(inc_rs_meta);
AlphaRowset rowset(&tablet_schema, data_path_prefix, this, alpha_rowset_meta);
rowset.init();
rowset.load(); // check if the rowset is successfully converted
rowset.load();
// check if the rowset is successfully converted
// incremental delta is saved in a seperate incremental folder
// create a mock rowset to delete its files
AlphaRowset inc_rowset(&tablet_schema, data_path_prefix + "/incremental_delta",
Expand Down Expand Up @@ -742,7 +746,7 @@ OLAPStatus DataDir::load() {
LOG(INFO) << "begin loading tablet from meta";
auto load_tablet_func = [this](long tablet_id,
long schema_hash, const std::string& value) -> bool {
OLAPStatus status = StorageEngine::instance()->tablet_manager()->load_tablet_from_meta(
OLAPStatus status = _tablet_manager->load_tablet_from_meta(
this, tablet_id, schema_hash, value);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "load tablet from header failed. status:" << status
Expand All @@ -762,7 +766,7 @@ OLAPStatus DataDir::load() {
// 2. add visible rowset to tablet
// ignore any errors when load tablet or rowset, because fe will repair them after report
for (auto rowset_meta : dir_rowset_metas) {
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
TabletSharedPtr tablet = _tablet_manager->get_tablet(
rowset_meta->tablet_id(), rowset_meta->tablet_schema_hash());
// tablet maybe dropped, but not drop related rowset meta
if (tablet.get() == NULL) {
Expand All @@ -785,7 +789,7 @@ OLAPStatus DataDir::load() {
continue;
}
if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED) {
OLAPStatus commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
OLAPStatus commit_txn_status = _txn_manager->commit_txn(
_meta,
rowset_meta->partition_id(), rowset_meta->txn_id(),
rowset_meta->tablet_id(), rowset_meta->tablet_schema_hash(),
Expand Down Expand Up @@ -828,4 +832,151 @@ OLAPStatus DataDir::load() {
return OLAP_SUCCESS;
}

void DataDir::add_pending_ids(const std::string& id) {
WriteLock wr_lock(&_pending_path_mutex);
_pending_path_ids.insert(id);
}

void DataDir::remove_pending_ids(const std::string& id) {
WriteLock wr_lock(&_pending_path_mutex);
_pending_path_ids.erase(id);
}

// path consumer
void DataDir::perform_path_gc() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add unit test for this two function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to fix the unit tests and add the unit test case for these two function later

// init the set of valid path
// validate the path in data dir
std::unique_lock<std::mutex> lck(_check_path_mutex);
cv.wait(lck, [this]{return _all_check_paths.size() > 0;});
LOG(INFO) << "start to path gc.";
int counter = 0;
for (auto& path : _all_check_paths) {
++counter;
if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) {
usleep(config::path_gc_check_step_interval_ms * 1000);
}
TTabletId tablet_id = -1;
TSchemaHash schema_hash = -1;
bool is_valid = _tablet_manager->get_tablet_id_and_schema_hash_from_path(path,
&tablet_id, &schema_hash);
if (!is_valid) {
LOG(WARNING) << "unknown path:" << path;
continue;
}
if (tablet_id > 0 && schema_hash > 0) {
// tablet schema hash path or rowset file path
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id, schema_hash);
if (tablet == nullptr) {
std::string tablet_path_id = TABLET_ID_PREFIX + std::to_string(tablet_id);
bool exist_in_pending = _check_pending_ids(tablet_path_id);
if (!exist_in_pending) {
_process_garbage_path(path);
}
} else {
bool valid = tablet->check_path(path);
if (!valid) {
RowsetId rowset_id = -1;
bool is_rowset_file = _tablet_manager->get_rowset_id_from_path(path, &rowset_id);
if (is_rowset_file) {
std::string rowset_path_id = ROWSET_ID_PREFIX + std::to_string(rowset_id);
bool exist_in_pending = _check_pending_ids(rowset_path_id);
if (!exist_in_pending) {
_process_garbage_path(path);
}
}
}
}
} else if (tablet_id > 0 && schema_hash <= 0) {
// tablet id path
if (!FileUtils::is_dir(path)) {
LOG(WARNING) << "unknown path:" << path;
continue;
}
bool exist = _tablet_manager->check_tablet_id_exist(tablet_id);
if (!exist) {
std::string tablet_path_id = TABLET_ID_PREFIX + std::to_string(tablet_id);
bool exist_in_pending = _check_pending_ids(tablet_path_id);
if (!exist_in_pending) {
_process_garbage_path(path);
}
}
}
}
_all_check_paths.clear();
LOG(INFO) << "finished one time path gc.";
}

// path producer
void DataDir::perform_path_scan() {
{
std::unique_lock<std::mutex> lck(_check_path_mutex);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (_all_check_paths.size() > 0) {
return ;
}

if (_all_check_paths.size() > 0) {
return;
}
LOG(INFO) << "start to scan data dir path:" << _path;
std::set<std::string> shards;
std::string data_path = _path + DATA_PREFIX;
if (dir_walk(data_path, &shards, nullptr) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to walk dir. [path=" << data_path << "]";
return;
}
for (const auto& shard : shards) {
std::string shard_path = data_path + "/" + shard;
std::set<std::string> tablet_ids;
if (dir_walk(shard_path, &tablet_ids, nullptr) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "]";
continue;
}
for (const auto& tablet_id : tablet_ids) {
std::string tablet_id_path = shard_path + "/" + tablet_id;
_all_check_paths.insert(tablet_id_path);
std::set<std::string> schema_hashes;
if (dir_walk(tablet_id_path, &schema_hashes, nullptr) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]";
continue;
}
for (const auto& schema_hash : schema_hashes) {
std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash;
_all_check_paths.insert(tablet_schema_hash_path);
std::set<std::string> rowset_files;
if (dir_walk(tablet_schema_hash_path, nullptr, &rowset_files) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path << "]";
continue;
}
for (const auto& rowset_file : rowset_files) {
std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file;
_all_check_paths.insert(rowset_file_path);
}
}
}
}
LOG(INFO) << "scan data dir path:" << _path << " finished. path size:" << _all_check_paths.size();
}
cv.notify_one();
}

void DataDir::_process_garbage_path(const std::string& path) {
if (check_dir_existed(path)) {
LOG(INFO) << "collect garbage dir path:" << path;
OLAPStatus status = remove_all_dir(path);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "remove garbage dir path:" << path << " failed";
}
}
}

bool DataDir::_check_pending_ids(const std::string& id) {
ReadLock rd_lock(&_pending_path_mutex);
return _pending_path_ids.find(id) != _pending_path_ids.end();
}

void DataDir::_remove_check_paths_no_lock(const std::set<std::string>& paths) {
for (const auto& path : paths) {
auto path_iter = _all_check_paths.find(path);
if (path_iter != _all_check_paths.end()) {
_all_check_paths.erase(path_iter);
}
}
}

} // namespace doris
Loading