Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Cherry-pick][Branch-2.5] Support sort key be able to nullable. (#15641) #29225

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 37 additions & 8 deletions be/src/storage/primary_key_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

namespace starrocks {

constexpr uint8_t SORT_KEY_NULL_FIRST_MARKER = 0x00;
constexpr uint8_t SORT_KEY_NORMAL_MARKER = 0x01;

template <class UT>
UT to_bigendian(UT v);

Expand Down Expand Up @@ -226,9 +229,6 @@ inline Status decode_slice(Slice* src, string* dest, bool is_last) {
}

bool PrimaryKeyEncoder::is_supported(const vectorized::Field& f) {
if (f.is_nullable()) {
return false;
}
switch (f.type()->type()) {
case OLAP_FIELD_TYPE_BOOL:
case OLAP_FIELD_TYPE_TINYINT:
Expand Down Expand Up @@ -260,6 +260,9 @@ FieldType PrimaryKeyEncoder::encoded_primary_key_type(const vectorized::Schema&
return OLAP_FIELD_TYPE_NONE;
}
if (key_idxes.size() == 1) {
if (!schema.sort_key_idxes().empty() && schema.field(schema.sort_key_idxes()[0])->is_nullable()) {
return OLAP_FIELD_TYPE_VARCHAR;
}
return schema.field(key_idxes[0])->type()->type();
}
return OLAP_FIELD_TYPE_VARCHAR;
Expand Down Expand Up @@ -443,13 +446,39 @@ void PrimaryKeyEncoder::encode_sort_key(const vectorized::Schema& schema, const
prepare_ops_datas(schema, schema.sort_key_idxes(), chunk, &ops, &datas);
auto& bdest = down_cast<vectorized::BinaryColumn&>(*dest);
bdest.reserve(bdest.size() + len);
std::vector<std::shared_ptr<vectorized::Column>> cols(ncol);
for (int i = 0; i < ncol; i++) {
cols[i] = chunk.get_column_by_index(schema.sort_key_idxes()[i]);
}
bool has_nullable_sort_key = false;
for (int i = 0; i < ncol; i++) {
if (schema.field(schema.sort_key_idxes()[i])->is_nullable()) {
has_nullable_sort_key = true;
break;
}
}
std::string buff;
for (size_t i = 0; i < len; i++) {
buff.clear();
for (int j = 0; j < ncol; j++) {
ops[j](datas[j], offset + i, &buff);
if (!has_nullable_sort_key) {
for (size_t i = 0; i < len; i++) {
buff.clear();
for (int j = 0; j < ncol; j++) {
ops[j](datas[j], offset + i, &buff);
}
bdest.append(buff);
}
} else {
for (size_t i = 0; i < len; i++) {
buff.clear();
for (int j = 0; j < ncol; j++) {
if (cols[j]->is_null(i)) {
buff.push_back(SORT_KEY_NULL_FIRST_MARKER);
} else {
buff.push_back(SORT_KEY_NORMAL_MARKER);
ops[j](datas[j], offset + i, &buff);
}
}
bdest.append(buff);
}
bdest.append(buff);
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/rowset_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <queue>

#include "column/binary_column.h"
#include "gutil/stl_util.h"
#include "storage/chunk_helper.h"
#include "storage/empty_iterator.h"
Expand Down Expand Up @@ -263,6 +264,8 @@ class RowsetMergerImpl : public RowsetMerger {
if (!PrimaryKeyEncoder::create_column(schema, &sort_column, schema.sort_key_idxes()).ok()) {
LOG(FATAL) << "create column for primary key encoder failed";
}
} else if (schema.sort_key_idxes().size() == 1 && schema.field(schema.sort_key_idxes()[0])->is_nullable()) {
sort_column = std::make_unique<BinaryColumn>();
}
std::vector<std::unique_ptr<vector<RowSourceMask>>> rowsets_source_masks;
uint16_t order = 0;
Expand Down
201 changes: 201 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,43 @@ class TabletUpdatesTest : public testing::Test {
return *writer->build();
}

RowsetSharedPtr create_nullable_sort_key_rowset(const TabletSharedPtr& tablet,
const vector<vector<int64_t>>& all_cols) {
RowsetWriterContext writer_context;
RowsetId rowset_id = StorageEngine::instance()->next_rowset_id();
writer_context.rowset_id = rowset_id;
writer_context.tablet_id = tablet->tablet_id();
writer_context.tablet_schema_hash = tablet->schema_hash();
writer_context.partition_id = 0;
writer_context.rowset_path_prefix = tablet->schema_hash_path();
writer_context.rowset_state = COMMITTED;
writer_context.tablet_schema = &tablet->tablet_schema();
writer_context.version.first = 0;
writer_context.version.second = 0;
writer_context.segments_overlap = NONOVERLAPPING;
std::unique_ptr<RowsetWriter> writer;
EXPECT_TRUE(RowsetFactory::create_rowset_writer(writer_context, &writer).ok());
auto schema = ChunkHelper::convert_schema_to_format_v2(tablet->tablet_schema());
const auto keys_size = all_cols[0].size();
auto chunk = ChunkHelper::new_chunk(schema, keys_size);
auto& cols = chunk->columns();
for (auto i = 0; i < keys_size; ++i) {
auto append_datum_func = []<typename T>(std::shared_ptr<Column> col, T val) {
if (val == -1) {
col->append_nulls(1);
} else {
col->append_datum(vectorized::Datum(val));
}
};
append_datum_func(cols[0], static_cast<int64_t>(all_cols[0][i]));
append_datum_func(cols[1], static_cast<int16_t>(all_cols[1][i]));
append_datum_func(cols[2], static_cast<int32_t>(all_cols[2][i]));
}

CHECK_OK(writer->flush_chunk(*chunk));
return *writer->build();
}

TabletSharedPtr create_tablet(int64_t tablet_id, int32_t schema_hash, bool multi_column_pk = false) {
TCreateTabletReq request;
request.tablet_id = tablet_id;
Expand Down Expand Up @@ -366,6 +403,41 @@ class TabletUpdatesTest : public testing::Test {
return StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false);
}

TabletSharedPtr create_tablet_with_nullable_sort_key(int64_t tablet_id, int32_t schema_hash,
std::vector<int32_t> sort_key_idxes) {
TCreateTabletReq request;
request.tablet_id = tablet_id;
request.__set_version(1);
request.tablet_schema.schema_hash = schema_hash;
request.tablet_schema.short_key_column_count = 1;
request.tablet_schema.keys_type = TKeysType::PRIMARY_KEYS;
request.tablet_schema.storage_type = TStorageType::COLUMN;
request.tablet_schema.sort_key_idxes = sort_key_idxes;

TColumn k1;
k1.column_name = "pk";
k1.__set_is_key(true);
k1.column_type.type = TPrimitiveType::BIGINT;
request.tablet_schema.columns.push_back(k1);

TColumn k2;
k2.column_name = "v1";
k2.__set_is_key(false);
k2.__set_is_allow_null(true);
k2.column_type.type = TPrimitiveType::SMALLINT;
request.tablet_schema.columns.push_back(k2);

TColumn k3;
k3.column_name = "v2";
k3.__set_is_key(false);
k3.__set_is_allow_null(true);
k3.column_type.type = TPrimitiveType::INT;
request.tablet_schema.columns.push_back(k3);
auto st = StorageEngine::instance()->create_tablet(request);
CHECK(st.ok()) << st.to_string();
return StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false);
}

TabletSharedPtr create_tablet2(int64_t tablet_id, int32_t schema_hash) {
TCreateTabletReq request;
request.tablet_id = tablet_id;
Expand Down Expand Up @@ -876,6 +948,48 @@ static ssize_t read_tablet_and_compare_sort_key_error_encode_case(const TabletSh
return count;
}

static ssize_t read_tablet_and_compare_nullable_sort_key(const TabletSharedPtr& tablet, int64_t version,
const vector<vector<int64_t>>& all_cols) {
vectorized::Schema schema = ChunkHelper::convert_schema_to_format_v2(tablet->tablet_schema());
vectorized::TabletReader reader(tablet, Version(0, version), schema);
auto iter = create_tablet_iterator(reader, schema);
if (iter == nullptr) {
return -1;
}
const auto keys_size = all_cols[0].size();
auto full_chunk = ChunkHelper::new_chunk(iter->schema(), keys_size);
auto& cols = full_chunk->columns();
for (auto i = 0; i < keys_size; ++i) {
auto append_datum_func = []<typename T>(std::shared_ptr<Column> col, T val) {
if (val == -1) {
col->append_nulls(1);
} else {
col->append_datum(vectorized::Datum(val));
}
};
append_datum_func(cols[0], static_cast<int64_t>(all_cols[0][i]));
append_datum_func(cols[1], static_cast<int16_t>(all_cols[1][i]));
append_datum_func(cols[2], static_cast<int32_t>(all_cols[2][i]));
}
auto chunk = ChunkHelper::new_chunk(iter->schema(), 100);
size_t count = 0;
while (true) {
auto st = iter->get_next(chunk.get());
if (st.is_end_of_file()) {
break;
} else if (st.ok()) {
for (auto i = 0; i < chunk->num_rows(); i++) {
EXPECT_EQ(full_chunk->get(count + i).compare(iter->schema(), chunk->get(i)), 0);
}
count += chunk->num_rows();
chunk->reset();
} else {
return -1;
}
}
return count;
}

void TabletUpdatesTest::test_writeread(bool enable_persistent_index) {
srand(GetCurrentTimeMicros());
_tablet = create_tablet(rand(), rand());
Expand Down Expand Up @@ -1637,6 +1751,93 @@ TEST_F(TabletUpdatesTest, horizontal_compaction_with_sort_key_error_encode_case)
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
}

TEST_F(TabletUpdatesTest, horizontal_compaction_with_nullable_sort_key) {
auto orig = config::vertical_compaction_max_columns_per_group;
config::vertical_compaction_max_columns_per_group = 5;
DeferOp unset_config([&] { config::vertical_compaction_max_columns_per_group = orig; });

{
int N = 10;
srand(GetCurrentTimeMicros());
_tablet = create_tablet_with_nullable_sort_key(rand(), rand(), {2});
std::vector<int64_t> keys;
for (int i = 0; i < N; i++) {
keys.push_back(i);
}
ASSERT_TRUE(_tablet->rowset_commit(2, create_nullable_sort_key_rowset(_tablet, {{1, 2, 3, 4, 5, 6, 7, 8},
{8, 7, 6, 5, 4, 3, 2, 1},
{-1, -1, -1, -1, 5, 6, 7, 8}}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_TRUE(
_tablet->rowset_commit(3, create_nullable_sort_key_rowset(_tablet, {{5, 6, 7, 8, 9, 10, 11, 12},
{12, 11, 10, 9, 8, 7, 6, 5},
{-1, -1, -1, -1, 9, 10, 11, 12}}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_EQ(_tablet->updates()->version_history_count(), 3);
ASSERT_EQ(12, read_tablet(_tablet, 3));
const auto& best_tablet = StorageEngine::instance()->tablet_manager()->find_best_tablet_to_do_update_compaction(
_tablet->data_dir());
EXPECT_EQ(best_tablet->tablet_id(), _tablet->tablet_id());
EXPECT_GT(best_tablet->updates()->get_compaction_score(), 0);
ASSERT_TRUE(best_tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(12, read_tablet_and_compare_nullable_sort_key(best_tablet, 3,
{{1, 5, 2, 6, 3, 7, 4, 8, 9, 10, 11, 12},
{8, 12, 7, 11, 6, 10, 5, 9, 8, 7, 6, 5},
{-1, -1, -1, -1, -1, -1, -1, -1, 9, 10, 11, 12}}));
ASSERT_EQ(best_tablet->updates()->num_rowsets(), 1);
ASSERT_EQ(best_tablet->updates()->version_history_count(), 4);
// the time interval is not enough after last compaction
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
}

{
int N = 10;
srand(GetCurrentTimeMicros());
_tablet = create_tablet_with_nullable_sort_key(rand(), rand(), {1, 2});
std::vector<int64_t> keys;
for (int i = 0; i < N; i++) {
keys.push_back(i);
}
ASSERT_TRUE(_tablet->rowset_commit(2, create_nullable_sort_key_rowset(_tablet,
{
{1, 2, 3, 4, 5, 6, 7, 8},
{-1, -1, -1, -1, 5, 6, 7, 8},
{8, 7, 6, 5, 4, 3, 2, 1},
}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_TRUE(_tablet->rowset_commit(3, create_nullable_sort_key_rowset(_tablet,
{
{5, 6, 7, 8, 9, 10, 11, 12},
{-1, -1, -1, -1, 9, 10, 11, 12},
{12, 11, 10, 9, 8, 7, 6, 5},
}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_EQ(_tablet->updates()->version_history_count(), 3);
ASSERT_EQ(12, read_tablet(_tablet, 3));
const auto& best_tablet = StorageEngine::instance()->tablet_manager()->find_best_tablet_to_do_update_compaction(
_tablet->data_dir());
EXPECT_EQ(best_tablet->tablet_id(), _tablet->tablet_id());
EXPECT_GT(best_tablet->updates()->get_compaction_score(), 0);
ASSERT_TRUE(best_tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(12, read_tablet_and_compare_nullable_sort_key(best_tablet, 3,
{
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
{-1, -1, -1, -1, -1, -1, -1, -1, 9, 10, 11, 12},
{8, 7, 6, 5, 12, 11, 10, 9, 8, 7, 6, 5},
}));
ASSERT_EQ(best_tablet->updates()->num_rowsets(), 1);
ASSERT_EQ(best_tablet->updates()->version_history_count(), 4);
// the time interval is not enough after last compaction
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
}
}

void TabletUpdatesTest::test_vertical_compaction(bool enable_persistent_index) {
auto orig = config::vertical_compaction_max_columns_per_group;
config::vertical_compaction_max_columns_per_group = 1;
Expand Down