Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/olap/olap_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ OlapMeta::~OlapMeta() {
for (auto handle : _handles) {
delete handle;
}
if (_db != NULL) {
if (_db != nullptr) {
_db->Close();
delete _db;
_db= NULL;
_db= nullptr;
}
}

Expand All @@ -79,7 +79,7 @@ OLAPStatus OlapMeta::init() {
meta_column_family.prefix_extractor.reset(NewFixedPrefixTransform(PREFIX_LENGTH));
column_families.emplace_back(META_COLUMN_FAMILY, meta_column_family);
Status s = DB::Open(options, db_path, column_families, &_handles, &_db);
if (!s.ok() || _db == NULL) {
if (!s.ok() || _db == nullptr) {
LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
return OLAP_ERR_META_OPEN_DB;
}
Expand Down
16 changes: 15 additions & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ OLAPStatus StorageEngine::_start_bg_worker() {
[this] {
_garbage_sweeper_thread_callback(nullptr);
});

_garbage_sweeper_thread.detach();
// start thread for monitoring the tablet with io error
_disk_stat_monitor_thread = std::thread(
[this] {
_disk_stat_monitor_thread_callback(nullptr);
});
_disk_stat_monitor_thread.detach();

// start be and ce threads for merge data
int32_t base_compaction_num_threads = config::base_compaction_num_threads;
Expand All @@ -61,6 +62,9 @@ OLAPStatus StorageEngine::_start_bg_worker() {
_base_compaction_thread_callback(nullptr);
});
}
for (auto& thread : _base_compaction_threads) {
thread.detach();
}

int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads;
_cumulative_compaction_threads.reserve(cumulative_compaction_num_threads);
Expand All @@ -70,11 +74,15 @@ OLAPStatus StorageEngine::_start_bg_worker() {
_cumulative_compaction_thread_callback(nullptr);
});
}
for (auto& thread : _cumulative_compaction_threads) {
thread.detach();
}

_fd_cache_clean_thread = std::thread(
[this] {
_fd_cache_clean_callback(nullptr);
});
_fd_cache_clean_thread.detach();

// path scan and gc thread
if (config::path_gc_check) {
Expand All @@ -89,6 +97,12 @@ OLAPStatus StorageEngine::_start_bg_worker() {
_path_gc_thread_callback((void*)data_dir);
});
}
for (auto& thread : _path_scan_threads) {
thread.detach();
}
for (auto& thread : _path_gc_threads) {
thread.detach();
}
}

VLOG(10) << "init finished.";
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/row_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class RowBlock {
uint32_t _capacity;
RowBlockInfo _info;
const TabletSchema* _schema; // 内部保存的schema句柄

bool _null_supported;

size_t _field_count = 0;
Expand Down
28 changes: 14 additions & 14 deletions be/src/olap/row_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ OLAPStatus RowCursor::_init(const std::vector<TabletColumn>& schema,
for (auto cid : _columns) {
const TabletColumn& column = schema[cid];
_field_array[cid] = Field::create(column);
if (_field_array[cid] == NULL) {
OLAP_LOG_WARNING("Fail to create field.");
if (_field_array[cid] == nullptr) {
LOG(WARNING) << "Fail to create field.";
return OLAP_ERR_INIT_FAILED;
}
_fixed_len += field_buf_lens[cid] + 1; //1 for null byte
Expand All @@ -99,7 +99,7 @@ OLAPStatus RowCursor::_init(const std::vector<TabletColumn>& schema,

_fixed_buf = new (nothrow) char[_fixed_len];
if (_fixed_buf == nullptr) {
OLAP_LOG_WARNING("Fail to malloc _fixed_buf.");
LOG(WARNING) << "Fail to malloc _fixed_buf.";
return OLAP_ERR_MALLOC_ERROR;
}
_owned_fixed_buf = _fixed_buf;
Expand Down Expand Up @@ -194,8 +194,8 @@ OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema,
}

// variable_len for null bytes
_variable_buf = new (nothrow) char[_variable_len];
if (_variable_buf == NULL) {
_variable_buf = new(nothrow) char[_variable_len];
if (_variable_buf == nullptr) {
OLAP_LOG_WARNING("Fail to malloc _variable_buf.");
return OLAP_ERR_MALLOC_ERROR;
}
Expand Down Expand Up @@ -304,7 +304,7 @@ int RowCursor::cmp(const RowCursor& other) const {
size_t common_prefix_count = min(_key_column_num, other._key_column_num);
// 只有key column才会参与比较
for (size_t i = 0; i < common_prefix_count; ++i) {
if (_field_array[i] == NULL || other._field_array[i] == NULL) {
if (_field_array[i] == nullptr || other._field_array[i] == nullptr) {
continue;
}

Expand All @@ -325,7 +325,7 @@ int RowCursor::index_cmp(const RowCursor& other) const {
size_t common_prefix_count = min(_columns.size(), other._key_column_num);
// 只有key column才会参与比较
for (size_t i = 0; i < common_prefix_count; ++i) {
if (_field_array[i] == NULL || other._field_array[i] == NULL) {
if (_field_array[i] == nullptr || other._field_array[i] == nullptr) {
continue;
}
char* left = _field_array[i]->get_field_ptr(_fixed_buf);
Expand All @@ -343,7 +343,7 @@ bool RowCursor::equal(const RowCursor& other) const {
// 按field顺序从后往前比较,有利于尽快发现不同,提升比较性能
size_t common_prefix_count = min(_key_column_num, other._key_column_num);
for (int i = common_prefix_count - 1; i >= 0; --i) {
if (_field_array[i] == NULL || other._field_array[i] == NULL) {
if (_field_array[i] == nullptr || other._field_array[i] == nullptr) {
continue;
}
char* left = _field_array[i]->get_field_ptr(_fixed_buf);
Expand All @@ -357,7 +357,7 @@ bool RowCursor::equal(const RowCursor& other) const {

void RowCursor::finalize_one_merge() {
for (size_t i = _key_column_num; i < _field_array.size(); ++i) {
if (_field_array[i] == NULL) {
if (_field_array[i] == nullptr) {
continue;
}
char* dest = _field_array[i]->get_ptr(_fixed_buf);
Expand All @@ -368,7 +368,7 @@ void RowCursor::finalize_one_merge() {
void RowCursor::aggregate(const RowCursor& other) {
// 只有value column才会参与aggregate
for (size_t i = _key_column_num; i < _field_array.size(); ++i) {
if (_field_array[i] == NULL || other._field_array[i] == NULL) {
if (_field_array[i] == nullptr || other._field_array[i] == nullptr) {
continue;
}

Expand Down Expand Up @@ -429,7 +429,7 @@ OlapTuple RowCursor::to_tuple() const {
OlapTuple tuple;

for (auto cid : _columns) {
if (_field_array[cid] != NULL) {
if (_field_array[cid] != nullptr) {
Field* field = _field_array[cid];
char* src = field->get_ptr(_fixed_buf);
if (field->is_null(_fixed_buf)) {
Expand Down Expand Up @@ -476,7 +476,7 @@ string RowCursor::to_string(string sep) const {
}

Field* field = _field_array[cid];
if (field != NULL) {
if (field != nullptr) {
char* src = field->get_ptr(_fixed_buf);
result.append(field->to_string(src));
} else {
Expand All @@ -489,7 +489,7 @@ string RowCursor::to_string(string sep) const {

OLAPStatus RowCursor::get_first_different_column_id(const RowCursor& other,
size_t* first_diff_id) const {
if (first_diff_id == NULL) {
if (first_diff_id == nullptr) {
OLAP_LOG_WARNING("input parameter 'first_diff_id' is NULL.");
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
Expand All @@ -501,7 +501,7 @@ OLAPStatus RowCursor::get_first_different_column_id(const RowCursor& other,

size_t i = 0;
for (; i < _field_array.size(); ++i) {
if (_field_array[i] == NULL || other._field_array[i] == NULL) {
if (_field_array[i] == nullptr || other._field_array[i] == nullptr) {
continue;
}

Expand Down
5 changes: 1 addition & 4 deletions be/src/olap/row_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,10 @@ class RowCursor {
OLAPStatus init_scan_key(const TabletSchema& schema,
const std::vector<std::string>& keys);

OLAPStatus init_scan_key(const TabletSchema& schema,
const std::vector<size_t>& field_lengths);

//allocate memory for string type, which include char, varchar, hyperloglog
OLAPStatus allocate_memory_for_string_type(const TabletSchema& schema,
MemPool* mem_pool = nullptr);

// 两个RowCurosr做比较,返回-1,0,1
int cmp(const RowCursor& other) const;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ void ColumnData::set_read_params(

auto res = _cursor.init(_segment_group->get_tablet_schema());
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row_cursor");
LOG(WARNING) << "fail to init row_cursor";
}

_read_vector_batch.reset(new VectorizedRowBatch(
Expand Down
16 changes: 12 additions & 4 deletions be/src/olap/rowset/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,18 +485,26 @@ ColumnReader::ColumnReader(uint32_t column_id, uint32_t column_unique_id) :
_present_reader(NULL) {
}

ColumnReader* ColumnReader::create(uint32_t column_id,
ColumnReader* ColumnReader::create(uint32_t column_id,
const TabletSchema& schema,
const UniqueIdToColumnIdMap& included,
UniqueIdToColumnIdMap& segment_included,
const UniqueIdEncodingMap& encodings) {
if (column_id >= schema.num_columns()) {
return create(column_id, schema.columns(), included, segment_included, encodings);
}

ColumnReader* ColumnReader::create(uint32_t column_id,
const std::vector<TabletColumn>& schema,
const UniqueIdToColumnIdMap& included,
UniqueIdToColumnIdMap& segment_included,
const UniqueIdEncodingMap& encodings) {
if (column_id >= schema.size()) {
LOG(WARNING) << "invalid column_id, column_id=" << column_id
<< ", columns_size=" << schema.num_columns();
<< ", columns_size=" << schema.size();
return NULL;
}

const TabletColumn& column = schema.column(column_id);
const TabletColumn& column = schema[column_id];
ColumnReader* reader = NULL;
int32_t column_unique_id = column.unique_id();

Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ class ColumnReader {
const UniqueIdToColumnIdMap& included,
UniqueIdToColumnIdMap& segment_included,
const UniqueIdEncodingMap& encodings);

static ColumnReader* create(uint32_t column_id,
const std::vector<TabletColumn>& schema,
const UniqueIdToColumnIdMap& included,
UniqueIdToColumnIdMap& segment_included,
const UniqueIdEncodingMap& encodings);

ColumnReader(uint32_t column_id, uint32_t column_unique_id);
virtual ~ColumnReader();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ColumnWriter* ColumnWriter::create(uint32_t column_id,
OutStreamFactory* stream_factory,
size_t num_rows_per_row_block,
double bf_fpp) {
ColumnWriter* column_writer = NULL;
ColumnWriter* column_writer = nullptr;
const TabletColumn& column = schema.column(column_id);

switch (column.type()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ColumnWriter {
OutStreamFactory* stream_factory,
size_t num_rows_per_row_block,
double bf_fpp);

virtual ~ColumnWriter();
virtual OLAPStatus init();

Expand Down
10 changes: 4 additions & 6 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ OLAPStatus RowsetMetaManager::get_rowset_meta(OlapMeta* meta, int64_t rowset_id,
}

OLAPStatus RowsetMetaManager::get_json_rowset_meta(OlapMeta* meta, int64_t rowset_id, std::string* json_rowset_meta) {
RowsetMeta rowset_meta;
RowsetMetaSharedPtr rowset_meta_ptr(&rowset_meta);
RowsetMetaSharedPtr rowset_meta_ptr(new(std::nothrow) RowsetMeta());
OLAPStatus status = get_rowset_meta(meta, rowset_id, rowset_meta_ptr);
if (status != OLAP_SUCCESS) {
return status;
}
bool ret = rowset_meta.json_rowset_meta(json_rowset_meta);
bool ret = rowset_meta_ptr->json_rowset_meta(json_rowset_meta);
if (!ret) {
std::string error_msg = "get json rowset meta failed. rowset id:" + std::to_string(rowset_id);
return OLAP_ERR_SERIALIZE_PROTOBUF_ERROR;
}
return OLAP_SUCCESS;
}

OLAPStatus RowsetMetaManager::save(OlapMeta* meta, int64_t rowset_id, RowsetMetaSharedPtr rowset_meta) {
OLAPStatus RowsetMetaManager::save(OlapMeta* meta, int64_t rowset_id, RowsetMeta* rowset_meta) {
std::string key = ROWSET_PREFIX + std::to_string(rowset_id);
std::string value;
bool ret = rowset_meta->serialize(&value);
Expand Down Expand Up @@ -130,8 +129,7 @@ OLAPStatus RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta, const std::s
return OLAP_ERR_SERIALIZE_PROTOBUF_ERROR;
}
uint64_t rowset_id = rowset_meta.rowset_id();
RowsetMetaSharedPtr rowset_meta_ptr(&rowset_meta);
OLAPStatus status = save(meta, rowset_id, rowset_meta_ptr);
OLAPStatus status = save(meta, rowset_id, &rowset_meta);
return status;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RowsetMetaManager {

static OLAPStatus get_json_rowset_meta(OlapMeta* meta, int64_t rowset_id, std::string* json_rowset_meta);

static OLAPStatus save(OlapMeta* meta, int64_t rowset_id, RowsetMetaSharedPtr rowset_meta);
static OLAPStatus save(OlapMeta* meta, int64_t rowset_id, RowsetMeta* rowset_meta);

static OLAPStatus save(OlapMeta* meta, int64_t rowset_id, const string& meta_binary);

Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ Tablet::Tablet(TabletMeta* tablet_meta, DataDir* data_dir)
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
_is_dropped = false;
_tablet_meta->set_data_dir(_data_dir);

_tablet_path.append(_data_dir->path());
Expand Down Expand Up @@ -455,6 +454,10 @@ OLAPStatus Tablet::capture_rs_readers(const vector<Version>& version_path,
return OLAP_SUCCESS;
}

OLAPStatus Tablet::add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version) {
return _tablet_meta->add_delete_predicate(delete_predicate, version);
}

bool Tablet::version_for_delete_predicate(const Version& version) {
return _tablet_meta->version_for_delete_predicate(version);
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
// operation for TabletState
TabletState tablet_state() const { return _state; }
inline OLAPStatus set_tablet_state(TabletState state);
void mark_dropped() { _is_dropped = true; }

// Property encapsulated in TabletMeta
inline const TabletMeta& tablet_meta();
Expand Down Expand Up @@ -142,6 +141,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
vector<RowsetReaderSharedPtr>* rs_readers) const;

DelPredicateArray delete_predicates() { return _tablet_meta->delete_predicates(); }
OLAPStatus add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version);
bool version_for_delete_predicate(const Version& version);
bool version_for_load_deletion(const Version& version);

Expand Down Expand Up @@ -222,7 +222,6 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
std::string _tablet_path;
RowsetGraph _rs_graph;

bool _is_dropped;
DorisInitOnce _init_once;
RWMutex _meta_lock;
Mutex _ingest_lock;
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ OLAPStatus TabletManager::add_tablet(TTabletId tablet_id, SchemaHash schema_hash
_tablet_map[tablet_id].table_arr.sort(_sort_tablet_by_creation_time);
_tablet_map_lock.unlock();
} else {
tablet->mark_dropped();
res = OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
}
LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res
Expand Down Expand Up @@ -707,7 +706,6 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
LOG(WARNING) << "tablet not in schema change state without delta is invalid."
<< "tablet=" << tablet->full_name();
// tablet state is invalid, drop tablet
tablet->mark_dropped();
return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
}

Expand Down Expand Up @@ -1096,10 +1094,12 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
for (list<TabletSharedPtr>::iterator it = _tablet_map[tablet_id].table_arr.begin();
it != _tablet_map[tablet_id].table_arr.end();) {
if ((*it)->equal(tablet_id, schema_hash)) {
TabletSharedPtr tablet = *it;
it = _tablet_map[tablet_id].table_arr.erase(it);
if (!keep_files) {
(*it)->mark_dropped();
LOG(INFO) << "remove tablet:" << tablet_id << " path:" << tablet->tablet_path();
boost::filesystem::remove_all(tablet->tablet_path());
}
it = _tablet_map[tablet_id].table_arr.erase(it);
} else {
++it;
}
Expand Down
Loading