Skip to content

Commit

Permalink
Fix: knn index returns invisible rows.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed May 7, 2024
1 parent 5ad3d30 commit e01bf9b
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 27 deletions.
1 change: 0 additions & 1 deletion python/parallel_test/test_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

class TestIndexParallel:

@pytest.mark.skip(reason="Cannot find segment id: 0, block id: 9@src/executor/operator/physical_knn_scan.cpp:542")
def test_chaos(self, get_infinity_connection_pool):
data = read_out_data()
connection_pool = get_infinity_connection_pool
Expand Down
1 change: 1 addition & 0 deletions src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export namespace std {
using std::is_same;
using std::fill;
using std::lower_bound;
using std::upper_bound;

using std::condition_variable;
using std::condition_variable_any;
Expand Down
2 changes: 1 addition & 1 deletion src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
const u32 segment_row_actual_count = segment_entry->actual_row_count(); // count of rows in segment, exclude deleted rows

// prepare filter for deleted rows
DeleteFilter delete_filter(segment_entry, begin_ts);
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
// output
auto result = SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count);
result.Output(output_data_blocks, segment_id, delete_filter);
Expand Down
20 changes: 15 additions & 5 deletions src/executor/operator/physical_knn_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
IVFFlatScan(filter);
}
} else {
SegmentOffset max_segment_offset = block_index->GetSegmentOffset(segment_id);
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteFilter filter(segment_entry, begin_ts);
DeleteFilter filter(segment_entry, begin_ts, max_segment_offset);
IVFFlatScan(filter);
} else {
IVFFlatScan();
Expand All @@ -407,7 +408,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
case IndexType::kHnsw: {
const auto *index_hnsw = static_cast<const IndexHnsw *>(segment_index_entry->table_index_entry()->index_base());

auto hnsw_search = [&](BufferHandle index_handle, bool with_lock) {
auto hnsw_search = [&](BufferHandle index_handle, bool with_lock, int chunk_id = -1) {
AbstractHnsw<f32, SegmentOffset> abstract_hnsw(index_handle.GetDataMut(), index_hnsw);

for (const auto &opt_param : knn_scan_shared_data->opt_params_) {
Expand Down Expand Up @@ -436,15 +437,16 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, filter, with_lock);
}
} else {
SegmentOffset max_segment_offset = block_index->GetSegmentOffset(segment_id);
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteFilter filter(segment_entry, begin_ts);
DeleteFilter filter(segment_entry, begin_ts, max_segment_offset);
std::tie(result_n1, d_ptr, l_ptr) =
abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, filter, with_lock);
} else {
if (!with_lock) {
std::tie(result_n1, d_ptr, l_ptr) = abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, false);
} else {
AppendFilter filter(block_index->GetSegmentOffset(segment_id));
AppendFilter filter(max_segment_offset);
std::tie(result_n1, d_ptr, l_ptr) = abstract_hnsw.KnnSearch(query, knn_scan_shared_data->topk_, filter, true);
}
}
Expand Down Expand Up @@ -476,16 +478,24 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
auto row_ids = MakeUniqueForOverwrite<RowID[]>(result_n);
for (i64 i = 0; i < result_n; ++i) {
row_ids[i] = RowID{segment_id, l_ptr[i]};

BlockID block_id = l_ptr[i] / DEFAULT_BLOCK_CAPACITY;
auto *block_entry = block_index->GetBlockEntry(segment_id, block_id);
if (block_entry == nullptr) {
UnrecoverableError(
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
}
}
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
}
};

auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
int i = 0;
for (auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(begin_ts)) {
BufferHandle index_handle = chunk_index_entry->GetIndex();
hnsw_search(index_handle, false);
hnsw_search(index_handle, false, i++);
}
}
if (memory_index_entry.get() != nullptr) {
Expand Down
3 changes: 2 additions & 1 deletion src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ class FilterIteratorBase : public QueryIteratorT {
return {false, INVALID_ROWID};
}
if (cache_need_check_delete_) [[unlikely]] {
DeleteFilter delete_filter(cache_segment_entry_, common_query_filter_->begin_ts_);
SegmentOffset max_segment_offset = cache_segment_offset_;
DeleteFilter delete_filter(cache_segment_entry_, common_query_filter_->begin_ts_, max_segment_offset);
if (delete_filter(id.segment_offset_)) {
return {true, id};
}
Expand Down
12 changes: 9 additions & 3 deletions src/function/table/knn_filter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,26 @@ private:

export class DeleteFilter final : public FilterBase<SegmentOffset> {
public:
explicit DeleteFilter(const SegmentEntry *segment, TxnTimeStamp query_ts) : segment_(segment), query_ts_(query_ts) {}
explicit DeleteFilter(const SegmentEntry *segment, TxnTimeStamp query_ts, SegmentOffset max_segment_offset)
: segment_(segment), query_ts_(query_ts), max_segment_offset_(max_segment_offset) {}

bool operator()(const SegmentOffset &segment_offset) const final { return segment_->CheckRowVisible(segment_offset, query_ts_); }
bool operator()(const SegmentOffset &segment_offset) const final {
bool check_append = max_segment_offset_ == 0;
return segment_offset <= max_segment_offset_ && segment_->CheckRowVisible(segment_offset, query_ts_, check_append);
}

private:
const SegmentEntry *const segment_;

const TxnTimeStamp query_ts_;

const SegmentOffset max_segment_offset_;
};

export class DeleteWithBitmaskFilter final : public FilterBase<SegmentOffset> {
public:
explicit DeleteWithBitmaskFilter(const Bitmask &bitmask, const SegmentEntry *segment, TxnTimeStamp query_ts)
: bitmask_filter_(bitmask), delete_filter_(segment, query_ts) {}
: bitmask_filter_(bitmask), delete_filter_(segment, query_ts, 0) {}

bool operator()(const SegmentOffset &segment_offset) const final { return bitmask_filter_(segment_offset) && delete_filter_(segment_offset); }

Expand Down
2 changes: 1 addition & 1 deletion src/storage/common/block_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, boo
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
}
}
blocks_info.segment_offset_ = segment_entry->row_count();
blocks_info.segment_offset_ = segment_entry->row_count(timestamp);

segment_block_index_.emplace(segment_id, std::move(blocks_info));
}
Expand Down
15 changes: 13 additions & 2 deletions src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ void BlockEntry::UpdateBlockReplay(SharedPtr<BlockEntry> block_entry, String blo
}
}

SizeT BlockEntry::row_count(TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);

auto block_version_handle = this->block_version_->Load();
const auto *block_version = reinterpret_cast<const BlockVersion *>(block_version_handle.GetData());
return block_version->GetRowCount(check_ts);
}

Pair<BlockOffset, BlockOffset> BlockEntry::GetVisibleRange(TxnTimeStamp begin_ts, u16 block_offset_begin) const {
std::shared_lock lock(rw_locker_);
begin_ts = std::min(begin_ts, this->max_row_ts_);
Expand All @@ -138,13 +146,16 @@ Pair<BlockOffset, BlockOffset> BlockEntry::GetVisibleRange(TxnTimeStamp begin_ts
return {block_offset_begin, row_idx};
}

bool BlockEntry::CheckRowVisible(BlockOffset block_offset, TxnTimeStamp check_ts) const {
bool BlockEntry::CheckRowVisible(BlockOffset block_offset, TxnTimeStamp check_ts, bool check_append) const {
std::shared_lock lock(rw_locker_);

auto block_version_handle = this->block_version_->Load();
const auto *block_version = reinterpret_cast<const BlockVersion *>(block_version_handle.GetData());

auto &deleted = block_version->deleted_;
if (check_append && block_version->GetRowCount(check_ts) <= block_offset) {
return false;
}
const auto &deleted = block_version->deleted_;
return deleted[block_offset] == 0 || deleted[block_offset] > check_ts;
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/meta/entry/block_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ public:

const FastRoughFilter *GetFastRoughFilter() const { return &fast_rough_filter_; }

SizeT row_count(TxnTimeStamp check_ts) const;

// Get visible range of the BlockEntry since the given row number for a txn
Pair<BlockOffset, BlockOffset> GetVisibleRange(TxnTimeStamp begin_ts, BlockOffset block_offset_begin = 0) const;

bool CheckRowVisible(BlockOffset block_offset, TxnTimeStamp check_ts) const;
bool CheckRowVisible(BlockOffset block_offset, TxnTimeStamp check_ts, bool check_append) const;

bool CheckDeleteVisible(Vector<BlockOffset> &block_offsets, TxnTimeStamp check_ts) const;

Expand Down
16 changes: 7 additions & 9 deletions src/storage/meta/entry/block_version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,14 @@ bool BlockVersion::operator==(const BlockVersion &rhs) const {
}

i32 BlockVersion::GetRowCount(TxnTimeStamp begin_ts) const {
if (created_.empty())
return 0;
i64 idx = created_.size() - 1;
for (; idx >= 0; idx--) {
if (created_[idx].create_ts_ <= begin_ts)
break;
// use binary search find the last create_field that has create_ts_ <= check_ts
auto iter =
std::upper_bound(created_.begin(), created_.end(), begin_ts, [](TxnTimeStamp ts, const CreateField &field) { return ts < field.create_ts_; });
if (iter == created_.begin()) {
return false;
}
if (idx < 0)
return 0;
return created_[idx].row_count_;
--iter;
return iter->row_count_;
}

void BlockVersion::SaveToFile(TxnTimeStamp checkpoint_ts, FileHandler &file_handler) const {
Expand Down
19 changes: 17 additions & 2 deletions src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,17 @@ bool SegmentEntry::CheckDeleteConflict(Vector<Pair<SegmentEntry *, Vector<Segmen
return false;
}

bool SegmentEntry::CheckRowVisible(SegmentOffset segment_offset, TxnTimeStamp check_ts) const {
bool SegmentEntry::CheckRowVisible(SegmentOffset segment_offset, TxnTimeStamp check_ts, bool check_append) const {
// FIXME: get the block_capacity from config?
u32 block_capacity = DEFAULT_BLOCK_CAPACITY;
BlockID block_id = segment_offset / block_capacity;
BlockOffset block_offset = segment_offset % block_capacity;

auto *block_entry = GetBlockEntryByID(block_id).get();
return block_entry->CheckRowVisible(block_offset, check_ts);
if (block_entry == nullptr || block_entry->commit_ts_ > check_ts) {
return false;
}
return block_entry->CheckRowVisible(block_offset, check_ts, check_append);
}

bool SegmentEntry::CheckDeleteVisible(HashMap<BlockID, Vector<BlockOffset>> &block_offsets_map, Txn *txn) const {
Expand Down Expand Up @@ -254,6 +257,18 @@ void SegmentEntry::AppendBlockEntry(UniquePtr<BlockEntry> block_entry) {
block_entries_.emplace_back(std::move(block_entry));
}

SizeT SegmentEntry::row_count(TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);
if (status_ == SegmentStatus::kDeprecated && check_ts > deprecate_ts_) {
return 0;
}
SizeT row_count = 0;
for (const auto &block_entry : block_entries_) {
row_count += block_entry->row_count(check_ts);
}
return row_count;
}

// One writer
u64 SegmentEntry::AppendData(TransactionID txn_id, TxnTimeStamp commit_ts, AppendState *append_state_ptr, BufferManager *buffer_mgr, Txn *txn) {
TxnTableStore *txn_store = txn->GetTxnTableStore(table_entry_);
Expand Down
4 changes: 3 additions & 1 deletion src/storage/meta/entry/segment_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public:

static bool CheckDeleteConflict(Vector<Pair<SegmentEntry *, Vector<SegmentOffset>>> &&segments, TransactionID txn_id);

bool CheckRowVisible(SegmentOffset segment_offset, TxnTimeStamp check_ts) const;
bool CheckRowVisible(SegmentOffset segment_offset, TxnTimeStamp check_ts, bool check_append) const;

bool CheckDeleteVisible(HashMap<BlockID, Vector<BlockOffset>> &block_offsets_map, Txn *txn) const;

Expand Down Expand Up @@ -175,6 +175,8 @@ public:
SharedPtr<BlockEntry> GetBlockEntryByID(BlockID block_id) const;

public:
SizeT row_count(TxnTimeStamp check_ts) const;

u64 AppendData(TransactionID txn_id, TxnTimeStamp commit_ts, AppendState *append_state_ptr, BufferManager *buffer_mgr, Txn *txn);

SizeT DeleteData(TransactionID txn_id, TxnTimeStamp commit_ts, const HashMap<BlockID, Vector<BlockOffset>> &block_row_hashmap, Txn *txn);
Expand Down

0 comments on commit e01bf9b

Please sign in to comment.