Skip to content

Commit

Permalink
Pick fix mow index compaction and revert comparePostings (#199)
Browse files Browse the repository at this point in the history
* [fix](write)revert comparePostings due to write core (#195)

revert #156

* [fix](index compaction)Remove INT32_MAX out of destPostingQueues (#198)
  • Loading branch information
qidaye authored Mar 12, 2024
1 parent c5ba0a2 commit ef95e67
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 50 deletions.
101 changes: 60 additions & 41 deletions src/core/CLucene/index/IndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <assert.h>
#include <iostream>
#include <roaring/roaring.hh>
#include <sstream>

#define FINALLY_CLOSE_OUTPUT(x) \
try { \
Expand Down Expand Up @@ -1667,34 +1668,47 @@ void IndexWriter::mergeTerms(bool hasProx) {
std::vector<std::vector<uint32_t>> freqBuffers(numDestIndexes);
auto destPostingQueues = _CLNEW postingQueue(matchSize);
std::vector<DestDoc> destDocs(matchSize);
for (int i = 0; i < matchSize; ++i) {
smi = match[i];
TermPositions *postings = smi->getPositions();

postings->seek(smi->termEnum);
if (postings->next()) {
auto processPostings = [&](TermPositions* postings, DestDoc* destDoc, int srcIdx) {
while (postings->next()) {
int srcDoc = postings->doc();
std::pair<int32_t, uint32_t> p = _trans_vec[smi->readerIndex][srcDoc];
uint32_t destIdx = p.first;
uint32_t destDocId = p.second;

destDocs[i].srcIdx = i;
destDocs[i].destIdx = destIdx;
destDocs[i].destDocId = destDocId;
destDoc->destIdx = p.first;
destDoc->destDocId = p.second;
destDoc->srcIdx = srcIdx;
// <UINT32_MAX, UINT32_MAX> indicates current row not exist in Doris dest segment.
// So we ignore this doc here.
if (destDoc->destIdx == UINT32_MAX || destDoc->destDocId == UINT32_MAX) {
if (infoStream != nullptr) {
std::stringstream ss;
ss << "skip UINT32_MAX, srcIdx: " << smi->readerIndex << ", srcDoc: " << srcDoc
<< ", destIdx: " << destDoc->destIdx << ", destDocId: " << destDoc->destDocId;
message(ss.str());
}
continue;
}

if (hasProx) {
int32_t freq = postings->freq();
destDocs[i].destFreq = freq;
destDocs[i].destPositions.resize(freq);
destDoc->destFreq = freq;
destDoc->destPositions.resize(freq);

for (int32_t j = 0; j < freq; j++) {
int32_t position = postings->nextPosition();
destDocs[i].destPositions[j] = position;
destDoc->destPositions[j] = position;
}
}

destPostingQueues->put(&destDocs[i]);
destPostingQueues->put(destDoc);
break;
}
};

for (int i = 0; i < matchSize; ++i) {
smi = match[i];
auto* postings = smi->getPositions();
postings->seek(smi->termEnum);
processPostings(postings, &destDocs[i], i);
}

auto encode = [](IndexOutput* out, std::vector<uint32_t>& buffer, bool isDoc) {
Expand All @@ -1719,12 +1733,27 @@ void IndexWriter::mergeTerms(bool hasProx) {
auto destDocId = destDoc->destDocId;
auto destFreq = destDoc->destFreq;
auto& descPositions = destDoc->destPositions;
// <UINT32_MAX, UINT32_MAX> indicates current row not exist in Doris dest segment.
// So we ignore this doc here.
if (destIdx == UINT32_MAX || destDocId == UINT32_MAX) {
continue;
if (infoStream != nullptr) {
for (int i = 0; i < _trans_vec.size(); ++i) {
// find pair < destIdx, destDocId > in _trans_vec[i] to get the index of the pair
auto it = std::find_if(_trans_vec[i].begin(), _trans_vec[i].end(),
[destIdx, destDocId](const std::pair<uint32_t, uint32_t>& pair) {
return pair.first == destIdx && pair.second == destDocId;
});

// Check if the pair was found
if (it != _trans_vec[i].end()) {
// Calculate the index of the pair
size_t index = std::distance(_trans_vec[i].begin(), it);
std::stringstream ss;
ss << "Found pair at srcIdxId:" << i << ", srcDocId: " << index << ", destIdxId: " << destIdx
<< ", destDocId: " << destDocId << ", destFreq: " << destDoc->destFreq;
message(ss.str());
}
}
}
auto freqOut = freqOutputList[destIdx];

auto freqOut = freqOutputList[destIdx];
auto proxOut = proxOutputList[destIdx];
auto& docDeltaBuffer = docDeltaBuffers[destIdx];
auto& freqBuffer = freqBuffers[destIdx];
Expand Down Expand Up @@ -1768,32 +1797,22 @@ void IndexWriter::mergeTerms(bool hasProx) {
}

smi = match[destDoc->srcIdx];
TermPositions *postings = smi->getPositions();
if (postings->next()) {
int srcDoc = postings->doc();
std::pair<int32_t, uint32_t> p = _trans_vec[smi->readerIndex][srcDoc];
destDoc->destIdx = p.first;
destDoc->destDocId = p.second;

if (hasProx) {
int32_t freq = postings->freq();
destDoc->destFreq = freq;
destDoc->destPositions.resize(freq);

for (int32_t j = 0; j < freq; j++) {
int32_t position = postings->nextPosition();
destDoc->destPositions[j] = position;
}
}

destPostingQueues->put(destDoc);
}
processPostings(smi->getPositions(), destDoc, destDoc->srcIdx);
}
}
if (destPostingQueues != NULL) {
if (destPostingQueues != nullptr) {
destPostingQueues->close();
_CLDELETE(destPostingQueues);
}

if (infoStream != nullptr) {
std::stringstream ss;
for (const auto& df : dfs) {
ss<< "df: " << df << "\n";
}
message(ss.str());
}

for (int i = 0; i < numDestIndexes; ++i) {
DefaultSkipListWriter *skipListWriter = skipListWriterList[i];
CL_NS(store)::IndexOutput *freqOutput = freqOutputList[i];
Expand Down
26 changes: 17 additions & 9 deletions src/core/CLucene/index/SDocumentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,15 +743,23 @@ void SDocumentsWriter<T>::ThreadState::resetPostings() {

template<typename T>
int32_t SDocumentsWriter<T>::ThreadState::comparePostings(Posting *p1, Posting *p2) {
if constexpr (std::is_same_v<T, char>) {
auto n1 = p1->term_.size();
auto n2 = p2->term_.size();
auto min = std::min(n1, n2);
auto s1 = p1->term_.data();
auto s2 = p2->term_.data();
return StringUtil::string_compare(s1, n1, s2, n2, min);
} else {
return p1->term_.compare(p2->term_);
const T *pos1 = scharPool->buffers[p1->textStart >> CHAR_BLOCK_SHIFT] + (p1->textStart & CHAR_BLOCK_MASK);
const T *pos2 = scharPool->buffers[p2->textStart >> CHAR_BLOCK_SHIFT] + (p2->textStart & CHAR_BLOCK_MASK);
while (true) {
const auto c1 = static_cast<typename std::make_unsigned<T>::type>(*pos1++);
const auto c2 = static_cast<typename std::make_unsigned<T>::type>(*pos2++);
if (c1 < c2)
if (CLUCENE_END_OF_WORD == c2)
return 1;
else
return -1;
else if (c2 < c1)
if (CLUCENE_END_OF_WORD == c1)
return -1;
else
return 1;
else if (CLUCENE_END_OF_WORD == c1)
return 0;
}
}

Expand Down

0 comments on commit ef95e67

Please sign in to comment.