Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
_key_coders.push_back(get_key_coder(column.type()));
_seq_coder = get_key_coder(column.type());
}
}

Expand Down Expand Up @@ -183,18 +183,19 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po

// convert column data from engine format to storage layer format
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
for (size_t id = 0; id < _column_writers.size(); ++id) {
// olap data convertor alway start from id = 0
auto converted_result = _olap_data_convertor->convert_column_data(id);
if (converted_result.first != Status::OK()) {
return converted_result.first;
}
auto cid = _column_ids[id];
if (_has_key && (cid < _num_key_columns || (_tablet_schema->has_sequence_col() &&
_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write &&
cid == _tablet_schema->sequence_col_idx()))) {
if (_has_key && cid < _num_key_columns) {
key_columns.push_back(converted_result.second);
} else if (_has_key && _tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {
seq_column = converted_result.second;
}
RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(),
converted_result.second->get_data(), num_rows));
Expand All @@ -203,8 +204,11 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
// create primary indexes
for (size_t pos = 0; pos < num_rows; pos++) {
RETURN_IF_ERROR(
_primary_key_index_builder->add_item(_full_encode_keys(key_columns, pos)));
std::string key = _full_encode_keys(key_columns, pos);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
}
} else {
// create short key indexes'
Expand Down Expand Up @@ -240,12 +244,7 @@ std::string SegmentWriter::_full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
assert(_key_index_size.size() == _num_key_columns);
if (_tablet_schema->has_sequence_col() && _opts.enable_unique_key_merge_on_write) {
assert(key_columns.size() == _num_key_columns + 1 &&
_key_coders.size() == _num_key_columns + 1);
} else {
assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns);
}
assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns);

std::string encoded_keys;
size_t cid = 0;
Expand All @@ -267,6 +266,22 @@ std::string SegmentWriter::_full_encode_keys(
return encoded_keys;
}

void SegmentWriter::_encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column,
size_t pos, string* encoded_keys) {
auto field = seq_column->get_data_at(pos);
// To facilitate the use of the primary key index, encode the seq column
// to the minimum value of the corresponding length when the seq column
// is null
if (UNLIKELY(!field)) {
encoded_keys->push_back(KEY_NULL_FIRST_MARKER);
size_t seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length();
encoded_keys->append(seq_col_length, KEY_MINIMAL_MARKER);
return;
}
encoded_keys->push_back(KEY_NORMAL_MARKER);
_seq_coder->full_encode_ascending(field, encoded_keys);
}

std::string SegmentWriter::_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ class SegmentWriter {
Status _write_raw_data(const std::vector<Slice>& slices);
std::string _encode_keys(const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos, bool null_first = true);
// for unique-key merge on write and segment min_max key
// used for unique-key with merge on write and segment min_max key
std::string _full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first = true);
// used for unique-key with merge on write
void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos,
string* encoded_keys);
void set_min_max_key(const Slice& key);
void set_min_key(const Slice& key);
void set_max_key(const Slice& key);
Expand Down Expand Up @@ -144,6 +147,7 @@ class SegmentWriter {
std::unique_ptr<vectorized::OlapBlockDataConvertor> _olap_data_convertor;
// used for building short key index or primary key index during vectorized write.
std::vector<const KeyCoder*> _key_coders;
const KeyCoder* _seq_coder = nullptr;
std::vector<uint16_t> _key_index_size;
size_t _short_key_row_pos = 0;

Expand Down
14 changes: 14 additions & 0 deletions regression-test/data/primary_index/test_mow_with_null_sequence.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
a abc address2 2022-10-20
aa1234 abc address4 2022-12-11
aa1235 abc address6 \N
ab abc address6 2022-11-20

-- !sql --
a abc address2 100
aa1234 abc address4 -1
aa1235 abc address6 -1
aa1236 abc address6 0
ab abc address6 110

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_mow_with_null_sequence") {
def tableName = "test_null_sequence"
sql """ DROP TABLE IF EXISTS $tableName """
sql """
CREATE TABLE `$tableName` (
`c_custkey` varchar(20) NOT NULL COMMENT "",
`c_name` varchar(20) NOT NULL COMMENT "",
`c_address` varchar(20) NOT NULL COMMENT "",
`c_date` date NULL COMMENT ""
)
UNIQUE KEY (`c_custkey`)
DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 10
PROPERTIES (
"function_column.sequence_col" = 'c_date',
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);
"""


sql """ insert into $tableName values('a', 'abc', 'address1', NULL) """
sql """ insert into $tableName values('a', 'abc', 'address2', '2022-10-20') """
sql """ insert into $tableName values('a', 'abc', 'address3', NULL) """
sql """ insert into $tableName values('ab', 'abc', 'address4', NULL) """
sql """ insert into $tableName values('ab', 'abc', 'address5', '2022-10-20') """
sql """ insert into $tableName values('ab', 'abc', 'address6', '2022-11-20') """
sql """ insert into $tableName values('ab', 'abc', 'address6', '2022-11-15') """
sql """ insert into $tableName values('aa1234', 'abc', 'address4', '2022-12-11') """
sql """ insert into $tableName values('aa1234', 'abc', 'address5', NULL) """
sql """ insert into $tableName values('aa1235', 'abc', 'address6', NULL) """

order_qt_sql "select * from $tableName"

sql """ DROP TABLE IF EXISTS $tableName """

sql """
CREATE TABLE `$tableName` (
`c_custkey` varchar(20) NOT NULL COMMENT "",
`c_name` varchar(20) NOT NULL COMMENT "",
`c_address` varchar(20) NOT NULL COMMENT "",
`c_int` int NULL COMMENT ""
)
UNIQUE KEY (`c_custkey`)
DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 10
PROPERTIES (
"function_column.sequence_col" = 'c_int',
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);
"""
sql """ insert into $tableName values('a', 'abc', 'address1', NULL) """
sql """ insert into $tableName values('a', 'abc', 'address2', 100) """
sql """ insert into $tableName values('a', 'abc', 'address3', NULL) """

sql """ insert into $tableName values('ab', 'abc', 'address4', NULL) """
sql """ insert into $tableName values('ab', 'abc', 'address5', -10) """
sql """ insert into $tableName values('ab', 'abc', 'address6', 110) """
sql """ insert into $tableName values('ab', 'abc', 'address6', 100) """

sql """ insert into $tableName values('aa1234', 'abc', 'address4', -1) """
sql """ insert into $tableName values('aa1234', 'abc', 'address5', NULL) """

sql """ insert into $tableName values('aa1235', 'abc', 'address6', NULL) """
sql """ insert into $tableName values('aa1235', 'abc', 'address6', -1) """

sql """ insert into $tableName values('aa1236', 'abc', 'address6', NULL) """
sql """ insert into $tableName values('aa1236', 'abc', 'address6', 0) """
sql """ insert into $tableName values('aa1236', 'abc', 'address6', NULL) """

order_qt_sql "select * from $tableName"

sql """ DROP TABLE IF EXISTS $tableName """
}