Skip to content

Commit

Permalink
Optimize index creation time (#1240)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
* Use loser tree to replace normal heap
* Optimize io and use mmap to replace fread
* Reconstruct merge and predic threads using ring queues
* Reconstruct merge and output threads using output queues
* Adjust the intermediate file output format and reduce the size of the
intermediate file
* Reconstruct the locks between merge, predic, and output threads to
reduce lock granularity

### Type of change
- [x] Refactoring
- [x] Performance Improvement
- [x] Test cases
  • Loading branch information
Ma-cat committed May 24, 2024
1 parent a6d5c6a commit 42be3d7
Show file tree
Hide file tree
Showing 12 changed files with 1,010 additions and 184 deletions.
40 changes: 26 additions & 14 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import infinity_exception;
import third_party;
import status;
import logger;
import buf_writer;
import profiler;
import third_party;

namespace infinity {

Expand Down Expand Up @@ -253,24 +256,32 @@ void ColumnInverter::SortForOfflineDump() {
// +-----------+ +----------------++--------------------++--------------------------++-------------------------------------------------------+
// ----------------------------------------------------------------------------------------------------------------------------+
// Data within each group
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {

void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer) {
// spill sort results for external merge sort
if (positions_.empty()) {
return;
}
SizeT spill_file_tell = ftell(spill_file);
// size of this Run in bytes
u32 data_size = 0;
u64 data_size_pos = ftell(spill_file);
fwrite(&data_size, sizeof(u32), 1, spill_file);
u64 data_size_pos = spill_file_tell;
buf_writer->Write((const char*)&data_size, sizeof(u32));
spill_file_tell += sizeof(u32);

// number of tuples
u32 num_of_tuples = positions_.size();
tuple_count += num_of_tuples;
fwrite(&num_of_tuples, sizeof(u32), 1, spill_file);
buf_writer->Write((const char*)&num_of_tuples, sizeof(u32));
spill_file_tell += sizeof(u32);

// start offset for next spill
u64 next_start_offset = 0;
u64 next_start_offset_pos = ftell(spill_file);
fwrite(&next_start_offset, sizeof(u64), 1, spill_file);
u64 data_start_offset = ftell(spill_file);
u64 next_start_offset_pos = spill_file_tell;
buf_writer->Write((const char*)&next_start_offset, sizeof(u64));
spill_file_tell += sizeof(u64);

u64 data_start_offset = spill_file_tell;
// sorted data
u32 last_term_num = std::numeric_limits<u32>::max();
StringRef term;
Expand All @@ -282,15 +293,16 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {
term = GetTermFromNum(last_term_num);
}
record_length = term.size() + sizeof(docid_t) + sizeof(u32) + 1;
fwrite(&record_length, sizeof(u32), 1, spill_file);
fwrite(term.data(), term.size(), 1, spill_file);
fwrite(&str_null, sizeof(char), 1, spill_file);
fwrite(&i.doc_id_, sizeof(docid_t), 1, spill_file);
fwrite(&i.term_pos_, sizeof(u32), 1, spill_file);
}

buf_writer->Write((const char*)&record_length, sizeof(u32));
buf_writer->Write(term.data(), term.size());
buf_writer->Write((const char*)&str_null, sizeof(char));
buf_writer->Write((const char*)&(i.doc_id_), sizeof(docid_t));
buf_writer->Write((const char*)&(i.term_pos_), sizeof(u32));
}
buf_writer->Flush();
// update data size
next_start_offset = ftell(spill_file);
next_start_offset = buf_writer->Tell();
data_size = next_start_offset - data_start_offset;
fseek(spill_file, data_size_pos, SEEK_SET);
fwrite(&data_size, sizeof(u32), 1, spill_file); // update offset for next spill
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import string_ref;
import internal_types;
import posting_writer;
import vector_with_lock;
import buf_writer;

namespace infinity {

Expand Down Expand Up @@ -73,7 +74,7 @@ public:
}
};

void SpillSortResults(FILE *spill_file, u64 &tuple_count);
void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer);

private:
using TermBuffer = Vector<char>;
Expand Down
42 changes: 42 additions & 0 deletions src/storage/invertedindex/common/buf_writer.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
module;

#include <cstring>
#include <cstdio>

export module buf_writer;
import stl;

namespace infinity {
// A simple buffer writer that writes data to a file.
// Now only used for ColumnInverter
// ColumnInverter will use BufWriter sequentially write data and use spill_file pointer randomly write data
export struct BufWriter {
BufWriter(FILE *spill_file, SizeT spill_buf_size) : spill_file_(spill_file), spill_buf_size_(spill_buf_size) {
spill_buffer_ = MakeUnique<char_t[]>(spill_buf_size_);
}

void Write(const char* data, SizeT data_size) {
if (spill_buf_idx_ + data_size > spill_buf_size_) {
Flush();
}
memcpy(spill_buffer_.get() + spill_buf_idx_, data, data_size);
spill_buf_idx_ += data_size;
}

void Flush() {
if (spill_buf_idx_) {
fwrite(spill_buffer_.get(), spill_buf_idx_, 1, spill_file_);
spill_buf_idx_ = 0;
}
}

SizeT Tell() {
return ftell(spill_file_);
}

FILE *spill_file_{nullptr};
SizeT spill_buf_idx_{0};
UniquePtr<char_t[]> spill_buffer_{};
SizeT spill_buf_size_{0};
};
}
Loading

0 comments on commit 42be3d7

Please sign in to comment.