Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,6 @@ enum ReaderType {
// <start_version_id, end_version_id>, such as <100, 110>
//using Version = std::pair<TupleVersion, TupleVersion>;
typedef std::pair<int64_t, int64_t> Version;

// used for hash-struct of hash_map<Version, Rowset*>.
struct HashOfVersion {
uint64_t operator()(const Version& version) const {
uint64_t value = version.first ^ version.second;
return std::hash<uint64_t>(value);
}
};

typedef std::vector<Version> Versions;

// It is used to represent Graph vertex.
Expand Down
23 changes: 13 additions & 10 deletions be/src/olap/rowset_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#include "olap/rowset_graph.h"

NewStatus Tablet::construct_rowset_graph(const std::vector<RowsetMeta>& rs_metas) {
namespace doris {

OLAPStatus Tablet::construct_rowset_graph(const std::vector<RowsetMeta>& rs_metas) {
if (rs_metas.size() == 0) {
VLOG(3) << "there is no version in the header.";
return Status::OK();
Expand All @@ -43,7 +45,7 @@ NewStatus Tablet::construct_rowset_graph(const std::vector<RowsetMeta>& rs_metas
}

// Add vertex to graph.
NewStatus status = add_vertex_to_graph(vertex_values[i]);
OLAPStatus status = add_vertex_to_graph(vertex_values[i]);
if (!status.ok())
LOG(WARNING) << "fail to add vertex to version graph. vertex=" << vertex_values[i];
return status;
Expand All @@ -67,19 +69,19 @@ NewStatus Tablet::construct_rowset_graph(const std::vector<RowsetMeta>& rs_metas
}
}

NewStatus RowsetGraph::add_version_to_graph(const Version& version) {
OLAPStatus RowsetGraph::add_version_to_graph(const Version& version) {
// Add version.first as new vertex of version graph if not exist.
int start_vertex_value = version.first;
int end_vertex_value = version.second + 1;

// Add vertex to graph.
NewStatus status = add_vertex_to_graph(start_vertex_value);
OLAPStatus status = add_vertex_to_graph(start_vertex_value);
if (!status.ok()) {
LOG(WARNING) << "fail to add vertex to version graph. vertex=" << start_vertex_value;
return status;
}

NewStatus status = add_vertex_to_graph(end_vertex_value);
OLAPStatus status = add_vertex_to_graph(end_vertex_value);
if (!status.ok()) {
LOG(WARNING) << "fail to add vertex to version graph. vertex=" << end_vertex_value;
return status;
Expand All @@ -100,15 +102,15 @@ NewStatus RowsetGraph::add_version_to_graph(const Version& version) {
return Status::OK();
}

NewStatus RowsetGraph::delete_version_from_graph(const Version& version) {
OLAPStatus RowsetGraph::delete_version_from_graph(const Version& version) {
int start_vertex_value = version.first;
int end_vertex_value = version.second + 1;

if (_vertex_helper_map->find(start_vertex_value) == _vertex_helper_map->end()
|| _vertex_helper_map->find(end_vertex_value) == _vertex_helper_map->end()) {
LOG(WARNING) << "vertex for version does not exists. "
<< "version=" << version.first << "-" << version.second;
return NewStatus::NotFound("vertex for version does not exists.");
return OLAPStatus::NotFound("vertex for version does not exists.");
}

int start_vertex_index = (*_vertex_helper_map)[start_vertex_value];
Expand All @@ -131,7 +133,7 @@ NewStatus RowsetGraph::delete_version_from_graph(const Version& version) {
return Status::OK();
}

NewStatus RowsetGraph::_add_vertex_to_graph(int vertex_value) {
OLAPStatus RowsetGraph::_add_vertex_to_graph(int vertex_value) {
// Vertex with vertex_value already exists.
if (_vertex_index_map->find(vertex_value) != _vertex_index_map->end()) {
VLOG(3) << "vertex with vertex value already exists. value=" << vertex_value;
Expand All @@ -140,7 +142,7 @@ NewStatus RowsetGraph::_add_vertex_to_graph(int vertex_value) {

list<int>* edges = new(std::nothrow) list<int>();
if (edges == NULL) {
return NewStatus::Corruption("malloc memory failed for edge list");
return OLAPStatus::Corruption("malloc memory failed for edge list");
}

Vertex vertex = {vertex_value, edges};
Expand All @@ -149,7 +151,8 @@ NewStatus RowsetGraph::_add_vertex_to_graph(int vertex_value) {
return Status::OK();
}

NewStatus Tablet::capture_consistent_versions(const Version& spec_version, version<Version>* version_path) {
OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path) {
if (spec_version.first > spec_version.second) {
LOG(WARNING) << "invalid specfied version. "
<< "spec_version=" << spec_version.first << "-" << spec_version.second;
Expand Down
17 changes: 11 additions & 6 deletions be/src/olap/rowset_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_GRAPH_H
#define DORIS_BE_SRC_OLAP_ROWSET_GRAPH_H

#include "olap/olap_common.h"
#include "olap/olap_define.h"

namespace doris {

class RowsetGraph {
public:
NewStatus construct_rowset_graph(const std::vector<RowsetMeta>& rs_metas);
NewStatus add_version_to_graph(const Versrion& version);
NewStatus delete_version_from_graph(const Version& version);
NewStatus capture_consistent_versions(const Version& spec_version,
version<Version>* version_path);
OLAPStatus construct_rowset_graph(const std::vector<RowsetMeta>& rs_metas);
OLAPStatus add_version_to_graph(const Version& version);
OLAPStatus delete_version_from_graph(const Version& version);
OLAPStatus capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path);
private:
NewStatus _add_vertex_to_graph(int vertex_value);
OLAPStatus _add_vertex_to_graph(int vertex_value);

// OLAP version contains two parts, [start_version, end_version]. In order
// to construct graph, the OLAP version has two corresponding vertex, one
Expand Down
30 changes: 13 additions & 17 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ Tablet(TabletMeta* tablet_meta, DataDir* data_dir) {
}

Tablet::~Tablet() {
if (_tablet_meta == NULL) {
return; // for convenience of mock test.
}

// ensure that there is nobody using Tablet, like acquiring OLAPData(SegmentGroup)
WriteLock wrlock(_meta_lock);
WriteLock wrlock(&_meta_lock);
for (auto& it : _version_rowset_map) {
SAFE_DELETE(it.second);
}
Expand All @@ -87,7 +83,7 @@ bool Tablet::can_do_compaction() {
// 如果table正在做schema change,则通过选路判断数据是否转换完成
// 如果选路成功,则转换完成,可以进行BE
// 如果选路失败,则转换未完成,不能进行BE
ReadLock rdlock(_meta_lock);
ReadLock rdlock(&_meta_lock);
RowsetSharedPtr lastest_rowset = lastest_version();
if (lastest_rowset == NULL) {
return false;
Expand All @@ -96,7 +92,7 @@ bool Tablet::can_do_compaction() {
Version test_version = Version(0, lastest_version->end_version());
vector<Version> path_versions;
if (OLAP_SUCCESS != _rs_graph->capture_consistent_versions(test_version, &path_versions)) {
LOG(WARNING) << "tablet has missed version. tablet=" << table->full_name();
LOG(WARNING) << "tablet has missed version. tablet=" << full_name();
return false;
}

Expand Down Expand Up @@ -132,7 +128,7 @@ OLAPStatus Tablet::capture_consistent_rowsets(const Version& spec_version,

void Tablet::acquire_rs_reader_by_version(const vector<Version>& version_vec,
vector<std::shared_ptr<RowsetReader>>* rs_readers) const {
DCHECK(rs_readers != NULL && rs_readers.empty());
DCHECK(rs_readers != NULL && rs_readers->empty());
for (auto version : version_vec) {
auto it2 = _rs_version_map.find(*it1);
if (it2 == _rs_version_map.end()) {
Expand All @@ -149,7 +145,7 @@ void Tablet::acquire_rs_reader_by_version(const vector<Version>& version_vec,
<< ", version=" << version.first << "-" << version.second;
release_rs_readers(rs_readers);
}
rs_reader.push_back(std::move(rs_reader)):
rs_reader->push_back(std::move(rs_reader)):
}
}

Expand All @@ -160,11 +156,11 @@ OLAPStatus Tablet::release_rs_readers(vector<std::shared_ptr<RowsetReader>>* rs_
}

rs_readers->clear();
return OLAP_SUCCESS;
return OLAP_SUCCESS;
}

OLAPStatus Tablet::add_inc_rowset(const Rowset& rowset) {
return _table_meta->add_inc_rs_meta(rowset->get_rs_meta());
return _table_meta.add_inc_rs_meta(rowset->get_rs_meta());
}

OLAPStatus Tablet::delete_expired_inc_rowset() {
Expand Down Expand Up @@ -201,7 +197,7 @@ void Tablet::calc_missed_versions(int64_t spec_version,
DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
std::list<Version> existing_versions;
for (RowsetMeta& rs : _tablet_meta->get_all_rs_metas()) {
existing_versions.emplace_back(rs->version());
existing_versions.emplace_back(rs.version());
}

// sort the existing versions in ascending order
Expand All @@ -210,7 +206,7 @@ void Tablet::calc_missed_versions(int64_t spec_version,
return a.first < b.first;
});

// find the missing version until until_version
// find the missing version until spec_version
int64_t last_version = -1;
for (const Version& version : existing_versions) {
if (version.first > last_version + 1) {
Expand All @@ -223,8 +219,8 @@ void Tablet::calc_missed_versions(int64_t spec_version,
break;
}
}
for (int64_t i = last_version + 1; i <= until_version; ++i) {
spec_versions->emplace_back(i, i);
for (int64_t i = last_version + 1; i <= spec_version; ++i) {
missed_versions->emplace_back(i, i);
}
}

Expand Down Expand Up @@ -375,7 +371,7 @@ OLAPStatus Tablet::split_range(
}

cur_start_key.attach(entry.data);
last_start_key.allocate_memory_for_string_type(_tablet_schema);
last_start_key.allocate_memory_for_string_type(_schema);
last_start_key.copy_without_pool(cur_start_key);
// start_key是last start_key, 但返回的实际上是查询层给出的key
ranges->emplace_back(start_key.to_tuple());
Expand Down Expand Up @@ -471,7 +467,7 @@ bool Tablet::is_schema_changing() {
bool is_schema_changing = false;

ReadLock rdlock(_meta_lock);
if (_tablet_meta->alter_state() != AlterTabletState::none) {
if (_tablet_meta->alter_state() != AlterTabletState::NONE) {
is_schema_changing = true;
}

Expand Down
24 changes: 18 additions & 6 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "olap/tablet_meta.h"
#include "olap/tuple.h"
#include "olap/row_cursor.h"
#include "olap/rowset_graph.h"
#include "olap/utils.h"

namespace doris {
Expand All @@ -40,6 +41,7 @@ class Rowset;
class Tablet;
class RowBlockPosition;
class DataDir;
class RowsetReader;

using TabletSharedPtr = std::shared_ptr<Tablet>;

Expand All @@ -59,7 +61,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
double bloom_filter_fpp() const;
bool equal(TTabletId tablet_id, TSchemaHash schema_hash);

Schema* schema() const;
TabletSchema* schema() const;
const std::string& full_name() const;
size_t num_fields() const;
size_t num_null_fields();
Expand Down Expand Up @@ -100,7 +102,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
vector<std::shared_ptr<RowsetReader>>* rs_readers) const;
OLAPStatus release_rs_readers(vector<std::shared_ptr<RowsetReader>>* rs_readers) const;

RMMutex* meta_lock();
RWMutex* meta_lock();
Mutex* ingest_lock();
Mutex* base_lock();
Mutex* cumulative_lock();
Expand All @@ -120,7 +122,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
bool can_do_compaction();

DeletePredicatePB* add_delete_predicates() {
return _tablet_meta->add_delete_predicates();
return _tablet_meta.add_delete_predicates();
}

const google::protobuf::RepeatedPtrField<DeletePredicatePB>&
Expand Down Expand Up @@ -151,12 +153,22 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
RowsetGraph* _rs_graph;

TabletMeta _tablet_meta;
Schema* _schema;
RMMutex _meta_lock;
TabletSchema* _schema;
RWMutex _meta_lock;
Mutex _ingest_lock;
Mutex _base_lock;
Mutex _cumulative_lock;
std::unordered_map<Version, std:shared_ptr<Rowset>, HashOfVersion> _version_rowset_map;

// used for hash-struct of hash_map<Version, Rowset*>.
struct HashOfVersion {
size_t operator()(const Version& version) const {
size_t seed = 0;
seed = HashUtil::hash64(&version.first, sizeof(version.first), seed);
seed = HashUtil::hash64(&version.second, sizeof(version.second), seed);
return seed;
}
};
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _version_rowset_map;

DISALLOW_COPY_AND_ASSIGN(Tablet);
};
Expand Down
Loading