Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 2 additions & 32 deletions be/src/storage/row_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,6 @@ void RowCursor::_init_schema(TabletSchemaSPtr schema, uint32_t column_count) {
_schema.reset(new Schema(schema->columns(), columns));
}

void RowCursor::_init_schema(const std::shared_ptr<Schema>& shared_schema, uint32_t column_count) {
_schema.reset(new Schema(*shared_schema));
}

Status RowCursor::init(TabletSchemaSPtr schema, size_t num_columns) {
if (num_columns > schema->num_columns()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
num_columns, schema->num_columns());
}
_init_schema(schema, cast_set<uint32_t>(num_columns));
// Initialize all fields as null (TYPE_NULL).
_fields.resize(num_columns);
return Status::OK();
}

Status RowCursor::init(TabletSchemaSPtr schema, const OlapTuple& tuple) {
size_t key_size = tuple.size();
if (key_size > schema->num_columns()) {
Expand All @@ -74,20 +57,7 @@ Status RowCursor::init(TabletSchemaSPtr schema, const OlapTuple& tuple) {
key_size, schema->num_columns());
}
_init_schema(schema, cast_set<uint32_t>(key_size));
return from_tuple(tuple);
}

Status RowCursor::init(TabletSchemaSPtr schema, const OlapTuple& tuple,
const std::shared_ptr<Schema>& shared_schema) {
size_t key_size = tuple.size();
if (key_size > schema->num_columns()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
key_size, schema->num_columns());
}
_init_schema(shared_schema, cast_set<uint32_t>(key_size));
return from_tuple(tuple);
return _from_tuple(tuple);
}

Status RowCursor::init_scan_key(TabletSchemaSPtr schema, std::vector<Field> fields) {
Expand All @@ -103,7 +73,7 @@ Status RowCursor::init_scan_key(TabletSchemaSPtr schema, std::vector<Field> fiel
return Status::OK();
}

Status RowCursor::from_tuple(const OlapTuple& tuple) {
Status RowCursor::_from_tuple(const OlapTuple& tuple) {
if (tuple.size() != _schema->num_column_ids()) {
return Status::Error<INVALID_ARGUMENT>(
"column count does not match. tuple_size={}, field_count={}", tuple.size(),
Expand Down
9 changes: 1 addition & 8 deletions be/src/storage/row_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ class RowCursor {
// Initialize from OlapTuple (which now stores Fields).
// Sets up the schema and copies Fields from the tuple.
Status init(TabletSchemaSPtr schema, const OlapTuple& tuple);
Status init(TabletSchemaSPtr schema, const OlapTuple& tuple,
const std::shared_ptr<Schema>& shared_schema);

// Initialize with schema and num_columns, creating null Fields.
// Caller sets individual fields via mutable_field().
Status init(TabletSchemaSPtr schema, size_t num_columns);

// Initialize from typed Fields directly.
Status init_scan_key(TabletSchemaSPtr schema, std::vector<Field> fields);
Expand Down Expand Up @@ -99,10 +93,9 @@ class RowCursor {

private:
// Copy Fields from an OlapTuple into this cursor.
Status from_tuple(const OlapTuple& tuple);
Status _from_tuple(const OlapTuple& tuple);

void _init_schema(TabletSchemaSPtr schema, uint32_t column_count);
void _init_schema(const std::shared_ptr<Schema>& shared_schema, uint32_t column_count);

// Helper: encode a single non-null field for the given column.
// Converts the core::Field to storage format and calls KeyCoder.
Expand Down
2 changes: 0 additions & 2 deletions be/src/storage/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ Schema& Schema::operator=(const Schema& other) {

void Schema::_copy_from(const Schema& other) {
_col_ids = other._col_ids;
_unique_ids = other._unique_ids;
_num_key_columns = other._num_key_columns;
_delete_sign_idx = other._delete_sign_idx;
_has_sequence_col = other._has_sequence_col;
_rowid_col_idx = other._rowid_col_idx;
_version_col_idx = other._version_col_idx;
_lsn_col_idx = other._lsn_col_idx;
_tso_col_idx = other._tso_col_idx;
_mem_size = other._mem_size;

_cols.resize(other._cols.size());
for (auto cid : _col_ids) {
Expand Down
57 changes: 0 additions & 57 deletions be/src/storage/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,51 +49,9 @@ class Schema;
using SchemaSPtr = std::shared_ptr<const Schema>;
class Schema {
public:
Schema(TabletSchemaSPtr tablet_schema) {
size_t num_columns = tablet_schema->num_columns();
// ignore this column
if (tablet_schema->columns().back()->name() == BeConsts::ROW_STORE_COL) {
--num_columns;
}
std::vector<ColumnId> col_ids(num_columns);
_unique_ids.resize(num_columns);
std::vector<TabletColumnPtr> columns;
columns.reserve(num_columns);

size_t num_key_columns = 0;
for (uint32_t cid = 0; cid < num_columns; ++cid) {
col_ids[cid] = cid;
const TabletColumn& column = tablet_schema->column(cid);
_unique_ids[cid] = column.unique_id();
if (column.is_key()) {
++num_key_columns;
}
if (column.name() == BeConsts::ROWID_COL ||
column.name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
_rowid_col_idx = cid;
}
if (column.name() == VERSION_COL) {
_version_col_idx = cid;
}
if (column.name() == std::string(kRowBinlogLsnColName)) {
_lsn_col_idx = cid;
}
if (column.name() == std::string(kRowBinlogTimestampColName)) {
_tso_col_idx = cid;
}
columns.push_back(std::make_shared<TabletColumn>(column));
}
_delete_sign_idx = tablet_schema->delete_sign_idx();
if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
_has_sequence_col = true;
}
_init(columns, col_ids, num_key_columns);
}

// All the columns of one table may exist in the columns param, but col_ids is only a subset.
Schema(const std::vector<TabletColumnPtr>& columns, const std::vector<ColumnId>& col_ids) {
size_t num_key_columns = 0;
_unique_ids.resize(columns.size());
for (int i = 0; i < columns.size(); ++i) {
if (columns[i]->is_key()) {
++num_key_columns;
Expand All @@ -114,23 +72,10 @@ class Schema {
if (columns[i]->name() == std::string(kRowBinlogTimestampColName)) {
_tso_col_idx = i;
}
_unique_ids[i] = columns[i]->unique_id();
}
_init(columns, col_ids, num_key_columns);
}

// Only for UT
Schema(const std::vector<TabletColumnPtr>& columns, size_t num_key_columns) {
std::vector<ColumnId> col_ids(columns.size());
_unique_ids.resize(columns.size());
for (uint32_t cid = 0; cid < columns.size(); ++cid) {
col_ids[cid] = cid;
_unique_ids[cid] = columns[cid]->unique_id();
}

_init(columns, col_ids, num_key_columns);
}

Schema(const Schema&);
Schema& operator=(const Schema& other);

Expand All @@ -151,7 +96,6 @@ class Schema {
size_t num_column_ids() const { return _col_ids.size(); }
const std::vector<ColumnId>& column_ids() const { return _col_ids; }
ColumnId column_id(size_t index) const { return _col_ids[index]; }
int32_t unique_id(size_t index) const { return _unique_ids[index]; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
bool has_sequence_col() const { return _has_sequence_col; }
int32_t rowid_col_idx() const { return _rowid_col_idx; }
Expand All @@ -172,7 +116,6 @@ class Schema {
// NOTE: The ColumnId here represents the sequential index number (starting from 0) of
// a column in current row, not the unique id-identifier of each column
std::vector<ColumnId> _col_ids;
std::vector<int32_t> _unique_ids;
// NOTE: _cols[cid] can only be accessed when the cid is
// contained in _col_ids
std::vector<TabletColumnPtr> _cols;
Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/segment/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,9 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
for (const TabletColumn* col : key_columns) {
cols.emplace_back(std::make_shared<TabletColumn>(*col));
}
_seek_schema = std::make_unique<Schema>(cols, cols.size());
std::vector<uint32_t> column_ids(cols.size());
std::iota(column_ids.begin(), column_ids.end(), 0);
_seek_schema = std::make_unique<Schema>(cols, column_ids);
}
// todo(wb) need refactor here, when using pk to search, _seek_block is useless
if (_seek_block.size() == 0) {
Expand Down
10 changes: 2 additions & 8 deletions be/src/storage/tablet/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,14 @@ Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
scan_key_size, _tablet_schema->num_columns());
}

std::vector<uint32_t> columns(scan_key_size);
std::iota(columns.begin(), columns.end(), 0);

std::shared_ptr<Schema> schema = std::make_shared<Schema>(_tablet_schema->columns(), columns);

for (size_t i = 0; i < start_key_size; ++i) {
if (read_params.start_key[i].size() != scan_key_size) {
return Status::Error<INVALID_ARGUMENT>(
"The start_key.at({}).size={}, not equals the scan_key_size={}", i,
read_params.start_key[i].size(), scan_key_size);
}

Status res =
_keys_param.start_keys[i].init(_tablet_schema, read_params.start_key[i], schema);
Status res = _keys_param.start_keys[i].init(_tablet_schema, read_params.start_key[i]);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor. res = " << res;
return res;
Expand All @@ -378,7 +372,7 @@ Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
read_params.end_key[i].size(), scan_key_size);
}

Status res = _keys_param.end_keys[i].init(_tablet_schema, read_params.end_key[i], schema);
Status res = _keys_param.end_keys[i].init(_tablet_schema, read_params.end_key[i]);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor. res = " << res;
return res;
Expand Down
21 changes: 14 additions & 7 deletions be/test/exec/scan/vgeneric_iterators_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,26 @@ class VGenericIteratorsTest : public testing::Test {

static Schema create_schema() {
std::vector<TabletColumnPtr> col_schemas;
col_schemas.emplace_back(
std::make_shared<TabletColumn>(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE,
FieldType::OLAP_FIELD_TYPE_SMALLINT, true));
auto c1 = std::make_shared<TabletColumn>(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE,
FieldType::OLAP_FIELD_TYPE_SMALLINT, true);
c1->set_is_key(true);
col_schemas.emplace_back(c1);
// c2: int
col_schemas.emplace_back(
std::make_shared<TabletColumn>(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE,
FieldType::OLAP_FIELD_TYPE_INT, true));
auto c2 = std::make_shared<TabletColumn>(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE,
FieldType::OLAP_FIELD_TYPE_INT, true);
c2->set_is_key(true);
col_schemas.emplace_back(c2);
// c3: big int
col_schemas.emplace_back(
std::make_shared<TabletColumn>(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_SUM,
FieldType::OLAP_FIELD_TYPE_BIGINT, true));

Schema schema(col_schemas, 2);
std::vector<ColumnId> column_ids(col_schemas.size());
for (uint32_t cid = 0; cid < column_ids.size(); ++cid) {
column_ids[cid] = cid;
}

Schema schema(col_schemas, column_ids);
return schema;
}

Expand Down
15 changes: 14 additions & 1 deletion be/test/load/delta_writer/delta_writer_cluster_key_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ class OlapMeta;
static const uint32_t MAX_PATH_LEN = 1024;
static StorageEngine* engine_ref = nullptr;

static std::shared_ptr<Schema> create_full_schema(const TabletSchemaSPtr& tablet_schema) {
size_t num_columns = tablet_schema->num_columns();
if (num_columns > 0 && tablet_schema->columns().back()->name() == BeConsts::ROW_STORE_COL) {
--num_columns;
}

std::vector<ColumnId> column_ids(num_columns);
for (uint32_t cid = 0; cid < num_columns; ++cid) {
column_ids[cid] = cid;
}
return std::make_shared<Schema>(tablet_schema->columns(), column_ids);
}

static void set_up() {
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
Expand Down Expand Up @@ -340,7 +353,7 @@ TEST_F(TestDeltaWriterClusterKey, vec_sequence_col) {
opts.tablet_schema = rowset->tablet_schema();

std::unique_ptr<RowwiseIterator> iter;
std::shared_ptr<Schema> schema = std::make_shared<Schema>(rowset->tablet_schema());
std::shared_ptr<Schema> schema = create_full_schema(rowset->tablet_schema());
auto s = segments[0]->new_iterator(schema, opts, &iter);
ASSERT_TRUE(s.ok());
auto read_block = rowset->tablet_schema()->create_block();
Expand Down
19 changes: 16 additions & 3 deletions be/test/load/delta_writer/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ class OlapMeta;
static const uint32_t MAX_PATH_LEN = 1024;
static StorageEngine* engine_ref = nullptr;

static std::shared_ptr<Schema> create_full_schema(const TabletSchemaSPtr& tablet_schema) {
size_t num_columns = tablet_schema->num_columns();
if (num_columns > 0 && tablet_schema->columns().back()->name() == BeConsts::ROW_STORE_COL) {
--num_columns;
}

std::vector<ColumnId> column_ids(num_columns);
for (uint32_t cid = 0; cid < num_columns; ++cid) {
column_ids[cid] = cid;
}
return std::make_shared<Schema>(tablet_schema->columns(), column_ids);
}

static void set_up() {
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
Expand Down Expand Up @@ -824,7 +837,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
opts.tablet_schema = rowset->tablet_schema();

std::unique_ptr<RowwiseIterator> iter;
std::shared_ptr<Schema> schema = std::make_shared<Schema>(rowset->tablet_schema());
std::shared_ptr<Schema> schema = create_full_schema(rowset->tablet_schema());
auto s = segments[0]->new_iterator(schema, opts, &iter);
ASSERT_TRUE(s.ok());
auto read_block = rowset->tablet_schema()->create_block();
Expand Down Expand Up @@ -1032,7 +1045,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset1->rowset_id(), 0, cur_version}));
std::unique_ptr<RowwiseIterator> iter;
std::shared_ptr<Schema> schema = std::make_shared<Schema>(rowset1->tablet_schema());
std::shared_ptr<Schema> schema = create_full_schema(rowset1->tablet_schema());
std::vector<segment_v2::SegmentSharedPtr> segments;
static_cast<void>(((BetaRowset*)rowset1.get())->load_segments(&segments));
auto s = segments[0]->new_iterator(schema, opts, &iter);
Expand Down Expand Up @@ -1060,7 +1073,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset2->rowset_id(), 0, cur_version}));
std::unique_ptr<RowwiseIterator> iter;
std::shared_ptr<Schema> schema = std::make_shared<Schema>(rowset2->tablet_schema());
std::shared_ptr<Schema> schema = create_full_schema(rowset2->tablet_schema());
std::vector<segment_v2::SegmentSharedPtr> segments;
static_cast<void>(((BetaRowset*)rowset2.get())->load_segments(&segments));
auto s = segments[0]->new_iterator(schema, opts, &iter);
Expand Down
10 changes: 9 additions & 1 deletion be/test/storage/compaction/ordered_data_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,15 @@ class OrderedDataCompactionTest : public ::testing::Test {

void block_create(TabletSchemaSPtr tablet_schema, Block* block) {
block->clear();
Schema schema(tablet_schema);
size_t num_columns = tablet_schema->num_columns();
if (num_columns > 0 && tablet_schema->columns().back()->name() == BeConsts::ROW_STORE_COL) {
--num_columns;
}
std::vector<ColumnId> schema_column_ids(num_columns);
for (uint32_t cid = 0; cid < num_columns; ++cid) {
schema_column_ids[cid] = cid;
}
Schema schema(tablet_schema->columns(), schema_column_ids);
const auto& column_ids = schema.column_ids();
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
auto column_desc = schema.column(column_ids[i]);
Expand Down
10 changes: 9 additions & 1 deletion be/test/storage/compaction/vertical_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,15 @@ class VerticalCompactionTest : public ::testing::Test {

void block_create(TabletSchemaSPtr tablet_schema, Block* block) {
block->clear();
Schema schema(tablet_schema);
size_t num_columns = tablet_schema->num_columns();
if (num_columns > 0 && tablet_schema->columns().back()->name() == BeConsts::ROW_STORE_COL) {
--num_columns;
}
std::vector<ColumnId> schema_column_ids(num_columns);
for (uint32_t cid = 0; cid < num_columns; ++cid) {
schema_column_ids[cid] = cid;
}
Schema schema(tablet_schema->columns(), schema_column_ids);
const auto& column_ids = schema.column_ids();
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
auto column_desc = schema.column(column_ids[i]);
Expand Down
3 changes: 2 additions & 1 deletion be/test/storage/delete/delete_bitmap_calculator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ void build_segment(SegmentWriterOptions opts, TabletSchemaSPtr build_schema, siz
EXPECT_TRUE(st.ok());

RowCursor row;
auto olap_st = row.init(build_schema, build_schema->num_columns());
std::vector<Field> fields(build_schema->num_columns(), Field(PrimitiveType::TYPE_NULL));
auto olap_st = row.init_scan_key(build_schema, std::move(fields));
EXPECT_EQ(Status::OK(), olap_st);

for (size_t rid = 0; rid < nrows; ++rid) {
Expand Down
Loading
Loading