Skip to content

Commit

Permalink
[Enhancement] Optimize mem usage of partial update (#14187)
Browse files Browse the repository at this point in the history
We have partially optimized the primary key model for large import memory usage in this pr(#12068), but the enhancement doesn't work if the load is partial update. And we also need a lot of memory if you do a large number of partial updates in one transaction. So this pr will try to reduce the memory usage of large partial update.

There are two reasons for large memory usage during partial column updates:
1. The first one is that updating a few columns may increase the segment file size and we need to load all data of segment into memory which will cost a lot of memory.
2. The second one is that doing partial update requires reading data from other columns into memory, which can take up a lot of memory if the table has many columns.

In order to reduce memory usage,  the following two adjustments are made:
1. The first one is to estimate the length of the updated partial columns in each row when importing data, thus reducing the size of the segment file
2. The second one is not to load all the data of the rowset into memory at once, but to load them one by one according to the segment.

In my test env, one BE with two HDD, using StreamLoad, create a table with 65 column, 20 buckets:
```
CREATE TABLE `partial_test` (
  `col_1` bigint(20) NOT NULL COMMENT "",
  `col_2` bigint(20) NOT NULL COMMENT "",
  `col_3` bigint(20) NOT NULL COMMENT "",
  `col_4` varchar(150) NOT NULL COMMENT "",
  `col_5` varchar(150) NOT NULL COMMENT "",
  `col_6` varchar(150) NULL COMMENT "",
  `col_7` varchar(150) NULL COMMENT "",
  `col_8` varchar(1024) NULL COMMENT "",
  `col_9` varchar(120) NULL COMMENT "",
  `col_10` varchar(60) NULL COMMENT "",
  `col_11` varchar(10) NULL COMMENT "",
  `col_12` varchar(120) NULL COMMENT "",
  `col_13` varchar(524) NULL COMMENT "",
  `col_14` varchar(100) NULL COMMENT "",
  `col_15` varchar(150) NULL COMMENT "",
  `col_16` varchar(150) NULL COMMENT "",
  `col_17` varchar(150) NULL COMMENT "",
  `col_18` bigint(20) NULL COMMENT "",
  `col_19` varchar(500) NULL COMMENT "",
  `col_20` varchar(150) NULL COMMENT "",
  `col_21` tinyint(4) NULL COMMENT "",
  `col_22` int(11) NULL COMMENT "",
  `col_23` varchar(524) NULL COMMENT "",
  `col_24` bigint(20) NULL COMMENT "",
  `col_25` bigint(20) NULL COMMENT "",
  `col_26` varchar(8) NULL COMMENT "",
  `col_27` decimal64(18, 6) NULL COMMENT "",
  `col_28` decimal64(18, 6) NULL COMMENT "",
  `col_29` decimal64(18, 6) NULL COMMENT "",
  `col_30` decimal64(18, 6) NULL COMMENT "",
  `col_31` decimal64(18, 6) NULL COMMENT "",
  `col_32` decimal64(18, 6) NULL COMMENT "",
  `col_33` bigint(20) NULL COMMENT "",
  `col_34` decimal64(18, 6) NULL COMMENT "",
  `col_35` varchar(8) NULL COMMENT "",
  `col_36` decimal64(18, 6) NULL COMMENT "",
  `col_37` decimal64(18, 6) NULL COMMENT "",
  `col_38` varchar(8) NULL COMMENT "",
  `col_39` decimal64(18, 6) NULL COMMENT "",
  `col_40` decimal64(18, 6) NULL COMMENT "",
  `col_41` varchar(8) NULL COMMENT "",
  `col_42` decimal64(18, 6) NULL COMMENT "",
  `col_43` decimal64(18, 6) NULL COMMENT "",
  `col_44` decimal64(18, 6) NULL COMMENT "",
  `col_45` decimal64(18, 6) NULL COMMENT "",
  `col_46` int(11) NULL COMMENT "",
  `col_47` int(11) NOT NULL COMMENT "",
  `col_48` tinyint(4) NULL COMMENT "",
  `col_49` varchar(200) NULL COMMENT "",
  `col_50` tinyint(4) NULL COMMENT "",
  `col_51` varchar(200) NULL COMMENT "",
  `col_52` varchar(10) NULL COMMENT "",
  `col_53` tinyint(4) NULL COMMENT "",
  `col_54` tinyint(4) NULL COMMENT "",
  `col_55` varchar(150) NULL COMMENT "",
  `col_56` varchar(150) NULL COMMENT "",
  `col_57` varchar(500) NULL COMMENT "",
  `col_58` tinyint(4) NULL COMMENT "",
  `col_59` varchar(100) NULL COMMENT "",
  `col_60` varchar(150) NULL COMMENT "",
  `col_61` varchar(150) NULL COMMENT "",
  `col_62` varchar(150) NULL COMMENT "",
  `col_63` varchar(150) NULL COMMENT "",
  `col_64` datetime NULL COMMENT "",
  `col_65` datetime NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`col_1`, `col_2`, `col_3`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`col_1`, `col_2`) BUCKETS 20
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2",
"enable_persistent_index" = "true",
"compression" = "LZ4"
);
```

|PrimaryKey Length| RowNum|BucketNum| Column Num| Partial ColumnNum | PartialUpdate RowsNum| Load time(s)| Apply time(ms)| Peak UpdateMemory usage | Note |
|---------------------|----------|------------|----------------|--------------------|------------------------------|----|-----|-----|----|
|12 Bytes| 300M | 20 |  65 | 5 | 100M | 135261 | 106693 | 78.9G | branch-main |
|12 Bytes| 300M | 20 |  65 | 5 | 100M | 166449| 149870 | 10.3G | branch-opt |
|12 Bytes| 300M | 20 |  65 | 5 | 100K | 2078 | 529 | 60.1M | branch-main |
|12 Bytes| 300M | 20 |  65 | 5 | 100K | 2211 | 541 | 60.2M | branch-opt |

(cherry picked from commit 545b7be)

# Conflicts:
#	be/src/storage/memtable.h
#	be/src/storage/rowset_update_state.cpp
#	be/src/storage/rowset_update_state.h
#	be/src/storage/tablet_updates.cpp
  • Loading branch information
sevev authored and mergify[bot] committed Dec 29, 2022
1 parent cfb27ea commit 20d524a
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 81 deletions.
12 changes: 12 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ Status DeltaWriter::_init() {
}
writer_context.referenced_column_ids.push_back(index);
}
int64_t average_row_size = _tablet->updates()->get_average_row_size();
if (average_row_size != 0) {
_memtable_buffer_row = config::write_buffer_size / average_row_size;
} else {
// If tablet is a new created tablet and has no historical data, average_row_size is 0
// And we use schema size as average row size. If there are complex type(i.e. BITMAP/ARRAY) or varchar,
// we will consider it as 16 bytes.
average_row_size = _tablet->tablet_schema().estimate_row_size(16);
_memtable_buffer_row = config::write_buffer_size / average_row_size;
}

writer_context.partial_update_tablet_schema =
TabletSchema::create(_tablet->tablet_schema(), writer_context.referenced_column_ids);
auto sort_key_idxes = _tablet->tablet_schema().sort_key_idxes();
Expand Down Expand Up @@ -388,6 +399,7 @@ void DeltaWriter::_reset_mem_table() {
_mem_table = std::make_unique<MemTable>(_tablet->tablet_id(), &_vectorized_schema, _opt.slots,
_mem_table_sink.get(), "", _mem_tracker);
}
_mem_table->set_write_buffer_row(_memtable_buffer_row);
}

Status DeltaWriter::commit() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class DeltaWriter {
std::unique_ptr<FlushToken> _flush_token;
std::unique_ptr<ReplicateToken> _replicate_token;
bool _with_rollback_log;
// initial value is max value
size_t _memtable_buffer_row = -1;
};

} // namespace vectorized
Expand Down
8 changes: 7 additions & 1 deletion be/src/storage/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ size_t MemTable::write_buffer_size() const {
return _chunk_bytes_usage + _aggregator_bytes_usage;
}

size_t MemTable::write_buffer_rows() const {
return _total_rows - _merged_rows;
}

bool MemTable::is_full() const {
return write_buffer_size() >= _max_buffer_size;
return write_buffer_size() >= _max_buffer_size || write_buffer_rows() >= _max_buffer_row;
}

bool MemTable::insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size) {
Expand Down Expand Up @@ -147,6 +151,7 @@ bool MemTable::insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from
if (chunk.has_rows()) {
_chunk_memory_usage += chunk.memory_usage() * size / chunk.num_rows();
_chunk_bytes_usage += _chunk->bytes_usage(cur_row_count, size);
_total_rows += chunk.num_rows();
}

// if memtable is full, push it to the flush executor,
Expand Down Expand Up @@ -303,6 +308,7 @@ void MemTable::_aggregate(bool is_final) {
// impossible finish
DCHECK(!_aggregator->is_finish());
DCHECK(_aggregator->source_exhausted());
_merged_rows = _aggregator->merged_rows();

if (is_final) {
_result_chunk.reset();
Expand Down
12 changes: 12 additions & 0 deletions be/src/storage/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class MemTable {

// buffer memory usage for write segment
size_t write_buffer_size() const;
size_t write_buffer_rows() const;

// return true suggests caller should flush this memory table
bool insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size);
Expand All @@ -51,7 +52,14 @@ class MemTable {

bool is_full() const;

<<<<<<< HEAD
static Schema convert_schema(const TabletSchema* tablet_schema, const std::vector<SlotDescriptor*>* slot_descs);
=======
void set_write_buffer_row(size_t max_buffer_row) { _max_buffer_row = max_buffer_row; }

static VectorizedSchema convert_schema(const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs);
>>>>>>> 545b7be0b ([Enhancement] Optimize mem usage of partial update (#14187))

private:
void _merge();
Expand Down Expand Up @@ -92,6 +100,10 @@ class MemTable {
std::string _merge_condition;

int64_t _max_buffer_size = config::write_buffer_size;
// initial value is max size
size_t _max_buffer_row = -1;
size_t _total_rows = 0;
size_t _merged_rows = 0;

// memory statistic
MemTracker* _mem_tracker = nullptr;
Expand Down
18 changes: 18 additions & 0 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,24 @@ Status Rowset::reload() {
return Status::OK();
}

Status Rowset::reload_segment(int32_t segment_id) {
DCHECK(_segments.size() > segment_id);
if (_segments.size() <= segment_id) {
LOG(WARNING) << "Error segment id: " << segment_id;
return Status::InternalError("Error segment id");
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path));
size_t footer_size_hint = 16 * 1024;
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id);
auto res = Segment::open(fs, seg_path, segment_id, _schema, &footer_size_hint);
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
return res.status();
}
_segments[segment_id] = std::move(res).value();
return Status::OK();
}

StatusOr<int64_t> Rowset::estimate_compaction_segment_iterator_num() {
if (num_segments() == 0) {
return 0;
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {

// reload this rowset after the underlying segment file is changed
Status reload();
Status reload_segment(int32_t segment_id);

const TabletSchema& schema() const { return *_schema; }
void set_schema(const TabletSchema* schema) { _schema = schema; }
Expand Down

0 comments on commit 20d524a

Please sign in to comment.